initial RETURN_OP based purchase metadata

This commit is contained in:
Lex Berezhny 2019-10-27 13:54:48 -04:00
parent 41e0e6762c
commit 459716bd6e
13 changed files with 264 additions and 59 deletions

View file

@ -927,14 +927,15 @@ class Daemon(metaclass=JSONRPCServerType):
@requires(WALLET_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT,
STREAM_MANAGER_COMPONENT)
async def jsonrpc_get(self, uri, file_name=None, download_directory=None, timeout=None, save_file=None):
async def jsonrpc_get(
self, uri, file_name=None, download_directory=None, timeout=None, save_file=None, wallet_id=None):
"""
Download stream from a LBRY name.
Usage:
get <uri> [<file_name> | --file_name=<file_name>]
[<download_directory> | --download_directory=<download_directory>] [<timeout> | --timeout=<timeout>]
[--save_file=<save_file>]
[--save_file=<save_file>] [--wallet_id=<wallet_id>]
Options:
@ -943,14 +944,17 @@ class Daemon(metaclass=JSONRPCServerType):
--download_directory=<download_directory> : (str) full path to the directory to download into
--timeout=<timeout> : (int) download timeout in number of seconds
--save_file=<save_file> : (bool) save the file to the downloads directory
--wallet_id=<wallet_id> : (str) restrict operation to specific wallet
Returns: {File}
"""
wallet = self.wallet_manager.get_wallet_or_default(wallet_id)
if download_directory and not os.path.isdir(download_directory):
return {"error": f"specified download directory \"{download_directory}\" does not exist"}
try:
stream = await self.stream_manager.download_stream_from_uri(
uri, self.exchange_rate_manager, timeout, file_name, download_directory, save_file=save_file
uri, self.exchange_rate_manager, timeout, file_name, download_directory,
save_file=save_file, wallet=wallet
)
if not stream:
raise DownloadSDTimeout(uri)

View file

@ -159,7 +159,7 @@ class JSONResponseEncoder(JSONEncoder):
'nout': txo.position,
'height': tx_height,
'amount': dewies_to_lbc(txo.amount),
'address': txo.get_address(self.ledger),
'address': txo.get_address(self.ledger) if txo.has_address else None,
'confirmations': (best_height+1) - tx_height if tx_height > 0 else tx_height,
'timestamp': self.ledger.headers[tx_height]['timestamp'] if 0 < tx_height <= best_height else None
}
@ -176,6 +176,10 @@ class JSONResponseEncoder(JSONEncoder):
output['claim_op'] = 'update'
elif txo.script.is_support_claim:
output['type'] = 'support'
elif txo.is_purchase_data:
output['type'] = 'purchase'
if txo.can_decode_purchase_data:
output['claim_id'] = txo.purchase_data.claim_id
else:
output['type'] = 'payment'

View file

@ -0,0 +1,47 @@
from google.protobuf.message import DecodeError
from google.protobuf.json_format import MessageToDict
from lbry.schema.types.v2.purchase_pb2 import Purchase as PurchaseMessage
from .attrs import ClaimReference
class Purchase(ClaimReference):
START_BYTE = ord('P')
__slots__ = ()
def __init__(self, claim_id=None):
super().__init__(PurchaseMessage())
if claim_id is not None:
self.claim_id = claim_id
def to_dict(self):
return MessageToDict(self.message)
def to_message_bytes(self) -> bytes:
return self.message.SerializeToString()
def to_bytes(self) -> bytes:
pieces = bytearray()
pieces.append(self.START_BYTE)
pieces.extend(self.to_message_bytes())
return bytes(pieces)
@classmethod
def has_start_byte(cls, data: bytes):
return data and data[0] == cls.START_BYTE
@classmethod
def from_bytes(cls, data: bytes):
purchase = cls()
if purchase.has_start_byte(data):
purchase.message.ParseFromString(data[1:])
else:
raise DecodeError('Message does not start with correct byte.')
return purchase
def __len__(self):
return len(self.to_bytes())
def __bytes__(self):
return self.to_bytes()

File diff suppressed because one or more lines are too long

View file

