forked from LBRYCommunity/lbry-sdk
tests and fixes for sig validation on undecode-able claims
This commit is contained in:
parent
36aded3830
commit
abf94357eb
4 changed files with 49 additions and 17 deletions
|
@ -39,8 +39,7 @@ class ClaimDict(OrderedDict):
|
||||||
@property
|
@property
|
||||||
def serialized(self):
|
def serialized(self):
|
||||||
"""Serialized Claim protobuf"""
|
"""Serialized Claim protobuf"""
|
||||||
current_serialization = self.protobuf.SerializeToString()
|
if self.detached_signature and self.detached_signature.payload:
|
||||||
if self.detached_signature and current_serialization == self.detached_signature.payload:
|
|
||||||
return self.detached_signature.serialized
|
return self.detached_signature.serialized
|
||||||
return self.protobuf.SerializeToString()
|
return self.protobuf.SerializeToString()
|
||||||
|
|
||||||
|
|
|
@ -46,25 +46,29 @@ class NIST_ECDSASigner(object):
|
||||||
def generate(cls):
|
def generate(cls):
|
||||||
return cls(ecdsa.SigningKey.generate(curve=cls.CURVE, hashfunc=cls.HASHFUNC_NAME))
|
return cls(ecdsa.SigningKey.generate(curve=cls.CURVE, hashfunc=cls.HASHFUNC_NAME))
|
||||||
|
|
||||||
def sign_stream_claim(self, claim, claim_address, cert_claim_id, name, detached=False):
|
def sign(self, *fields):
|
||||||
to_sign = bytearray()
|
digest = self.HASHFUNC(bytearray(b''.join(fields))).digest()
|
||||||
if detached:
|
return self.private_key.sign_digest_deterministic(digest, hashfunc=self.HASHFUNC)
|
||||||
assert name, "Name is required for detached signatures"
|
|
||||||
assert self.CURVE_NAME == SECP256k1, f"Only SECP256k1 is supported, not: {self.CURVE_NAME}"
|
|
||||||
to_sign.extend(name.lower().encode())
|
|
||||||
|
|
||||||
|
def sign_stream_claim(self, claim, claim_address, cert_claim_id, name, detached=False):
|
||||||
validate_claim_id(cert_claim_id)
|
validate_claim_id(cert_claim_id)
|
||||||
raw_cert_id = binascii.unhexlify(cert_claim_id)
|
raw_cert_id = binascii.unhexlify(cert_claim_id)
|
||||||
decoded_addr = decode_address(claim_address)
|
decoded_addr = decode_address(claim_address)
|
||||||
|
if detached:
|
||||||
|
assert name, "Name is required for detached signatures"
|
||||||
|
assert self.CURVE_NAME == SECP256k1, f"Only SECP256k1 is supported, not: {self.CURVE_NAME}"
|
||||||
|
signature = self.sign(
|
||||||
|
name.lower().encode(),
|
||||||
|
decoded_addr,
|
||||||
|
claim.serialized_no_signature,
|
||||||
|
raw_cert_id,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
signature = self.sign(decoded_addr, claim.serialized_no_signature, raw_cert_id)
|
||||||
|
|
||||||
to_sign.extend(decoded_addr)
|
|
||||||
to_sign.extend(claim.serialized_no_signature)
|
|
||||||
to_sign.extend(raw_cert_id)
|
|
||||||
|
|
||||||
digest = self.HASHFUNC(to_sign).digest()
|
|
||||||
if detached:
|
if detached:
|
||||||
return Claim.load(decode_b64_fields(claim.protobuf_dict)), Signature(NAMED_SECP256K1(
|
return Claim.load(decode_b64_fields(claim.protobuf_dict)), Signature(NAMED_SECP256K1(
|
||||||
self.private_key.sign_digest_deterministic(digest, hashfunc=self.HASHFUNC),
|
signature,
|
||||||
raw_cert_id,
|
raw_cert_id,
|
||||||
claim.serialized_no_signature
|
claim.serialized_no_signature
|
||||||
))
|
))
|
||||||
|
@ -75,7 +79,7 @@ class NIST_ECDSASigner(object):
|
||||||
sig_dict = {
|
sig_dict = {
|
||||||
"version": V_0_0_1,
|
"version": V_0_0_1,
|
||||||
"signatureType": self.CURVE_NAME,
|
"signatureType": self.CURVE_NAME,
|
||||||
"signature": self.private_key.sign_digest_deterministic(digest, hashfunc=self.HASHFUNC),
|
"signature": signature,
|
||||||
"certificateId": raw_cert_id
|
"certificateId": raw_cert_id
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -82,9 +82,11 @@ class Validator:
|
||||||
assert name is not None, "Name is required for verifying detached signatures."
|
assert name is not None, "Name is required for verifying detached signatures."
|
||||||
to_sign.extend(name.lower().encode())
|
to_sign.extend(name.lower().encode())
|
||||||
signature = claim.detached_signature.raw_signature
|
signature = claim.detached_signature.raw_signature
|
||||||
|
payload = claim.detached_signature.payload
|
||||||
else:
|
else:
|
||||||
# extract and serialize the stream from the claim, then check the signature
|
# extract and serialize the stream from the claim, then check the signature
|
||||||
signature = binascii.unhexlify(claim.signature)
|
signature = binascii.unhexlify(claim.signature)
|
||||||
|
payload = claim.serialized_no_signature
|
||||||
decoded_address = decode_address(claim_address)
|
decoded_address = decode_address(claim_address)
|
||||||
|
|
||||||
|
|
||||||
|
@ -92,7 +94,7 @@ class Validator:
|
||||||
raise Exception("No signature to validate")
|
raise Exception("No signature to validate")
|
||||||
|
|
||||||
to_sign.extend(decoded_address)
|
to_sign.extend(decoded_address)
|
||||||
to_sign.extend(claim.serialized_no_signature)
|
to_sign.extend(payload)
|
||||||
to_sign.extend(binascii.unhexlify(self.certificate_claim_id))
|
to_sign.extend(binascii.unhexlify(self.certificate_claim_id))
|
||||||
|
|
||||||
return self.validate_signature(self.HASHFUNC(to_sign).digest(), signature)
|
return self.validate_signature(self.HASHFUNC(to_sign).digest(), signature)
|
||||||
|
|
|
@ -6,6 +6,7 @@ import binascii
|
||||||
from copy import deepcopy
|
from copy import deepcopy
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
|
from lbrynet.schema.signature import Signature, NAMED_SECP256K1
|
||||||
from .test_data import example_003, example_010, example_010_serialized
|
from .test_data import example_003, example_010, example_010_serialized
|
||||||
from .test_data import claim_id_1, claim_address_1, claim_address_2
|
from .test_data import claim_id_1, claim_address_1, claim_address_2
|
||||||
from .test_data import binary_claim, expected_binary_claim_decoded
|
from .test_data import binary_claim, expected_binary_claim_decoded
|
||||||
|
@ -339,6 +340,32 @@ class TestDetachedNamedSECP256k1Signatures(UnitTest):
|
||||||
signed_copy = ClaimDict.deserialize(signed.serialized)
|
signed_copy = ClaimDict.deserialize(signed.serialized)
|
||||||
self.assertEqual(signed_copy.validate_signature(claim_address_2, cert, name='example'), True)
|
self.assertEqual(signed_copy.validate_signature(claim_address_2, cert, name='example'), True)
|
||||||
|
|
||||||
|
def test_validate_what_cant_be_serialized_back(self):
|
||||||
|
cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1)
|
||||||
|
self.assertDictEqual(cert.claim_dict, secp256k1_cert)
|
||||||
|
original = ClaimDict.load_dict(example_010).serialized
|
||||||
|
altered = original + b'\x00\x01\x02\x30\x50\x80\x99' # pretend this extra trash is from some unknown protobuf
|
||||||
|
|
||||||
|
# manually sign
|
||||||
|
signer = get_signer(SECP256k1).load_pem(secp256k1_private_key)
|
||||||
|
signature = signer.sign(
|
||||||
|
b'example',
|
||||||
|
decode_address(claim_address_2),
|
||||||
|
altered,
|
||||||
|
binascii.unhexlify(claim_id_1),
|
||||||
|
)
|
||||||
|
detached_sig = Signature(NAMED_SECP256K1(
|
||||||
|
signature,
|
||||||
|
binascii.unhexlify(claim_id_1),
|
||||||
|
altered
|
||||||
|
))
|
||||||
|
|
||||||
|
signed = detached_sig.serialized
|
||||||
|
self.assertEqual(signed[85:], altered)
|
||||||
|
signed_copy = ClaimDict.deserialize(signed)
|
||||||
|
self.assertEqual(signed_copy.validate_signature(claim_address_2, cert, name='example'), True)
|
||||||
|
self.assertEqual(signed, signed_copy.serialized)
|
||||||
|
|
||||||
def test_fail_to_sign_with_no_claim_address(self):
|
def test_fail_to_sign_with_no_claim_address(self):
|
||||||
cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1)
|
cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1)
|
||||||
self.assertDictEqual(cert.claim_dict, secp256k1_cert)
|
self.assertDictEqual(cert.claim_dict, secp256k1_cert)
|
||||||
|
@ -385,7 +412,7 @@ class TestDetachedNamedSECP256k1Signatures(UnitTest):
|
||||||
original_serialization = altered.serialized
|
original_serialization = altered.serialized
|
||||||
sd_hash = altered['stream']['source']['source']
|
sd_hash = altered['stream']['source']['source']
|
||||||
altered['stream']['source']['source'] = sd_hash[::-1]
|
altered['stream']['source']['source'] = sd_hash[::-1]
|
||||||
altered_serialization = altered.serialized
|
altered_serialization = altered.protobuf.SerializeToString()
|
||||||
|
|
||||||
# keep signature, but replace serialization with the altered claim (check signature.py for slice sizes)
|
# keep signature, but replace serialization with the altered claim (check signature.py for slice sizes)
|
||||||
altered_copy = ClaimDict.deserialize(original_serialization[:85] + altered_serialization)
|
altered_copy = ClaimDict.deserialize(original_serialization[:85] + altered_serialization)
|
||||||
|
|
Loading…
Add table
Reference in a new issue