2018-09-17 17:13:30 -03:00
|
|
|
import base64, binascii
|
|
|
|
from copy import deepcopy
|
2018-09-17 17:31:44 -03:00
|
|
|
from lbrynet.schema.address import decode_address, encode_address
|
|
|
|
from lbrynet.schema.schema import CLAIM_TYPES, CLAIM_TYPE, STREAM_TYPE, CERTIFICATE_TYPE
|
|
|
|
from lbrynet.schema.schema import SIGNATURE
|
|
|
|
from lbrynet.schema.error import DecodeError, InvalidAddress
|
2019-01-17 22:20:47 -03:00
|
|
|
from lbrynet.schema.signature import Signature
|
2018-09-17 17:13:30 -03:00
|
|
|
|
|
|
|
|
2019-01-17 22:20:47 -03:00
|
|
|
def encode_fields(claim_dictionary, detached_signature: Signature):
|
2018-09-17 17:13:30 -03:00
|
|
|
"""Encode bytes to hex and b58 for return by ClaimDict"""
|
|
|
|
claim_dictionary = deepcopy(claim_dictionary)
|
|
|
|
claim_type = CLAIM_TYPES[claim_dictionary[CLAIM_TYPE]]
|
|
|
|
claim_value = claim_dictionary[claim_type]
|
|
|
|
if claim_type == CLAIM_TYPES[STREAM_TYPE]:
|
|
|
|
claim_value['source']['source'] = binascii.hexlify(claim_value['source']['source']).decode()
|
2019-01-22 20:55:35 -03:00
|
|
|
if 'releaseTime' in claim_value['metadata']:
|
|
|
|
release_time = int(claim_value['metadata'].pop('releaseTime'))
|
|
|
|
if release_time != 0:
|
|
|
|
claim_value['metadata']['releaseTime'] = release_time
|
2018-09-17 17:13:30 -03:00
|
|
|
if 'fee' in claim_value['metadata']:
|
|
|
|
try:
|
|
|
|
address = encode_address(claim_value['metadata']['fee']['address'])
|
|
|
|
except InvalidAddress as err:
|
|
|
|
raise DecodeError("Invalid fee address: %s" % err)
|
|
|
|
claim_value['metadata']['fee']['address'] = address
|
|
|
|
elif claim_type == CLAIM_TYPES[CERTIFICATE_TYPE]:
|
|
|
|
public_key = claim_value["publicKey"]
|
|
|
|
claim_value["publicKey"] = binascii.hexlify(public_key).decode()
|
|
|
|
if SIGNATURE in claim_dictionary:
|
|
|
|
encoded_sig = binascii.hexlify(claim_dictionary[SIGNATURE]['signature']).decode()
|
|
|
|
encoded_cert_id = binascii.hexlify(claim_dictionary[SIGNATURE]['certificateId']).decode()
|
|
|
|
claim_dictionary[SIGNATURE]['signature'] = encoded_sig
|
|
|
|
claim_dictionary[SIGNATURE]['certificateId'] = encoded_cert_id
|
2019-01-17 22:20:47 -03:00
|
|
|
elif detached_signature and detached_signature.raw_signature:
|
|
|
|
claim_dictionary[SIGNATURE] = {
|
|
|
|
'detached_signature': binascii.hexlify(detached_signature.serialized).decode(),
|
|
|
|
'certificateId': binascii.hexlify(detached_signature.certificate_id).decode()
|
|
|
|
}
|
|
|
|
|
2018-09-17 17:13:30 -03:00
|
|
|
claim_dictionary[claim_type] = claim_value
|
|
|
|
return claim_dictionary
|
|
|
|
|
|
|
|
|
|
|
|
def decode_fields(claim_dictionary):
|
|
|
|
"""Decode hex and b58 encoded bytes in dictionaries given to ClaimDict"""
|
2019-01-17 22:20:47 -03:00
|
|
|
detached_signature = None
|
2018-09-17 17:13:30 -03:00
|
|
|
claim_dictionary = deepcopy(claim_dictionary)
|
|
|
|
claim_type = CLAIM_TYPES[claim_dictionary[CLAIM_TYPE]]
|
|
|
|
claim_value = claim_dictionary[claim_type]
|
|
|
|
if claim_type == CLAIM_TYPES[STREAM_TYPE]:
|
|
|
|
claim_value['source']['source'] = binascii.unhexlify(claim_value['source']['source'])
|
|
|
|
if 'fee' in claim_value['metadata']:
|
|
|
|
try:
|
|
|
|
address = decode_address(claim_value['metadata']['fee']['address'])
|
|
|
|
except InvalidAddress as err:
|
|
|
|
raise DecodeError("Invalid fee address: %s" % err)
|
|
|
|
claim_value['metadata']['fee']['address'] = address
|
|
|
|
elif claim_type == CLAIM_TYPES[CERTIFICATE_TYPE]:
|
|
|
|
public_key = binascii.unhexlify(claim_value["publicKey"])
|
|
|
|
claim_value["publicKey"] = public_key
|
2019-01-17 22:20:47 -03:00
|
|
|
if SIGNATURE in claim_dictionary and not claim_dictionary[SIGNATURE].get('detached_signature'):
|
2018-09-17 17:13:30 -03:00
|
|
|
decoded_sig = binascii.unhexlify(claim_dictionary[SIGNATURE]['signature'])
|
|
|
|
decoded_cert_id = binascii.unhexlify(claim_dictionary[SIGNATURE]['certificateId'])
|
|
|
|
claim_dictionary[SIGNATURE]['signature'] = decoded_sig
|
|
|
|
claim_dictionary[SIGNATURE]['certificateId'] = decoded_cert_id
|
2019-01-17 22:20:47 -03:00
|
|
|
elif claim_dictionary.get(SIGNATURE, {}).get('detached_signature'):
|
|
|
|
hex_detached_signature = claim_dictionary[SIGNATURE]['detached_signature']
|
|
|
|
detached_signature = Signature.flagged_parse(binascii.unhexlify(hex_detached_signature))
|
|
|
|
del claim_dictionary[SIGNATURE]
|
2018-09-17 17:13:30 -03:00
|
|
|
claim_dictionary[claim_type] = claim_value
|
2019-01-17 22:20:47 -03:00
|
|
|
return claim_dictionary, detached_signature
|
2018-09-17 17:13:30 -03:00
|
|
|
|
|
|
|
|
|
|
|
def decode_b64_fields(claim_dictionary):
|
|
|
|
"""Decode b64 encoded bytes in protobuf generated dictionary to be given to ClaimDict"""
|
|
|
|
claim_dictionary = deepcopy(claim_dictionary)
|
|
|
|
claim_type = CLAIM_TYPES[claim_dictionary[CLAIM_TYPE]]
|
|
|
|
claim_value = claim_dictionary[claim_type]
|
|
|
|
if claim_type == CLAIM_TYPES[STREAM_TYPE]:
|
|
|
|
claim_value['source']['source'] = base64.b64decode(claim_value['source']['source'])
|
|
|
|
if 'fee' in claim_value['metadata']:
|
|
|
|
address = base64.b64decode(claim_value['metadata']['fee']['address'])
|
|
|
|
claim_value['metadata']['fee']['address'] = address
|
|
|
|
elif claim_type == CLAIM_TYPES[CERTIFICATE_TYPE]:
|
|
|
|
public_key = base64.b64decode(claim_value["publicKey"])
|
|
|
|
claim_value["publicKey"] = public_key
|
|
|
|
if SIGNATURE in claim_dictionary:
|
|
|
|
encoded_sig = base64.b64decode(claim_dictionary[SIGNATURE]['signature'])
|
|
|
|
encoded_cert_id = base64.b64decode(claim_dictionary[SIGNATURE]['certificateId'])
|
|
|
|
claim_dictionary[SIGNATURE]['signature'] = encoded_sig
|
|
|
|
claim_dictionary[SIGNATURE]['certificateId'] = encoded_cert_id
|
|
|
|
claim_dictionary[claim_type] = claim_value
|
|
|
|
return claim_dictionary
|