@ -0,0 +1,69 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: purchase.proto
import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='purchase.proto',
package='pb',
syntax='proto3',
serialized_options=None,
serialized_pb=_b('\n\x0epurchase.proto\x12\x02pb\"\x1e\n\x08Purchase\x12\x12\n\nclaim_hash\x18\x01 \x01(\x0c\x62\x06proto3')
)
_PURCHASE = _descriptor.Descriptor(
name='Purchase',
full_name='pb.Purchase',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='claim_hash', full_name='pb.Purchase.claim_hash', index=0,
number=1, type=12, cpp_type=9, label=1,
has_default_value=False, default_value=_b(""),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=22,
serialized_end=52,
)
DESCRIPTOR.message_types_by_name['Purchase'] = _PURCHASE
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
Purchase = _reflection.GeneratedProtocolMessageType('Purchase', (_message.Message,), dict(
DESCRIPTOR = _PURCHASE,
__module__ = 'purchase_pb2'
# @@protoc_insertion_point(class_scope:pb.Purchase)
))
_sym_db.RegisterMessage(Purchase)
# @@protoc_insertion_point(module_scope)

View file

@ -24,6 +24,7 @@ if typing.TYPE_CHECKING:
from lbry.wallet import LbryWalletManager
from lbry.wallet.transaction import Transaction
from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
from torba.client.wallet import Wallet
log = logging.getLogger(__name__)
@ -65,12 +66,12 @@ def path_or_none(p) -> typing.Optional[str]:
class StreamManager:
def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager',
wallet: 'LbryWalletManager', storage: 'SQLiteStorage', node: typing.Optional['Node'],
wallet_manager: 'LbryWalletManager', storage: 'SQLiteStorage', node: typing.Optional['Node'],
analytics_manager: typing.Optional['AnalyticsManager'] = None):
self.loop = loop
self.config = config
self.blob_manager = blob_manager
self.wallet = wallet
self.wallet_manager = wallet_manager
self.storage = storage
self.node = node
self.analytics_manager = analytics_manager
@ -309,7 +310,7 @@ class StreamManager:
for url, txo in resolves.items():
if isinstance(txo, Output):
tx_height = txo.tx_ref.height
best_height = self.wallet.ledger.headers.height
best_height = self.wallet_manager.ledger.headers.height
result[url] = {
'name': txo.claim_name,
'value': txo.claim,
@ -322,9 +323,9 @@ class StreamManager:
'height': tx_height,
'confirmations': (best_height+1) - tx_height if tx_height > 0 else tx_height,
'claim_sequence': -1,
'address': txo.get_address(self.wallet.ledger),
'address': txo.get_address(self.wallet_manager.ledger),
'valid_at_height': txo.meta.get('activation_height', None),
'timestamp': self.wallet.ledger.headers[tx_height]['timestamp'],
'timestamp': self.wallet_manager.ledger.headers[tx_height]['timestamp'],
'supports': []
}
else:
@ -337,7 +338,9 @@ class StreamManager:
file_name: typing.Optional[str] = None,
download_directory: typing.Optional[str] = None,
save_file: typing.Optional[bool] = None,
resolve_timeout: float = 3.0) -> ManagedStream:
resolve_timeout: float = 3.0,
wallet: typing.Optional['Wallet'] = None) -> ManagedStream:
wallet = wallet or self.wallet_manager.default_wallet
timeout = timeout or self.config.download_timeout
start_time = self.loop.time()
resolved_time = None
@ -359,7 +362,7 @@ class StreamManager:
raise ResolveError("cannot download a channel claim, specify a /path")
try:
resolved_result = self._convert_to_old_resolve_output(
await asyncio.wait_for(self.wallet.ledger.resolve([uri]), resolve_timeout)
await asyncio.wait_for(self.wallet_manager.ledger.resolve([uri]), resolve_timeout)
)
except asyncio.TimeoutError:
raise ResolveTimeout(uri)
@ -409,7 +412,7 @@ class StreamManager:
msg = f"fee of {fee_amount} exceeds max configured to allow of {max_fee_amount}"
log.warning(msg)
raise KeyFeeAboveMaxAllowed(msg)
balance = await self.wallet.default_account.get_balance()
balance = await self.wallet_manager.get_balance(wallet)
if lbc_to_dewies(str(fee_amount)) > balance:
msg = f"fee of {fee_amount} exceeds max available balance"
log.warning(msg)
@ -429,8 +432,9 @@ class StreamManager:
if to_replace: # delete old stream now that the replacement has started downloading
await self.delete_stream(to_replace)
elif fee_address:
stream.content_fee = await self.wallet.send_amount_to_address(
lbc_to_dewies(str(fee_amount)), fee_address.encode('latin1')
stream.content_fee = await self.wallet_manager.buy_claim(
stream.claim_id, lbc_to_dewies(str(fee_amount)),
fee_address.encode('latin1'), wallet.accounts
)
log.info("paid fee of %s for %s", fee_amount, uri)
await self.storage.save_content_fee(stream.stream_hash, stream.content_fee)
@ -451,7 +455,7 @@ class StreamManager:
finally:
if self.analytics_manager and (error or (stream and (stream.downloader.time_to_descriptor or
stream.downloader.time_to_first_bytes))):
server = self.wallet.ledger.network.client.server
server = self.wallet_manager.ledger.network.client.server
self.loop.create_task(
self.analytics_manager.send_time_to_first_bytes(
resolved_time, self.loop.time() - start_time, None if not stream else stream.download_id,

View file

@ -161,10 +161,11 @@ class LbryWalletManager(BaseWalletManager):
def get_unused_address(self):
return self.default_account.receiving.get_or_create_usable_address()
async def send_amount_to_address(self, amount: int, destination_address: bytes, account=None):
account = account or self.default_account
tx = await Transaction.pay(amount, destination_address, [account], account)
await account.ledger.broadcast(tx)
async def buy_claim(self, claim_id: str, amount: int, destination_address: bytes, accounts):
tx = await Transaction.purchase(
claim_id, amount, destination_address, accounts, accounts[0]
)
await self.ledger.broadcast(tx)
return tx
async def get_transaction(self, txid):

View file

@ -14,6 +14,7 @@ from cryptography.exceptions import InvalidSignature
from torba.client.basetransaction import BaseTransaction, BaseInput, BaseOutput, ReadOnlyList
from torba.client.hash import hash160, sha256, Base58
from lbry.schema.claim import Claim
from lbry.schema.purchase import Purchase
from lbry.schema.url import normalize_name
from lbry.wallet.account import Account
from lbry.wallet.script import InputScript, OutputScript
@ -188,9 +189,31 @@ class Output(BaseOutput):
return cls(amount, script)
@classmethod
def purchase_claim_pubkey_hash(cls, amount: int, claim_id: str, pubkey_hash: bytes) -> 'Output':
script = cls.script_class.purchase_claim_pubkey_hash(unhexlify(claim_id)[::-1], pubkey_hash)
return cls(amount, script)
def add_purchase_data(cls, purchase: Purchase) -> 'Output':
script = cls.script_class.return_data(purchase)
return cls(0, script)
@property
def is_purchase_data(self) -> bool:
return self.script.is_return_data and (
isinstance(self.script.values['data'], Purchase) or
Purchase.has_start_byte(self.script.values['data'])
)
@property
def purchase_data(self) -> Purchase:
if self.is_purchase_data:
if not isinstance(self.script.values['data'], Purchase):
self.script.values['data'] = Purchase.from_bytes(self.script.values['data'])
return self.script.values['data']
raise ValueError('Output does not have purchase data.')
@property
def can_decode_purchase_data(self):
try:
return self.purchase_data
except:
return False
class Transaction(BaseTransaction):
@ -246,13 +269,12 @@ class Transaction(BaseTransaction):
return cls.create([], [support_output], funding_accounts, change_account)
@classmethod
def purchase(cls, claim: Output, amount: int, merchant_address: bytes,
def purchase(cls, claim_id: str, amount: int, merchant_address: bytes,
funding_accounts: List[Account], change_account: Account):
ledger, wallet = cls.ensure_all_have_same_ledger_and_wallet(funding_accounts, change_account)
claim_output = Output.purchase_claim_pubkey_hash(
amount, claim.claim_id, ledger.address_to_hash160(merchant_address)
)
return cls.create([], [claim_output], funding_accounts, change_account)
payment = Output.pay_pubkey_hash(amount, ledger.address_to_hash160(merchant_address))
data = Output.add_purchase_data(Purchase(claim_id))
return cls.create([], [payment, data], funding_accounts, change_account)
@property
def my_inputs(self):

View file

@ -348,7 +348,7 @@ class FileCommands(CommandTestCase):
response = await self.daemon.jsonrpc_get('lbry://icanpay')
raw_content_fee = response.content_fee.raw
await self.ledger.wait(response.content_fee)
await self.assertBalance(self.account, '8.925555')
await self.assertBalance(self.account, '8.925538')
self.assertEqual(len(self.file_list()), 1)
await asyncio.wait_for(self.wait_files_to_complete(), timeout=1)
@ -399,7 +399,7 @@ class FileCommands(CommandTestCase):
await self.assertBalance(self.account, '9.483893')
response = await self.daemon.jsonrpc_get('lbry://somename')
await self.ledger.wait(response.content_fee)
await self.assertBalance(self.account, '8.483769')
await self.assertBalance(self.account, '8.483752')
# Assert the file downloads
await asyncio.wait_for(self.wait_files_to_complete(), timeout=1)

View file

@ -0,0 +1,35 @@
from lbry.testcase import CommandTestCase
from lbry.schema.purchase import Purchase
class PurchaseCommand(CommandTestCase):
async def test_purchase_via_get(self):
starting_balance = await self.blockchain.get_balance()
target_address = await self.blockchain.get_raw_change_address()
stream = await self.stream_create(
'stream', '0.01', data=b'high value content',
fee_currency='LBC', fee_amount='1.0', fee_address=target_address
)
await self.daemon.jsonrpc_file_delete(claim_name='stream')
await self.assertBalance(self.account, '9.977893')
response = await self.daemon.jsonrpc_get('lbry://stream')
tx = response.content_fee
await self.ledger.wait(tx)
await self.assertBalance(self.account, '8.977752')
self.assertEqual(len(tx.outputs), 3)
txo = tx.outputs[1]
self.assertTrue(txo.is_purchase_data)
self.assertTrue(txo.can_decode_purchase_data)
self.assertIsInstance(txo.purchase_data, Purchase)
self.assertEqual(txo.purchase_data.claim_id, self.get_claim_id(stream))
await self.generate(1)
self.assertEqual(
await self.blockchain.get_balance(),
starting_balance +
2.0 + # block rewards
1.0 # content payment
)

View file

@ -560,12 +560,11 @@ class BaseLedger(metaclass=LedgerRegistry):
for txi in tx.inputs:
if txi.txo_ref.txo is not None:
addresses.add(
self.hash160_to_address(txi.txo_ref.txo.script.values['pubkey_hash'])
self.hash160_to_address(txi.txo_ref.txo.pubkey_hash)
)
for txo in tx.outputs:
addresses.add(
self.hash160_to_address(txo.script.values['pubkey_hash'])
)
if txo.has_address:
addresses.add(self.hash160_to_address(txo.pubkey_hash))
records = await self.db.get_addresses(address__in=addresses)
_, pending = await asyncio.wait([
self.on_transaction.where(partial(

View file

@ -78,3 +78,10 @@ class BaseWalletManager:
if wallet.id == wallet_id:
return wallet
raise ValueError(f"Couldn't find wallet: {wallet_id}.")
@staticmethod
def get_balance(wallet):
accounts = wallet.accounts
if not accounts:
return 0
return accounts[0].ledger.db.get_balance(wallet=wallet, accounts=accounts)

View file

@ -423,6 +423,12 @@ class BaseOutputScript(Script):
'script_hash': script_hash
})
@classmethod
def return_data(cls, data):
return cls(template=cls.RETURN_DATA, values={
'data': data
})
@property
def is_pay_pubkey(self):
return self.template.name.endswith('pay_pubkey_full')