simplify detached signing and verifying

This commit is contained in:
Victor Shyba 2018-12-30 21:23:03 -03:00 committed by Lex Berezhny
parent 70471eebfa
commit 74cccdbfc7
3 changed files with 28 additions and 55 deletions

View file

@ -166,21 +166,15 @@ class ClaimDict(OrderedDict):
def sign(self, private_key, claim_address, cert_claim_id, curve=NIST256p, name=None): def sign(self, private_key, claim_address, cert_claim_id, curve=NIST256p, name=None):
signer = get_signer(curve).load_pem(private_key) signer = get_signer(curve).load_pem(private_key)
if name: signed, signature = signer.sign_stream_claim(self, claim_address, cert_claim_id, name)
signature = signer.detached_sign_stream_claim(self, claim_address, cert_claim_id, name) return ClaimDict.load_protobuf(signed, signature)
return ClaimDict(self, detached_signature=signature)
signed = signer.sign_stream_claim(self, claim_address, cert_claim_id)
return ClaimDict.load_protobuf(signed)
def validate_signature(self, claim_address, certificate, name=None): def validate_signature(self, claim_address, certificate, name=None):
if isinstance(certificate, ClaimDict): if isinstance(certificate, ClaimDict):
certificate = certificate.protobuf certificate = certificate.protobuf
curve = CURVE_NAMES[certificate.certificate.keyType] curve = CURVE_NAMES[certificate.certificate.keyType]
validator = get_validator(curve).load_from_certificate(certificate, self.certificate_id) validator = get_validator(curve).load_from_certificate(certificate, self.certificate_id)
if self.detached_signature: return validator.validate_claim_signature(self, claim_address, name)
assert name is not None, "Name is required for verifying detached signatures."
return validator.validate_detached_claim_signature(self, claim_address, name)
return validator.validate_claim_signature(self, claim_address)
def validate_private_key(self, private_key, certificate_id): def validate_private_key(self, private_key, certificate_id):
certificate = self.protobuf certificate = self.protobuf

View file

