forked from LBRYCommunity/lbry-sdk
614 lines
17 KiB
614 lines
17 KiB
import json
from collections import OrderedDict
from typing import List, Tuple
from decimal import Decimal
from binascii import hexlify, unhexlify
from google.protobuf import json_format # pylint: disable=no-name-in-module
from google.protobuf.message import DecodeError as DecodeError_pb # pylint: disable=no-name-in-module,import-error
from torba.client.constants import COIN
from lbrynet.schema.signature import Signature
from lbrynet.schema.validator import get_validator
from lbrynet.schema.signer import get_signer
from lbrynet.schema.constants import CURVE_NAMES, SECP256k1
from lbrynet.schema.encoding import decode_fields, decode_b64_fields, encode_fields
from lbrynet.schema.error import DecodeError
from lbrynet.schema.types.v2.claim_pb2 import Claim as ClaimMessage, Fee as FeeMessage
from lbrynet.schema.base import b58decode, b58encode
from lbrynet.schema import compat
class ClaimDict(OrderedDict):
def __init__(self, claim_dict=None, detached_signature: Signature=None):
if isinstance(claim_dict, legacy_claim_pb2.Claim):
raise Exception("To initialize %s with a Claim protobuf use %s.load_protobuf" %
(self.__class__.__name__, self.__class__.__name__))
self.detached_signature = detached_signature
OrderedDict.__init__(self, claim_dict or [])
def protobuf_dict(self):
"""Claim dictionary using base64 to represent bytes"""
return json.loads(json_format.MessageToJson(self.protobuf, True))
def protobuf(self):
"""Claim message object"""
return LegacyClaim.load(self)
def serialized(self):
"""Serialized Claim protobuf"""
if self.detached_signature:
return self.detached_signature.serialized
return self.protobuf.SerializeToString()
def serialized_no_signature(self):
"""Serialized Claim protobuf without publisherSignature field"""
claim = self.protobuf
return ClaimDict.load_protobuf(claim).serialized
def has_signature(self):
return self.protobuf.HasField("publisherSignature") or (
self.detached_signature and self.detached_signature.raw_signature
def is_certificate(self):
claim = self.protobuf
return CLAIM_TYPE_NAMES[claim.claimType] == "certificate"
def is_stream(self):
claim = self.protobuf
return CLAIM_TYPE_NAMES[claim.claimType] == "stream"
def source_hash(self):
claim = self.protobuf
if not CLAIM_TYPE_NAMES[claim.claimType] == "stream":
return None
return binascii.hexlify(
def has_fee(self):
claim = self.protobuf
if not CLAIM_TYPE_NAMES[claim.claimType] == "stream":
return None
return True
return False
def source_fee(self):
claim = self.protobuf
if not CLAIM_TYPE_NAMES[claim.claimType] == "stream":
return None
return Fee.load_protobuf(
return None
def certificate_id(self) -> str:
if self.protobuf.HasField("publisherSignature"):
return binascii.hexlify(self.protobuf.publisherSignature.certificateId).decode()
if self.detached_signature and self.detached_signature.certificate_id:
return binascii.hexlify(self.detached_signature.certificate_id).decode()
def signature(self):
if not self.has_signature:
return None
return binascii.hexlify(self.protobuf.publisherSignature.signature)
def protobuf_len(self):
"""Length of serialized string"""
return self.protobuf.ByteSize()
def json_len(self):
"""Length of json encoded string"""
return len(json.dumps(self.claim_dict))
def claim_dict(self):
"""Claim dictionary with bytes represented as hex and base58"""
return dict(encode_fields(self, self.detached_signature))
def load_protobuf_dict(cls, protobuf_dict, detached_signature=None):
Load a ClaimDict from a dictionary with base64 encoded bytes
(as returned by the protobuf json formatter)
return cls(decode_b64_fields(protobuf_dict), detached_signature=detached_signature)
def load_protobuf(cls, protobuf_claim, detached_signature=None):
"""Load ClaimDict from a protobuf Claim message"""
return cls.load_protobuf_dict(json.loads(json_format.MessageToJson(protobuf_claim, True)), detached_signature)
def load_dict(cls, claim_dict):
"""Load ClaimDict from a dictionary with hex and base58 encoded bytes"""
claim_dict, detached_signature = decode_fields(claim_dict)
return cls.load_protobuf(cls(claim_dict).protobuf, detached_signature)
except json_format.ParseError as err:
raise DecodeError(str(err))
def deserialize(cls, serialized):
"""Load a ClaimDict from a serialized protobuf string"""
detached_signature = Signature.flagged_parse(serialized)
temp_claim = legacy_claim_pb2.Claim()
except DecodeError_pb:
raise DecodeError(DecodeError_pb)
return cls.load_protobuf(temp_claim, detached_signature=detached_signature)
def generate_certificate(cls, private_key, curve=SECP256k1):
signer = get_signer(curve).load_pem(private_key)
return cls.load_protobuf(signer.certificate)
def sign(self, private_key, claim_address, cert_claim_id, curve=SECP256k1, name=None, force_detached=False):
signer = get_signer(curve).load_pem(private_key)
signed, signature = signer.sign_stream_claim(self, claim_address, cert_claim_id, name, force_detached)
return ClaimDict.load_protobuf(signed, signature)
def validate_signature(self, claim_address, certificate, name=None):
if isinstance(certificate, ClaimDict):
certificate = certificate.protobuf
curve = CURVE_NAMES[certificate.certificate.keyType]
validator = get_validator(curve).load_from_certificate(certificate, self.certificate_id)
return validator.validate_claim_signature(self, claim_address, name)
def validate_private_key(self, private_key, certificate_id):
certificate = self.protobuf
if CLAIM_TYPE_NAMES[certificate.claimType] != "certificate":
curve = CURVE_NAMES[certificate.certificate.keyType]
validator = get_validator(curve).load_from_certificate(certificate, certificate_id)
signing_key = validator.signing_key_from_pem(private_key)
return validator.validate_private_key(signing_key)
def get_validator(self, certificate_id):
Get a lbrynet.schema.validator.Validator object for a certificate claim
:param certificate_id: claim id of this certificate claim
:return: None or lbrynet.schema.validator.Validator object
claim = self.protobuf
if CLAIM_TYPE_NAMES[claim.claimType] != "certificate":
curve = CURVE_NAMES[claim.certificate.keyType]
return get_validator(curve).load_from_certificate(claim, certificate_id)
class Claim:
__slots__ = '_claim',
def __init__(self, claim_message=None):
self._claim = claim_message or ClaimMessage()
def is_undetermined(self):
return self._claim.WhichOneof('type') is None
def is_stream(self):
return self._claim.WhichOneof('type') == 'stream'
def is_channel(self):
return self._claim.WhichOneof('type') == 'channel'
def stream_message(self):
if self.is_undetermined:
if not self.is_stream:
raise ValueError('Claim is not a stream.')
def stream(self) -> 'Stream':
return Stream(self)
def channel_message(self):
if self.is_undetermined:
if not self.is_channel:
raise ValueError('Claim is not a channel.')
def channel(self) -> 'Channel':
return Channel(self)
def to_bytes(self) -> bytes:
return self._claim.SerializeToString()
def from_bytes(cls, data: bytes) -> 'Claim':
claim = ClaimMessage()
if data[0] == 0:
return cls(claim)
elif data[0] == 1:
return cls(claim).from_message(payload[1:21], payload[21:85])
elif data[0] == ord('{'):
return compat.from_old_json_schema(cls(claim), data)
return compat.from_types_v1(cls(claim), data)
class Video:
__slots__ = '_video',
def __init__(self, video_message):
self._video = video_message
def width(self) -> int:
return self._video.width
def width(self, width: int):
self._video.width = width
def height(self) -> int:
return self._video.height
def height(self, height: int):
self._video.height = height
def dimensions(self) -> Tuple[int, int]:
return self.width, self.height
def dimensions(self, dimensions: Tuple[int, int]):
self._video.width, self._video.height = dimensions
class File:
__slots__ = '_file',
def __init__(self, file_message):
self._file = file_message
def name(self) -> str:
def name(self, name: str):
| = name
def size(self) -> int:
return self._file.size
def size(self, size: int):
self._file.size = size
class Fee:
__slots__ = '_fee',
def __init__(self, fee_message):
self._fee = fee_message
def currency(self) -> str:
return FeeMessage.Currency.Name(self._fee.currency)
def address(self) -> str:
return b58encode(self._fee.address).decode()
def address(self, address: str):
self._fee.address = b58decode(address)
def address_bytes(self) -> bytes:
return self._fee.address
def address_bytes(self, address: bytes):
self._fee.address = address
def amount(self) -> Decimal:
if self.currency == 'LBC':
return self.lbc
if self.currency == 'USD':
return self.usd
DEWIES = Decimal(COIN)
def lbc(self) -> Decimal:
if self._fee.currency != FeeMessage.LBC:
raise ValueError('LBC can only be returned for LBC fees.')
return Decimal(self._fee.amount / self.DEWIES)
def lbc(self, amount: Decimal):
self.dewies = int(amount * self.DEWIES)
def dewies(self) -> int:
if self._fee.currency != FeeMessage.LBC:
raise ValueError('Dewies can only be returned for LBC fees.')
return self._fee.amount
def dewies(self, amount: int):
self._fee.amount = amount
self._fee.currency = FeeMessage.LBC
PENNIES = Decimal(100.0)
def usd(self) -> Decimal:
if self._fee.currency != FeeMessage.USD:
raise ValueError('USD can only be returned for USD fees.')
return Decimal(self._fee.amount / self.PENNIES)
def usd(self, amount: Decimal):
self.pennies = int(amount * self.PENNIES)
def pennies(self) -> int:
if self._fee.currency != FeeMessage.USD:
raise ValueError('Pennies can only be returned for USD fees.')
return self._fee.amount
def pennies(self, amount: int):
self._fee.amount = amount
self._fee.currency = FeeMessage.USD
class Stream:
__slots__ = '_claim', '_stream'
def __init__(self, claim: Claim = None):
self._claim = claim or Claim()
self._stream = self._claim.stream_message
def claim(self) -> Claim:
return self._claim
def video(self) -> Video:
return Video(
def file(self) -> File:
return File(self._stream.file)
def fee(self) -> Fee:
return Fee(self._stream.fee)
def tags(self) -> List:
return self._stream.tags
def hash(self) -> str:
return hexlify(self._stream.hash).decode()
def hash(self, sd_hash: str):
self._stream.hash = unhexlify(sd_hash.encode())
def hash_bytes(self) -> bytes:
return self._stream.hash
def hash_bytes(self, hash: bytes):
self._stream.hash = hash
def language(self) -> str:
return self._stream.language
def language(self, language: str):
self._stream.language = language
def title(self) -> str:
return self._stream.title
def title(self, title: str):
self._stream.title = title
def author(self) -> str:
def author(self, author: str):
| = author
def description(self) -> str:
return self._stream.description
def description(self, description: str):
self._stream.description = description
def media_type(self) -> str:
return self._stream.media_type
def media_type(self, media_type: str):
self._stream.media_type = media_type
def license(self) -> str:
return self._stream.license
def license(self, license: str):
self._stream.license = license
def license_url(self) -> str:
return self._stream.license_url
def license_url(self, license_url: str):
self._stream.license_url = license_url
def thumbnail_url(self) -> str:
return self._stream.thumbnail_url
def thumbnail_url(self, thumbnail_url: str):
self._stream.thumbnail_url = thumbnail_url
def duration(self) -> int:
return self._stream.duration
def duration(self, duration: int):
self._stream.duration = duration
def release_time(self) -> int:
return self._stream.release_time
def release_time(self, release_time: int):
self._stream.release_time = release_time
class Channel:
__slots__ = '_claim', '_channel'
def __init__(self, claim: Claim = None):
self._claim = claim or Claim()
self._channel = self._claim.channel_message
def claim(self) -> Claim:
return self._claim
def tags(self) -> List:
return self._channel.tags
def public_key(self) -> str:
return hexlify(self._channel.public_key).decode()
def public_key(self, sd_public_key: str):
self._channel.public_key = unhexlify(sd_public_key.encode())
def public_key_bytes(self) -> bytes:
return self._channel.public_key
def public_key_bytes(self, public_key: bytes):
self._channel.public_key = public_key
def language(self) -> str:
return self._channel.language
def language(self, language: str):
self._channel.language = language
def title(self) -> str:
return self._channel.title
def title(self, title: str):
self._channel.title = title
def description(self) -> str:
return self._channel.description
def description(self, description: str):
self._channel.description = description
def contact_email(self) -> str:
return self._channel.contact_email
def contact_email(self, contact_email: str):
self._channel.contact_email = contact_email
def homepage_url(self) -> str:
return self._channel.homepage_url
def homepage_url(self, homepage_url: str):
self._channel.homepage_url = homepage_url
def thumbnail_url(self) -> str:
return self._channel.thumbnail_url
def thumbnail_url(self, thumbnail_url: str):
self._channel.thumbnail_url = thumbnail_url
def cover_url(self) -> str:
return self._channel.cover_url
def cover_url(self, cover_url: str):
self._channel.cover_url = cover_url