From fdd2562f3205784c519dabcb2a32ce289164251c Mon Sep 17 00:00:00 2001 From: Lex Berezhny Date: Tue, 29 Oct 2019 01:26:25 -0400 Subject: [PATCH] added purchase_list and purchase_create --- lbry/lbry/extras/daemon/Daemon.py | 131 ++++++++++++--- lbry/lbry/extras/daemon/__init__.py | 1 - .../extras/daemon/exchange_rate_manager.py | 13 +- .../extras/daemon/json_response_encoder.py | 12 +- lbry/lbry/stream/stream_manager.py | 118 ++++++------- lbry/lbry/testcase.py | 19 ++- lbry/lbry/wallet/constants.py | 3 +- lbry/lbry/wallet/database.py | 59 ++++++- lbry/lbry/wallet/ledger.py | 64 +++++-- lbry/lbry/wallet/manager.py | 67 ++++++-- lbry/lbry/wallet/server/db/reader.py | 4 +- lbry/lbry/wallet/transaction.py | 37 +++- lbry/scripts/release.py | 10 +- lbry/tests/integration/test_file_commands.py | 10 +- .../integration/test_purchase_command.py | 159 +++++++++++++++--- lbry/tests/unit/stream/test_stream_manager.py | 120 ++++++++----- torba/torba/client/basedatabase.py | 8 +- 17 files changed, 629 insertions(+), 206 deletions(-) diff --git a/lbry/lbry/extras/daemon/Daemon.py b/lbry/lbry/extras/daemon/Daemon.py index 9d1851e80..4bbadaf49 100644 --- a/lbry/lbry/extras/daemon/Daemon.py +++ b/lbry/lbry/extras/daemon/Daemon.py @@ -837,15 +837,16 @@ class Daemon(metaclass=JSONRPCServerType): return platform_info @requires(WALLET_COMPONENT) - async def jsonrpc_resolve(self, urls: typing.Union[str, list]): + async def jsonrpc_resolve(self, urls: typing.Union[str, list], wallet_id=None): """ Get the claim that a URL refers to. Usage: - resolve ... + resolve ... [--wallet_id=] Options: - --urls= : (str, list) one or more urls to resolve + --urls= : (str, list) one or more urls to resolve + --wallet_id= : (str) wallet to check for claim purchase reciepts Returns: Dictionary of results, keyed by url @@ -903,6 +904,7 @@ class Daemon(metaclass=JSONRPCServerType): } } """ + wallet = self.wallet_manager.get_wallet_or_default(wallet_id) if isinstance(urls, str): urls = [urls] @@ -917,7 +919,7 @@ class Daemon(metaclass=JSONRPCServerType): except ValueError: results[u] = {"error": f"{u} is not a valid url"} - resolved = await self.resolve(list(valid_urls)) + resolved = await self.resolve(wallet.accounts, list(valid_urls)) for resolved_uri in resolved: results[resolved_uri] = resolved[resolved_uri] if resolved[resolved_uri] is not None else \ @@ -944,7 +946,7 @@ class Daemon(metaclass=JSONRPCServerType): --download_directory= : (str) full path to the directory to download into --timeout= : (int) download timeout in number of seconds --save_file= : (bool) save the file to the downloads directory - --wallet_id= : (str) restrict operation to specific wallet + --wallet_id= : (str) wallet to check for claim purchase reciepts Returns: {File} """ @@ -1947,6 +1949,91 @@ class Daemon(metaclass=JSONRPCServerType): await stream.save_file(file_name, download_directory) return stream + PURCHASE_DOC = """ + List and make purchases of claims. + """ + + @requires(WALLET_COMPONENT) + def jsonrpc_purchase_list( + self, claim_id=None, resolve=False, account_id=None, wallet_id=None, page=None, page_size=None): + """ + List my claim purchases. + + Usage: + purchase_list [ | --claim_id=] [--resolve] + [--account_id=] [--wallet_id=] + [--page=] [--page_size=] + + Options: + --claim_id= : (str) purchases for specific claim + --resolve : (str) include resolved claim information + --account_id= : (str) id of the account to query + --wallet_id= : (str) restrict results to specific wallet + --page= : (int) page to return during paginating + --page_size= : (int) number of items on page during pagination + + Returns: {Paginated[Output]} + """ + wallet = self.wallet_manager.get_wallet_or_default(wallet_id) + constraints = { + "wallet": wallet, + "accounts": [wallet.get_account_or_error(account_id)] if account_id else wallet.accounts, + "resolve": resolve, + } + if claim_id: + constraints["purchased_claim_id"] = claim_id + return paginate_rows( + self.ledger.get_purchases, + self.ledger.get_purchase_count, + page, page_size, **constraints + ) + + @requires(WALLET_COMPONENT) + async def jsonrpc_purchase_create( + self, claim_id, wallet_id=None, funding_account_ids=None, + allow_duplicate_purchase=False, override_max_key_fee=False, preview=False, blocking=False): + """ + Purchase a claim. + + Usage: + purchase_create ( | --claim_id=) [--wallet_id=] + [--funding_account_ids=...] + [--allow_duplicate_purchase] [--override_max_key_fee] [--preview] [--blocking] + + Options: + --claim_id= : (str) id of claim to purchase + --wallet_id= : (str) restrict operation to specific wallet + --funding_account_ids=: (list) ids of accounts to fund this transaction + --allow_duplicate_purchase : (bool) allow purchasing claim_id you already own + --override_max_key_fee : (bool) ignore max key fee for this purchase + --preview : (bool) do not broadcast the transaction + --blocking : (bool) wait until transaction is in mempool + + Returns: {Transaction} + """ + wallet = self.wallet_manager.get_wallet_or_default(wallet_id) + assert not wallet.is_locked, "Cannot spend funds with locked wallet, unlock first." + accounts = wallet.get_accounts_or_all(funding_account_ids) + txo = await self.ledger.get_claim_by_claim_id(accounts, claim_id) + if not txo or not txo.is_claim: + raise Exception(f"Could not find claim with claim_id '{claim_id}'. ") + if not allow_duplicate_purchase and txo.purchase_receipt: + raise Exception( + f"You already have a purchase for claim_id '{claim_id}'. " + f"Use --allow-duplicate-purchase flag to override." + ) + claim = txo.claim + if not claim.is_stream or not claim.stream.has_fee: + raise Exception(f"Claim '{claim_id}' does not have a purchase price.") + tx = await self.wallet_manager.create_purchase_transaction( + accounts, txo, self.exchange_rate_manager, override_max_key_fee + ) + if not preview: + await self.broadcast_or_release(tx, blocking) + else: + await self.ledger.release_tx(tx) + return tx + CLAIM_DOC = """ List and search all types of claims. """ @@ -1988,7 +2075,8 @@ class Daemon(metaclass=JSONRPCServerType): eg. --height=">400000" would limit results to only claims above 400k block height. Usage: - claim_search [ | --name=] [--claim_id=] [--txid=] [--nout=] + claim_search [ | --name=] [--txid=] [--nout=] + [--claim_id= | --claim_ids=...] [--channel= | [[--channel_ids=...] [--not_channel_ids=...]]] [--has_channel_signature] [--valid_channel_signature | --invalid_channel_signature] @@ -2008,10 +2096,12 @@ class Daemon(metaclass=JSONRPCServerType): [--any_locations=...] [--all_locations=...] [--not_locations=...] [--order_by=...] [--page=] [--page_size=] + [--wallet_id=] Options: --name= : (str) claim name (normalized) --claim_id= : (str) full or partial claim id + --claim_ids= : (list) list of full claim ids --txid= : (str) transaction id --nout= : (str) position in the transaction --channel= : (str) claims signed by this channel (argument is @@ -2101,16 +2191,20 @@ class Daemon(metaclass=JSONRPCServerType): 'trending_local', 'trending_global', 'activation_height' --no_totals : (bool) do not calculate the total number of pages and items in result set (significant performance boost) + --wallet_id= : (str) wallet to check for claim purchase reciepts Returns: {Paginated[Output]} """ + wallet = self.wallet_manager.get_wallet_or_default(kwargs.pop('wallet_id', None)) + if {'claim_id', 'claim_ids'}.issubset(kwargs): + raise ValueError("Only 'claim_id' or 'claim_ids' is allowed, not both.") if kwargs.pop('valid_channel_signature', False): kwargs['signature_valid'] = 1 if kwargs.pop('invalid_channel_signature', False): kwargs['signature_valid'] = 0 page_num, page_size = abs(kwargs.pop('page', 1)), min(abs(kwargs.pop('page_size', DEFAULT_PAGE_SIZE)), 50) kwargs.update({'offset': page_size * (page_num - 1), 'limit': page_size}) - txos, offset, total = await self.ledger.claim_search(**kwargs) + txos, offset, total = await self.ledger.claim_search(wallet.accounts, **kwargs) result = {"items": txos, "page": page_num, "page_size": page_size} if not kwargs.pop('no_totals', False): result['total_pages'] = int((total + (page_size - 1)) / page_size) @@ -2533,6 +2627,8 @@ class Daemon(metaclass=JSONRPCServerType): Returns: (dict) Result dictionary """ + wallet = self.wallet_manager.get_wallet_or_default(wallet_id) + decoded = base58.b58decode(channel_data) data = json.loads(decoded) channel_private_key = ecdsa.SigningKey.from_pem( @@ -2543,12 +2639,11 @@ class Daemon(metaclass=JSONRPCServerType): # check that the holding_address hasn't changed since the export was made holding_address = data['holding_address'] channels, _, _ = await self.ledger.claim_search( - public_key_id=self.ledger.public_key_to_address(public_key_der) + wallet.accounts, public_key_id=self.ledger.public_key_to_address(public_key_der) ) if channels and channels[0].get_address(self.ledger) != holding_address: holding_address = channels[0].get_address(self.ledger) - wallet = self.wallet_manager.get_wallet_or_default(wallet_id) account: LBCAccount = await self.ledger.get_account_for_address(wallet, holding_address) if account: # Case 1: channel holding address is in one of the accounts we already have @@ -3188,7 +3283,7 @@ class Daemon(metaclass=JSONRPCServerType): assert not wallet.is_locked, "Cannot spend funds with locked wallet, unlock first." funding_accounts = wallet.get_accounts_or_all(funding_account_ids) amount = self.get_dewies_or_error("amount", amount) - claim = await self.ledger.get_claim_by_claim_id(claim_id) + claim = await self.ledger.get_claim_by_claim_id(wallet.accounts, claim_id) claim_address = claim.get_address(self.ledger) if not tip: account = wallet.get_account_or_default(account_id) @@ -3997,7 +4092,7 @@ class Daemon(metaclass=JSONRPCServerType): self.conf.comment_server, 'get_comments_by_id', comment_ids=comment_ids ) claim_ids = {comment['claim_id'] for comment in comments} - claims = {cid: await self.ledger.get_claim_by_claim_id(claim_id=cid) for cid in claim_ids} + claims = {cid: await self.ledger.get_claim_by_claim_id(wallet.accounts, claim_id=cid) for cid in claim_ids} pieces = [] for comment in comments: claim = claims.get(comment['claim_id']) @@ -4015,13 +4110,7 @@ class Daemon(metaclass=JSONRPCServerType): return await comment_client.jsonrpc_post(self.conf.comment_server, 'hide_comments', pieces=pieces) async def broadcast_or_release(self, tx, blocking=False): - try: - await self.ledger.broadcast(tx) - if blocking: - await self.ledger.wait(tx) - except: - await self.ledger.release_tx(tx) - raise + await self.wallet_manager.broadcast_or_release(tx, blocking) def valid_address_or_error(self, address): try: @@ -4114,11 +4203,11 @@ class Daemon(metaclass=JSONRPCServerType): except ValueError as e: raise ValueError(f"Invalid value for '{argument}': {e.args[0]}") - async def resolve(self, urls): - results = await self.ledger.resolve(urls) + async def resolve(self, accounts, urls): + results = await self.ledger.resolve(accounts, urls) if results: try: - claims = self.stream_manager._convert_to_old_resolve_output(results) + claims = self.stream_manager._convert_to_old_resolve_output(self.wallet_manager, results) await self.storage.save_claims_for_resolve([ value for value in claims.values() if 'error' not in value ]) diff --git a/lbry/lbry/extras/daemon/__init__.py b/lbry/lbry/extras/daemon/__init__.py index e3c55f014..e69de29bb 100644 --- a/lbry/lbry/extras/daemon/__init__.py +++ b/lbry/lbry/extras/daemon/__init__.py @@ -1 +0,0 @@ -from lbry.extras.daemon import Components # register Component classes diff --git a/lbry/lbry/extras/daemon/exchange_rate_manager.py b/lbry/lbry/extras/daemon/exchange_rate_manager.py index 7a91cc232..1b328e4d8 100644 --- a/lbry/lbry/extras/daemon/exchange_rate_manager.py +++ b/lbry/lbry/extras/daemon/exchange_rate_manager.py @@ -7,6 +7,7 @@ from typing import Optional from aiohttp.client_exceptions import ClientError from lbry.error import InvalidExchangeRateResponse, CurrencyConversionError from lbry.utils import aiohttp_request +from lbry.wallet.dewies import lbc_to_dewies log = logging.getLogger(__name__) @@ -222,19 +223,23 @@ class ExchangeRateManager: rates = [market.rate for market in self.market_feeds] log.debug("Converting %f %s to %s, rates: %s" % (amount, from_currency, to_currency, rates)) if from_currency == to_currency: - return amount + return round(amount, 8) for market in self.market_feeds: if (market.rate_is_initialized() and market.is_online() and market.rate.currency_pair == (from_currency, to_currency)): - return amount * Decimal(market.rate.spot) + return round(amount * Decimal(market.rate.spot), 8) for market in self.market_feeds: if (market.rate_is_initialized() and market.is_online() and market.rate.currency_pair[0] == from_currency): - return self.convert_currency( - market.rate.currency_pair[1], to_currency, amount * Decimal(market.rate.spot)) + return round(self.convert_currency( + market.rate.currency_pair[1], to_currency, amount * Decimal(market.rate.spot)), 8) raise CurrencyConversionError( f'Unable to convert {amount} from {from_currency} to {to_currency}') + def to_dewies(self, currency, amount) -> int: + converted = self.convert_currency(currency, "LBC", amount) + return lbc_to_dewies(str(converted)) + def fee_dict(self): return {market: market.rate.as_dict() for market in self.market_feeds} diff --git a/lbry/lbry/extras/daemon/json_response_encoder.py b/lbry/lbry/extras/daemon/json_response_encoder.py index 28537524e..08a13c412 100644 --- a/lbry/lbry/extras/daemon/json_response_encoder.py +++ b/lbry/lbry/extras/daemon/json_response_encoder.py @@ -38,6 +38,7 @@ def encode_txo_doc(): 'permanent_url': "when type is 'claim' or 'support', this is the long permanent claim URL", 'signing_channel': "for signed claims only, metadata of signing channel", 'is_channel_signature_valid': "for signed claims only, whether signature is valid", + 'purchase_receipt': "metadata for the purchase transaction associated with this claim" } @@ -176,10 +177,13 @@ class JSONResponseEncoder(JSONEncoder): output['claim_op'] = 'update' elif txo.script.is_support_claim: output['type'] = 'support' - elif txo.is_purchase_data: + elif txo.script.is_return_data: + output['type'] = 'data' + elif txo.purchase is not None: output['type'] = 'purchase' - if txo.can_decode_purchase_data: - output['claim_id'] = txo.purchase_data.claim_id + output['claim_id'] = txo.purchased_claim_id + if txo.purchased_claim is not None: + output['claim'] = self.encode_output(txo.purchased_claim) else: output['type'] = 'payment' @@ -201,6 +205,8 @@ class JSONResponseEncoder(JSONEncoder): output['value_type'] = txo.claim.claim_type if self.include_protobuf: output['protobuf'] = hexlify(txo.claim.to_bytes()) + if txo.purchase_receipt is not None: + output['purchase_receipt'] = self.encode_output(txo.purchase_receipt) if txo.claim.is_channel: output['has_signing_key'] = txo.has_private_key if check_signature and txo.claim.is_signed: diff --git a/lbry/lbry/stream/stream_manager.py b/lbry/lbry/stream/stream_manager.py index b9a6e36bb..e81246b92 100644 --- a/lbry/lbry/stream/stream_manager.py +++ b/lbry/lbry/stream/stream_manager.py @@ -1,19 +1,19 @@ import os import asyncio -import typing import binascii import logging import random -from decimal import Decimal +import typing +from typing import Optional from aiohttp.web import Request -from lbry.error import ResolveError, InvalidStreamDescriptorError, KeyFeeAboveMaxAllowed, InsufficientFundsError +from lbry.error import ResolveError, InvalidStreamDescriptorError from lbry.error import ResolveTimeout, DownloadDataTimeout from lbry.utils import cache_concurrent from lbry.stream.descriptor import StreamDescriptor from lbry.stream.managed_stream import ManagedStream from lbry.schema.claim import Claim from lbry.schema.url import URL -from lbry.extras.daemon.storage import lbc_to_dewies, dewies_to_lbc +from lbry.wallet.dewies import dewies_to_lbc from lbry.wallet.transaction import Output if typing.TYPE_CHECKING: from lbry.conf import Config @@ -58,7 +58,7 @@ comparison_operators = { } -def path_or_none(p) -> typing.Optional[str]: +def path_or_none(p) -> Optional[str]: if not p: return return binascii.unhexlify(p).decode() @@ -66,8 +66,8 @@ def path_or_none(p) -> typing.Optional[str]: class StreamManager: def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager', - wallet_manager: 'LbryWalletManager', storage: 'SQLiteStorage', node: typing.Optional['Node'], - analytics_manager: typing.Optional['AnalyticsManager'] = None): + wallet_manager: 'LbryWalletManager', storage: 'SQLiteStorage', node: Optional['Node'], + analytics_manager: Optional['AnalyticsManager'] = None): self.loop = loop self.config = config self.blob_manager = blob_manager @@ -76,8 +76,8 @@ class StreamManager: self.node = node self.analytics_manager = analytics_manager self.streams: typing.Dict[str, ManagedStream] = {} - self.resume_saving_task: typing.Optional[asyncio.Task] = None - self.re_reflect_task: typing.Optional[asyncio.Task] = None + self.resume_saving_task: Optional[asyncio.Task] = None + self.re_reflect_task: Optional[asyncio.Task] = None self.update_stream_finished_futs: typing.List[asyncio.Future] = [] self.running_reflector_uploads: typing.List[asyncio.Task] = [] self.started = asyncio.Event(loop=self.loop) @@ -91,7 +91,7 @@ class StreamManager: async def recover_stream(sd_hash: str, stream_hash: str, stream_name: str, suggested_file_name: str, key: str, - content_fee: typing.Optional['Transaction']) -> typing.Optional[StreamDescriptor]: + content_fee: Optional['Transaction']) -> Optional[StreamDescriptor]: sd_blob = self.blob_manager.get_blob(sd_hash) blobs = await self.storage.get_blobs_for_stream(stream_hash) descriptor = await StreamDescriptor.recover( @@ -115,10 +115,10 @@ class StreamManager: # if self.blob_manager._save_blobs: # log.info("Recovered %i/%i attempted streams", len(to_restore), len(file_infos)) - async def add_stream(self, rowid: int, sd_hash: str, file_name: typing.Optional[str], - download_directory: typing.Optional[str], status: str, - claim: typing.Optional['StoredStreamClaim'], content_fee: typing.Optional['Transaction'], - added_on: typing.Optional[int]): + async def add_stream(self, rowid: int, sd_hash: str, file_name: Optional[str], + download_directory: Optional[str], status: str, + claim: Optional['StoredStreamClaim'], content_fee: Optional['Transaction'], + added_on: Optional[int]): try: descriptor = await self.blob_manager.get_stream_descriptor(sd_hash) except InvalidStreamDescriptorError as err: @@ -217,8 +217,8 @@ class StreamManager: self.started.clear() log.info("finished stopping the stream manager") - async def create_stream(self, file_path: str, key: typing.Optional[bytes] = None, - iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream: + async def create_stream(self, file_path: str, key: Optional[bytes] = None, + iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream: stream = await ManagedStream.create(self.loop, self.config, self.blob_manager, file_path, key, iv_generator) self.streams[stream.sd_hash] = stream self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) @@ -232,7 +232,7 @@ class StreamManager: ) return stream - async def delete_stream(self, stream: ManagedStream, delete_file: typing.Optional[bool] = False): + async def delete_stream(self, stream: ManagedStream, delete_file: Optional[bool] = False): stream.stop_tasks() if stream.sd_hash in self.streams: del self.streams[stream.sd_hash] @@ -242,13 +242,13 @@ class StreamManager: if delete_file and stream.output_file_exists: os.remove(stream.full_path) - def get_stream_by_stream_hash(self, stream_hash: str) -> typing.Optional[ManagedStream]: + def get_stream_by_stream_hash(self, stream_hash: str) -> Optional[ManagedStream]: streams = tuple(filter(lambda stream: stream.stream_hash == stream_hash, self.streams.values())) if streams: return streams[0] - def get_filtered_streams(self, sort_by: typing.Optional[str] = None, reverse: typing.Optional[bool] = False, - comparison: typing.Optional[str] = None, + def get_filtered_streams(self, sort_by: Optional[str] = None, reverse: Optional[bool] = False, + comparison: Optional[str] = None, **search_by) -> typing.List[ManagedStream]: """ Get a list of filtered and sorted ManagedStream objects @@ -284,7 +284,7 @@ class StreamManager: return streams async def _check_update_or_replace(self, outpoint: str, claim_id: str, claim: Claim) -> typing.Tuple[ - typing.Optional[ManagedStream], typing.Optional[ManagedStream]]: + Optional[ManagedStream], Optional[ManagedStream]]: existing = self.get_filtered_streams(outpoint=outpoint) if existing: return existing[0], None @@ -305,12 +305,13 @@ class StreamManager: return None, existing_for_claim_id[0] return None, None - def _convert_to_old_resolve_output(self, resolves): + @staticmethod + def _convert_to_old_resolve_output(wallet_manager, resolves): result = {} for url, txo in resolves.items(): if isinstance(txo, Output): tx_height = txo.tx_ref.height - best_height = self.wallet_manager.ledger.headers.height + best_height = wallet_manager.ledger.headers.height result[url] = { 'name': txo.claim_name, 'value': txo.claim, @@ -323,9 +324,9 @@ class StreamManager: 'height': tx_height, 'confirmations': (best_height+1) - tx_height if tx_height > 0 else tx_height, 'claim_sequence': -1, - 'address': txo.get_address(self.wallet_manager.ledger), + 'address': txo.get_address(wallet_manager.ledger), 'valid_at_height': txo.meta.get('activation_height', None), - 'timestamp': self.wallet_manager.ledger.headers[tx_height]['timestamp'], + 'timestamp': wallet_manager.ledger.headers[tx_height]['timestamp'], 'supports': [] } else: @@ -334,17 +335,19 @@ class StreamManager: @cache_concurrent async def download_stream_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager', - timeout: typing.Optional[float] = None, - file_name: typing.Optional[str] = None, - download_directory: typing.Optional[str] = None, - save_file: typing.Optional[bool] = None, + timeout: Optional[float] = None, + file_name: Optional[str] = None, + download_directory: Optional[str] = None, + save_file: Optional[bool] = None, resolve_timeout: float = 3.0, - wallet: typing.Optional['Wallet'] = None) -> ManagedStream: - wallet = wallet or self.wallet_manager.default_wallet + wallet: Optional['Wallet'] = None) -> ManagedStream: + manager = self.wallet_manager + wallet = wallet or manager.default_wallet timeout = timeout or self.config.download_timeout start_time = self.loop.time() resolved_time = None stream = None + txo: Optional[Output] = None error = None outpoint = None if save_file is None: @@ -356,19 +359,23 @@ class StreamManager: else: download_directory = None + payment = None try: # resolve the claim if not URL.parse(uri).has_stream: raise ResolveError("cannot download a channel claim, specify a /path") try: - resolved_result = self._convert_to_old_resolve_output( - await asyncio.wait_for(self.wallet_manager.ledger.resolve([uri]), resolve_timeout) + response = await asyncio.wait_for( + manager.ledger.resolve(wallet.accounts, [uri]), + resolve_timeout ) + resolved_result = self._convert_to_old_resolve_output(manager, response) except asyncio.TimeoutError: raise ResolveTimeout(uri) except Exception as err: if isinstance(err, asyncio.CancelledError): raise + log.exception("Unexpected error resolving stream:") raise ResolveError(f"Unexpected error resolving stream: {str(err)}") await self.storage.save_claims_for_resolve([ value for value in resolved_result.values() if 'error' not in value @@ -379,12 +386,13 @@ class StreamManager: raise ResolveError(f"Failed to resolve stream at '{uri}'") if 'error' in resolved: raise ResolveError(f"error resolving stream: {resolved['error']}") + txo = response[uri] claim = Claim.from_bytes(binascii.unhexlify(resolved['protobuf'])) outpoint = f"{resolved['txid']}:{resolved['nout']}" resolved_time = self.loop.time() - start_time - # resume or update an existing stream, if the stream changed download it and delete the old one after + # resume or update an existing stream, if the stream changed: download it and delete the old one after updated_stream, to_replace = await self._check_update_or_replace(outpoint, resolved['claim_id'], claim) if updated_stream: log.info("already have stream for %s", uri) @@ -397,31 +405,14 @@ class StreamManager: ) return updated_stream - content_fee = None - fee_amount, fee_address = None, None - - # check that the fee is payable - if not to_replace and claim.stream.has_fee and claim.stream.fee.amount: - fee_amount = round(exchange_rate_manager.convert_currency( - claim.stream.fee.currency, "LBC", claim.stream.fee.amount - ), 5) - max_fee_amount = round(exchange_rate_manager.convert_currency( - self.config.max_key_fee['currency'], "LBC", Decimal(self.config.max_key_fee['amount']) - ), 5) if self.config.max_key_fee else None - if max_fee_amount and fee_amount > max_fee_amount: - msg = f"fee of {fee_amount} exceeds max configured to allow of {max_fee_amount}" - log.warning(msg) - raise KeyFeeAboveMaxAllowed(msg) - balance = await self.wallet_manager.get_balance(wallet) - if lbc_to_dewies(str(fee_amount)) > balance: - msg = f"fee of {fee_amount} exceeds max available balance" - log.warning(msg) - raise InsufficientFundsError(msg) - fee_address = claim.stream.fee.address or resolved['address'] + if not to_replace and txo.has_price and not txo.purchase_receipt: + payment = await manager.create_purchase_transaction( + wallet.accounts, txo, exchange_rate_manager + ) stream = ManagedStream( self.loop, self.config, self.blob_manager, claim.stream.source.sd_hash, download_directory, - file_name, ManagedStream.STATUS_RUNNING, content_fee=content_fee, + file_name, ManagedStream.STATUS_RUNNING, content_fee=payment, analytics_manager=self.analytics_manager ) log.info("starting download for %s", uri) @@ -431,12 +422,11 @@ class StreamManager: stream.set_claim(resolved, claim) if to_replace: # delete old stream now that the replacement has started downloading await self.delete_stream(to_replace) - elif fee_address: - stream.content_fee = await self.wallet_manager.buy_claim( - stream.claim_id, lbc_to_dewies(str(fee_amount)), - fee_address.encode('latin1'), wallet.accounts - ) - log.info("paid fee of %s for %s", fee_amount, uri) + + if payment is not None: + await manager.broadcast_or_release(payment) + payment = None # to avoid releasing in `finally` later + log.info("paid fee of %s for %s", dewies_to_lbc(stream.content_fee.outputs[0].amount), uri) await self.storage.save_content_fee(stream.stream_hash, stream.content_fee) self.streams[stream.sd_hash] = stream @@ -450,9 +440,13 @@ class StreamManager: error = DownloadDataTimeout(stream.sd_hash) raise error except Exception as err: # forgive data timeout, don't delete stream + log.exception("Unexpected error downloading stream:") error = err raise finally: + if payment is not None: + # payment is set to None after broadcasting, if we're here an exception probably happened + await manager.ledger.release_tx(payment) if self.analytics_manager and (error or (stream and (stream.downloader.time_to_descriptor or stream.downloader.time_to_first_bytes))): server = self.wallet_manager.ledger.network.client.server diff --git a/lbry/lbry/testcase.py b/lbry/lbry/testcase.py index a8d1ba64c..847f815c0 100644 --- a/lbry/lbry/testcase.py +++ b/lbry/lbry/testcase.py @@ -20,13 +20,19 @@ from lbry.extras.daemon.Components import ( UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT ) from lbry.extras.daemon.ComponentManager import ComponentManager +from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager as BaseExchangeRateManager, ExchangeRate from lbry.extras.daemon.storage import SQLiteStorage from lbry.blob.blob_manager import BlobManager from lbry.stream.reflector.server import ReflectorServer from lbry.blob_exchange.server import BlobServer -class ExchangeRateManager: +class FakeExchangeRateManager(BaseExchangeRateManager): + + def __init__(self): + super().__init__() + for i, feed in enumerate(self.market_feeds): + feed._save_price(i+1) def start(self): pass @@ -34,22 +40,16 @@ class ExchangeRateManager: def stop(self): pass - def convert_currency(self, from_currency, to_currency, amount): - return amount - - def fee_dict(self): - return {} - class ExchangeRateManagerComponent(Component): component_name = EXCHANGE_RATE_MANAGER_COMPONENT def __init__(self, component_manager): super().__init__(component_manager) - self.exchange_rate_manager = ExchangeRateManager() + self.exchange_rate_manager = FakeExchangeRateManager() @property - def component(self) -> ExchangeRateManager: + def component(self) -> BaseExchangeRateManager: return self.exchange_rate_manager async def start(self): @@ -141,6 +141,7 @@ class CommandTestCase(IntegrationTestCase): DHT_COMPONENT, UPNP_COMPONENT, HASH_ANNOUNCER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT ] + wallet_node.manager.config = conf def wallet_maker(component_manager): wallet_component = WalletComponent(component_manager) diff --git a/lbry/lbry/wallet/constants.py b/lbry/lbry/wallet/constants.py index d4b51e41b..75413ef22 100644 --- a/lbry/lbry/wallet/constants.py +++ b/lbry/lbry/wallet/constants.py @@ -1,5 +1,6 @@ TXO_TYPES = { "stream": 1, "channel": 2, - "support": 3 + "support": 3, + "purchase": 4 } diff --git a/lbry/lbry/wallet/database.py b/lbry/lbry/wallet/database.py index d3dfab8ce..c78bcfa43 100644 --- a/lbry/lbry/wallet/database.py +++ b/lbry/lbry/wallet/database.py @@ -8,6 +8,20 @@ from lbry.wallet.constants import TXO_TYPES class WalletDatabase(BaseDatabase): + SCHEMA_VERSION = f"{BaseDatabase.SCHEMA_VERSION}+1" + + CREATE_TX_TABLE = """ + create table if not exists tx ( + txid text primary key, + raw blob not null, + height integer not null, + position integer not null, + is_verified boolean not null default 0, + purchased_claim_id text + ); + create index if not exists tx_purchased_claim_id_idx on tx (purchased_claim_id); + """ + CREATE_TXO_TABLE = """ create table if not exists txo ( txid text references tx, @@ -22,6 +36,7 @@ class WalletDatabase(BaseDatabase): claim_id text, claim_name text ); + create index if not exists txo_txid_idx on txo (txid); create index if not exists txo_address_idx on txo (address); create index if not exists txo_claim_id_idx on txo (claim_id); create index if not exists txo_txo_type_idx on txo (txo_type); @@ -31,11 +46,19 @@ class WalletDatabase(BaseDatabase): BaseDatabase.PRAGMAS + BaseDatabase.CREATE_ACCOUNT_TABLE + BaseDatabase.CREATE_PUBKEY_ADDRESS_TABLE + - BaseDatabase.CREATE_TX_TABLE + + CREATE_TX_TABLE + CREATE_TXO_TABLE + BaseDatabase.CREATE_TXI_TABLE ) + def tx_to_row(self, tx): + row = super().tx_to_row(tx) + txos = tx.outputs + if len(txos) >= 2 and txos[1].can_decode_purchase_data: + txos[0].purchase = txos[1] + row['purchased_claim_id'] = txos[1].purchase_data.claim_id + return row + def txo_to_row(self, tx, address, txo): row = super().txo_to_row(tx, address, txo) if txo.is_claim: @@ -45,11 +68,45 @@ class WalletDatabase(BaseDatabase): row['txo_type'] = TXO_TYPES['stream'] elif txo.is_support: row['txo_type'] = TXO_TYPES['support'] + elif txo.purchase is not None: + row['txo_type'] = TXO_TYPES['purchase'] + row['claim_id'] = txo.purchased_claim_id if txo.script.is_claim_involved: row['claim_id'] = txo.claim_id row['claim_name'] = txo.claim_name return row + async def get_transactions(self, **constraints): + txs = await super().get_transactions(**constraints) + for tx in txs: + txos = tx.outputs + if len(txos) >= 2 and txos[1].can_decode_purchase_data: + txos[0].purchase = txos[1] + return txs + + @staticmethod + def constrain_purchases(constraints): + accounts = constraints.pop('accounts', None) + assert accounts, "'accounts' argument required to find purchases" + if not {'purchased_claim_id', 'purchased_claim_id__in'}.intersection(constraints): + constraints['purchased_claim_id__is_not_null'] = True + constraints.update({ + f'$account{i}': a.public_key.address for i, a in enumerate(accounts) + }) + account_values = ', '.join([f':$account{i}' for i in range(len(accounts))]) + constraints['txid__in'] = f""" + SELECT txid FROM txi JOIN account_address USING (address) + WHERE account_address.account IN ({account_values}) + """ + + async def get_purchases(self, **constraints): + self.constrain_purchases(constraints) + return [tx.outputs[0] for tx in await self.get_transactions(**constraints)] + + def get_purchase_count(self, **constraints): + self.constrain_purchases(constraints) + return self.get_transaction_count(**constraints) + async def get_txos(self, wallet=None, no_tx=False, **constraints) -> List[Output]: txos = await super().get_txos(wallet=wallet, no_tx=no_tx, **constraints) diff --git a/lbry/lbry/wallet/ledger.py b/lbry/lbry/wallet/ledger.py index 5355735e8..d10af6e65 100644 --- a/lbry/lbry/wallet/ledger.py +++ b/lbry/lbry/wallet/ledger.py @@ -53,16 +53,34 @@ class MainNetLedger(BaseLedger): super().__init__(*args, **kwargs) self.fee_per_name_char = self.config.get('fee_per_name_char', self.default_fee_per_name_char) - async def _inflate_outputs(self, query): + async def _inflate_outputs(self, query, accounts): outputs = Outputs.from_base64(await query) txs = [] if len(outputs.txs) > 0: - txs = await asyncio.gather(*(self.cache_transaction(*tx) for tx in outputs.txs)) + txs: List[Transaction] = await asyncio.gather(*( + self.cache_transaction(*tx) for tx in outputs.txs + )) + if accounts: + priced_claims = [] + for tx in txs: + for txo in tx.outputs: + if txo.has_price: + priced_claims.append(txo) + if priced_claims: + receipts = { + txo.purchased_claim_id: txo for txo in + await self.db.get_purchases( + accounts=accounts, + purchased_claim_id__in=[c.claim_id for c in priced_claims] + ) + } + for txo in priced_claims: + txo.purchase_receipt = receipts.get(txo.claim_id) return outputs.inflate(txs), outputs.offset, outputs.total - async def resolve(self, urls): + async def resolve(self, accounts, urls): resolve = partial(self.network.retriable_call, self.network.resolve) - txos = (await self._inflate_outputs(resolve(urls)))[0] + txos = (await self._inflate_outputs(resolve(urls), accounts))[0] assert len(urls) == len(txos), "Mismatch between urls requested for resolve and responses received." result = {} for url, txo in zip(urls, txos): @@ -75,11 +93,11 @@ class MainNetLedger(BaseLedger): result[url] = {'error': f'{url} did not resolve to a claim'} return result - async def claim_search(self, **kwargs) -> Tuple[List[Output], int, int]: - return await self._inflate_outputs(self.network.claim_search(**kwargs)) + async def claim_search(self, accounts, **kwargs) -> Tuple[List[Output], int, int]: + return await self._inflate_outputs(self.network.claim_search(**kwargs), accounts) - async def get_claim_by_claim_id(self, claim_id) -> Output: - for claim in (await self.claim_search(claim_id=claim_id))[0]: + async def get_claim_by_claim_id(self, accounts, claim_id) -> Output: + for claim in (await self.claim_search(accounts, claim_id=claim_id))[0]: return claim async def start(self): @@ -122,6 +140,23 @@ class MainNetLedger(BaseLedger): self.constraint_spending_utxos(constraints) return super().get_utxo_count(**constraints) + async def get_purchases(self, resolve=False, **constraints): + purchases = await self.db.get_purchases(**constraints) + if resolve: + claim_ids = [p.purchased_claim_id for p in purchases] + try: + resolved, _, _ = await self.claim_search([], claim_ids=claim_ids) + except: + log.exception("Resolve failed while looking up purchased claim ids:") + resolved = [] + lookup = {claim.claim_id: claim for claim in resolved} + for purchase in purchases: + purchase.purchased_claim = lookup.get(purchase.purchased_claim_id) + return purchases + + def get_purchase_count(self, resolve=False, **constraints): + return self.db.get_purchase_count(**constraints) + def get_claims(self, **constraints): return self.db.get_claims(**constraints) @@ -147,7 +182,7 @@ class MainNetLedger(BaseLedger): return self.db.get_support_count(**constraints) async def get_transaction_history(self, **constraints): - txs = await self.db.get_transactions(**constraints) + txs: List[Transaction] = await self.db.get_transactions(**constraints) headers = self.headers history = [] for tx in txs: @@ -160,7 +195,8 @@ class MainNetLedger(BaseLedger): 'claim_info': [], 'update_info': [], 'support_info': [], - 'abandon_info': [] + 'abandon_info': [], + 'purchase_info': [] } is_my_inputs = all([txi.is_my_account for txi in tx.inputs]) if is_my_inputs: @@ -238,6 +274,14 @@ class MainNetLedger(BaseLedger): 'claim_name': txo.claim_name, 'nout': txo.position }) + for txo in tx.any_purchase_outputs: + item['purchase_info'].append({ + 'address': txo.get_address(self), + 'balance_delta': dewies_to_lbc(txo.amount), + 'amount': dewies_to_lbc(txo.amount), + 'claim_id': txo.purchased_claim_id, + 'nout': txo.position + }) history.append(item) return history diff --git a/lbry/lbry/wallet/manager.py b/lbry/lbry/wallet/manager.py index 8c3cd31b6..a076e4ca5 100644 --- a/lbry/lbry/wallet/manager.py +++ b/lbry/lbry/wallet/manager.py @@ -2,15 +2,20 @@ import os import json import logging from binascii import unhexlify - +from typing import Optional, List +from decimal import Decimal from torba.client.basemanager import BaseWalletManager from torba.client.wallet import ENCRYPT_ON_DISK from torba.rpc.jsonrpc import CodeMessageError +from lbry.error import KeyFeeAboveMaxAllowed +from lbry.wallet.dewies import dewies_to_lbc +from lbry.wallet.account import Account from lbry.wallet.ledger import MainNetLedger -from lbry.wallet.transaction import Transaction +from lbry.wallet.transaction import Transaction, Output from lbry.wallet.database import WalletDatabase +from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager from lbry.conf import Config @@ -19,6 +24,10 @@ log = logging.getLogger(__name__) class LbryWalletManager(BaseWalletManager): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.config: Optional[Config] = None + @property def ledger(self) -> MainNetLedger: return self.default_account.ledger @@ -87,21 +96,21 @@ class LbryWalletManager(BaseWalletManager): return receiving_addresses, change_addresses @classmethod - async def from_lbrynet_config(cls, settings: Config): + async def from_lbrynet_config(cls, config: Config): ledger_id = { 'lbrycrd_main': 'lbc_mainnet', 'lbrycrd_testnet': 'lbc_testnet', 'lbrycrd_regtest': 'lbc_regtest' - }[settings.blockchain_name] + }[config.blockchain_name] ledger_config = { 'auto_connect': True, - 'default_servers': settings.lbryum_servers, - 'data_path': settings.wallet_dir, + 'default_servers': config.lbryum_servers, + 'data_path': config.wallet_dir, } - wallets_directory = os.path.join(settings.wallet_dir, 'wallets') + wallets_directory = os.path.join(config.wallet_dir, 'wallets') if not os.path.exists(wallets_directory): os.mkdir(wallets_directory) @@ -112,11 +121,12 @@ class LbryWalletManager(BaseWalletManager): manager = cls.from_config({ 'ledgers': {ledger_id: ledger_config}, 'wallets': [ - os.path.join(wallets_directory, wallet_file) for wallet_file in settings.wallets + os.path.join(wallets_directory, wallet_file) for wallet_file in config.wallets ] }) + manager.config = config ledger = manager.get_or_create_ledger(ledger_id) - ledger.coin_selection_strategy = settings.coin_selection_strategy + ledger.coin_selection_strategy = config.coin_selection_strategy default_wallet = manager.default_wallet if default_wallet.default_account is None: log.info('Wallet at %s is empty, generating a default account.', default_wallet.id) @@ -161,13 +171,6 @@ class LbryWalletManager(BaseWalletManager): def get_unused_address(self): return self.default_account.receiving.get_or_create_usable_address() - async def buy_claim(self, claim_id: str, amount: int, destination_address: bytes, accounts): - tx = await Transaction.purchase( - claim_id, amount, destination_address, accounts, accounts[0] - ) - await self.ledger.broadcast(tx) - return tx - async def get_transaction(self, txid): tx = await self.db.get_transaction(txid=txid) if not tx: @@ -181,3 +184,35 @@ class LbryWalletManager(BaseWalletManager): tx = self.ledger.transaction_class(unhexlify(raw)) await self.ledger.maybe_verify_transaction(tx, height) return tx + + async def create_purchase_transaction( + self, accounts: List[Account], txo: Output, exchange: ExchangeRateManager, override_max_key_fee=False): + fee = txo.claim.stream.fee + fee_amount = exchange.to_dewies(fee.currency, fee.amount) + if not override_max_key_fee and self.config.max_key_fee: + max_fee = self.config.max_key_fee + max_fee_amount = exchange.to_dewies(max_fee['currency'], Decimal(max_fee['amount'])) + if max_fee_amount and fee_amount > max_fee_amount: + error_fee = f"{dewies_to_lbc(fee_amount)} LBC" + if fee.currency != 'LBC': + error_fee += f" ({fee.amount} {fee.currency})" + error_max_fee = f"{dewies_to_lbc(max_fee_amount)} LBC" + if max_fee['currency'] != 'LBC': + error_max_fee += f" ({max_fee['amount']} {max_fee['currency']})" + raise KeyFeeAboveMaxAllowed( + f"Purchase price of {error_fee} exceeds maximum " + f"configured price of {error_max_fee}." + ) + fee_address = fee.address or txo.get_address(self.ledger) + return await Transaction.purchase( + txo.claim_id, fee_amount, fee_address, accounts, accounts[0] + ) + + async def broadcast_or_release(self, tx, blocking=False): + try: + await self.ledger.broadcast(tx) + if blocking: + await self.ledger.wait(tx) + except: + await self.ledger.release_tx(tx) + raise diff --git a/lbry/lbry/wallet/server/db/reader.py b/lbry/lbry/wallet/server/db/reader.py index c24697aab..6b086b377 100644 --- a/lbry/lbry/wallet/server/db/reader.py +++ b/lbry/lbry/wallet/server/db/reader.py @@ -45,7 +45,7 @@ INTEGER_PARAMS = { } SEARCH_PARAMS = { - 'name', 'claim_id', 'txid', 'nout', 'channel', 'channel_ids', 'not_channel_ids', + 'name', 'claim_id', 'claim_ids', 'txid', 'nout', 'channel', 'channel_ids', 'not_channel_ids', 'public_key_id', 'claim_type', 'stream_types', 'media_types', 'fee_currency', 'has_channel_signature', 'signature_valid', 'any_tags', 'all_tags', 'not_tags', @@ -226,6 +226,8 @@ def _get_claims(cols, for_count=False, **constraints) -> Tuple[str, Dict]: constraints['claim.claim_id'] = claim_id else: constraints['claim.claim_id__like'] = f'{claim_id[:40]}%' + elif 'claim_ids' in constraints: + constraints['claim.claim_id__in'] = constraints.pop('claim_ids') if 'name' in constraints: constraints['claim.normalized'] = normalize_name(constraints.pop('name')) diff --git a/lbry/lbry/wallet/transaction.py b/lbry/lbry/wallet/transaction.py index 4b55a4fc8..fba9d3506 100644 --- a/lbry/lbry/wallet/transaction.py +++ b/lbry/lbry/wallet/transaction.py @@ -29,13 +29,19 @@ class Output(BaseOutput): script: OutputScript script_class = OutputScript - __slots__ = 'channel', 'private_key', 'meta' + __slots__ = ( + 'channel', 'private_key', 'meta', + 'purchase', 'purchased_claim', 'purchase_receipt', + ) def __init__(self, *args, channel: Optional['Output'] = None, private_key: Optional[str] = None, **kwargs) -> None: super().__init__(*args, **kwargs) self.channel = channel self.private_key = private_key + self.purchase: 'Output' = None # txo containing purchase metadata + self.purchased_claim: 'Output' = None # resolved claim pointed to by purchase + self.purchase_receipt: 'Output' = None # txo representing purchase receipt for this claim self.meta = {} def update_annotations(self, annotated): @@ -215,6 +221,26 @@ class Output(BaseOutput): except: return False + @property + def purchased_claim_id(self): + if self.purchase is not None: + return self.purchase.purchase_data.claim_id + if self.purchased_claim is not None: + return self.purchased_claim.claim_id + + @property + def has_price(self): + if self.can_decode_claim: + claim = self.claim + if claim.is_stream: + stream = claim.stream + return stream.has_fee and stream.fee.amount and stream.fee.amount > 0 + return False + + @property + def price(self): + return self.claim.stream.fee + class Transaction(BaseTransaction): @@ -292,6 +318,11 @@ class Transaction(BaseTransaction): if not txo.is_my_account and f(txo.script): yield txo + def _filter_any_outputs(self, f): + for txo in self.outputs: + if f(txo): + yield txo + @property def my_claim_outputs(self): return self._filter_my_outputs(lambda s: s.is_claim_name) @@ -304,6 +335,10 @@ class Transaction(BaseTransaction): def my_support_outputs(self): return self._filter_my_outputs(lambda s: s.is_support_claim) + @property + def any_purchase_outputs(self): + return self._filter_any_outputs(lambda o: o.purchase is not None) + @property def other_support_outputs(self): return self._filter_other_outputs(lambda s: s.is_support_claim) diff --git a/lbry/scripts/release.py b/lbry/scripts/release.py index 8854239d1..3735dbab6 100644 --- a/lbry/scripts/release.py +++ b/lbry/scripts/release.py @@ -135,6 +135,7 @@ def release(args): incompats = [] release_texts = [] unlabeled = [] + fixups = [] areas = {} for pr in gh.search_issues(f"merged:>={previous_release._json_data['created_at']} repo:lbryio/lbry-sdk"): area_labels = list(get_labels(pr, 'area')) @@ -145,7 +146,9 @@ def release(args): incompats.append(f' * [{area_name}] {incompat.strip()} ({pr.html_url})') for release_text in get_release_text(pr.body or ""): release_texts.append(f'{release_text.strip()} ({pr.html_url})') - if type_label != 'fixup': + if type_label == 'fixup': + fixups.append(f' * {pr.title} ({pr.html_url}) by {pr.user["login"]}') + else: area = areas.setdefault(area_name, []) area.append(f' * [{type_label}] {pr.title} ({pr.html_url}) by {pr.user["login"]}') else: @@ -183,6 +186,11 @@ def release(args): for skipped in unlabeled: print(skipped) + if fixups: + print('The following PRs were marked as fixups and not included in changelog:') + for skipped in fixups: + print(skipped) + if args.confirm: commit = version_file.update( diff --git a/lbry/tests/integration/test_file_commands.py b/lbry/tests/integration/test_file_commands.py index 769c2b8ed..1c5ade6ba 100644 --- a/lbry/tests/integration/test_file_commands.py +++ b/lbry/tests/integration/test_file_commands.py @@ -149,11 +149,11 @@ class FileCommands(CommandTestCase): f.flush() second_path = await self.daemon.jsonrpc_get('lbry://foo', save_file=True) await self.wait_files_to_complete() - self.assertNotEquals(first_path, second_path) + self.assertNotEqual(first_path, second_path) async def test_file_list_updated_metadata_on_resolve(self): await self.stream_create('foo', '0.01') - txo = (await self.daemon.resolve(['lbry://foo']))['lbry://foo'] + txo = (await self.daemon.resolve(self.wallet.accounts, ['lbry://foo']))['lbry://foo'] claim = txo.claim await self.daemon.jsonrpc_file_delete(claim_name='foo') txid = await self.blockchain_claim_name('bar', hexlify(claim.to_bytes()).decode(), '0.01') @@ -325,7 +325,7 @@ class FileCommands(CommandTestCase): ) await self.daemon.jsonrpc_file_delete(claim_name='expensive') response = await self.out(self.daemon.jsonrpc_get('lbry://expensive')) - self.assertEqual(response['error'], 'fee of 11.00000 exceeds max available balance') + self.assertEqual(response['error'], 'Not enough funds to cover this transaction.') self.assertEqual(len(self.file_list()), 0) # FAIL: beyond maximum key fee @@ -336,7 +336,9 @@ class FileCommands(CommandTestCase): await self.daemon.jsonrpc_file_delete(claim_name='maxkey') response = await self.out(self.daemon.jsonrpc_get('lbry://maxkey')) self.assertEqual(len(self.file_list()), 0) - self.assertEqual(response['error'], 'fee of 111.00000 exceeds max configured to allow of 50.00000') + self.assertEqual( + response['error'], 'Purchase price of 111.0 LBC exceeds maximum configured price of 100.0 LBC (50.0 USD).' + ) # PASS: purchase is successful await self.stream_create( diff --git a/lbry/tests/integration/test_purchase_command.py b/lbry/tests/integration/test_purchase_command.py index 2016382f9..a2cdc0b7e 100644 --- a/lbry/tests/integration/test_purchase_command.py +++ b/lbry/tests/integration/test_purchase_command.py @@ -1,35 +1,142 @@ +from typing import Optional from lbry.testcase import CommandTestCase from lbry.schema.purchase import Purchase +from lbry.wallet.transaction import Transaction +from lbry.wallet.dewies import lbc_to_dewies, dewies_to_lbc -class PurchaseCommand(CommandTestCase): +class PurchaseCommandTests(CommandTestCase): - async def test_purchase_via_get(self): - starting_balance = await self.blockchain.get_balance() - target_address = await self.blockchain.get_raw_change_address() - stream = await self.stream_create( - 'stream', '0.01', data=b'high value content', - fee_currency='LBC', fee_amount='1.0', fee_address=target_address + async def asyncSetUp(self): + await super().asyncSetUp() + self.merchant_address = await self.blockchain.get_raw_change_address() + + async def priced_stream(self, name='stream', price: Optional[str] = '2.0', currency='LBC') -> Transaction: + kwargs = {} + if price and currency: + kwargs = { + 'fee_amount': price, + 'fee_currency': currency, + 'fee_address': self.merchant_address + } + file_path = self.create_upload_file(data=b'high value content') + tx = await self.daemon.jsonrpc_stream_create( + name, '0.01', file_path=file_path, **kwargs ) - await self.daemon.jsonrpc_file_delete(claim_name='stream') - - await self.assertBalance(self.account, '9.977893') - response = await self.daemon.jsonrpc_get('lbry://stream') - tx = response.content_fee await self.ledger.wait(tx) - await self.assertBalance(self.account, '8.977752') - - self.assertEqual(len(tx.outputs), 3) - txo = tx.outputs[1] - self.assertTrue(txo.is_purchase_data) - self.assertTrue(txo.can_decode_purchase_data) - self.assertIsInstance(txo.purchase_data, Purchase) - self.assertEqual(txo.purchase_data.claim_id, self.get_claim_id(stream)) - await self.generate(1) + await self.ledger.wait(tx) + await self.daemon.jsonrpc_file_delete(claim_name=name) + return tx + + async def create_purchase(self, name, price): + stream = await self.priced_stream(name, price) + claim_id = stream.outputs[0].claim_id + purchase = await self.daemon.jsonrpc_purchase_create(claim_id) + await self.ledger.wait(purchase) + return claim_id + + async def assertStreamPurchased(self, stream: Transaction, purchase: Transaction): + stream_txo, purchase_txo = stream.outputs[0], purchase.outputs[0] + stream_fee = stream_txo.claim.stream.fee + self.assertEqual(stream_fee.dewies, purchase_txo.amount) + self.assertEqual(stream_fee.address, purchase_txo.get_address(self.ledger)) + + await self.account.release_all_outputs() + buyer_balance = await self.account.get_balance() + merchant_balance = lbc_to_dewies(str(await self.blockchain.get_balance())) + pre_purchase_count = (await self.daemon.jsonrpc_purchase_list())['total_items'] + + await self.ledger.wait(purchase) + await self.generate(1) + merchant_balance += lbc_to_dewies('1.0') # block reward + await self.ledger.wait(purchase) + self.assertEqual( - await self.blockchain.get_balance(), - starting_balance + - 2.0 + # block rewards - 1.0 # content payment - ) + await self.account.get_balance(), buyer_balance - (purchase.input_sum-purchase.outputs[2].amount)) + self.assertEqual( + str(await self.blockchain.get_balance()), dewies_to_lbc(merchant_balance + purchase_txo.amount)) + + purchases = await self.daemon.jsonrpc_purchase_list() + self.assertEqual(purchases['total_items'], pre_purchase_count+1) + + tx = purchases['items'][0].tx_ref.tx + self.assertEqual(len(tx.outputs), 3) # purchase txo, purchase data, change + + txo0 = tx.outputs[0] + txo1 = tx.outputs[1] + self.assertEqual(txo0.purchase, txo1) # purchase txo has reference to purchase data + self.assertTrue(txo1.is_purchase_data) + self.assertTrue(txo1.can_decode_purchase_data) + self.assertIsInstance(txo1.purchase_data, Purchase) + self.assertEqual(txo1.purchase_data.claim_id, stream_txo.claim_id) + + async def test_purchasing(self): + stream = await self.priced_stream() + claim_id = stream.outputs[0].claim_id + + # explicit purchase of claim + tx = await self.daemon.jsonrpc_purchase_create(claim_id) + await self.assertStreamPurchased(stream, tx) + + # check that `get` doesn't purchase it again + balance = await self.account.get_balance() + response = await self.daemon.jsonrpc_get('lbry://stream') + self.assertIsNone(response.content_fee) + self.assertEqual(await self.account.get_balance(), balance) + self.assertItemCount(await self.daemon.jsonrpc_purchase_list(), 1) + + # `get` does purchase a stream we don't have yet + another_stream = await self.priced_stream('another') + response = await self.daemon.jsonrpc_get('lbry://another') + await self.assertStreamPurchased(another_stream, response.content_fee) + + # purchase non-existent claim fails + with self.assertRaisesRegex(Exception, "Could not find claim with claim_id"): + await self.daemon.jsonrpc_purchase_create('abc123') + + # purchase stream with no price fails + no_price_stream = await self.priced_stream('no_price_stream', price=None) + with self.assertRaisesRegex(Exception, "does not have a purchase price"): + await self.daemon.jsonrpc_purchase_create(no_price_stream.outputs[0].claim_id) + + # purchase claim you already own fails + with self.assertRaisesRegex(Exception, "You already have a purchase for claim_id"): + await self.daemon.jsonrpc_purchase_create(claim_id) + + # force purchasing claim you already own + tx = await self.daemon.jsonrpc_purchase_create(claim_id, allow_duplicate_purchase=True) + await self.assertStreamPurchased(stream, tx) + + async def test_purchase_and_transaction_list(self): + self.assertItemCount(await self.daemon.jsonrpc_purchase_list(), 0) + self.assertItemCount(await self.daemon.jsonrpc_transaction_list(), 1) + + claim_id1 = await self.create_purchase('a', '1.0') + claim_id2 = await self.create_purchase('b', '1.0') + + result = await self.out(self.daemon.jsonrpc_purchase_list()) + self.assertItemCount(await self.daemon.jsonrpc_transaction_list(), 5) + self.assertItemCount(result, 2) + self.assertEqual(result['items'][0]['type'], 'purchase') + self.assertEqual(result['items'][0]['claim_id'], claim_id2) + self.assertNotIn('claim', result['items'][0]) + self.assertEqual(result['items'][1]['type'], 'purchase') + self.assertEqual(result['items'][1]['claim_id'], claim_id1) + self.assertNotIn('claim', result['items'][1]) + + result = await self.out(self.daemon.jsonrpc_purchase_list(resolve=True)) + self.assertEqual(result['items'][0]['claim']['name'], 'b') + self.assertEqual(result['items'][1]['claim']['name'], 'a') + + result = await self.daemon.jsonrpc_transaction_list() + self.assertEqual(result['items'][0]['purchase_info'][0]['claim_id'], claim_id2) + self.assertEqual(result['items'][2]['purchase_info'][0]['claim_id'], claim_id1) + + result = await self.claim_search() + self.assertEqual(result[0]['claim_id'], result[0]['purchase_receipt']['claim_id']) + self.assertEqual(result[1]['claim_id'], result[1]['purchase_receipt']['claim_id']) + + url = result[0]['canonical_url'] + resolve = await self.resolve(url) + self.assertEqual(result[0]['claim_id'], resolve[url]['purchase_receipt']['claim_id']) diff --git a/lbry/tests/unit/stream/test_stream_manager.py b/lbry/tests/unit/stream/test_stream_manager.py index 3f03f3f1e..af8e6588e 100644 --- a/lbry/tests/unit/stream/test_stream_manager.py +++ b/lbry/tests/unit/stream/test_stream_manager.py @@ -9,8 +9,14 @@ from decimal import Decimal from tests.unit.blob_exchange.test_transfer_blob import BlobExchangeTestBase from tests.unit.lbrynet_daemon.test_ExchangeRateManager import get_dummy_exchange_rate_manager from lbry.utils import generate_id -from lbry.error import InsufficientFundsError, KeyFeeAboveMaxAllowed, ResolveError, DownloadSDTimeout, \ - DownloadDataTimeout +from torba.client.errors import InsufficientFundsError +from lbry.error import KeyFeeAboveMaxAllowed, ResolveError, DownloadSDTimeout, DownloadDataTimeout +from torba.client.wallet import Wallet +from torba.client.constants import CENT, NULL_HASH32 +from torba.client.basenetwork import ClientSession +from lbry.conf import Config +from lbry.wallet.ledger import MainNetLedger +from lbry.wallet.transaction import Transaction, Input, Output from lbry.wallet.manager import LbryWalletManager from lbry.extras.daemon.analytics import AnalyticsManager from lbry.stream.stream_manager import StreamManager @@ -39,52 +45,84 @@ def get_mock_node(peer=None): return mock_node -def get_mock_wallet(sd_hash, storage, balance=10.0, fee=None): - claim = { - "address": "bYFeMtSL7ARuG1iMpjFyrnTe4oJHSAVNXF", - "amount": "0.1", - "claim_id": "c49566d631226492317d06ad7fdbe1ed32925124", - "claim_sequence": 1, - "decoded_claim": True, - "confirmations": 1057, - "effective_amount": "0.1", - "has_signature": False, - "height": 514081, - "hex": "", - "name": "33rpm", - "nout": 0, - "permanent_url": "33rpm#c49566d631226492317d06ad7fdbe1ed32925124", - "supports": [], - "txid": "81ac52662af926fdf639d56920069e0f63449d4cde074c61717cb99ddde40e3c", - } - claim_obj = Claim() +def get_output(amount=CENT, pubkey_hash=NULL_HASH32): + return Transaction() \ + .add_outputs([Output.pay_pubkey_hash(amount, pubkey_hash)]) \ + .outputs[0] + + +def get_input(): + return Input.spend(get_output()) + + +def get_transaction(txo=None): + return Transaction() \ + .add_inputs([get_input()]) \ + .add_outputs([txo or Output.pay_pubkey_hash(CENT, NULL_HASH32)]) + + +def get_claim_transaction(claim_name, claim=b''): + return get_transaction( + Output.pay_claim_name_pubkey_hash(CENT, claim_name, claim, NULL_HASH32) + ) + + +async def get_mock_wallet(sd_hash, storage, balance=10.0, fee=None): + claim = Claim() if fee: if fee['currency'] == 'LBC': - claim_obj.stream.fee.lbc = Decimal(fee['amount']) + claim.stream.fee.lbc = Decimal(fee['amount']) elif fee['currency'] == 'USD': - claim_obj.stream.fee.usd = Decimal(fee['amount']) - claim_obj.stream.title = "33rpm" - claim_obj.stream.languages.append("en") - claim_obj.stream.source.sd_hash = sd_hash - claim_obj.stream.source.media_type = "image/png" - claim['value'] = claim_obj - claim['protobuf'] = binascii.hexlify(claim_obj.to_bytes()).decode() + claim.stream.fee.usd = Decimal(fee['amount']) + claim.stream.title = "33rpm" + claim.stream.languages.append("en") + claim.stream.source.sd_hash = sd_hash + claim.stream.source.media_type = "image/png" + + tx = get_claim_transaction("33rpm", claim.to_bytes()) + tx.height = 514081 + txo = tx.outputs[0] + txo.meta.update({ + "permanent_url": "33rpm#c49566d631226492317d06ad7fdbe1ed32925124", + + }) + + class FakeHeaders: + def __init__(self, height): + self.height = height + + def __getitem__(self, item): + return {'timestamp': 1984} + + wallet = Wallet() + ledger = MainNetLedger({ + 'db': MainNetLedger.database_class(':memory:'), + 'headers': FakeHeaders(514082) + }) + await ledger.db.open() + wallet.generate_account(ledger) + manager = LbryWalletManager() + manager.config = Config() + manager.wallets.append(wallet) + manager.ledgers[MainNetLedger] = ledger + manager.ledger.network.client = ClientSession( + network=manager.ledger.network, server=('fakespv.lbry.com', 50001) + ) async def mock_resolve(*args): - await storage.save_claims([claim]) - return { - claim['permanent_url']: claim - } - - mock_wallet = mock.Mock(spec=LbryWalletManager) - mock_wallet.ledger.resolve = mock_resolve - mock_wallet.ledger.network.client.server = ('fakespv.lbry.com', 50001) + result = {txo.meta['permanent_url']: txo} + claims = [ + StreamManager._convert_to_old_resolve_output(manager, result)[txo.meta['permanent_url']] + ] + await storage.save_claims(claims) + return result + manager.ledger.resolve = mock_resolve async def get_balance(*_): return balance + manager.get_balance = get_balance - mock_wallet.get_balance = get_balance - return mock_wallet, claim['permanent_url'] + return manager, txo.meta['permanent_url'] class TestStreamManager(BlobExchangeTestBase): @@ -96,7 +134,7 @@ class TestStreamManager(BlobExchangeTestBase): self.loop, self.server_blob_manager.blob_dir, file_path, old_sort=old_sort ) self.sd_hash = descriptor.sd_hash - self.mock_wallet, self.uri = get_mock_wallet(self.sd_hash, self.client_storage, balance, fee) + self.mock_wallet, self.uri = await get_mock_wallet(self.sd_hash, self.client_storage, balance, fee) self.stream_manager = StreamManager(self.loop, self.client_config, self.client_blob_manager, self.mock_wallet, self.client_storage, get_mock_node(self.server_from_client), AnalyticsManager(self.client_config, @@ -226,7 +264,7 @@ class TestStreamManager(BlobExchangeTestBase): start = self.loop.time() await self._test_time_to_first_bytes(check_post, DownloadSDTimeout) duration = self.loop.time() - start - self.assertLessEqual(duration, 4.7) + self.assertLessEqual(duration, 5) self.assertGreaterEqual(duration, 3.0) async def test_download_stop_resume_delete(self): diff --git a/torba/torba/client/basedatabase.py b/torba/torba/client/basedatabase.py index fadbc6f9b..9a6570806 100644 --- a/torba/torba/client/basedatabase.py +++ b/torba/torba/client/basedatabase.py @@ -387,7 +387,7 @@ class BaseDatabase(SQLiteMixin): conn.execute(*self._insert_sql('tx', self.tx_to_row(tx), replace=True)) for txo in tx.outputs: - if txo.script.is_pay_pubkey_hash and txo.script.values['pubkey_hash'] == txhash: + if txo.script.is_pay_pubkey_hash and txo.pubkey_hash == txhash: conn.execute(*self._insert_sql( "txo", self.txo_to_row(tx, address, txo), ignore_duplicate=True )).fetchall() @@ -433,7 +433,7 @@ class BaseDatabase(SQLiteMixin): return True async def select_transactions(self, cols, accounts=None, **constraints): - if not set(constraints) & {'txid', 'txid__in'}: + if not {'txid', 'txid__in'}.intersection(constraints): assert accounts, "'accounts' argument required when no 'txid' constraint is present" constraints.update({ f'$account{i}': a.public_key.address for i, a in enumerate(accounts) @@ -517,10 +517,10 @@ class BaseDatabase(SQLiteMixin): return txs[0] async def select_txos(self, cols, **constraints): - sql = "SELECT {} FROM txo JOIN tx USING (txid)" + sql = f"SELECT {cols} FROM txo JOIN tx USING (txid)" if 'accounts' in constraints: sql += " JOIN account_address USING (address)" - return await self.db.execute_fetchall(*query(sql.format(cols), **constraints)) + return await self.db.execute_fetchall(*query(sql, **constraints)) async def get_txos(self, wallet=None, no_tx=False, **constraints): my_accounts = {a.public_key.address for a in wallet.accounts} if wallet else set()