encode and decode detached signatures from claim dict
This commit is contained in:
parent
0e9888be3b
commit
b707ee4844
4 changed files with 58 additions and 10 deletions
|
@ -75,7 +75,7 @@ class JSONResponseEncoder(JSONEncoder):
|
|||
output['channel_name'] = txo.channel.claim_name
|
||||
try:
|
||||
output['valid_signature'] = claim.validate_signature(
|
||||
txo.get_address(self.ledger), txo.channel.claim
|
||||
txo.get_address(self.ledger), txo.channel.claim, name=txo.claim_name
|
||||
)
|
||||
except BadSignatureError:
|
||||
output['valid_signature'] = False
|
||||
|
|
|
@ -120,7 +120,7 @@ class ClaimDict(OrderedDict):
|
|||
def claim_dict(self):
|
||||
"""Claim dictionary with bytes represented as hex and base58"""
|
||||
|
||||
return dict(encode_fields(self))
|
||||
return dict(encode_fields(self, self.detached_signature))
|
||||
|
||||
@classmethod
|
||||
def load_protobuf_dict(cls, protobuf_dict, detached_signature=None):
|
||||
|
@ -139,9 +139,9 @@ class ClaimDict(OrderedDict):
|
|||
@classmethod
|
||||
def load_dict(cls, claim_dict):
|
||||
"""Load ClaimDict from a dictionary with hex and base58 encoded bytes"""
|
||||
detached_signature = claim_dict.detached_signature if hasattr(claim_dict, 'detached_signature') else None
|
||||
try:
|
||||
return cls.load_protobuf(cls(decode_fields(claim_dict)).protobuf, detached_signature)
|
||||
claim_dict, detached_signature = decode_fields(claim_dict)
|
||||
return cls.load_protobuf(cls(claim_dict).protobuf, detached_signature)
|
||||
except json_format.ParseError as err:
|
||||
raise DecodeError(str(err))
|
||||
|
||||
|
|
|
@ -4,9 +4,10 @@ from lbrynet.schema.address import decode_address, encode_address
|
|||
from lbrynet.schema.schema import CLAIM_TYPES, CLAIM_TYPE, STREAM_TYPE, CERTIFICATE_TYPE
|
||||
from lbrynet.schema.schema import SIGNATURE
|
||||
from lbrynet.schema.error import DecodeError, InvalidAddress
|
||||
from lbrynet.schema.signature import Signature
|
||||
|
||||
|
||||
def encode_fields(claim_dictionary):
|
||||
def encode_fields(claim_dictionary, detached_signature: Signature):
|
||||
"""Encode bytes to hex and b58 for return by ClaimDict"""
|
||||
claim_dictionary = deepcopy(claim_dictionary)
|
||||
claim_type = CLAIM_TYPES[claim_dictionary[CLAIM_TYPE]]
|
||||
|
@ -27,12 +28,19 @@ def encode_fields(claim_dictionary):
|
|||
encoded_cert_id = binascii.hexlify(claim_dictionary[SIGNATURE]['certificateId']).decode()
|
||||
claim_dictionary[SIGNATURE]['signature'] = encoded_sig
|
||||
claim_dictionary[SIGNATURE]['certificateId'] = encoded_cert_id
|
||||
elif detached_signature and detached_signature.raw_signature:
|
||||
claim_dictionary[SIGNATURE] = {
|
||||
'detached_signature': binascii.hexlify(detached_signature.serialized).decode(),
|
||||
'certificateId': binascii.hexlify(detached_signature.certificate_id).decode()
|
||||
}
|
||||
|
||||
claim_dictionary[claim_type] = claim_value
|
||||
return claim_dictionary
|
||||
|
||||
|
||||
def decode_fields(claim_dictionary):
|
||||
"""Decode hex and b58 encoded bytes in dictionaries given to ClaimDict"""
|
||||
detached_signature = None
|
||||
claim_dictionary = deepcopy(claim_dictionary)
|
||||
claim_type = CLAIM_TYPES[claim_dictionary[CLAIM_TYPE]]
|
||||
claim_value = claim_dictionary[claim_type]
|
||||
|
@ -47,13 +55,17 @@ def decode_fields(claim_dictionary):
|
|||
elif claim_type == CLAIM_TYPES[CERTIFICATE_TYPE]:
|
||||
public_key = binascii.unhexlify(claim_value["publicKey"])
|
||||
claim_value["publicKey"] = public_key
|
||||
if SIGNATURE in claim_dictionary:
|
||||
if SIGNATURE in claim_dictionary and not claim_dictionary[SIGNATURE].get('detached_signature'):
|
||||
decoded_sig = binascii.unhexlify(claim_dictionary[SIGNATURE]['signature'])
|
||||
decoded_cert_id = binascii.unhexlify(claim_dictionary[SIGNATURE]['certificateId'])
|
||||
claim_dictionary[SIGNATURE]['signature'] = decoded_sig
|
||||
claim_dictionary[SIGNATURE]['certificateId'] = decoded_cert_id
|
||||
elif claim_dictionary.get(SIGNATURE, {}).get('detached_signature'):
|
||||
hex_detached_signature = claim_dictionary[SIGNATURE]['detached_signature']
|
||||
detached_signature = Signature.flagged_parse(binascii.unhexlify(hex_detached_signature))
|
||||
del claim_dictionary[SIGNATURE]
|
||||
claim_dictionary[claim_type] = claim_value
|
||||
return claim_dictionary
|
||||
return claim_dictionary, detached_signature
|
||||
|
||||
|
||||
def decode_b64_fields(claim_dictionary):
|
||||
|
|
|
@ -336,10 +336,21 @@ class TestDetachedNamedSECP256k1Signatures(UnitTest):
|
|||
self.assertDictEqual(cert.claim_dict, secp256k1_cert)
|
||||
signed = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_2, claim_id_1,
|
||||
curve=SECP256k1, name='example', force_detached=True)
|
||||
#self.assertDictEqual(signed.claim_dict, claim_010_signed_secp256k1)
|
||||
signed_copy = ClaimDict.deserialize(signed.serialized)
|
||||
self.assertEqual(signed_copy.validate_signature(claim_address_2, cert, name='example'), True)
|
||||
|
||||
def test_validate_detached_named_ecdsa_signature_from_dict(self):
|
||||
cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1)
|
||||
self.assertDictEqual(cert.claim_dict, secp256k1_cert)
|
||||
signed = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_2, claim_id_1,
|
||||
curve=SECP256k1, name='example', force_detached=True)
|
||||
self.assertEqual(
|
||||
signed.claim_dict['publisherSignature']['detached_signature'],
|
||||
binascii.hexlify(signed.serialized)
|
||||
)
|
||||
signed_copy = ClaimDict.load_dict(signed.claim_dict)
|
||||
self.assertEqual(signed_copy.validate_signature(claim_address_2, cert, name='example'), True)
|
||||
|
||||
def test_validate_what_cant_be_serialized_back(self):
|
||||
cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1)
|
||||
self.assertDictEqual(cert.claim_dict, secp256k1_cert)
|
||||
|
@ -366,6 +377,33 @@ class TestDetachedNamedSECP256k1Signatures(UnitTest):
|
|||
self.assertEqual(signed_copy.validate_signature(claim_address_2, cert, name='example'), True)
|
||||
self.assertEqual(signed, signed_copy.serialized)
|
||||
|
||||
def test_validate_what_cant_be_serialized_back_even_by_loading_back_from_dictionary(self):
|
||||
cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1)
|
||||
self.assertDictEqual(cert.claim_dict, secp256k1_cert)
|
||||
original = ClaimDict.load_dict(example_010).serialized
|
||||
altered = original + b'\x00\x01\x02\x30\x50\x80\x99' # pretend this extra trash is from some unknown protobuf
|
||||
|
||||
# manually sign
|
||||
signer = get_signer(SECP256k1).load_pem(secp256k1_private_key)
|
||||
signature = signer.sign(
|
||||
b'example',
|
||||
decode_address(claim_address_2),
|
||||
altered,
|
||||
binascii.unhexlify(claim_id_1),
|
||||
)
|
||||
detached_sig = Signature(NAMED_SECP256K1(
|
||||
signature,
|
||||
binascii.unhexlify(claim_id_1),
|
||||
altered
|
||||
))
|
||||
|
||||
signed = detached_sig.serialized
|
||||
self.assertEqual(signed[85:], altered)
|
||||
signed_copy = ClaimDict.deserialize(signed)
|
||||
signed_copy = ClaimDict.load_dict(signed_copy.claim_dict)
|
||||
self.assertEqual(signed_copy.validate_signature(claim_address_2, cert, name='example'), True)
|
||||
self.assertEqual(signed, signed_copy.serialized)
|
||||
|
||||
def test_fail_to_sign_with_no_claim_address(self):
|
||||
cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1)
|
||||
self.assertDictEqual(cert.claim_dict, secp256k1_cert)
|
||||
|
@ -377,7 +415,6 @@ class TestDetachedNamedSECP256k1Signatures(UnitTest):
|
|||
self.assertDictEqual(cert.claim_dict, secp256k1_cert)
|
||||
signed = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_2, claim_id_1,
|
||||
curve=SECP256k1, name='example', force_detached=True)
|
||||
#self.assertDictEqual(signed.claim_dict, claim_010_signed_secp256k1)
|
||||
signed_copy = ClaimDict.load_protobuf(signed.protobuf)
|
||||
self.assertRaises(Exception, signed_copy.validate_signature, None, cert, name='example')
|
||||
|
||||
|
@ -386,7 +423,6 @@ class TestDetachedNamedSECP256k1Signatures(UnitTest):
|
|||
self.assertDictEqual(cert.claim_dict, secp256k1_cert)
|
||||
signed = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_2, claim_id_1,
|
||||
curve=SECP256k1, name='example', force_detached=True)
|
||||
#self.assertDictEqual(signed.claim_dict, claim_010_signed_secp256k1)
|
||||
signed_copy = ClaimDict.load_protobuf(signed.protobuf)
|
||||
self.assertRaises(Exception, signed_copy.validate_signature, None, cert, name=None)
|
||||
|
||||
|
|
Loading…
Reference in a new issue