import struct import hashlib from binascii import hexlify, unhexlify from typing import List, Optional import ecdsa from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.serialization import load_der_public_key from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.asymmetric.utils import Prehashed from cryptography.exceptions import InvalidSignature from lbry.crypto.base58 import Base58 from lbry.crypto.hash import hash160, sha256 from lbry.wallet.client.basetransaction import BaseTransaction, BaseInput, BaseOutput, ReadOnlyList from lbry.schema.claim import Claim from lbry.schema.purchase import Purchase from lbry.schema.url import normalize_name from lbry.wallet.account import Account from lbry.wallet.script import InputScript, OutputScript class Input(BaseInput): script: InputScript script_class = InputScript class Output(BaseOutput): script: OutputScript script_class = OutputScript __slots__ = ( 'channel', 'private_key', 'meta', 'purchase', 'purchased_claim', 'purchase_receipt', 'reposted_claim', 'claims', ) def __init__(self, *args, channel: Optional['Output'] = None, private_key: Optional[str] = None, **kwargs) -> None: super().__init__(*args, **kwargs) self.channel = channel self.private_key = private_key self.purchase: 'Output' = None # txo containing purchase metadata self.purchased_claim: 'Output' = None # resolved claim pointed to by purchase self.purchase_receipt: 'Output' = None # txo representing purchase receipt for this claim self.reposted_claim: 'Output' = None # txo representing claim being reposted self.claims: List['Output'] = None # resolved claims for collection self.meta = {} def update_annotations(self, annotated): super().update_annotations(annotated) self.channel = annotated.channel if annotated else None self.private_key = annotated.private_key if annotated else None def get_fee(self, ledger): name_fee = 0 if self.script.is_claim_name: name_fee = len(self.script.values['claim_name']) * ledger.fee_per_name_char return max(name_fee, super().get_fee(ledger)) @property def is_claim(self) -> bool: return self.script.is_claim_name or self.script.is_update_claim @property def is_support(self) -> bool: return self.script.is_support_claim @property def claim_hash(self) -> bytes: if self.script.is_claim_name: return hash160(self.tx_ref.hash + struct.pack('>I', self.position)) elif self.script.is_update_claim or self.script.is_support_claim: return self.script.values['claim_id'] else: raise ValueError('No claim_id associated.') @property def claim_id(self) -> str: return hexlify(self.claim_hash[::-1]).decode() @property def claim_name(self) -> str: if self.script.is_claim_involved: return self.script.values['claim_name'].decode() raise ValueError('No claim_name associated.') @property def normalized_name(self) -> str: return normalize_name(self.claim_name) @property def claim(self) -> Claim: if self.is_claim: if not isinstance(self.script.values['claim'], Claim): self.script.values['claim'] = Claim.from_bytes(self.script.values['claim']) return self.script.values['claim'] raise ValueError('Only claim name and claim update have the claim payload.') @property def can_decode_claim(self): try: return self.claim except: return False @property def permanent_url(self) -> str: if self.script.is_claim_involved: return f"lbry://{self.claim_name}#{self.claim_id}" raise ValueError('No claim associated.') @property def has_private_key(self): return self.private_key is not None def get_signature_digest(self, ledger): if self.claim.unsigned_payload: pieces = [ Base58.decode(self.get_address(ledger)), self.claim.unsigned_payload, self.claim.signing_channel_hash[::-1] ] else: pieces = [ self.tx_ref.tx.inputs[0].txo_ref.hash, self.claim.signing_channel_hash, self.claim.to_message_bytes() ] return sha256(b''.join(pieces)) def get_encoded_signature(self): signature = hexlify(self.claim.signature) r = int(signature[:int(len(signature)/2)], 16) s = int(signature[int(len(signature)/2):], 16) return ecdsa.util.sigencode_der(r, s, len(signature)*4) @staticmethod def is_signature_valid(encoded_signature, signature_digest, public_key_bytes): try: public_key = load_der_public_key(public_key_bytes, default_backend()) public_key.verify(encoded_signature, signature_digest, ec.ECDSA(Prehashed(hashes.SHA256()))) return True except (ValueError, InvalidSignature): pass return False def is_signed_by(self, channel: 'Output', ledger=None): return self.is_signature_valid( self.get_encoded_signature(), self.get_signature_digest(ledger), channel.claim.channel.public_key_bytes ) def sign(self, channel: 'Output', first_input_id=None): self.channel = channel self.claim.signing_channel_hash = channel.claim_hash digest = sha256(b''.join([ first_input_id or self.tx_ref.tx.inputs[0].txo_ref.hash, self.claim.signing_channel_hash, self.claim.to_message_bytes() ])) self.claim.signature = channel.private_key.sign_digest_deterministic(digest, hashfunc=hashlib.sha256) self.script.generate() def clear_signature(self): self.channel = None self.claim.clear_signature() def generate_channel_private_key(self): self.private_key = ecdsa.SigningKey.generate(curve=ecdsa.SECP256k1, hashfunc=hashlib.sha256) self.claim.channel.public_key_bytes = self.private_key.get_verifying_key().to_der() self.script.generate() return self.private_key def is_channel_private_key(self, private_key): return self.claim.channel.public_key_bytes == private_key.get_verifying_key().to_der() @classmethod def pay_claim_name_pubkey_hash( cls, amount: int, claim_name: str, claim: Claim, pubkey_hash: bytes) -> 'Output': script = cls.script_class.pay_claim_name_pubkey_hash( claim_name.encode(), claim, pubkey_hash) txo = cls(amount, script) return txo @classmethod def pay_update_claim_pubkey_hash( cls, amount: int, claim_name: str, claim_id: str, claim: Claim, pubkey_hash: bytes) -> 'Output': script = cls.script_class.pay_update_claim_pubkey_hash( claim_name.encode(), unhexlify(claim_id)[::-1], claim, pubkey_hash) txo = cls(amount, script) return txo @classmethod def pay_support_pubkey_hash(cls, amount: int, claim_name: str, claim_id: str, pubkey_hash: bytes) -> 'Output': script = cls.script_class.pay_support_pubkey_hash(claim_name.encode(), unhexlify(claim_id)[::-1], pubkey_hash) return cls(amount, script) @classmethod def add_purchase_data(cls, purchase: Purchase) -> 'Output': script = cls.script_class.return_data(purchase) return cls(0, script) @property def is_purchase_data(self) -> bool: return self.script.is_return_data and ( isinstance(self.script.values['data'], Purchase) or Purchase.has_start_byte(self.script.values['data']) ) @property def purchase_data(self) -> Purchase: if self.is_purchase_data: if not isinstance(self.script.values['data'], Purchase): self.script.values['data'] = Purchase.from_bytes(self.script.values['data']) return self.script.values['data'] raise ValueError('Output does not have purchase data.') @property def can_decode_purchase_data(self): try: return self.purchase_data except: return False @property def purchased_claim_id(self): if self.purchase is not None: return self.purchase.purchase_data.claim_id if self.purchased_claim is not None: return self.purchased_claim.claim_id @property def has_price(self): if self.can_decode_claim: claim = self.claim if claim.is_stream: stream = claim.stream return stream.has_fee and stream.fee.amount and stream.fee.amount > 0 return False @property def price(self): return self.claim.stream.fee class Transaction(BaseTransaction): input_class = Input output_class = Output outputs: ReadOnlyList[Output] inputs: ReadOnlyList[Input] @classmethod def pay(cls, amount: int, address: bytes, funding_accounts: List[Account], change_account: Account): ledger, wallet = cls.ensure_all_have_same_ledger_and_wallet(funding_accounts, change_account) output = Output.pay_pubkey_hash(amount, ledger.address_to_hash160(address)) return cls.create([], [output], funding_accounts, change_account) @classmethod def claim_create( cls, name: str, claim: Claim, amount: int, holding_address: str, funding_accounts: List[Account], change_account: Account, signing_channel: Output = None): ledger, wallet = cls.ensure_all_have_same_ledger_and_wallet(funding_accounts, change_account) claim_output = Output.pay_claim_name_pubkey_hash( amount, name, claim, ledger.address_to_hash160(holding_address) ) if signing_channel is not None: claim_output.sign(signing_channel, b'placeholder txid:nout') return cls.create([], [claim_output], funding_accounts, change_account, sign=False) @classmethod def claim_update( cls, previous_claim: Output, claim: Claim, amount: int, holding_address: str, funding_accounts: List[Account], change_account: Account, signing_channel: Output = None): ledger, wallet = cls.ensure_all_have_same_ledger_and_wallet(funding_accounts, change_account) updated_claim = Output.pay_update_claim_pubkey_hash( amount, previous_claim.claim_name, previous_claim.claim_id, claim, ledger.address_to_hash160(holding_address) ) if signing_channel is not None: updated_claim.sign(signing_channel, b'placeholder txid:nout') else: updated_claim.clear_signature() return cls.create( [Input.spend(previous_claim)], [updated_claim], funding_accounts, change_account, sign=False ) @classmethod def support(cls, claim_name: str, claim_id: str, amount: int, holding_address: str, funding_accounts: List[Account], change_account: Account): ledger, wallet = cls.ensure_all_have_same_ledger_and_wallet(funding_accounts, change_account) support_output = Output.pay_support_pubkey_hash( amount, claim_name, claim_id, ledger.address_to_hash160(holding_address) ) return cls.create([], [support_output], funding_accounts, change_account) @classmethod def purchase(cls, claim_id: str, amount: int, merchant_address: bytes, funding_accounts: List[Account], change_account: Account): ledger, wallet = cls.ensure_all_have_same_ledger_and_wallet(funding_accounts, change_account) payment = Output.pay_pubkey_hash(amount, ledger.address_to_hash160(merchant_address)) data = Output.add_purchase_data(Purchase(claim_id)) return cls.create([], [payment, data], funding_accounts, change_account) @property def my_inputs(self): for txi in self.inputs: if txi.txo_ref.txo is not None and txi.txo_ref.txo.is_my_account: yield txi def _filter_my_outputs(self, f): for txo in self.outputs: if txo.is_my_account and f(txo.script): yield txo def _filter_other_outputs(self, f): for txo in self.outputs: if not txo.is_my_account and f(txo.script): yield txo def _filter_any_outputs(self, f): for txo in self.outputs: if f(txo): yield txo @property def my_claim_outputs(self): return self._filter_my_outputs(lambda s: s.is_claim_name) @property def my_update_outputs(self): return self._filter_my_outputs(lambda s: s.is_update_claim) @property def my_support_outputs(self): return self._filter_my_outputs(lambda s: s.is_support_claim) @property def any_purchase_outputs(self): return self._filter_any_outputs(lambda o: o.purchase is not None) @property def other_support_outputs(self): return self._filter_other_outputs(lambda s: s.is_support_claim) @property def my_abandon_outputs(self): for txi in self.inputs: abandon = txi.txo_ref.txo if abandon is not None and abandon.is_my_account and abandon.script.is_claim_involved: is_update = False if abandon.script.is_claim_name or abandon.script.is_update_claim: for update in self.my_update_outputs: if abandon.claim_id == update.claim_id: is_update = True break if not is_update: yield abandon