added purchase_list and purchase_create

This commit is contained in:
Lex Berezhny 2019-10-29 01:26:25 -04:00
parent 28457021f8
commit fdd2562f32
17 changed files with 629 additions and 206 deletions

View file

@ -837,15 +837,16 @@ class Daemon(metaclass=JSONRPCServerType):
return platform_info
@requires(WALLET_COMPONENT)
async def jsonrpc_resolve(self, urls: typing.Union[str, list]):
async def jsonrpc_resolve(self, urls: typing.Union[str, list], wallet_id=None):
"""
Get the claim that a URL refers to.
Usage:
resolve <urls>...
resolve <urls>... [--wallet_id=<wallet_id>]
Options:
--urls=<urls> : (str, list) one or more urls to resolve
--wallet_id=<wallet_id> : (str) wallet to check for claim purchase reciepts
Returns:
Dictionary of results, keyed by url
@ -903,6 +904,7 @@ class Daemon(metaclass=JSONRPCServerType):
}
}
"""
wallet = self.wallet_manager.get_wallet_or_default(wallet_id)
if isinstance(urls, str):
urls = [urls]
@ -917,7 +919,7 @@ class Daemon(metaclass=JSONRPCServerType):
except ValueError:
results[u] = {"error": f"{u} is not a valid url"}
resolved = await self.resolve(list(valid_urls))
resolved = await self.resolve(wallet.accounts, list(valid_urls))
for resolved_uri in resolved:
results[resolved_uri] = resolved[resolved_uri] if resolved[resolved_uri] is not None else \
@ -944,7 +946,7 @@ class Daemon(metaclass=JSONRPCServerType):
--download_directory=<download_directory> : (str) full path to the directory to download into
--timeout=<timeout> : (int) download timeout in number of seconds
--save_file=<save_file> : (bool) save the file to the downloads directory
--wallet_id=<wallet_id> : (str) restrict operation to specific wallet
--wallet_id=<wallet_id> : (str) wallet to check for claim purchase reciepts
Returns: {File}
"""
@ -1947,6 +1949,91 @@ class Daemon(metaclass=JSONRPCServerType):
await stream.save_file(file_name, download_directory)
return stream
PURCHASE_DOC = """
List and make purchases of claims.
"""
@requires(WALLET_COMPONENT)
def jsonrpc_purchase_list(
self, claim_id=None, resolve=False, account_id=None, wallet_id=None, page=None, page_size=None):
"""
List my claim purchases.
Usage:
purchase_list [<claim_id> | --claim_id=<claim_id>] [--resolve]
[--account_id=<account_id>] [--wallet_id=<wallet_id>]
[--page=<page>] [--page_size=<page_size>]
Options:
--claim_id=<claim_id> : (str) purchases for specific claim
--resolve : (str) include resolved claim information
--account_id=<account_id> : (str) id of the account to query
--wallet_id=<wallet_id> : (str) restrict results to specific wallet
--page=<page> : (int) page to return during paginating
--page_size=<page_size> : (int) number of items on page during pagination
Returns: {Paginated[Output]}
"""
wallet = self.wallet_manager.get_wallet_or_default(wallet_id)
constraints = {
"wallet": wallet,
"accounts": [wallet.get_account_or_error(account_id)] if account_id else wallet.accounts,
"resolve": resolve,
}
if claim_id:
constraints["purchased_claim_id"] = claim_id
return paginate_rows(
self.ledger.get_purchases,
self.ledger.get_purchase_count,
page, page_size, **constraints
)
@requires(WALLET_COMPONENT)
async def jsonrpc_purchase_create(
self, claim_id, wallet_id=None, funding_account_ids=None,
allow_duplicate_purchase=False, override_max_key_fee=False, preview=False, blocking=False):
"""
Purchase a claim.
Usage:
purchase_create (<claim_id> | --claim_id=<claim_id>) [--wallet_id=<wallet_id>]
[--funding_account_ids=<funding_account_ids>...]
[--allow_duplicate_purchase] [--override_max_key_fee] [--preview] [--blocking]
Options:
--claim_id=<claim_id> : (str) id of claim to purchase
--wallet_id=<wallet_id> : (str) restrict operation to specific wallet
--funding_account_ids=<funding_account_ids>: (list) ids of accounts to fund this transaction
--allow_duplicate_purchase : (bool) allow purchasing claim_id you already own
--override_max_key_fee : (bool) ignore max key fee for this purchase
--preview : (bool) do not broadcast the transaction
--blocking : (bool) wait until transaction is in mempool
Returns: {Transaction}
"""
wallet = self.wallet_manager.get_wallet_or_default(wallet_id)
assert not wallet.is_locked, "Cannot spend funds with locked wallet, unlock first."
accounts = wallet.get_accounts_or_all(funding_account_ids)
txo = await self.ledger.get_claim_by_claim_id(accounts, claim_id)
if not txo or not txo.is_claim:
raise Exception(f"Could not find claim with claim_id '{claim_id}'. ")
if not allow_duplicate_purchase and txo.purchase_receipt:
raise Exception(
f"You already have a purchase for claim_id '{claim_id}'. "
f"Use --allow-duplicate-purchase flag to override."
)
claim = txo.claim
if not claim.is_stream or not claim.stream.has_fee:
raise Exception(f"Claim '{claim_id}' does not have a purchase price.")
tx = await self.wallet_manager.create_purchase_transaction(
accounts, txo, self.exchange_rate_manager, override_max_key_fee
)
if not preview:
await self.broadcast_or_release(tx, blocking)
else:
await self.ledger.release_tx(tx)
return tx
CLAIM_DOC = """
List and search all types of claims.
"""
@ -1988,7 +2075,8 @@ class Daemon(metaclass=JSONRPCServerType):
eg. --height=">400000" would limit results to only claims above 400k block height.
Usage:
claim_search [<name> | --name=<name>] [--claim_id=<claim_id>] [--txid=<txid>] [--nout=<nout>]
claim_search [<name> | --name=<name>] [--txid=<txid>] [--nout=<nout>]
[--claim_id=<claim_id> | --claim_ids=<claim_ids>...]
[--channel=<channel> |
[[--channel_ids=<channel_ids>...] [--not_channel_ids=<not_channel_ids>...]]]
[--has_channel_signature] [--valid_channel_signature | --invalid_channel_signature]
@ -2008,10 +2096,12 @@ class Daemon(metaclass=JSONRPCServerType):
[--any_locations=<any_locations>...] [--all_locations=<all_locations>...]
[--not_locations=<not_locations>...]
[--order_by=<order_by>...] [--page=<page>] [--page_size=<page_size>]
[--wallet_id=<wallet_id>]
Options:
--name=<name> : (str) claim name (normalized)
--claim_id=<claim_id> : (str) full or partial claim id
--claim_ids=<claim_ids> : (list) list of full claim ids
--txid=<txid> : (str) transaction id
--nout=<nout> : (str) position in the transaction
--channel=<channel> : (str) claims signed by this channel (argument is
@ -2101,16 +2191,20 @@ class Daemon(metaclass=JSONRPCServerType):
'trending_local', 'trending_global', 'activation_height'
--no_totals : (bool) do not calculate the total number of pages and items in result set
(significant performance boost)
--wallet_id=<wallet_id> : (str) wallet to check for claim purchase reciepts
Returns: {Paginated[Output]}
"""
wallet = self.wallet_manager.get_wallet_or_default(kwargs.pop('wallet_id', None))
if {'claim_id', 'claim_ids'}.issubset(kwargs):
raise ValueError("Only 'claim_id' or 'claim_ids' is allowed, not both.")
if kwargs.pop('valid_channel_signature', False):
kwargs['signature_valid'] = 1
if kwargs.pop('invalid_channel_signature', False):
kwargs['signature_valid'] = 0
page_num, page_size = abs(kwargs.pop('page', 1)), min(abs(kwargs.pop('page_size', DEFAULT_PAGE_SIZE)), 50)
kwargs.update({'offset': page_size * (page_num - 1), 'limit': page_size})
txos, offset, total = await self.ledger.claim_search(**kwargs)
txos, offset, total = await self.ledger.claim_search(wallet.accounts, **kwargs)
result = {"items": txos, "page": page_num, "page_size": page_size}
if not kwargs.pop('no_totals', False):
result['total_pages'] = int((total + (page_size - 1)) / page_size)
@ -2533,6 +2627,8 @@ class Daemon(metaclass=JSONRPCServerType):
Returns:
(dict) Result dictionary
"""
wallet = self.wallet_manager.get_wallet_or_default(wallet_id)
decoded = base58.b58decode(channel_data)
data = json.loads(decoded)
channel_private_key = ecdsa.SigningKey.from_pem(
@ -2543,12 +2639,11 @@ class Daemon(metaclass=JSONRPCServerType):
# check that the holding_address hasn't changed since the export was made
holding_address = data['holding_address']
channels, _, _ = await self.ledger.claim_search(
public_key_id=self.ledger.public_key_to_address(public_key_der)
wallet.accounts, public_key_id=self.ledger.public_key_to_address(public_key_der)
)
if channels and channels[0].get_address(self.ledger) != holding_address:
holding_address = channels[0].get_address(self.ledger)
wallet = self.wallet_manager.get_wallet_or_default(wallet_id)
account: LBCAccount = await self.ledger.get_account_for_address(wallet, holding_address)
if account:
# Case 1: channel holding address is in one of the accounts we already have
@ -3188,7 +3283,7 @@ class Daemon(metaclass=JSONRPCServerType):
assert not wallet.is_locked, "Cannot spend funds with locked wallet, unlock first."
funding_accounts = wallet.get_accounts_or_all(funding_account_ids)
amount = self.get_dewies_or_error("amount", amount)
claim = await self.ledger.get_claim_by_claim_id(claim_id)
claim = await self.ledger.get_claim_by_claim_id(wallet.accounts, claim_id)
claim_address = claim.get_address(self.ledger)
if not tip:
account = wallet.get_account_or_default(account_id)
@ -3997,7 +4092,7 @@ class Daemon(metaclass=JSONRPCServerType):
self.conf.comment_server, 'get_comments_by_id', comment_ids=comment_ids
)
claim_ids = {comment['claim_id'] for comment in comments}
claims = {cid: await self.ledger.get_claim_by_claim_id(claim_id=cid) for cid in claim_ids}
claims = {cid: await self.ledger.get_claim_by_claim_id(wallet.accounts, claim_id=cid) for cid in claim_ids}
pieces = []
for comment in comments:
claim = claims.get(comment['claim_id'])
@ -4015,13 +4110,7 @@ class Daemon(metaclass=JSONRPCServerType):
return await comment_client.jsonrpc_post(self.conf.comment_server, 'hide_comments', pieces=pieces)
async def broadcast_or_release(self, tx, blocking=False):
try:
await self.ledger.broadcast(tx)
if blocking:
await self.ledger.wait(tx)
except:
await self.ledger.release_tx(tx)
raise
await self.wallet_manager.broadcast_or_release(tx, blocking)
def valid_address_or_error(self, address):
try:
@ -4114,11 +4203,11 @@ class Daemon(metaclass=JSONRPCServerType):
except ValueError as e:
raise ValueError(f"Invalid value for '{argument}': {e.args[0]}")
async def resolve(self, urls):
results = await self.ledger.resolve(urls)
async def resolve(self, accounts, urls):
results = await self.ledger.resolve(accounts, urls)
if results:
try:
claims = self.stream_manager._convert_to_old_resolve_output(results)
claims = self.stream_manager._convert_to_old_resolve_output(self.wallet_manager, results)
await self.storage.save_claims_for_resolve([
value for value in claims.values() if 'error' not in value
])

View file

@ -1 +0,0 @@
from lbry.extras.daemon import Components # register Component classes

View file

@ -7,6 +7,7 @@ from typing import Optional
from aiohttp.client_exceptions import ClientError
from lbry.error import InvalidExchangeRateResponse, CurrencyConversionError
from lbry.utils import aiohttp_request
from lbry.wallet.dewies import lbc_to_dewies
log = logging.getLogger(__name__)
@ -222,19 +223,23 @@ class ExchangeRateManager:
rates = [market.rate for market in self.market_feeds]
log.debug("Converting %f %s to %s, rates: %s" % (amount, from_currency, to_currency, rates))
if from_currency == to_currency:
return amount
return round(amount, 8)
for market in self.market_feeds:
if (market.rate_is_initialized() and market.is_online() and
market.rate.currency_pair == (from_currency, to_currency)):
return amount * Decimal(market.rate.spot)
return round(amount * Decimal(market.rate.spot), 8)
for market in self.market_feeds:
if (market.rate_is_initialized() and market.is_online() and
market.rate.currency_pair[0] == from_currency):
return self.convert_currency(
market.rate.currency_pair[1], to_currency, amount * Decimal(market.rate.spot))
return round(self.convert_currency(
market.rate.currency_pair[1], to_currency, amount * Decimal(market.rate.spot)), 8)
raise CurrencyConversionError(
f'Unable to convert {amount} from {from_currency} to {to_currency}')
def to_dewies(self, currency, amount) -> int:
converted = self.convert_currency(currency, "LBC", amount)
return lbc_to_dewies(str(converted))
def fee_dict(self):
return {market: market.rate.as_dict() for market in self.market_feeds}

View file

@ -38,6 +38,7 @@ def encode_txo_doc():
'permanent_url': "when type is 'claim' or 'support', this is the long permanent claim URL",
'signing_channel': "for signed claims only, metadata of signing channel",
'is_channel_signature_valid': "for signed claims only, whether signature is valid",
'purchase_receipt': "metadata for the purchase transaction associated with this claim"
}
@ -176,10 +177,13 @@ class JSONResponseEncoder(JSONEncoder):
output['claim_op'] = 'update'
elif txo.script.is_support_claim:
output['type'] = 'support'
elif txo.is_purchase_data:
elif txo.script.is_return_data:
output['type'] = 'data'
elif txo.purchase is not None:
output['type'] = 'purchase'
if txo.can_decode_purchase_data:
output['claim_id'] = txo.purchase_data.claim_id
output['claim_id'] = txo.purchased_claim_id
if txo.purchased_claim is not None:
output['claim'] = self.encode_output(txo.purchased_claim)
else:
output['type'] = 'payment'
@ -201,6 +205,8 @@ class JSONResponseEncoder(JSONEncoder):
output['value_type'] = txo.claim.claim_type
if self.include_protobuf:
output['protobuf'] = hexlify(txo.claim.to_bytes())
if txo.purchase_receipt is not None:
output['purchase_receipt'] = self.encode_output(txo.purchase_receipt)
if txo.claim.is_channel:
output['has_signing_key'] = txo.has_private_key
if check_signature and txo.claim.is_signed:

View file

@ -1,19 +1,19 @@
import os
import asyncio
import typing
import binascii
import logging
import random
from decimal import Decimal
import typing
from typing import Optional
from aiohttp.web import Request
from lbry.error import ResolveError, InvalidStreamDescriptorError, KeyFeeAboveMaxAllowed, InsufficientFundsError
from lbry.error import ResolveError, InvalidStreamDescriptorError
from lbry.error import ResolveTimeout, DownloadDataTimeout
from lbry.utils import cache_concurrent
from lbry.stream.descriptor import StreamDescriptor
from lbry.stream.managed_stream import ManagedStream
from lbry.schema.claim import Claim
from lbry.schema.url import URL
from lbry.extras.daemon.storage import lbc_to_dewies, dewies_to_lbc
from lbry.wallet.dewies import dewies_to_lbc
from lbry.wallet.transaction import Output
if typing.TYPE_CHECKING:
from lbry.conf import Config
@ -58,7 +58,7 @@ comparison_operators = {
}
def path_or_none(p) -> typing.Optional[str]:
def path_or_none(p) -> Optional[str]:
if not p:
return
return binascii.unhexlify(p).decode()
@ -66,8 +66,8 @@ def path_or_none(p) -> typing.Optional[str]:
class StreamManager:
def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager',
wallet_manager: 'LbryWalletManager', storage: 'SQLiteStorage', node: typing.Optional['Node'],
analytics_manager: typing.Optional['AnalyticsManager'] = None):
wallet_manager: 'LbryWalletManager', storage: 'SQLiteStorage', node: Optional['Node'],
analytics_manager: Optional['AnalyticsManager'] = None):
self.loop = loop
self.config = config
self.blob_manager = blob_manager
@ -76,8 +76,8 @@ class StreamManager:
self.node = node
self.analytics_manager = analytics_manager
self.streams: typing.Dict[str, ManagedStream] = {}
self.resume_saving_task: typing.Optional[asyncio.Task] = None
self.re_reflect_task: typing.Optional[asyncio.Task] = None
self.resume_saving_task: Optional[asyncio.Task] = None
self.re_reflect_task: Optional[asyncio.Task] = None
self.update_stream_finished_futs: typing.List[asyncio.Future] = []
self.running_reflector_uploads: typing.List[asyncio.Task] = []
self.started = asyncio.Event(loop=self.loop)
@ -91,7 +91,7 @@ class StreamManager:
async def recover_stream(sd_hash: str, stream_hash: str, stream_name: str,
suggested_file_name: str, key: str,
content_fee: typing.Optional['Transaction']) -> typing.Optional[StreamDescriptor]:
content_fee: Optional['Transaction']) -> Optional[StreamDescriptor]:
sd_blob = self.blob_manager.get_blob(sd_hash)
blobs = await self.storage.get_blobs_for_stream(stream_hash)
descriptor = await StreamDescriptor.recover(
@ -115,10 +115,10 @@ class StreamManager:
# if self.blob_manager._save_blobs:
# log.info("Recovered %i/%i attempted streams", len(to_restore), len(file_infos))
async def add_stream(self, rowid: int, sd_hash: str, file_name: typing.Optional[str],
download_directory: typing.Optional[str], status: str,
claim: typing.Optional['StoredStreamClaim'], content_fee: typing.Optional['Transaction'],
added_on: typing.Optional[int]):
async def add_stream(self, rowid: int, sd_hash: str, file_name: Optional[str],
download_directory: Optional[str], status: str,
claim: Optional['StoredStreamClaim'], content_fee: Optional['Transaction'],
added_on: Optional[int]):
try:
descriptor = await self.blob_manager.get_stream_descriptor(sd_hash)
except InvalidStreamDescriptorError as err:
@ -217,8 +217,8 @@ class StreamManager:
self.started.clear()
log.info("finished stopping the stream manager")
async def create_stream(self, file_path: str, key: typing.Optional[bytes] = None,
iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream:
async def create_stream(self, file_path: str, key: Optional[bytes] = None,
iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream:
stream = await ManagedStream.create(self.loop, self.config, self.blob_manager, file_path, key, iv_generator)
self.streams[stream.sd_hash] = stream
self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
@ -232,7 +232,7 @@ class StreamManager:
)
return stream
async def delete_stream(self, stream: ManagedStream, delete_file: typing.Optional[bool] = False):
async def delete_stream(self, stream: ManagedStream, delete_file: Optional[bool] = False):
stream.stop_tasks()
if stream.sd_hash in self.streams:
del self.streams[stream.sd_hash]
@ -242,13 +242,13 @@ class StreamManager:
if delete_file and stream.output_file_exists:
os.remove(stream.full_path)
def get_stream_by_stream_hash(self, stream_hash: str) -> typing.Optional[ManagedStream]:
def get_stream_by_stream_hash(self, stream_hash: str) -> Optional[ManagedStream]:
streams = tuple(filter(lambda stream: stream.stream_hash == stream_hash, self.streams.values()))
if streams:
return streams[0]
def get_filtered_streams(self, sort_by: typing.Optional[str] = None, reverse: typing.Optional[bool] = False,
comparison: typing.Optional[str] = None,
def get_filtered_streams(self, sort_by: Optional[str] = None, reverse: Optional[bool] = False,
comparison: Optional[str] = None,
**search_by) -> typing.List[ManagedStream]:
"""
Get a list of filtered and sorted ManagedStream objects
@ -284,7 +284,7 @@ class StreamManager:
return streams
async def _check_update_or_replace(self, outpoint: str, claim_id: str, claim: Claim) -> typing.Tuple[
typing.Optional[ManagedStream], typing.Optional[ManagedStream]]:
Optional[ManagedStream], Optional[ManagedStream]]:
existing = self.get_filtered_streams(outpoint=outpoint)
if existing:
return existing[0], None
@ -305,12 +305,13 @@ class StreamManager:
return None, existing_for_claim_id[0]
return None, None
def _convert_to_old_resolve_output(self, resolves):
@staticmethod
def _convert_to_old_resolve_output(wallet_manager, resolves):
result = {}
for url, txo in resolves.items():
if isinstance(txo, Output):
tx_height = txo.tx_ref.height
best_height = self.wallet_manager.ledger.headers.height
best_height = wallet_manager.ledger.headers.height
result[url] = {
'name': txo.claim_name,
'value': txo.claim,
@ -323,9 +324,9 @@ class StreamManager:
'height': tx_height,
'confirmations': (best_height+1) - tx_height if tx_height > 0 else tx_height,
'claim_sequence': -1,
'address': txo.get_address(self.wallet_manager.ledger),
'address': txo.get_address(wallet_manager.ledger),
'valid_at_height': txo.meta.get('activation_height', None),
'timestamp': self.wallet_manager.ledger.headers[tx_height]['timestamp'],
'timestamp': wallet_manager.ledger.headers[tx_height]['timestamp'],
'supports': []
}
else:
@ -334,17 +335,19 @@ class StreamManager:
@cache_concurrent
async def download_stream_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager',
timeout: typing.Optional[float] = None,
file_name: typing.Optional[str] = None,
download_directory: typing.Optional[str] = None,
save_file: typing.Optional[bool] = None,
timeout: Optional[float] = None,
file_name: Optional[str] = None,
download_directory: Optional[str] = None,
save_file: Optional[bool] = None,
resolve_timeout: float = 3.0,
wallet: typing.Optional['Wallet'] = None) -> ManagedStream:
wallet = wallet or self.wallet_manager.default_wallet
wallet: Optional['Wallet'] = None) -> ManagedStream:
manager = self.wallet_manager
wallet = wallet or manager.default_wallet
timeout = timeout or self.config.download_timeout
start_time = self.loop.time()
resolved_time = None
stream = None
txo: Optional[Output] = None
error = None
outpoint = None
if save_file is None:
@ -356,19 +359,23 @@ class StreamManager:
else:
download_directory = None
payment = None
try:
# resolve the claim
if not URL.parse(uri).has_stream:
raise ResolveError("cannot download a channel claim, specify a /path")
try:
resolved_result = self._convert_to_old_resolve_output(
await asyncio.wait_for(self.wallet_manager.ledger.resolve([uri]), resolve_timeout)
response = await asyncio.wait_for(
manager.ledger.resolve(wallet.accounts, [uri]),
resolve_timeout
)
resolved_result = self._convert_to_old_resolve_output(manager, response)
except asyncio.TimeoutError:
raise ResolveTimeout(uri)
except Exception as err:
if isinstance(err, asyncio.CancelledError):
raise
log.exception("Unexpected error resolving stream:")
raise ResolveError(f"Unexpected error resolving stream: {str(err)}")
await self.storage.save_claims_for_resolve([
value for value in resolved_result.values() if 'error' not in value
@ -379,12 +386,13 @@ class StreamManager:
raise ResolveError(f"Failed to resolve stream at '{uri}'")
if 'error' in resolved:
raise ResolveError(f"error resolving stream: {resolved['error']}")
txo = response[uri]
claim = Claim.from_bytes(binascii.unhexlify(resolved['protobuf']))
outpoint = f"{resolved['txid']}:{resolved['nout']}"
resolved_time = self.loop.time() - start_time
# resume or update an existing stream, if the stream changed download it and delete the old one after
# resume or update an existing stream, if the stream changed: download it and delete the old one after
updated_stream, to_replace = await self._check_update_or_replace(outpoint, resolved['claim_id'], claim)
if updated_stream:
log.info("already have stream for %s", uri)
@ -397,31 +405,14 @@ class StreamManager:
)
return updated_stream
content_fee = None
fee_amount, fee_address = None, None
# check that the fee is payable
if not to_replace and claim.stream.has_fee and claim.stream.fee.amount:
fee_amount = round(exchange_rate_manager.convert_currency(
claim.stream.fee.currency, "LBC", claim.stream.fee.amount
), 5)
max_fee_amount = round(exchange_rate_manager.convert_currency(
self.config.max_key_fee['currency'], "LBC", Decimal(self.config.max_key_fee['amount'])
), 5) if self.config.max_key_fee else None
if max_fee_amount and fee_amount > max_fee_amount:
msg = f"fee of {fee_amount} exceeds max configured to allow of {max_fee_amount}"
log.warning(msg)
raise KeyFeeAboveMaxAllowed(msg)
balance = await self.wallet_manager.get_balance(wallet)
if lbc_to_dewies(str(fee_amount)) > balance:
msg = f"fee of {fee_amount} exceeds max available balance"
log.warning(msg)
raise InsufficientFundsError(msg)
fee_address = claim.stream.fee.address or resolved['address']
if not to_replace and txo.has_price and not txo.purchase_receipt:
payment = await manager.create_purchase_transaction(
wallet.accounts, txo, exchange_rate_manager
)
stream = ManagedStream(
self.loop, self.config, self.blob_manager, claim.stream.source.sd_hash, download_directory,
file_name, ManagedStream.STATUS_RUNNING, content_fee=content_fee,
file_name, ManagedStream.STATUS_RUNNING, content_fee=payment,
analytics_manager=self.analytics_manager
)
log.info("starting download for %s", uri)
@ -431,12 +422,11 @@ class StreamManager:
stream.set_claim(resolved, claim)
if to_replace: # delete old stream now that the replacement has started downloading
await self.delete_stream(to_replace)
elif fee_address:
stream.content_fee = await self.wallet_manager.buy_claim(
stream.claim_id, lbc_to_dewies(str(fee_amount)),
fee_address.encode('latin1'), wallet.accounts
)
log.info("paid fee of %s for %s", fee_amount, uri)
if payment is not None:
await manager.broadcast_or_release(payment)
payment = None # to avoid releasing in `finally` later
log.info("paid fee of %s for %s", dewies_to_lbc(stream.content_fee.outputs[0].amount), uri)
await self.storage.save_content_fee(stream.stream_hash, stream.content_fee)
self.streams[stream.sd_hash] = stream
@ -450,9 +440,13 @@ class StreamManager:
error = DownloadDataTimeout(stream.sd_hash)
raise error
except Exception as err: # forgive data timeout, don't delete stream
log.exception("Unexpected error downloading stream:")
error = err
raise
finally:
if payment is not None:
# payment is set to None after broadcasting, if we're here an exception probably happened
await manager.ledger.release_tx(payment)
if self.analytics_manager and (error or (stream and (stream.downloader.time_to_descriptor or
stream.downloader.time_to_first_bytes))):
server = self.wallet_manager.ledger.network.client.server

View file

@ -20,13 +20,19 @@ from lbry.extras.daemon.Components import (
UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT
)
from lbry.extras.daemon.ComponentManager import ComponentManager
from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager as BaseExchangeRateManager, ExchangeRate
from lbry.extras.daemon.storage import SQLiteStorage
from lbry.blob.blob_manager import BlobManager
from lbry.stream.reflector.server import ReflectorServer
from lbry.blob_exchange.server import BlobServer
class ExchangeRateManager:
class FakeExchangeRateManager(BaseExchangeRateManager):
def __init__(self):
super().__init__()
for i, feed in enumerate(self.market_feeds):
feed._save_price(i+1)
def start(self):
pass
@ -34,22 +40,16 @@ class ExchangeRateManager:
def stop(self):
pass
def convert_currency(self, from_currency, to_currency, amount):
return amount
def fee_dict(self):
return {}
class ExchangeRateManagerComponent(Component):
component_name = EXCHANGE_RATE_MANAGER_COMPONENT
def __init__(self, component_manager):
super().__init__(component_manager)
self.exchange_rate_manager = ExchangeRateManager()
self.exchange_rate_manager = FakeExchangeRateManager()
@property
def component(self) -> ExchangeRateManager:
def component(self) -> BaseExchangeRateManager:
return self.exchange_rate_manager
async def start(self):
@ -141,6 +141,7 @@ class CommandTestCase(IntegrationTestCase):
DHT_COMPONENT, UPNP_COMPONENT, HASH_ANNOUNCER_COMPONENT,
PEER_PROTOCOL_SERVER_COMPONENT
]
wallet_node.manager.config = conf
def wallet_maker(component_manager):
wallet_component = WalletComponent(component_manager)

View file

@ -1,5 +1,6 @@
TXO_TYPES = {
"stream": 1,
"channel": 2,
"support": 3
"support": 3,
"purchase": 4
}

View file

@ -8,6 +8,20 @@ from lbry.wallet.constants import TXO_TYPES
class WalletDatabase(BaseDatabase):
SCHEMA_VERSION = f"{BaseDatabase.SCHEMA_VERSION}+1"
CREATE_TX_TABLE = """
create table if not exists tx (
txid text primary key,
raw blob not null,
height integer not null,
position integer not null,
is_verified boolean not null default 0,
purchased_claim_id text
);
create index if not exists tx_purchased_claim_id_idx on tx (purchased_claim_id);
"""
CREATE_TXO_TABLE = """
create table if not exists txo (
txid text references tx,
@ -22,6 +36,7 @@ class WalletDatabase(BaseDatabase):
claim_id text,
claim_name text
);
create index if not exists txo_txid_idx on txo (txid);
create index if not exists txo_address_idx on txo (address);
create index if not exists txo_claim_id_idx on txo (claim_id);
create index if not exists txo_txo_type_idx on txo (txo_type);
@ -31,11 +46,19 @@ class WalletDatabase(BaseDatabase):
BaseDatabase.PRAGMAS +
BaseDatabase.CREATE_ACCOUNT_TABLE +
BaseDatabase.CREATE_PUBKEY_ADDRESS_TABLE +
BaseDatabase.CREATE_TX_TABLE +
CREATE_TX_TABLE +
CREATE_TXO_TABLE +
BaseDatabase.CREATE_TXI_TABLE
)
def tx_to_row(self, tx):
row = super().tx_to_row(tx)
txos = tx.outputs
if len(txos) >= 2 and txos[1].can_decode_purchase_data:
txos[0].purchase = txos[1]
row['purchased_claim_id'] = txos[1].purchase_data.claim_id
return row
def txo_to_row(self, tx, address, txo):
row = super().txo_to_row(tx, address, txo)
if txo.is_claim:
@ -45,11 +68,45 @@ class WalletDatabase(BaseDatabase):
row['txo_type'] = TXO_TYPES['stream']
elif txo.is_support:
row['txo_type'] = TXO_TYPES['support']
elif txo.purchase is not None:
row['txo_type'] = TXO_TYPES['purchase']
row['claim_id'] = txo.purchased_claim_id
if txo.script.is_claim_involved:
row['claim_id'] = txo.claim_id
row['claim_name'] = txo.claim_name
return row
async def get_transactions(self, **constraints):
txs = await super().get_transactions(**constraints)
for tx in txs:
txos = tx.outputs
if len(txos) >= 2 and txos[1].can_decode_purchase_data:
txos[0].purchase = txos[1]
return txs
@staticmethod
def constrain_purchases(constraints):
accounts = constraints.pop('accounts', None)
assert accounts, "'accounts' argument required to find purchases"
if not {'purchased_claim_id', 'purchased_claim_id__in'}.intersection(constraints):
constraints['purchased_claim_id__is_not_null'] = True
constraints.update({
f'$account{i}': a.public_key.address for i, a in enumerate(accounts)
})
account_values = ', '.join([f':$account{i}' for i in range(len(accounts))])
constraints['txid__in'] = f"""
SELECT txid FROM txi JOIN account_address USING (address)
WHERE account_address.account IN ({account_values})
"""
async def get_purchases(self, **constraints):
self.constrain_purchases(constraints)
return [tx.outputs[0] for tx in await self.get_transactions(**constraints)]
def get_purchase_count(self, **constraints):
self.constrain_purchases(constraints)
return self.get_transaction_count(**constraints)
async def get_txos(self, wallet=None, no_tx=False, **constraints) -> List[Output]:
txos = await super().get_txos(wallet=wallet, no_tx=no_tx, **constraints)

View file

@ -53,16 +53,34 @@ class MainNetLedger(BaseLedger):
super().__init__(*args, **kwargs)
self.fee_per_name_char = self.config.get('fee_per_name_char', self.default_fee_per_name_char)
async def _inflate_outputs(self, query):
async def _inflate_outputs(self, query, accounts):
outputs = Outputs.from_base64(await query)
txs = []
if len(outputs.txs) > 0:
txs = await asyncio.gather(*(self.cache_transaction(*tx) for tx in outputs.txs))
txs: List[Transaction] = await asyncio.gather(*(
self.cache_transaction(*tx) for tx in outputs.txs
))
if accounts:
priced_claims = []
for tx in txs:
for txo in tx.outputs:
if txo.has_price:
priced_claims.append(txo)
if priced_claims:
receipts = {
txo.purchased_claim_id: txo for txo in
await self.db.get_purchases(
accounts=accounts,
purchased_claim_id__in=[c.claim_id for c in priced_claims]
)
}
for txo in priced_claims:
txo.purchase_receipt = receipts.get(txo.claim_id)
return outputs.inflate(txs), outputs.offset, outputs.total
async def resolve(self, urls):
async def resolve(self, accounts, urls):
resolve = partial(self.network.retriable_call, self.network.resolve)
txos = (await self._inflate_outputs(resolve(urls)))[0]
txos = (await self._inflate_outputs(resolve(urls), accounts))[0]
assert len(urls) == len(txos), "Mismatch between urls requested for resolve and responses received."
result = {}
for url, txo in zip(urls, txos):
@ -75,11 +93,11 @@ class MainNetLedger(BaseLedger):
result[url] = {'error': f'{url} did not resolve to a claim'}
return result
async def claim_search(self, **kwargs) -> Tuple[List[Output], int, int]:
return await self._inflate_outputs(self.network.claim_search(**kwargs))
async def claim_search(self, accounts, **kwargs) -> Tuple[List[Output], int, int]:
return await self._inflate_outputs(self.network.claim_search(**kwargs), accounts)
async def get_claim_by_claim_id(self, claim_id) -> Output:
for claim in (await self.claim_search(claim_id=claim_id))[0]:
async def get_claim_by_claim_id(self, accounts, claim_id) -> Output:
for claim in (await self.claim_search(accounts, claim_id=claim_id))[0]:
return claim
async def start(self):
@ -122,6 +140,23 @@ class MainNetLedger(BaseLedger):
self.constraint_spending_utxos(constraints)
return super().get_utxo_count(**constraints)
async def get_purchases(self, resolve=False, **constraints):
purchases = await self.db.get_purchases(**constraints)
if resolve:
claim_ids = [p.purchased_claim_id for p in purchases]
try:
resolved, _, _ = await self.claim_search([], claim_ids=claim_ids)
except:
log.exception("Resolve failed while looking up purchased claim ids:")
resolved = []
lookup = {claim.claim_id: claim for claim in resolved}
for purchase in purchases:
purchase.purchased_claim = lookup.get(purchase.purchased_claim_id)
return purchases
def get_purchase_count(self, resolve=False, **constraints):
return self.db.get_purchase_count(**constraints)
def get_claims(self, **constraints):
return self.db.get_claims(**constraints)
@ -147,7 +182,7 @@ class MainNetLedger(BaseLedger):
return self.db.get_support_count(**constraints)
async def get_transaction_history(self, **constraints):
txs = await self.db.get_transactions(**constraints)
txs: List[Transaction] = await self.db.get_transactions(**constraints)
headers = self.headers
history = []
for tx in txs:
@ -160,7 +195,8 @@ class MainNetLedger(BaseLedger):
'claim_info': [],
'update_info': [],
'support_info': [],
'abandon_info': []
'abandon_info': [],
'purchase_info': []
}
is_my_inputs = all([txi.is_my_account for txi in tx.inputs])
if is_my_inputs:
@ -238,6 +274,14 @@ class MainNetLedger(BaseLedger):
'claim_name': txo.claim_name,
'nout': txo.position
})
for txo in tx.any_purchase_outputs:
item['purchase_info'].append({
'address': txo.get_address(self),
'balance_delta': dewies_to_lbc(txo.amount),
'amount': dewies_to_lbc(txo.amount),
'claim_id': txo.purchased_claim_id,
'nout': txo.position
})
history.append(item)
return history

View file

@ -2,15 +2,20 @@ import os
import json
import logging
from binascii import unhexlify
from typing import Optional, List
from decimal import Decimal
from torba.client.basemanager import BaseWalletManager
from torba.client.wallet import ENCRYPT_ON_DISK
from torba.rpc.jsonrpc import CodeMessageError
from lbry.error import KeyFeeAboveMaxAllowed
from lbry.wallet.dewies import dewies_to_lbc
from lbry.wallet.account import Account
from lbry.wallet.ledger import MainNetLedger
from lbry.wallet.transaction import Transaction
from lbry.wallet.transaction import Transaction, Output
from lbry.wallet.database import WalletDatabase
from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
from lbry.conf import Config
@ -19,6 +24,10 @@ log = logging.getLogger(__name__)
class LbryWalletManager(BaseWalletManager):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.config: Optional[Config] = None
@property
def ledger(self) -> MainNetLedger:
return self.default_account.ledger
@ -87,21 +96,21 @@ class LbryWalletManager(BaseWalletManager):
return receiving_addresses, change_addresses
@classmethod
async def from_lbrynet_config(cls, settings: Config):
async def from_lbrynet_config(cls, config: Config):
ledger_id = {
'lbrycrd_main': 'lbc_mainnet',
'lbrycrd_testnet': 'lbc_testnet',
'lbrycrd_regtest': 'lbc_regtest'
}[settings.blockchain_name]
}[config.blockchain_name]
ledger_config = {
'auto_connect': True,
'default_servers': settings.lbryum_servers,
'data_path': settings.wallet_dir,
'default_servers': config.lbryum_servers,
'data_path': config.wallet_dir,
}
wallets_directory = os.path.join(settings.wallet_dir, 'wallets')
wallets_directory = os.path.join(config.wallet_dir, 'wallets')
if not os.path.exists(wallets_directory):
os.mkdir(wallets_directory)
@ -112,11 +121,12 @@ class LbryWalletManager(BaseWalletManager):
manager = cls.from_config({
'ledgers': {ledger_id: ledger_config},
'wallets': [
os.path.join(wallets_directory, wallet_file) for wallet_file in settings.wallets
os.path.join(wallets_directory, wallet_file) for wallet_file in config.wallets
]
})
manager.config = config
ledger = manager.get_or_create_ledger(ledger_id)
ledger.coin_selection_strategy = settings.coin_selection_strategy
ledger.coin_selection_strategy = config.coin_selection_strategy
default_wallet = manager.default_wallet
if default_wallet.default_account is None:
log.info('Wallet at %s is empty, generating a default account.', default_wallet.id)
@ -161,13 +171,6 @@ class LbryWalletManager(BaseWalletManager):
def get_unused_address(self):
return self.default_account.receiving.get_or_create_usable_address()
async def buy_claim(self, claim_id: str, amount: int, destination_address: bytes, accounts):
tx = await Transaction.purchase(
claim_id, amount, destination_address, accounts, accounts[0]
)
await self.ledger.broadcast(tx)
return tx
async def get_transaction(self, txid):
tx = await self.db.get_transaction(txid=txid)
if not tx:
@ -181,3 +184,35 @@ class LbryWalletManager(BaseWalletManager):
tx = self.ledger.transaction_class(unhexlify(raw))
await self.ledger.maybe_verify_transaction(tx, height)
return tx
async def create_purchase_transaction(
self, accounts: List[Account], txo: Output, exchange: ExchangeRateManager, override_max_key_fee=False):
fee = txo.claim.stream.fee
fee_amount = exchange.to_dewies(fee.currency, fee.amount)
if not override_max_key_fee and self.config.max_key_fee:
max_fee = self.config.max_key_fee
max_fee_amount = exchange.to_dewies(max_fee['currency'], Decimal(max_fee['amount']))
if max_fee_amount and fee_amount > max_fee_amount:
error_fee = f"{dewies_to_lbc(fee_amount)} LBC"
if fee.currency != 'LBC':
error_fee += f" ({fee.amount} {fee.currency})"
error_max_fee = f"{dewies_to_lbc(max_fee_amount)} LBC"
if max_fee['currency'] != 'LBC':
error_max_fee += f" ({max_fee['amount']} {max_fee['currency']})"
raise KeyFeeAboveMaxAllowed(
f"Purchase price of {error_fee} exceeds maximum "
f"configured price of {error_max_fee}."
)
fee_address = fee.address or txo.get_address(self.ledger)
return await Transaction.purchase(
txo.claim_id, fee_amount, fee_address, accounts, accounts[0]
)
async def broadcast_or_release(self, tx, blocking=False):
try:
await self.ledger.broadcast(tx)
if blocking:
await self.ledger.wait(tx)
except:
await self.ledger.release_tx(tx)
raise

View file

@ -45,7 +45,7 @@ INTEGER_PARAMS = {
}
SEARCH_PARAMS = {
'name', 'claim_id', 'txid', 'nout', 'channel', 'channel_ids', 'not_channel_ids',
'name', 'claim_id', 'claim_ids', 'txid', 'nout', 'channel', 'channel_ids', 'not_channel_ids',
'public_key_id', 'claim_type', 'stream_types', 'media_types', 'fee_currency',
'has_channel_signature', 'signature_valid',
'any_tags', 'all_tags', 'not_tags',
@ -226,6 +226,8 @@ def _get_claims(cols, for_count=False, **constraints) -> Tuple[str, Dict]:
constraints['claim.claim_id'] = claim_id
else:
constraints['claim.claim_id__like'] = f'{claim_id[:40]}%'
elif 'claim_ids' in constraints:
constraints['claim.claim_id__in'] = constraints.pop('claim_ids')
if 'name' in constraints:
constraints['claim.normalized'] = normalize_name(constraints.pop('name'))

View file

@ -29,13 +29,19 @@ class Output(BaseOutput):
script: OutputScript
script_class = OutputScript
__slots__ = 'channel', 'private_key', 'meta'
__slots__ = (
'channel', 'private_key', 'meta',
'purchase', 'purchased_claim', 'purchase_receipt',
)
def __init__(self, *args, channel: Optional['Output'] = None,
private_key: Optional[str] = None, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.channel = channel
self.private_key = private_key
self.purchase: 'Output' = None # txo containing purchase metadata
self.purchased_claim: 'Output' = None # resolved claim pointed to by purchase
self.purchase_receipt: 'Output' = None # txo representing purchase receipt for this claim
self.meta = {}
def update_annotations(self, annotated):
@ -215,6 +221,26 @@ class Output(BaseOutput):
except:
return False
@property
def purchased_claim_id(self):
if self.purchase is not None:
return self.purchase.purchase_data.claim_id
if self.purchased_claim is not None:
return self.purchased_claim.claim_id
@property
def has_price(self):
if self.can_decode_claim:
claim = self.claim
if claim.is_stream:
stream = claim.stream
return stream.has_fee and stream.fee.amount and stream.fee.amount > 0
return False
@property
def price(self):
return self.claim.stream.fee
class Transaction(BaseTransaction):
@ -292,6 +318,11 @@ class Transaction(BaseTransaction):
if not txo.is_my_account and f(txo.script):
yield txo
def _filter_any_outputs(self, f):
for txo in self.outputs:
if f(txo):
yield txo
@property
def my_claim_outputs(self):
return self._filter_my_outputs(lambda s: s.is_claim_name)
@ -304,6 +335,10 @@ class Transaction(BaseTransaction):
def my_support_outputs(self):
return self._filter_my_outputs(lambda s: s.is_support_claim)
@property
def any_purchase_outputs(self):
return self._filter_any_outputs(lambda o: o.purchase is not None)
@property
def other_support_outputs(self):
return self._filter_other_outputs(lambda s: s.is_support_claim)

View file

@ -135,6 +135,7 @@ def release(args):
incompats = []
release_texts = []
unlabeled = []
fixups = []
areas = {}
for pr in gh.search_issues(f"merged:>={previous_release._json_data['created_at']} repo:lbryio/lbry-sdk"):
area_labels = list(get_labels(pr, 'area'))
@ -145,7 +146,9 @@ def release(args):
incompats.append(f' * [{area_name}] {incompat.strip()} ({pr.html_url})')
for release_text in get_release_text(pr.body or ""):
release_texts.append(f'{release_text.strip()} ({pr.html_url})')
if type_label != 'fixup':
if type_label == 'fixup':
fixups.append(f' * {pr.title} ({pr.html_url}) by {pr.user["login"]}')
else:
area = areas.setdefault(area_name, [])
area.append(f' * [{type_label}] {pr.title} ({pr.html_url}) by {pr.user["login"]}')
else:
@ -183,6 +186,11 @@ def release(args):
for skipped in unlabeled:
print(skipped)
if fixups:
print('The following PRs were marked as fixups and not included in changelog:')
for skipped in fixups:
print(skipped)
if args.confirm:
commit = version_file.update(

View file

@ -149,11 +149,11 @@ class FileCommands(CommandTestCase):
f.flush()
second_path = await self.daemon.jsonrpc_get('lbry://foo', save_file=True)
await self.wait_files_to_complete()
self.assertNotEquals(first_path, second_path)
self.assertNotEqual(first_path, second_path)
async def test_file_list_updated_metadata_on_resolve(self):
await self.stream_create('foo', '0.01')
txo = (await self.daemon.resolve(['lbry://foo']))['lbry://foo']
txo = (await self.daemon.resolve(self.wallet.accounts, ['lbry://foo']))['lbry://foo']
claim = txo.claim
await self.daemon.jsonrpc_file_delete(claim_name='foo')
txid = await self.blockchain_claim_name('bar', hexlify(claim.to_bytes()).decode(), '0.01')
@ -325,7 +325,7 @@ class FileCommands(CommandTestCase):
)
await self.daemon.jsonrpc_file_delete(claim_name='expensive')
response = await self.out(self.daemon.jsonrpc_get('lbry://expensive'))
self.assertEqual(response['error'], 'fee of 11.00000 exceeds max available balance')
self.assertEqual(response['error'], 'Not enough funds to cover this transaction.')
self.assertEqual(len(self.file_list()), 0)
# FAIL: beyond maximum key fee
@ -336,7 +336,9 @@ class FileCommands(CommandTestCase):
await self.daemon.jsonrpc_file_delete(claim_name='maxkey')
response = await self.out(self.daemon.jsonrpc_get('lbry://maxkey'))
self.assertEqual(len(self.file_list()), 0)
self.assertEqual(response['error'], 'fee of 111.00000 exceeds max configured to allow of 50.00000')
self.assertEqual(
response['error'], 'Purchase price of 111.0 LBC exceeds maximum configured price of 100.0 LBC (50.0 USD).'
)
# PASS: purchase is successful
await self.stream_create(

View file

@ -1,35 +1,142 @@
from typing import Optional
from lbry.testcase import CommandTestCase
from lbry.schema.purchase import Purchase
from lbry.wallet.transaction import Transaction
from lbry.wallet.dewies import lbc_to_dewies, dewies_to_lbc
class PurchaseCommand(CommandTestCase):
class PurchaseCommandTests(CommandTestCase):
async def test_purchase_via_get(self):
starting_balance = await self.blockchain.get_balance()
target_address = await self.blockchain.get_raw_change_address()
stream = await self.stream_create(
'stream', '0.01', data=b'high value content',
fee_currency='LBC', fee_amount='1.0', fee_address=target_address
async def asyncSetUp(self):
await super().asyncSetUp()
self.merchant_address = await self.blockchain.get_raw_change_address()
async def priced_stream(self, name='stream', price: Optional[str] = '2.0', currency='LBC') -> Transaction:
kwargs = {}
if price and currency:
kwargs = {
'fee_amount': price,
'fee_currency': currency,
'fee_address': self.merchant_address
}
file_path = self.create_upload_file(data=b'high value content')
tx = await self.daemon.jsonrpc_stream_create(
name, '0.01', file_path=file_path, **kwargs
)
await self.daemon.jsonrpc_file_delete(claim_name='stream')
await self.assertBalance(self.account, '9.977893')
response = await self.daemon.jsonrpc_get('lbry://stream')
tx = response.content_fee
await self.ledger.wait(tx)
await self.assertBalance(self.account, '8.977752')
self.assertEqual(len(tx.outputs), 3)
txo = tx.outputs[1]
self.assertTrue(txo.is_purchase_data)
self.assertTrue(txo.can_decode_purchase_data)
self.assertIsInstance(txo.purchase_data, Purchase)
self.assertEqual(txo.purchase_data.claim_id, self.get_claim_id(stream))
await self.generate(1)
await self.ledger.wait(tx)
await self.daemon.jsonrpc_file_delete(claim_name=name)
return tx
async def create_purchase(self, name, price):
stream = await self.priced_stream(name, price)
claim_id = stream.outputs[0].claim_id
purchase = await self.daemon.jsonrpc_purchase_create(claim_id)
await self.ledger.wait(purchase)
return claim_id
async def assertStreamPurchased(self, stream: Transaction, purchase: Transaction):
stream_txo, purchase_txo = stream.outputs[0], purchase.outputs[0]
stream_fee = stream_txo.claim.stream.fee
self.assertEqual(stream_fee.dewies, purchase_txo.amount)
self.assertEqual(stream_fee.address, purchase_txo.get_address(self.ledger))
await self.account.release_all_outputs()
buyer_balance = await self.account.get_balance()
merchant_balance = lbc_to_dewies(str(await self.blockchain.get_balance()))
pre_purchase_count = (await self.daemon.jsonrpc_purchase_list())['total_items']
await self.ledger.wait(purchase)
await self.generate(1)
merchant_balance += lbc_to_dewies('1.0') # block reward
await self.ledger.wait(purchase)
self.assertEqual(
await self.blockchain.get_balance(),
starting_balance +
2.0 + # block rewards
1.0 # content payment
)
await self.account.get_balance(), buyer_balance - (purchase.input_sum-purchase.outputs[2].amount))
self.assertEqual(
str(await self.blockchain.get_balance()), dewies_to_lbc(merchant_balance + purchase_txo.amount))
purchases = await self.daemon.jsonrpc_purchase_list()
self.assertEqual(purchases['total_items'], pre_purchase_count+1)
tx = purchases['items'][0].tx_ref.tx
self.assertEqual(len(tx.outputs), 3) # purchase txo, purchase data, change
txo0 = tx.outputs[0]
txo1 = tx.outputs[1]
self.assertEqual(txo0.purchase, txo1) # purchase txo has reference to purchase data
self.assertTrue(txo1.is_purchase_data)
self.assertTrue(txo1.can_decode_purchase_data)
self.assertIsInstance(txo1.purchase_data, Purchase)
self.assertEqual(txo1.purchase_data.claim_id, stream_txo.claim_id)
async def test_purchasing(self):
stream = await self.priced_stream()
claim_id = stream.outputs[0].claim_id
# explicit purchase of claim
tx = await self.daemon.jsonrpc_purchase_create(claim_id)
await self.assertStreamPurchased(stream, tx)
# check that `get` doesn't purchase it again
balance = await self.account.get_balance()
response = await self.daemon.jsonrpc_get('lbry://stream')
self.assertIsNone(response.content_fee)
self.assertEqual(await self.account.get_balance(), balance)
self.assertItemCount(await self.daemon.jsonrpc_purchase_list(), 1)
# `get` does purchase a stream we don't have yet
another_stream = await self.priced_stream('another')
response = await self.daemon.jsonrpc_get('lbry://another')
await self.assertStreamPurchased(another_stream, response.content_fee)
# purchase non-existent claim fails
with self.assertRaisesRegex(Exception, "Could not find claim with claim_id"):
await self.daemon.jsonrpc_purchase_create('abc123')
# purchase stream with no price fails
no_price_stream = await self.priced_stream('no_price_stream', price=None)
with self.assertRaisesRegex(Exception, "does not have a purchase price"):
await self.daemon.jsonrpc_purchase_create(no_price_stream.outputs[0].claim_id)
# purchase claim you already own fails
with self.assertRaisesRegex(Exception, "You already have a purchase for claim_id"):
await self.daemon.jsonrpc_purchase_create(claim_id)
# force purchasing claim you already own
tx = await self.daemon.jsonrpc_purchase_create(claim_id, allow_duplicate_purchase=True)
await self.assertStreamPurchased(stream, tx)
async def test_purchase_and_transaction_list(self):
self.assertItemCount(await self.daemon.jsonrpc_purchase_list(), 0)
self.assertItemCount(await self.daemon.jsonrpc_transaction_list(), 1)
claim_id1 = await self.create_purchase('a', '1.0')
claim_id2 = await self.create_purchase('b', '1.0')
result = await self.out(self.daemon.jsonrpc_purchase_list())
self.assertItemCount(await self.daemon.jsonrpc_transaction_list(), 5)
self.assertItemCount(result, 2)
self.assertEqual(result['items'][0]['type'], 'purchase')
self.assertEqual(result['items'][0]['claim_id'], claim_id2)
self.assertNotIn('claim', result['items'][0])
self.assertEqual(result['items'][1]['type'], 'purchase')
self.assertEqual(result['items'][1]['claim_id'], claim_id1)
self.assertNotIn('claim', result['items'][1])
result = await self.out(self.daemon.jsonrpc_purchase_list(resolve=True))
self.assertEqual(result['items'][0]['claim']['name'], 'b')
self.assertEqual(result['items'][1]['claim']['name'], 'a')
result = await self.daemon.jsonrpc_transaction_list()
self.assertEqual(result['items'][0]['purchase_info'][0]['claim_id'], claim_id2)
self.assertEqual(result['items'][2]['purchase_info'][0]['claim_id'], claim_id1)
result = await self.claim_search()
self.assertEqual(result[0]['claim_id'], result[0]['purchase_receipt']['claim_id'])
self.assertEqual(result[1]['claim_id'], result[1]['purchase_receipt']['claim_id'])
url = result[0]['canonical_url']
resolve = await self.resolve(url)
self.assertEqual(result[0]['claim_id'], resolve[url]['purchase_receipt']['claim_id'])

View file

@ -9,8 +9,14 @@ from decimal import Decimal
from tests.unit.blob_exchange.test_transfer_blob import BlobExchangeTestBase
from tests.unit.lbrynet_daemon.test_ExchangeRateManager import get_dummy_exchange_rate_manager
from lbry.utils import generate_id
from lbry.error import InsufficientFundsError, KeyFeeAboveMaxAllowed, ResolveError, DownloadSDTimeout, \
DownloadDataTimeout
from torba.client.errors import InsufficientFundsError
from lbry.error import KeyFeeAboveMaxAllowed, ResolveError, DownloadSDTimeout, DownloadDataTimeout
from torba.client.wallet import Wallet
from torba.client.constants import CENT, NULL_HASH32
from torba.client.basenetwork import ClientSession
from lbry.conf import Config
from lbry.wallet.ledger import MainNetLedger
from lbry.wallet.transaction import Transaction, Input, Output
from lbry.wallet.manager import LbryWalletManager
from lbry.extras.daemon.analytics import AnalyticsManager
from lbry.stream.stream_manager import StreamManager
@ -39,52 +45,84 @@ def get_mock_node(peer=None):
return mock_node
def get_mock_wallet(sd_hash, storage, balance=10.0, fee=None):
claim = {
"address": "bYFeMtSL7ARuG1iMpjFyrnTe4oJHSAVNXF",
"amount": "0.1",
"claim_id": "c49566d631226492317d06ad7fdbe1ed32925124",
"claim_sequence": 1,
"decoded_claim": True,
"confirmations": 1057,
"effective_amount": "0.1",
"has_signature": False,
"height": 514081,
"hex": "",
"name": "33rpm",
"nout": 0,
"permanent_url": "33rpm#c49566d631226492317d06ad7fdbe1ed32925124",
"supports": [],
"txid": "81ac52662af926fdf639d56920069e0f63449d4cde074c61717cb99ddde40e3c",
}
claim_obj = Claim()
def get_output(amount=CENT, pubkey_hash=NULL_HASH32):
return Transaction() \
.add_outputs([Output.pay_pubkey_hash(amount, pubkey_hash)]) \
.outputs[0]
def get_input():
return Input.spend(get_output())
def get_transaction(txo=None):
return Transaction() \
.add_inputs([get_input()]) \
.add_outputs([txo or Output.pay_pubkey_hash(CENT, NULL_HASH32)])
def get_claim_transaction(claim_name, claim=b''):
return get_transaction(
Output.pay_claim_name_pubkey_hash(CENT, claim_name, claim, NULL_HASH32)
)
async def get_mock_wallet(sd_hash, storage, balance=10.0, fee=None):
claim = Claim()
if fee:
if fee['currency'] == 'LBC':
claim_obj.stream.fee.lbc = Decimal(fee['amount'])
claim.stream.fee.lbc = Decimal(fee['amount'])
elif fee['currency'] == 'USD':
claim_obj.stream.fee.usd = Decimal(fee['amount'])
claim_obj.stream.title = "33rpm"
claim_obj.stream.languages.append("en")
claim_obj.stream.source.sd_hash = sd_hash
claim_obj.stream.source.media_type = "image/png"
claim['value'] = claim_obj
claim['protobuf'] = binascii.hexlify(claim_obj.to_bytes()).decode()
claim.stream.fee.usd = Decimal(fee['amount'])
claim.stream.title = "33rpm"
claim.stream.languages.append("en")
claim.stream.source.sd_hash = sd_hash
claim.stream.source.media_type = "image/png"
tx = get_claim_transaction("33rpm", claim.to_bytes())
tx.height = 514081
txo = tx.outputs[0]
txo.meta.update({
"permanent_url": "33rpm#c49566d631226492317d06ad7fdbe1ed32925124",
})
class FakeHeaders:
def __init__(self, height):
self.height = height
def __getitem__(self, item):
return {'timestamp': 1984}
wallet = Wallet()
ledger = MainNetLedger({
'db': MainNetLedger.database_class(':memory:'),
'headers': FakeHeaders(514082)
})
await ledger.db.open()
wallet.generate_account(ledger)
manager = LbryWalletManager()
manager.config = Config()
manager.wallets.append(wallet)
manager.ledgers[MainNetLedger] = ledger
manager.ledger.network.client = ClientSession(
network=manager.ledger.network, server=('fakespv.lbry.com', 50001)
)
async def mock_resolve(*args):
await storage.save_claims([claim])
return {
claim['permanent_url']: claim
}
mock_wallet = mock.Mock(spec=LbryWalletManager)
mock_wallet.ledger.resolve = mock_resolve
mock_wallet.ledger.network.client.server = ('fakespv.lbry.com', 50001)
result = {txo.meta['permanent_url']: txo}
claims = [
StreamManager._convert_to_old_resolve_output(manager, result)[txo.meta['permanent_url']]
]
await storage.save_claims(claims)
return result
manager.ledger.resolve = mock_resolve
async def get_balance(*_):
return balance
manager.get_balance = get_balance
mock_wallet.get_balance = get_balance
return mock_wallet, claim['permanent_url']
return manager, txo.meta['permanent_url']
class TestStreamManager(BlobExchangeTestBase):
@ -96,7 +134,7 @@ class TestStreamManager(BlobExchangeTestBase):
self.loop, self.server_blob_manager.blob_dir, file_path, old_sort=old_sort
)
self.sd_hash = descriptor.sd_hash
self.mock_wallet, self.uri = get_mock_wallet(self.sd_hash, self.client_storage, balance, fee)
self.mock_wallet, self.uri = await get_mock_wallet(self.sd_hash, self.client_storage, balance, fee)
self.stream_manager = StreamManager(self.loop, self.client_config, self.client_blob_manager, self.mock_wallet,
self.client_storage, get_mock_node(self.server_from_client),
AnalyticsManager(self.client_config,
@ -226,7 +264,7 @@ class TestStreamManager(BlobExchangeTestBase):
start = self.loop.time()
await self._test_time_to_first_bytes(check_post, DownloadSDTimeout)
duration = self.loop.time() - start
self.assertLessEqual(duration, 4.7)
self.assertLessEqual(duration, 5)
self.assertGreaterEqual(duration, 3.0)
async def test_download_stop_resume_delete(self):

View file

@ -387,7 +387,7 @@ class BaseDatabase(SQLiteMixin):
conn.execute(*self._insert_sql('tx', self.tx_to_row(tx), replace=True))
for txo in tx.outputs:
if txo.script.is_pay_pubkey_hash and txo.script.values['pubkey_hash'] == txhash:
if txo.script.is_pay_pubkey_hash and txo.pubkey_hash == txhash:
conn.execute(*self._insert_sql(
"txo", self.txo_to_row(tx, address, txo), ignore_duplicate=True
)).fetchall()
@ -433,7 +433,7 @@ class BaseDatabase(SQLiteMixin):
return True
async def select_transactions(self, cols, accounts=None, **constraints):
if not set(constraints) & {'txid', 'txid__in'}:
if not {'txid', 'txid__in'}.intersection(constraints):
assert accounts, "'accounts' argument required when no 'txid' constraint is present"
constraints.update({
f'$account{i}': a.public_key.address for i, a in enumerate(accounts)
@ -517,10 +517,10 @@ class BaseDatabase(SQLiteMixin):
return txs[0]
async def select_txos(self, cols, **constraints):
sql = "SELECT {} FROM txo JOIN tx USING (txid)"
sql = f"SELECT {cols} FROM txo JOIN tx USING (txid)"
if 'accounts' in constraints:
sql += " JOIN account_address USING (address)"
return await self.db.execute_fetchall(*query(sql.format(cols), **constraints))
return await self.db.execute_fetchall(*query(sql, **constraints))
async def get_txos(self, wallet=None, no_tx=False, **constraints):
my_accounts = {a.public_key.address for a in wallet.accounts} if wallet else set()