@ -16,6 +16,7 @@ class NIST_ECDSASigner(object):
CURVE_NAME = None CURVE_NAME = None
HASHFUNC = hashlib.sha256 HASHFUNC = hashlib.sha256
HASHFUNC_NAME = SHA256 HASHFUNC_NAME = SHA256
DETACHED = False
def __init__(self, private_key): def __init__(self, private_key):
self._private_key = private_key self._private_key = private_key
@ -46,16 +47,26 @@ class NIST_ECDSASigner(object):
def generate(cls): def generate(cls):
return cls(ecdsa.SigningKey.generate(curve=cls.CURVE, hashfunc=cls.HASHFUNC_NAME)) return cls(ecdsa.SigningKey.generate(curve=cls.CURVE, hashfunc=cls.HASHFUNC_NAME))
def sign_stream_claim(self, claim, claim_address, cert_claim_id): def sign_stream_claim(self, claim, claim_address, cert_claim_id, name):
to_sign = bytearray()
if self.DETACHED:
assert name, "Name is required for detached signatures"
assert self.CURVE_NAME == SECP256k1, f"Only SECP256k1 is supported, not: {self.CURVE_NAME}"
to_sign.extend(name.lower().encode())
validate_claim_id(cert_claim_id) validate_claim_id(cert_claim_id)
raw_cert_id = binascii.unhexlify(cert_claim_id)
decoded_addr = decode_address(claim_address) decoded_addr = decode_address(claim_address)
to_sign = bytearray()
to_sign.extend(decoded_addr) to_sign.extend(decoded_addr)
to_sign.extend(claim.serialized_no_signature) to_sign.extend(claim.serialized_no_signature)
to_sign.extend(binascii.unhexlify(cert_claim_id)) to_sign.extend(raw_cert_id)
digest = self.HASHFUNC(to_sign).digest() digest = self.HASHFUNC(to_sign).digest()
if self.DETACHED:
return claim.protobuf_dict, Signature(
self.private_key.sign_digest_deterministic(digest, hashfunc=self.HASHFUNC), raw_cert_id
)
if not isinstance(self.private_key, ecdsa.SigningKey): if not isinstance(self.private_key, ecdsa.SigningKey):
raise Exception("Not given a signing key") raise Exception("Not given a signing key")
@ -63,7 +74,7 @@ class NIST_ECDSASigner(object):
"version": V_0_0_1, "version": V_0_0_1,
"signatureType": self.CURVE_NAME, "signatureType": self.CURVE_NAME,
"signature": self.private_key.sign_digest_deterministic(digest, hashfunc=self.HASHFUNC), "signature": self.private_key.sign_digest_deterministic(digest, hashfunc=self.HASHFUNC),
"certificateId": binascii.unhexlify(cert_claim_id) "certificateId": raw_cert_id
} }
msg = { msg = {
@ -72,25 +83,7 @@ class NIST_ECDSASigner(object):
"publisherSignature": sig_dict "publisherSignature": sig_dict
} }
return Claim.load(msg) return Claim.load(msg), None
def detached_sign_stream_claim(self, claim, claim_address, cert_claim_id, name: str):
assert self.CURVE_NAME == SECP256k1, f"Only SECP256k1 is supported, not: {self.CURVE_NAME}"
validate_claim_id(cert_claim_id)
if not isinstance(self.private_key, ecdsa.SigningKey):
raise Exception("Not given a signing key")
decoded_addr = decode_address(claim_address)
name = name.lower().encode()
raw_cert_id = binascii.unhexlify(cert_claim_id)
to_sign = bytearray()
to_sign.extend(name)
to_sign.extend(decoded_addr)
to_sign.extend(claim.serialized_no_signature)
to_sign.extend(raw_cert_id)
digest = self.HASHFUNC(to_sign).digest()
return Signature(self.private_key.sign_digest_deterministic(digest, hashfunc=self.HASHFUNC), raw_cert_id)
class NIST256pSigner(NIST_ECDSASigner): class NIST256pSigner(NIST_ECDSASigner):

View file

@ -76,35 +76,21 @@ class Validator:
from ecdsa import BadSignatureError from ecdsa import BadSignatureError
raise BadSignatureError raise BadSignatureError
def validate_detached_claim_signature(self, claim, claim_address, name): def validate_claim_signature(self, claim, claim_address, name):
decoded_address = decode_address(claim_address)
# extract and serialize the stream from the claim, then check the signature
signature = claim.detached_signature.raw_signature
if signature is None:
raise Exception("No signature to validate")
name = name.lower().encode()
to_sign = bytearray() to_sign = bytearray()
to_sign.extend(name) if claim.detached_signature:
to_sign.extend(decoded_address) assert name is not None, "Name is required for verifying detached signatures."
to_sign.extend(claim.serialized_no_signature) to_sign.extend(name.lower().encode())
to_sign.extend(binascii.unhexlify(self.certificate_claim_id)) signature = claim.detached_signature.raw_signature
else:
return self.validate_signature(self.HASHFUNC(to_sign).digest(), signature)
def validate_claim_signature(self, claim, claim_address):
decoded_address = decode_address(claim_address)
# extract and serialize the stream from the claim, then check the signature # extract and serialize the stream from the claim, then check the signature
signature = binascii.unhexlify(claim.signature) signature = binascii.unhexlify(claim.signature)
decoded_address = decode_address(claim_address)
if signature is None: if signature is None:
raise Exception("No signature to validate") raise Exception("No signature to validate")
to_sign = bytearray()
to_sign.extend(decoded_address) to_sign.extend(decoded_address)
to_sign.extend(claim.serialized_no_signature) to_sign.extend(claim.serialized_no_signature)
to_sign.extend(binascii.unhexlify(self.certificate_claim_id)) to_sign.extend(binascii.unhexlify(self.certificate_claim_id))