From a144fae8b252f8cb997dfeeaf2a1773d1786515c Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 22 Jan 2019 21:27:29 -0300 Subject: [PATCH] activate new signature model --- lbrynet/extras/wallet/manager.py | 3 +- lbrynet/schema/claim.py | 4 +-- lbrynet/schema/signer.py | 6 ++-- tests/unit/schema/test_lbryschema.py | 42 ++++++++++++++-------------- 4 files changed, 27 insertions(+), 28 deletions(-) diff --git a/lbrynet/extras/wallet/manager.py b/lbrynet/extras/wallet/manager.py index f4b35c65d..249d00591 100644 --- a/lbrynet/extras/wallet/manager.py +++ b/lbrynet/extras/wallet/manager.py @@ -411,8 +411,7 @@ class LbryWalletManager(BaseWalletManager): claim_address = await account.receiving.get_or_create_usable_address() if certificate: claim = claim.sign( - certificate.private_key, claim_address, certificate.claim_id, curve=SECP256k1, name=name, - force_detached=False # TODO: delete it and make True default everywhere when its out + certificate.private_key, claim_address, certificate.claim_id, curve=SECP256k1, name=name ) existing_claims = await account.get_claims( claim_name_type__any={'is_claim': 1, 'is_update': 1}, # exclude is_supports diff --git a/lbrynet/schema/claim.py b/lbrynet/schema/claim.py index 5576aea53..4f887c65c 100644 --- a/lbrynet/schema/claim.py +++ b/lbrynet/schema/claim.py @@ -162,9 +162,9 @@ class ClaimDict(OrderedDict): signer = get_signer(curve).load_pem(private_key) return cls.load_protobuf(signer.certificate) - def sign(self, private_key, claim_address, cert_claim_id, curve=SECP256k1, name=None, force_detached=False): + def sign(self, private_key, claim_address, cert_claim_id, curve=SECP256k1, name=None, legacy=False): signer = get_signer(curve).load_pem(private_key) - signed, signature = signer.sign_stream_claim(self, claim_address, cert_claim_id, name, force_detached) + signed, signature = signer.sign_stream_claim(self, claim_address, cert_claim_id, name, legacy) return ClaimDict.load_protobuf(signed, signature) def validate_signature(self, claim_address, certificate, name=None): diff --git a/lbrynet/schema/signer.py b/lbrynet/schema/signer.py index 2daac0dbd..c5147d646 100644 --- a/lbrynet/schema/signer.py +++ b/lbrynet/schema/signer.py @@ -50,11 +50,11 @@ class NIST_ECDSASigner(object): digest = self.HASHFUNC(bytearray(b''.join(fields))).digest() return self.private_key.sign_digest_deterministic(digest, hashfunc=self.HASHFUNC) - def sign_stream_claim(self, claim, claim_address, cert_claim_id, name, detached=False): + def sign_stream_claim(self, claim, claim_address, cert_claim_id, name, legacy=False): validate_claim_id(cert_claim_id) raw_cert_id = binascii.unhexlify(cert_claim_id) decoded_addr = decode_address(claim_address) - if detached: + if not legacy: assert name, "Name is required for detached signatures" assert self.CURVE_NAME == SECP256k1, f"Only SECP256k1 is supported, not: {self.CURVE_NAME}" signature = self.sign( @@ -66,7 +66,7 @@ class NIST_ECDSASigner(object): else: signature = self.sign(decoded_addr, claim.serialized_no_signature, raw_cert_id) - if detached: + if not legacy: return Claim.load(decode_b64_fields(claim.protobuf_dict)), Signature(NAMED_SECP256K1( signature, raw_cert_id, diff --git a/tests/unit/schema/test_lbryschema.py b/tests/unit/schema/test_lbryschema.py index ebf154876..3db269343 100644 --- a/tests/unit/schema/test_lbryschema.py +++ b/tests/unit/schema/test_lbryschema.py @@ -206,19 +206,19 @@ class TestNIST256pSignatures(UnitTest): def test_validate_ecdsa_signature(self): cert = ClaimDict.generate_certificate(nist256p_private_key, curve=NIST256p) signed = ClaimDict.load_dict(example_010).sign(nist256p_private_key, - claim_address_2, claim_id_1, curve=NIST256p) + claim_address_2, claim_id_1, curve=NIST256p, legacy=True) self.assertDictEqual(signed.claim_dict, claim_010_signed_nist256p) signed_copy = ClaimDict.load_protobuf(signed.protobuf) self.assertEqual(signed_copy.validate_signature(claim_address_2, cert), True) def test_remove_signature_equals_unsigned(self): unsigned = ClaimDict.load_dict(example_010) - signed = unsigned.sign(nist256p_private_key, claim_address_1, claim_id_1, curve=NIST256p) + signed = unsigned.sign(nist256p_private_key, claim_address_1, claim_id_1, curve=NIST256p, legacy=True) self.assertEqual(unsigned.serialized, signed.serialized_no_signature) def test_fail_to_validate_fake_ecdsa_signature(self): signed = ClaimDict.load_dict(example_010).sign(nist256p_private_key, claim_address_1, - claim_id_1, curve=NIST256p) + claim_id_1, curve=NIST256p, legacy=True) signed_copy = ClaimDict.load_protobuf(signed.protobuf) fake_key = get_signer(NIST256p).generate().private_key.to_pem() fake_cert = ClaimDict.generate_certificate(fake_key, curve=NIST256p) @@ -228,7 +228,7 @@ class TestNIST256pSignatures(UnitTest): def test_fail_to_validate_ecdsa_sig_for_altered_claim(self): cert = ClaimDict.generate_certificate(nist256p_private_key, curve=NIST256p) altered = ClaimDict.load_dict(example_010).sign(nist256p_private_key, claim_address_1, - claim_id_1, curve=NIST256p) + claim_id_1, curve=NIST256p, legacy=True) sd_hash = altered['stream']['source']['source'] altered['stream']['source']['source'] = sd_hash[::-1] altered_copy = ClaimDict.load_dict(altered.claim_dict) @@ -245,19 +245,19 @@ class TestNIST384pSignatures(UnitTest): def test_validate_ecdsa_signature(self): cert = ClaimDict.generate_certificate(nist384p_private_key, curve=NIST384p) signed = ClaimDict.load_dict(example_010).sign(nist384p_private_key, - claim_address_2, claim_id_1, curve=NIST384p) + claim_address_2, claim_id_1, curve=NIST384p, legacy=True) self.assertDictEqual(signed.claim_dict, claim_010_signed_nist384p) signed_copy = ClaimDict.load_protobuf(signed.protobuf) self.assertEqual(signed_copy.validate_signature(claim_address_2, cert), True) def test_remove_signature_equals_unsigned(self): unsigned = ClaimDict.load_dict(example_010) - signed = unsigned.sign(nist384p_private_key, claim_address_1, claim_id_1, curve=NIST384p) + signed = unsigned.sign(nist384p_private_key, claim_address_1, claim_id_1, curve=NIST384p, legacy=True) self.assertEqual(unsigned.serialized, signed.serialized_no_signature) def test_fail_to_validate_fake_ecdsa_signature(self): signed = ClaimDict.load_dict(example_010).sign(nist384p_private_key, claim_address_1, - claim_id_1, curve=NIST384p) + claim_id_1, curve=NIST384p, legacy=True) signed_copy = ClaimDict.load_protobuf(signed.protobuf) fake_key = get_signer(NIST384p).generate().private_key.to_pem() fake_cert = ClaimDict.generate_certificate(fake_key, curve=NIST384p) @@ -267,7 +267,7 @@ class TestNIST384pSignatures(UnitTest): def test_fail_to_validate_ecdsa_sig_for_altered_claim(self): cert = ClaimDict.generate_certificate(nist384p_private_key, curve=NIST384p) altered = ClaimDict.load_dict(example_010).sign(nist384p_private_key, claim_address_1, - claim_id_1, curve=NIST384p) + claim_id_1, curve=NIST384p, legacy=True) sd_hash = altered['stream']['source']['source'] altered['stream']['source']['source'] = sd_hash[::-1] altered_copy = ClaimDict.load_dict(altered.claim_dict) @@ -285,7 +285,7 @@ class TestSECP256k1Signatures(UnitTest): cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1) self.assertDictEqual(cert.claim_dict, secp256k1_cert) signed = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_2, - claim_id_1, curve=SECP256k1) + claim_id_1, curve=SECP256k1, legacy=True) self.assertDictEqual(signed.claim_dict, claim_010_signed_secp256k1) signed_copy = ClaimDict.load_protobuf(signed.protobuf) self.assertEqual(signed_copy.validate_signature(claim_address_2, cert), True) @@ -300,19 +300,19 @@ class TestSECP256k1Signatures(UnitTest): cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1) self.assertDictEqual(cert.claim_dict, secp256k1_cert) signed = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_2, - claim_id_1, curve=SECP256k1) + claim_id_1, curve=SECP256k1, legacy=True) self.assertDictEqual(signed.claim_dict, claim_010_signed_secp256k1) signed_copy = ClaimDict.load_protobuf(signed.protobuf) self.assertRaises(Exception, signed_copy.validate_signature, None, cert) def test_remove_signature_equals_unsigned(self): unsigned = ClaimDict.load_dict(example_010) - signed = unsigned.sign(secp256k1_private_key, claim_address_1, claim_id_1, curve=SECP256k1) + signed = unsigned.sign(secp256k1_private_key, claim_address_1, claim_id_1, curve=SECP256k1, legacy=True) self.assertEqual(unsigned.serialized, signed.serialized_no_signature) def test_fail_to_validate_fake_ecdsa_signature(self): signed = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_1, - claim_id_1, curve=SECP256k1) + claim_id_1, curve=SECP256k1, legacy=True) signed_copy = ClaimDict.load_protobuf(signed.protobuf) fake_key = get_signer(SECP256k1).generate().private_key.to_pem() fake_cert = ClaimDict.generate_certificate(fake_key, curve=SECP256k1) @@ -322,7 +322,7 @@ class TestSECP256k1Signatures(UnitTest): def test_fail_to_validate_ecdsa_sig_for_altered_claim(self): cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1) altered = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_1, - claim_id_1, curve=SECP256k1) + claim_id_1, curve=SECP256k1, legacy=True) sd_hash = altered['stream']['source']['source'] altered['stream']['source']['source'] = sd_hash[::-1] altered_copy = ClaimDict.load_dict(altered.claim_dict) @@ -335,7 +335,7 @@ class TestDetachedNamedSECP256k1Signatures(UnitTest): cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1) self.assertDictEqual(cert.claim_dict, secp256k1_cert) signed = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_2, claim_id_1, - curve=SECP256k1, name='example', force_detached=True) + curve=SECP256k1, name='example') signed_copy = ClaimDict.deserialize(signed.serialized) self.assertEqual(signed_copy.validate_signature(claim_address_2, cert, name='example'), True) @@ -343,7 +343,7 @@ class TestDetachedNamedSECP256k1Signatures(UnitTest): cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1) self.assertDictEqual(cert.claim_dict, secp256k1_cert) signed = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_2, claim_id_1, - curve=SECP256k1, name='example', force_detached=True) + curve=SECP256k1, name='example') self.assertEqual( signed.claim_dict['publisherSignature']['detached_signature'], binascii.hexlify(signed.serialized).decode() @@ -408,13 +408,13 @@ class TestDetachedNamedSECP256k1Signatures(UnitTest): cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1) self.assertDictEqual(cert.claim_dict, secp256k1_cert) self.assertRaises(Exception, ClaimDict.load_dict(example_010).sign, secp256k1_private_key, - None, claim_id_1, curve=SECP256k1, name='example', force_detached=True) + None, claim_id_1, curve=SECP256k1, name='example') def test_fail_to_validate_with_no_claim_address(self): cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1) self.assertDictEqual(cert.claim_dict, secp256k1_cert) signed = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_2, claim_id_1, - curve=SECP256k1, name='example', force_detached=True) + curve=SECP256k1, name='example') signed_copy = ClaimDict.load_protobuf(signed.protobuf) self.assertRaises(Exception, signed_copy.validate_signature, None, cert, name='example') @@ -422,19 +422,19 @@ class TestDetachedNamedSECP256k1Signatures(UnitTest): cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1) self.assertDictEqual(cert.claim_dict, secp256k1_cert) signed = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_2, claim_id_1, - curve=SECP256k1, name='example', force_detached=True) + curve=SECP256k1, name='example') signed_copy = ClaimDict.load_protobuf(signed.protobuf) self.assertRaises(Exception, signed_copy.validate_signature, None, cert, name=None) def test_remove_signature_equals_unsigned(self): unsigned = ClaimDict.load_dict(example_010) signed = unsigned.sign(secp256k1_private_key, claim_address_1, claim_id_1, - curve=SECP256k1, name='example', force_detached=True) + curve=SECP256k1, name='example') self.assertEqual(unsigned.serialized, signed.serialized_no_signature) def test_fail_to_validate_fake_ecdsa_signature(self): signed = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_1, claim_id_1, - curve=SECP256k1, name='example', force_detached=True) + curve=SECP256k1, name='example') signed_copy = ClaimDict.deserialize(signed.serialized) fake_key = get_signer(SECP256k1).generate().private_key.to_pem() fake_cert = ClaimDict.generate_certificate(fake_key, curve=SECP256k1) @@ -444,7 +444,7 @@ class TestDetachedNamedSECP256k1Signatures(UnitTest): def test_fail_to_validate_ecdsa_sig_for_altered_claim(self): cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1) altered = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_1, claim_id_1, - curve=SECP256k1, name='example', force_detached=True) + curve=SECP256k1, name='example') original_serialization = altered.serialized sd_hash = altered['stream']['source']['source'] altered['stream']['source']['source'] = sd_hash[::-1]