diff --git a/lbrynet/schema/claim.py b/lbrynet/schema/claim.py index 25215543b..7014b5f2c 100644 --- a/lbrynet/schema/claim.py +++ b/lbrynet/schema/claim.py @@ -7,19 +7,21 @@ from collections import OrderedDict from lbrynet.schema.schema.claim import Claim from lbrynet.schema.proto import claim_pb2 +from lbrynet.schema.signature import Signature from lbrynet.schema.validator import get_validator from lbrynet.schema.signer import get_signer -from lbrynet.schema.schema import NIST256p, CURVE_NAMES, CLAIM_TYPE_NAMES +from lbrynet.schema.schema import NIST256p, CURVE_NAMES, CLAIM_TYPE_NAMES, SECP256k1 from lbrynet.schema.encoding import decode_fields, decode_b64_fields, encode_fields from lbrynet.schema.error import DecodeError from lbrynet.schema.fee import Fee class ClaimDict(OrderedDict): - def __init__(self, claim_dict=None): + def __init__(self, claim_dict=None, detached_signature: Signature=None): if isinstance(claim_dict, claim_pb2.Claim): raise Exception("To initialize %s with a Claim protobuf use %s.load_protobuf" % (self.__class__.__name__, self.__class__.__name__)) + self.detached_signature = detached_signature OrderedDict.__init__(self, claim_dict or []) @property @@ -37,7 +39,8 @@ class ClaimDict(OrderedDict): @property def serialized(self): """Serialized Claim protobuf""" - + if self.detached_signature: + return self.detached_signature.serialized + self.protobuf.SerializeToString() return self.protobuf.SerializeToString() @property @@ -91,6 +94,8 @@ class ClaimDict(OrderedDict): @property def certificate_id(self): + if self.detached_signature: + return binascii.hexlify(self.detached_signature.certificate_id) if not self.has_signature: return None return binascii.hexlify(self.protobuf.publisherSignature.certificateId) @@ -120,53 +125,61 @@ class ClaimDict(OrderedDict): return dict(encode_fields(self)) @classmethod - def load_protobuf_dict(cls, protobuf_dict): + def load_protobuf_dict(cls, protobuf_dict, detached_signature=None): """ Load a ClaimDict from a dictionary with base64 encoded bytes (as returned by the protobuf json formatter) """ - return cls(decode_b64_fields(protobuf_dict)) + return cls(decode_b64_fields(protobuf_dict), detached_signature=detached_signature) @classmethod - def load_protobuf(cls, protobuf_claim): + def load_protobuf(cls, protobuf_claim, detached_signature=None): """Load ClaimDict from a protobuf Claim message""" - return cls.load_protobuf_dict(json.loads(json_format.MessageToJson(protobuf_claim, True))) + return cls.load_protobuf_dict(json.loads(json_format.MessageToJson(protobuf_claim, True)), detached_signature) @classmethod def load_dict(cls, claim_dict): """Load ClaimDict from a dictionary with hex and base58 encoded bytes""" + detached_signature = claim_dict.detached_signature if hasattr(claim_dict, 'detached_signature') else None try: - return cls.load_protobuf(cls(decode_fields(claim_dict)).protobuf) + return cls.load_protobuf(cls(decode_fields(claim_dict)).protobuf, detached_signature) except json_format.ParseError as err: raise DecodeError(str(err)) @classmethod def deserialize(cls, serialized): """Load a ClaimDict from a serialized protobuf string""" + serialized, detached_signature = Signature.flagged_parse(serialized) temp_claim = claim_pb2.Claim() try: temp_claim.ParseFromString(serialized) except DecodeError_pb: raise DecodeError(DecodeError_pb) - return cls.load_protobuf(temp_claim) + return cls.load_protobuf(temp_claim, detached_signature=detached_signature) @classmethod def generate_certificate(cls, private_key, curve=NIST256p): signer = get_signer(curve).load_pem(private_key) return cls.load_protobuf(signer.certificate) - def sign(self, private_key, claim_address, cert_claim_id, curve=NIST256p): + def sign(self, private_key, claim_address, cert_claim_id, curve=NIST256p, name=None): signer = get_signer(curve).load_pem(private_key) + if name: + signature = signer.detached_sign_stream_claim(self, claim_address, cert_claim_id, name) + return ClaimDict(self, detached_signature=signature) signed = signer.sign_stream_claim(self, claim_address, cert_claim_id) return ClaimDict.load_protobuf(signed) - def validate_signature(self, claim_address, certificate): + def validate_signature(self, claim_address, certificate, name=None): if isinstance(certificate, ClaimDict): certificate = certificate.protobuf curve = CURVE_NAMES[certificate.certificate.keyType] validator = get_validator(curve).load_from_certificate(certificate, self.certificate_id) + if self.detached_signature: + assert name is not None, "Name is required for verifying detached signatures." + return validator.validate_detached_claim_signature(self, claim_address, name) return validator.validate_claim_signature(self, claim_address) def validate_private_key(self, private_key, certificate_id):