diff --git a/lbrynet/schema/claim.py b/lbrynet/schema/claim.py index 1f1fb0b11..ce739c31f 100644 --- a/lbrynet/schema/claim.py +++ b/lbrynet/schema/claim.py @@ -164,9 +164,9 @@ class ClaimDict(OrderedDict): signer = get_signer(curve).load_pem(private_key) return cls.load_protobuf(signer.certificate) - def sign(self, private_key, claim_address, cert_claim_id, curve=SECP256k1, name=None): + def sign(self, private_key, claim_address, cert_claim_id, curve=SECP256k1, name=None, force_detached=False): signer = get_signer(curve).load_pem(private_key) - signed, signature = signer.sign_stream_claim(self, claim_address, cert_claim_id, name) + signed, signature = signer.sign_stream_claim(self, claim_address, cert_claim_id, name, force_detached) return ClaimDict.load_protobuf(signed, signature) def validate_signature(self, claim_address, certificate, name=None): diff --git a/lbrynet/schema/signer.py b/lbrynet/schema/signer.py index 17fb04697..eca2fb467 100644 --- a/lbrynet/schema/signer.py +++ b/lbrynet/schema/signer.py @@ -16,7 +16,6 @@ class NIST_ECDSASigner(object): CURVE_NAME = None HASHFUNC = hashlib.sha256 HASHFUNC_NAME = SHA256 - DETACHED = False def __init__(self, private_key): self._private_key = private_key @@ -47,9 +46,9 @@ class NIST_ECDSASigner(object): def generate(cls): return cls(ecdsa.SigningKey.generate(curve=cls.CURVE, hashfunc=cls.HASHFUNC_NAME)) - def sign_stream_claim(self, claim, claim_address, cert_claim_id, name): + def sign_stream_claim(self, claim, claim_address, cert_claim_id, name, detached=False): to_sign = bytearray() - if self.DETACHED: + if detached: assert name, "Name is required for detached signatures" assert self.CURVE_NAME == SECP256k1, f"Only SECP256k1 is supported, not: {self.CURVE_NAME}" to_sign.extend(name.lower().encode()) @@ -63,10 +62,11 @@ class NIST_ECDSASigner(object): to_sign.extend(raw_cert_id) digest = self.HASHFUNC(to_sign).digest() - if self.DETACHED: + if detached: return Claim.load(decode_b64_fields(claim.protobuf_dict)), Signature( self.private_key.sign_digest_deterministic(digest, hashfunc=self.HASHFUNC), raw_cert_id ) + # -- Legacy signer (signature inside protobuf) -- if not isinstance(self.private_key, ecdsa.SigningKey): raise Exception("Not given a signing key") diff --git a/tests/unit/schema/test_lbryschema.py b/tests/unit/schema/test_lbryschema.py index 72aca1557..daa8ba92c 100644 --- a/tests/unit/schema/test_lbryschema.py +++ b/tests/unit/schema/test_lbryschema.py @@ -333,8 +333,8 @@ class TestDetachedNamedSECP256k1Signatures(UnitTest): def test_validate_detached_named_ecdsa_signature(self): cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1) self.assertDictEqual(cert.claim_dict, secp256k1_cert) - signed = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_2, - claim_id_1, curve=SECP256k1, name='example') + signed = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_2, claim_id_1, + curve=SECP256k1, name='example', force_detached=True) #self.assertDictEqual(signed.claim_dict, claim_010_signed_secp256k1) signed_copy = ClaimDict.deserialize(signed.serialized) self.assertEqual(signed_copy.validate_signature(claim_address_2, cert, name='example'), True) @@ -343,13 +343,13 @@ class TestDetachedNamedSECP256k1Signatures(UnitTest): cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1) self.assertDictEqual(cert.claim_dict, secp256k1_cert) self.assertRaises(Exception, ClaimDict.load_dict(example_010).sign, secp256k1_private_key, - None, claim_id_1, curve=SECP256k1, name='example') + None, claim_id_1, curve=SECP256k1, name='example', force_detached=True) def test_fail_to_validate_with_no_claim_address(self): cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1) self.assertDictEqual(cert.claim_dict, secp256k1_cert) - signed = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_2, - claim_id_1, curve=SECP256k1, name='example') + signed = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_2, claim_id_1, + curve=SECP256k1, name='example', force_detached=True) #self.assertDictEqual(signed.claim_dict, claim_010_signed_secp256k1) signed_copy = ClaimDict.load_protobuf(signed.protobuf) self.assertRaises(Exception, signed_copy.validate_signature, None, cert, name='example') @@ -357,20 +357,21 @@ class TestDetachedNamedSECP256k1Signatures(UnitTest): def test_fail_to_validate_with_no_name(self): cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1) self.assertDictEqual(cert.claim_dict, secp256k1_cert) - signed = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_2, - claim_id_1, curve=SECP256k1, name='example') + signed = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_2, claim_id_1, + curve=SECP256k1, name='example', force_detached=True) #self.assertDictEqual(signed.claim_dict, claim_010_signed_secp256k1) signed_copy = ClaimDict.load_protobuf(signed.protobuf) self.assertRaises(Exception, signed_copy.validate_signature, None, cert, name=None) def test_remove_signature_equals_unsigned(self): unsigned = ClaimDict.load_dict(example_010) - signed = unsigned.sign(secp256k1_private_key, claim_address_1, claim_id_1, curve=SECP256k1, name='example') + signed = unsigned.sign(secp256k1_private_key, claim_address_1, claim_id_1, + curve=SECP256k1, name='example', force_detached=True) self.assertEqual(unsigned.serialized, signed.serialized_no_signature) def test_fail_to_validate_fake_ecdsa_signature(self): - signed = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_1, - claim_id_1, curve=SECP256k1, name='example') + signed = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_1, claim_id_1, + curve=SECP256k1, name='example', force_detached=True) signed_copy = ClaimDict.deserialize(signed.serialized) fake_key = get_signer(SECP256k1).generate().private_key.to_pem() fake_cert = ClaimDict.generate_certificate(fake_key, curve=SECP256k1) @@ -379,8 +380,8 @@ class TestDetachedNamedSECP256k1Signatures(UnitTest): def test_fail_to_validate_ecdsa_sig_for_altered_claim(self): cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1) - altered = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_1, - claim_id_1, curve=SECP256k1, name='example') + altered = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_1, claim_id_1, + curve=SECP256k1, name='example', force_detached=True) sd_hash = altered['stream']['source']['source'] altered['stream']['source']['source'] = sd_hash[::-1] altered_copy = ClaimDict.deserialize(altered.serialized)