store original payload for signing and verifying

This commit is contained in:
Victor Shyba 2019-01-05 06:02:43 -03:00 committed by Lex Berezhny
parent d7ebf50602
commit 36aded3830
6 changed files with 53 additions and 26 deletions

View file

@ -39,8 +39,9 @@ class ClaimDict(OrderedDict):
@property
def serialized(self):
"""Serialized Claim protobuf"""
if self.detached_signature:
return self.detached_signature.serialized + self.protobuf.SerializeToString()
current_serialization = self.protobuf.SerializeToString()
if self.detached_signature and current_serialization == self.detached_signature.payload:
return self.detached_signature.serialized
return self.protobuf.SerializeToString()
@property
@ -94,7 +95,7 @@ class ClaimDict(OrderedDict):
@property
def certificate_id(self):
if self.detached_signature:
if self.detached_signature and self.detached_signature.certificate_id:
return binascii.hexlify(self.detached_signature.certificate_id)
if not self.has_signature:
return None
@ -150,11 +151,11 @@ class ClaimDict(OrderedDict):
@classmethod
def deserialize(cls, serialized):
"""Load a ClaimDict from a serialized protobuf string"""
serialized, detached_signature = Signature.flagged_parse(serialized)
detached_signature = Signature.flagged_parse(serialized)
temp_claim = claim_pb2.Claim()
try:
temp_claim.ParseFromString(serialized)
temp_claim.ParseFromString(detached_signature.payload)
except DecodeError_pb:
raise DecodeError(DecodeError_pb)
return cls.load_protobuf(temp_claim, detached_signature=detached_signature)

View file

@ -1,24 +1,43 @@
# Flags
LEGACY = 0x80 # Everything is contained in the protobuf.
NAMED_SECP256K1 = 0x01 # ECDSA SECP256k1 64 bytes. Claim name is also signed.
from collections import namedtuple
LEGACY = namedtuple('Legacy', 'payload')
NAMED_SECP256K1 = namedtuple('NamedSECP256k1', 'raw_signature certificate_id payload')
FLAGS = {
LEGACY: 0x80,
NAMED_SECP256K1: 0x01
}
class Signature:
def __init__(self, raw_signature: bytes, certificate_id: bytes, flag: int=NAMED_SECP256K1):
self.flag = flag
assert len(raw_signature) == 64, f"signature must be 64 bytes, not: {len(raw_signature)}"
self.raw_signature = raw_signature
assert len(certificate_id) == 20, f"certificate_id must be 20 bytes, not: {len(certificate_id)}"
self.certificate_id = certificate_id
def __init__(self, data: namedtuple):
assert isinstance(data, (LEGACY, NAMED_SECP256K1))
self.data = data
@property
def payload(self):
return self.data.payload
@property
def certificate_id(self):
if type(self.data) == NAMED_SECP256K1:
return self.data.certificate_id
@property
def raw_signature(self):
if type(self.data) == NAMED_SECP256K1:
return self.data.raw_signature
@classmethod
def flagged_parse(cls, binary: bytes):
if binary[0] == NAMED_SECP256K1:
return binary[85:], cls(binary[1:65], binary[65:85], NAMED_SECP256K1)
flag = binary[0]
if flag == FLAGS[NAMED_SECP256K1]:
return cls(NAMED_SECP256K1(binary[1:65], binary[65:85], binary[85:]))
else:
return binary, None
return cls(LEGACY(binary))
@property
def serialized(self):
return (bytes([self.flag]) + self.raw_signature + self.certificate_id) if self.flag != LEGACY else b''
if isinstance(self.data, NAMED_SECP256K1):
return (bytes([FLAGS[type(self.data)]]) + self.data.raw_signature + self.data.certificate_id + self.payload)
elif isinstance(self.data, LEGACY):
return self.payload

View file

@ -5,7 +5,7 @@ from lbrynet.schema.address import decode_address
from lbrynet.schema.encoding import decode_b64_fields
from lbrynet.schema.schema.certificate import Certificate
from lbrynet.schema.schema.claim import Claim
from lbrynet.schema.signature import Signature
from lbrynet.schema.signature import Signature, NAMED_SECP256K1
from lbrynet.schema.validator import validate_claim_id
from lbrynet.schema.schema import V_0_0_1, CLAIM_TYPE, CLAIM_TYPES, CERTIFICATE_TYPE, VERSION
from lbrynet.schema.schema import NIST256p, NIST384p, SECP256k1, SHA256, SHA384
@ -63,9 +63,11 @@ class NIST_ECDSASigner(object):
digest = self.HASHFUNC(to_sign).digest()
if detached:
return Claim.load(decode_b64_fields(claim.protobuf_dict)), Signature(
self.private_key.sign_digest_deterministic(digest, hashfunc=self.HASHFUNC), raw_cert_id
)
return Claim.load(decode_b64_fields(claim.protobuf_dict)), Signature(NAMED_SECP256K1(
self.private_key.sign_digest_deterministic(digest, hashfunc=self.HASHFUNC),
raw_cert_id,
claim.serialized_no_signature
))
# -- Legacy signer (signature inside protobuf) --
if not isinstance(self.private_key, ecdsa.SigningKey):
@ -83,7 +85,8 @@ class NIST_ECDSASigner(object):
"publisherSignature": sig_dict
}
return Claim.load(msg), None
proto = Claim.load(msg)
return proto, Signature.flagged_parse(proto.SerializeToString())
class NIST256pSigner(NIST_ECDSASigner):

View file

@ -78,7 +78,7 @@ class Validator:
def validate_claim_signature(self, claim, claim_address, name):
to_sign = bytearray()
if claim.detached_signature:
if claim.detached_signature and claim.detached_signature.raw_signature:
assert name is not None, "Name is required for verifying detached signatures."
to_sign.extend(name.lower().encode())
signature = claim.detached_signature.raw_signature

View file

@ -112,7 +112,7 @@ class BasicTransactionTest(IntegrationTestCase):
cert, key = generate_certificate()
cert_tx = await Transaction.claim('@bar', cert, l2d('1.0'), address1, [self.account], self.account)
claim = ClaimDict.load_dict(example_claim_dict)
claim = claim.sign(key, address1, cert_tx.outputs[0].claim_id, name='foo', curve=SECP256k1)
claim = claim.sign(key, address1, cert_tx.outputs[0].claim_id, name='foo', curve=SECP256k1, force_detached=True)
claim_tx = await Transaction.claim('foo', claim, l2d('1.0'), address1, [self.account], self.account)
await self.broadcast(cert_tx)

View file

@ -382,9 +382,13 @@ class TestDetachedNamedSECP256k1Signatures(UnitTest):
cert = ClaimDict.generate_certificate(secp256k1_private_key, curve=SECP256k1)
altered = ClaimDict.load_dict(example_010).sign(secp256k1_private_key, claim_address_1, claim_id_1,
curve=SECP256k1, name='example', force_detached=True)
original_serialization = altered.serialized
sd_hash = altered['stream']['source']['source']
altered['stream']['source']['source'] = sd_hash[::-1]
altered_copy = ClaimDict.deserialize(altered.serialized)
altered_serialization = altered.serialized
# keep signature, but replace serialization with the altered claim (check signature.py for slice sizes)
altered_copy = ClaimDict.deserialize(original_serialization[:85] + altered_serialization)
self.assertRaises(ecdsa.keys.BadSignatureError, altered_copy.validate_signature,
claim_address_1, cert, 'example')