diff --git a/lbrynet/extras/daemon/json_response_encoder.py b/lbrynet/extras/daemon/json_response_encoder.py index e020644a9..494b9e92f 100644 --- a/lbrynet/extras/daemon/json_response_encoder.py +++ b/lbrynet/extras/daemon/json_response_encoder.py @@ -75,7 +75,7 @@ class JSONResponseEncoder(JSONEncoder): output['channel_name'] = txo.channel.claim_name try: output['valid_signature'] = claim.validate_signature( - txo.get_address(self.ledger), txo.channel.claim + txo.get_address(self.ledger), txo.channel.claim, name=txo.claim_name ) except BadSignatureError: output['valid_signature'] = False diff --git a/lbrynet/schema/claim.py b/lbrynet/schema/claim.py index 49922b26a..faf4dbc79 100644 --- a/lbrynet/schema/claim.py +++ b/lbrynet/schema/claim.py @@ -120,7 +120,7 @@ class ClaimDict(OrderedDict): def claim_dict(self): """Claim dictionary with bytes represented as hex and base58""" - return dict(encode_fields(self)) + return dict(encode_fields(self, self.detached_signature)) @classmethod def load_protobuf_dict(cls, protobuf_dict, detached_signature=None): @@ -139,9 +139,9 @@ class ClaimDict(OrderedDict): @classmethod def load_dict(cls, claim_dict): """Load ClaimDict from a dictionary with hex and base58 encoded bytes""" - detached_signature = claim_dict.detached_signature if hasattr(claim_dict, 'detached_signature') else None try: - return cls.load_protobuf(cls(decode_fields(claim_dict)).protobuf, detached_signature) + claim_dict, detached_signature = decode_fields(claim_dict) + return cls.load_protobuf(cls(claim_dict).protobuf, detached_signature) except json_format.ParseError as err: raise DecodeError(str(err)) diff --git a/lbrynet/schema/encoding.py b/lbrynet/schema/encoding.py index 9d8aea4a4..33c3eb30d 100644 --- a/lbrynet/schema/encoding.py +++ b/lbrynet/schema/encoding.py @@ -4,9 +4,10 @@ from lbrynet.schema.address import decode_address, encode_address from lbrynet.schema.schema import CLAIM_TYPES, CLAIM_TYPE, STREAM_TYPE, CERTIFICATE_TYPE from lbrynet.schema.schema import SIGNATURE from lbrynet.schema.error import DecodeError, InvalidAddress +from lbrynet.schema.signature import Signature -def encode_fields(claim_dictionary): +def encode_fields(claim_dictionary, detached_signature: Signature): """Encode bytes to hex and b58 for return by ClaimDict""" claim_dictionary = deepcopy(claim_dictionary) claim_type = CLAIM_TYPES[claim_dictionary[CLAIM_TYPE]] @@ -27,12 +28,19 @@ def encode_fields(claim_dictionary): encoded_cert_id = binascii.hexlify(claim_dictionary[SIGNATURE]['certificateId']).decode() claim_dictionary[SIGNATURE]['signature'] = encoded_sig claim_dictionary[SIGNATURE]['certificateId'] = encoded_cert_id + elif detached_signature and detached_signature.raw_signature: + claim_dictionary[SIGNATURE] = { + 'detached_signature': binascii.hexlify(detached_signature.serialized).decode(), + 'certificateId': binascii.hexlify(detached_signature.certificate_id).decode() + } + claim_dictionary[claim_type] = claim_value return claim_dictionary def decode_fields(claim_dictionary): """Decode hex and b58 encoded bytes in dictionaries given to ClaimDict""" + detached_signature = None claim_dictionary = deepcopy(claim_dictionary) claim_type = CLAIM_TYPES[claim_dictionary[CLAIM_TYPE]] claim_value = claim_dictionary[claim_type] @@ -47,13 +55,17 @@ def decode_fields(claim_dictionary): elif claim_type == CLAIM_TYPES[CERTIFICATE_TYPE]: public_key = binascii.unhexlify(claim_value["publicKey"]) claim_value["publicKey"] = public_key - if SIGNATURE in claim_dictionary: + if SIGNATURE in claim_dictionary and not claim_dictionary[SIGNATURE].get('detached_signature'): decoded_sig = binascii.unhexlify(claim_dictionary[SIGNATURE]['signature']) decoded_cert_id = binascii.unhexlify(claim_dictionary[SIGNATURE]['certificateId']) claim_dictionary[SIGNATURE]['signature'] = decoded_sig claim_dictionary[SIGNATURE]['certificateId'] = decoded_cert_id + elif claim_dictionary.get(SIGNATURE, {}).get('detached_signature'): + hex_detached_signature = claim_dictionary[SIGNATURE]['detached_signature'] + detached_signature = Signature.flagged_parse(binascii.unhexlify(hex_detached_signature)) + del claim_dictionary[SIGNATURE] claim_dictionary[claim_type] = claim_value - return claim_dictionary + return claim_dictionary, detached_signature def decode_b64_fields(claim_dictionary): diff --git a/tests/unit/schema/test_lbryschema.py b/tests/unit/schema/test_lbryschema.py index 841806333..b61f85f99 100644 --- a/tests/unit/schema/test_lbryschema.py +++ b/tests/unit/schema/test_lbryschema.py @@ -336,10 +336,21 @@ class TestDetachedNamedSECP256k1Signatures(UnitTest): self.assertDictEqual(cert.claim_dict, secp256k1_cert) signed = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_2, claim_id_1, curve=SECP256k1, name='example', force_detached=True) - #self.assertDictEqual(signed.claim_dict, claim_010_signed_secp256k1) signed_copy = ClaimDict.deserialize(signed.serialized) self.assertEqual(signed_copy.validate_signature(claim_address_2, cert, name='example'), True) + def test_validate_detached_named_ecdsa_signature_from_dict(self): + cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1) + self.assertDictEqual(cert.claim_dict, secp256k1_cert) + signed = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_2, claim_id_1, + curve=SECP256k1, name='example', force_detached=True) + self.assertEqual( + signed.claim_dict['publisherSignature']['detached_signature'], + binascii.hexlify(signed.serialized) + ) + signed_copy = ClaimDict.load_dict(signed.claim_dict) + self.assertEqual(signed_copy.validate_signature(claim_address_2, cert, name='example'), True) + def test_validate_what_cant_be_serialized_back(self): cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1) self.assertDictEqual(cert.claim_dict, secp256k1_cert) @@ -366,6 +377,33 @@ class TestDetachedNamedSECP256k1Signatures(UnitTest): self.assertEqual(signed_copy.validate_signature(claim_address_2, cert, name='example'), True) self.assertEqual(signed, signed_copy.serialized) + def test_validate_what_cant_be_serialized_back_even_by_loading_back_from_dictionary(self): + cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1) + self.assertDictEqual(cert.claim_dict, secp256k1_cert) + original = ClaimDict.load_dict(example_010).serialized + altered = original + b'\x00\x01\x02\x30\x50\x80\x99' # pretend this extra trash is from some unknown protobuf + + # manually sign + signer = get_signer(SECP256k1).load_pem(secp256k1_private_key) + signature = signer.sign( + b'example', + decode_address(claim_address_2), + altered, + binascii.unhexlify(claim_id_1), + ) + detached_sig = Signature(NAMED_SECP256K1( + signature, + binascii.unhexlify(claim_id_1), + altered + )) + + signed = detached_sig.serialized + self.assertEqual(signed[85:], altered) + signed_copy = ClaimDict.deserialize(signed) + signed_copy = ClaimDict.load_dict(signed_copy.claim_dict) + self.assertEqual(signed_copy.validate_signature(claim_address_2, cert, name='example'), True) + self.assertEqual(signed, signed_copy.serialized) + def test_fail_to_sign_with_no_claim_address(self): cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1) self.assertDictEqual(cert.claim_dict, secp256k1_cert) @@ -377,7 +415,6 @@ class TestDetachedNamedSECP256k1Signatures(UnitTest): self.assertDictEqual(cert.claim_dict, secp256k1_cert) signed = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_2, claim_id_1, curve=SECP256k1, name='example', force_detached=True) - #self.assertDictEqual(signed.claim_dict, claim_010_signed_secp256k1) signed_copy = ClaimDict.load_protobuf(signed.protobuf) self.assertRaises(Exception, signed_copy.validate_signature, None, cert, name='example') @@ -386,7 +423,6 @@ class TestDetachedNamedSECP256k1Signatures(UnitTest): self.assertDictEqual(cert.claim_dict, secp256k1_cert) signed = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_2, claim_id_1, curve=SECP256k1, name='example', force_detached=True) - #self.assertDictEqual(signed.claim_dict, claim_010_signed_secp256k1) signed_copy = ClaimDict.load_protobuf(signed.protobuf) self.assertRaises(Exception, signed_copy.validate_signature, None, cert, name=None)