From abf94357eb6c9c215deb04c4214c11f442ed6c1c Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 9 Jan 2019 12:18:52 -0300 Subject: [PATCH] tests and fixes for sig validation on undecode-able claims --- lbrynet/schema/claim.py | 3 +-- lbrynet/schema/signer.py | 30 ++++++++++++++++------------ lbrynet/schema/validator.py | 4 +++- tests/unit/schema/test_lbryschema.py | 29 ++++++++++++++++++++++++++- 4 files changed, 49 insertions(+), 17 deletions(-) diff --git a/lbrynet/schema/claim.py b/lbrynet/schema/claim.py index f74e44625..a97073eda 100644 --- a/lbrynet/schema/claim.py +++ b/lbrynet/schema/claim.py @@ -39,8 +39,7 @@ class ClaimDict(OrderedDict): @property def serialized(self): """Serialized Claim protobuf""" - current_serialization = self.protobuf.SerializeToString() - if self.detached_signature and current_serialization == self.detached_signature.payload: + if self.detached_signature and self.detached_signature.payload: return self.detached_signature.serialized return self.protobuf.SerializeToString() diff --git a/lbrynet/schema/signer.py b/lbrynet/schema/signer.py index ae9c0d97d..2daac0dbd 100644 --- a/lbrynet/schema/signer.py +++ b/lbrynet/schema/signer.py @@ -46,25 +46,29 @@ class NIST_ECDSASigner(object): def generate(cls): return cls(ecdsa.SigningKey.generate(curve=cls.CURVE, hashfunc=cls.HASHFUNC_NAME)) - def sign_stream_claim(self, claim, claim_address, cert_claim_id, name, detached=False): - to_sign = bytearray() - if detached: - assert name, "Name is required for detached signatures" - assert self.CURVE_NAME == SECP256k1, f"Only SECP256k1 is supported, not: {self.CURVE_NAME}" - to_sign.extend(name.lower().encode()) + def sign(self, *fields): + digest = self.HASHFUNC(bytearray(b''.join(fields))).digest() + return self.private_key.sign_digest_deterministic(digest, hashfunc=self.HASHFUNC) + def sign_stream_claim(self, claim, claim_address, cert_claim_id, name, detached=False): validate_claim_id(cert_claim_id) raw_cert_id = binascii.unhexlify(cert_claim_id) decoded_addr = decode_address(claim_address) + if detached: + assert name, "Name is required for detached signatures" + assert self.CURVE_NAME == SECP256k1, f"Only SECP256k1 is supported, not: {self.CURVE_NAME}" + signature = self.sign( + name.lower().encode(), + decoded_addr, + claim.serialized_no_signature, + raw_cert_id, + ) + else: + signature = self.sign(decoded_addr, claim.serialized_no_signature, raw_cert_id) - to_sign.extend(decoded_addr) - to_sign.extend(claim.serialized_no_signature) - to_sign.extend(raw_cert_id) - - digest = self.HASHFUNC(to_sign).digest() if detached: return Claim.load(decode_b64_fields(claim.protobuf_dict)), Signature(NAMED_SECP256K1( - self.private_key.sign_digest_deterministic(digest, hashfunc=self.HASHFUNC), + signature, raw_cert_id, claim.serialized_no_signature )) @@ -75,7 +79,7 @@ class NIST_ECDSASigner(object): sig_dict = { "version": V_0_0_1, "signatureType": self.CURVE_NAME, - "signature": self.private_key.sign_digest_deterministic(digest, hashfunc=self.HASHFUNC), + "signature": signature, "certificateId": raw_cert_id } diff --git a/lbrynet/schema/validator.py b/lbrynet/schema/validator.py index dead162df..751f2a052 100644 --- a/lbrynet/schema/validator.py +++ b/lbrynet/schema/validator.py @@ -82,9 +82,11 @@ class Validator: assert name is not None, "Name is required for verifying detached signatures." to_sign.extend(name.lower().encode()) signature = claim.detached_signature.raw_signature + payload = claim.detached_signature.payload else: # extract and serialize the stream from the claim, then check the signature signature = binascii.unhexlify(claim.signature) + payload = claim.serialized_no_signature decoded_address = decode_address(claim_address) @@ -92,7 +94,7 @@ class Validator: raise Exception("No signature to validate") to_sign.extend(decoded_address) - to_sign.extend(claim.serialized_no_signature) + to_sign.extend(payload) to_sign.extend(binascii.unhexlify(self.certificate_claim_id)) return self.validate_signature(self.HASHFUNC(to_sign).digest(), signature) diff --git a/tests/unit/schema/test_lbryschema.py b/tests/unit/schema/test_lbryschema.py index 7d78db2c6..841806333 100644 --- a/tests/unit/schema/test_lbryschema.py +++ b/tests/unit/schema/test_lbryschema.py @@ -6,6 +6,7 @@ import binascii from copy import deepcopy import unittest +from lbrynet.schema.signature import Signature, NAMED_SECP256K1 from .test_data import example_003, example_010, example_010_serialized from .test_data import claim_id_1, claim_address_1, claim_address_2 from .test_data import binary_claim, expected_binary_claim_decoded @@ -339,6 +340,32 @@ class TestDetachedNamedSECP256k1Signatures(UnitTest): signed_copy = ClaimDict.deserialize(signed.serialized) self.assertEqual(signed_copy.validate_signature(claim_address_2, cert, name='example'), True) + def test_validate_what_cant_be_serialized_back(self): + cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1) + self.assertDictEqual(cert.claim_dict, secp256k1_cert) + original = ClaimDict.load_dict(example_010).serialized + altered = original + b'\x00\x01\x02\x30\x50\x80\x99' # pretend this extra trash is from some unknown protobuf + + # manually sign + signer = get_signer(SECP256k1).load_pem(secp256k1_private_key) + signature = signer.sign( + b'example', + decode_address(claim_address_2), + altered, + binascii.unhexlify(claim_id_1), + ) + detached_sig = Signature(NAMED_SECP256K1( + signature, + binascii.unhexlify(claim_id_1), + altered + )) + + signed = detached_sig.serialized + self.assertEqual(signed[85:], altered) + signed_copy = ClaimDict.deserialize(signed) + self.assertEqual(signed_copy.validate_signature(claim_address_2, cert, name='example'), True) + self.assertEqual(signed, signed_copy.serialized) + def test_fail_to_sign_with_no_claim_address(self): cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1) self.assertDictEqual(cert.claim_dict, secp256k1_cert) @@ -385,7 +412,7 @@ class TestDetachedNamedSECP256k1Signatures(UnitTest): original_serialization = altered.serialized sd_hash = altered['stream']['source']['source'] altered['stream']['source']['source'] = sd_hash[::-1] - altered_serialization = altered.serialized + altered_serialization = altered.protobuf.SerializeToString() # keep signature, but replace serialization with the altered claim (check signature.py for slice sizes) altered_copy = ClaimDict.deserialize(original_serialization[:85] + altered_serialization)