adds a detached signature attribute to ClaimDict

This commit is contained in:
Victor Shyba 2018-12-15 03:25:46 -03:00 committed by Lex Berezhny
parent f8ed605da2
commit fe97f7a119

View file

@ -7,19 +7,21 @@ from collections import OrderedDict
from lbrynet.schema.schema.claim import Claim from lbrynet.schema.schema.claim import Claim
from lbrynet.schema.proto import claim_pb2 from lbrynet.schema.proto import claim_pb2
from lbrynet.schema.signature import Signature
from lbrynet.schema.validator import get_validator from lbrynet.schema.validator import get_validator
from lbrynet.schema.signer import get_signer from lbrynet.schema.signer import get_signer
from lbrynet.schema.schema import NIST256p, CURVE_NAMES, CLAIM_TYPE_NAMES from lbrynet.schema.schema import NIST256p, CURVE_NAMES, CLAIM_TYPE_NAMES, SECP256k1
from lbrynet.schema.encoding import decode_fields, decode_b64_fields, encode_fields from lbrynet.schema.encoding import decode_fields, decode_b64_fields, encode_fields
from lbrynet.schema.error import DecodeError from lbrynet.schema.error import DecodeError
from lbrynet.schema.fee import Fee from lbrynet.schema.fee import Fee
class ClaimDict(OrderedDict): class ClaimDict(OrderedDict):
def __init__(self, claim_dict=None): def __init__(self, claim_dict=None, detached_signature: Signature=None):
if isinstance(claim_dict, claim_pb2.Claim): if isinstance(claim_dict, claim_pb2.Claim):
raise Exception("To initialize %s with a Claim protobuf use %s.load_protobuf" % raise Exception("To initialize %s with a Claim protobuf use %s.load_protobuf" %
(self.__class__.__name__, self.__class__.__name__)) (self.__class__.__name__, self.__class__.__name__))
self.detached_signature = detached_signature
OrderedDict.__init__(self, claim_dict or []) OrderedDict.__init__(self, claim_dict or [])
@property @property
@ -37,7 +39,8 @@ class ClaimDict(OrderedDict):
@property @property
def serialized(self): def serialized(self):
"""Serialized Claim protobuf""" """Serialized Claim protobuf"""
if self.detached_signature:
return self.detached_signature.serialized + self.protobuf.SerializeToString()
return self.protobuf.SerializeToString() return self.protobuf.SerializeToString()
@property @property
@ -91,6 +94,8 @@ class ClaimDict(OrderedDict):
@property @property
def certificate_id(self): def certificate_id(self):
if self.detached_signature:
return binascii.hexlify(self.detached_signature.certificate_id)
if not self.has_signature: if not self.has_signature:
return None return None
return binascii.hexlify(self.protobuf.publisherSignature.certificateId) return binascii.hexlify(self.protobuf.publisherSignature.certificateId)
@ -120,53 +125,61 @@ class ClaimDict(OrderedDict):
return dict(encode_fields(self)) return dict(encode_fields(self))
@classmethod @classmethod
def load_protobuf_dict(cls, protobuf_dict): def load_protobuf_dict(cls, protobuf_dict, detached_signature=None):
""" """
Load a ClaimDict from a dictionary with base64 encoded bytes Load a ClaimDict from a dictionary with base64 encoded bytes
(as returned by the protobuf json formatter) (as returned by the protobuf json formatter)
""" """
return cls(decode_b64_fields(protobuf_dict)) return cls(decode_b64_fields(protobuf_dict), detached_signature=detached_signature)
@classmethod @classmethod
def load_protobuf(cls, protobuf_claim): def load_protobuf(cls, protobuf_claim, detached_signature=None):
"""Load ClaimDict from a protobuf Claim message""" """Load ClaimDict from a protobuf Claim message"""
return cls.load_protobuf_dict(json.loads(json_format.MessageToJson(protobuf_claim, True))) return cls.load_protobuf_dict(json.loads(json_format.MessageToJson(protobuf_claim, True)), detached_signature)
@classmethod @classmethod
def load_dict(cls, claim_dict): def load_dict(cls, claim_dict):
"""Load ClaimDict from a dictionary with hex and base58 encoded bytes""" """Load ClaimDict from a dictionary with hex and base58 encoded bytes"""
detached_signature = claim_dict.detached_signature if hasattr(claim_dict, 'detached_signature') else None
try: try:
return cls.load_protobuf(cls(decode_fields(claim_dict)).protobuf) return cls.load_protobuf(cls(decode_fields(claim_dict)).protobuf, detached_signature)
except json_format.ParseError as err: except json_format.ParseError as err:
raise DecodeError(str(err)) raise DecodeError(str(err))
@classmethod @classmethod
def deserialize(cls, serialized): def deserialize(cls, serialized):
"""Load a ClaimDict from a serialized protobuf string""" """Load a ClaimDict from a serialized protobuf string"""
serialized, detached_signature = Signature.flagged_parse(serialized)
temp_claim = claim_pb2.Claim() temp_claim = claim_pb2.Claim()
try: try:
temp_claim.ParseFromString(serialized) temp_claim.ParseFromString(serialized)
except DecodeError_pb: except DecodeError_pb:
raise DecodeError(DecodeError_pb) raise DecodeError(DecodeError_pb)
return cls.load_protobuf(temp_claim) return cls.load_protobuf(temp_claim, detached_signature=detached_signature)
@classmethod @classmethod
def generate_certificate(cls, private_key, curve=NIST256p): def generate_certificate(cls, private_key, curve=NIST256p):
signer = get_signer(curve).load_pem(private_key) signer = get_signer(curve).load_pem(private_key)
return cls.load_protobuf(signer.certificate) return cls.load_protobuf(signer.certificate)
def sign(self, private_key, claim_address, cert_claim_id, curve=NIST256p): def sign(self, private_key, claim_address, cert_claim_id, curve=NIST256p, name=None):
signer = get_signer(curve).load_pem(private_key) signer = get_signer(curve).load_pem(private_key)
if name:
signature = signer.detached_sign_stream_claim(self, claim_address, cert_claim_id, name)
return ClaimDict(self, detached_signature=signature)
signed = signer.sign_stream_claim(self, claim_address, cert_claim_id) signed = signer.sign_stream_claim(self, claim_address, cert_claim_id)
return ClaimDict.load_protobuf(signed) return ClaimDict.load_protobuf(signed)
def validate_signature(self, claim_address, certificate): def validate_signature(self, claim_address, certificate, name=None):
if isinstance(certificate, ClaimDict): if isinstance(certificate, ClaimDict):
certificate = certificate.protobuf certificate = certificate.protobuf
curve = CURVE_NAMES[certificate.certificate.keyType] curve = CURVE_NAMES[certificate.certificate.keyType]
validator = get_validator(curve).load_from_certificate(certificate, self.certificate_id) validator = get_validator(curve).load_from_certificate(certificate, self.certificate_id)
if self.detached_signature:
assert name is not None, "Name is required for verifying detached signatures."
return validator.validate_detached_claim_signature(self, claim_address, name)
return validator.validate_claim_signature(self, claim_address) return validator.validate_claim_signature(self, claim_address)
def validate_private_key(self, private_key, certificate_id): def validate_private_key(self, private_key, certificate_id):