more commands

This commit is contained in:
Lex Berezhny 2020-11-11 10:57:51 -05:00
parent d459d6a26b
commit 1e4613fd8a
7 changed files with 175 additions and 146 deletions

View file

@ -12,7 +12,7 @@ import base58
from aiohttp import ClientSession
from lbry.conf import Setting, NOT_SET
from lbry.db import TXO_TYPES
from lbry.db import TXO_TYPES, CLAIM_TYPE_NAMES
from lbry.db.utils import constrain_single_or_list
from lbry.wallet import Wallet, Account, SingleKey, HierarchicalDeterministic
from lbry.blockchain import Transaction, Output, dewies_to_lbc, dict_values_to_lbc
@ -1501,11 +1501,16 @@ class API:
{kwargs}
"""
kwargs = claim_filter_and_and_signed_filter_and_stream_filter_and_channel_filter_and_pagination_kwargs
kwargs = claim_filter_and_stream_filter_and_pagination_kwargs
kwargs['type'] = claim_type or CLAIM_TYPE_NAMES
if 'is_spent' not in kwargs:
kwargs['is_not_spent'] = True
return await self.txo_list(**kwargs)
return await self.txo_list(
account_id=account_id, wallet_id=wallet_id,
is_spent=is_spent, resolve=resolve,
include_received_tips=include_received_tips,
**kwargs
)
async def claim_search(
self,
@ -1631,9 +1636,11 @@ class API:
amount = self.ledger.get_dewies_or_error('bid', bid, positive_value=True)
holding_account = wallet.accounts.get_or_default(channel_dict.pop('account_id'))
funding_accounts = wallet.accounts.get_or_all(tx_dict.pop('fund_account_id'))
change_account = wallet.accounts.get_or_default(tx_dict.pop('change_account_id'))
await wallet.verify_duplicate(name, allow_duplicate_name)
tx = await wallet.channels.create(
name=name, amount=amount, holding_account=holding_account, funding_accounts=funding_accounts,
name=name, amount=amount, holding_account=holding_account,
funding_accounts=funding_accounts, change_account=change_account,
save_key=not tx_dict['preview'], **remove_nulls(channel_dict)
)
await self.service.maybe_broadcast_or_release(tx, **tx_dict)
@ -2113,15 +2120,19 @@ class API:
List my stream claims.
Usage:
stream list [<account_id> | --account_id=<account_id>] [--wallet_id=<wallet_id>]
stream list [--account_id=<account_id>] [--wallet_id=<wallet_id>]
[--is_spent] [--resolve]
{kwargs}
"""
kwargs['type'] = 'stream'
if 'is_spent' not in kwargs:
kwargs['is_not_spent'] = True
return await self.txo_list(*args, **kwargs)
claim_filter_and_pagination_kwargs['type'] = 'stream'
if 'is_spent' not in claim_filter_and_pagination_kwargs:
claim_filter_and_pagination_kwargs['is_not_spent'] = True
return await self.txo_list(
account_id=account_id, wallet_id=wallet_id,
is_spent=is_spent, resolve=resolve,
**claim_filter_and_pagination_kwargs
)
async def stream_cost_estimate(
self,
@ -2410,6 +2421,7 @@ class API:
{kwargs}
"""
kwargs = pagination_kwargs
kwargs['type'] = 'support'
if 'is_spent' not in kwargs:
kwargs['is_not_spent'] = True
@ -2425,7 +2437,9 @@ class API:
elif staked:
kwargs['is_my_input'] = True
kwargs['is_my_output'] = True
return await self.txo_list(*args, **kwargs)
return await self.txo_list(
account_id=account_id, wallet_id=wallet_id, is_spent=is_spent, **kwargs
)
async def support_search(
self,
@ -2722,7 +2736,7 @@ class API:
else:
raise ValueError(f"'{order_by}' is not a valid --order_by value.")
self._constrain_txo_from_kwargs(constraints, **txo_dict)
return await paginate_rows(
return await Paginated.from_getter(
self.service.get_txos,
wallet=wallet, accounts=accounts,
**pagination, **constraints
@ -2820,13 +2834,13 @@ class API:
List unspent transaction outputs
Usage:
utxo_list
utxo list
{kwargs}
"""
kwargs['type'] = ['other', 'purchase']
kwargs['is_not_spent'] = True
return await self.txo_list(*args, **kwargs)
txo_filter_and_pagination_kwargs['type'] = ['other', 'purchase']
txo_filter_and_pagination_kwargs['is_not_spent'] = True
return await self.txo_list(**txo_filter_and_pagination_kwargs)
async def utxo_release(
self,

View file

@ -11,11 +11,12 @@ from typing import List, Optional, DefaultDict, NamedTuple
#from lbry.crypto.hash import double_sha256, sha256
from lbry.tasks import TaskGroup
from lbry.blockchain.transaction import Transaction
from lbry.blockchain.block import get_address_filter
from lbry.blockchain import Transaction
from lbry.blockchain.block import Block, get_address_filter
from lbry.event import BroadcastSubscription, EventController
from lbry.wallet.account import AddressManager
from lbry.blockchain import Ledger, Transaction
from lbry.db import Database
from .base import Service, Sync
from .api import Client as APIClient
@ -36,14 +37,21 @@ class LightClient(Service):
f"http://{ledger.conf.full_nodes[0][0]}:{ledger.conf.full_nodes[0][1]}/ws"
)
self.sync = FastSync(self, self.client)
self.blocks = BlockHeaderManager(self.db, self.client)
self.filters = FilterManager(self.db, self.client)
async def start(self):
await self.client.connect()
await super().start()
await self.client.start_event_streams()
async def stop(self):
await super().stop()
await self.client.disconnect()
async def search_transactions(self, txids):
return await self.client.transaction_search(txids=txids)
async def get_address_filters(self, start_height: int, end_height: int = None, granularity: int = 0):
return await self.filters.get_filters(
return await self.sync.filters.get_filters(
start_height=start_height, end_height=end_height, granularity=granularity
)
@ -108,74 +116,16 @@ class FilterManager:
self.client = client
self.cache = {}
async def get_filters(self, start_height, end_height, granularity):
return await self.client.address_filter(
start_height=start_height, end_height=end_height, granularity=granularity
)
class BlockHeaderManager:
"""
Efficient on-demand block header access.
Stores and retrieves from local db what it previously downloaded and
downloads on-demand what it doesn't have from full node.
"""
def __init__(self, db, client):
self.db = db
self.client = client
self.cache = {}
async def get_header(self, height):
blocks = await self.client.block_list(height)
if blocks:
return blocks[0]
async def add(self, header):
pass
async def download(self):
pass
filters_response = await self.client.get_address_filters(0, 500)
filters = await filters_response.first
address_array = [bytearray(self.client.ledger.address_to_hash160(address))]
for filter in filters:
print(filter)
filter = get_address_filter(unhexlify(filter['filter']))
print(filter.MatchAny(address_array))
class FastSync(Sync):
def __init__(self, service: Service, client: APIClient):
super().__init__(service.ledger, service.db)
self.service = service
self.client = client
self.advance_loop_task: Optional[asyncio.Task] = None
self.on_block = client.get_event_stream('blockchain.block')
self.on_block_event = asyncio.Event()
self.on_block_subscription: Optional[BroadcastSubscription] = None
self.on_mempool = client.get_event_stream('blockchain.mempool')
self.on_mempool_event = asyncio.Event()
self.on_mempool_subscription: Optional[BroadcastSubscription] = None
async def wait_for_client_ready(self):
await self.client.connect()
async def start(self):
return
self.db.stop_event.clear()
await self.wait_for_client_ready()
self.advance_loop_task = asyncio.create_task(self.advance())
await self.advance_loop_task
await self.client.subscribe()
self.advance_loop_task = asyncio.create_task(self.advance_loop())
self.on_block_subscription = self.on_block.listen(
lambda e: self.on_block_event.set()
)
self.on_mempool_subscription = self.on_mempool.listen(
lambda e: self.on_mempool_event.set()
)
await self.download_filters()
await self.download_headers()
async def stop(self):
await self.client.disconnect()
async def advance(self):
address_array = [
bytearray(a['address'].encode())
for a in await self.service.db.get_all_addresses()
@ -194,6 +144,98 @@ class FastSync(Sync):
tx = Transaction(unhexlify(txs[txid]))
await self.service.db.insert_transaction(tx)
async def get_filters(self, start_height, end_height, granularity):
return await self.client.address_filter(
start_height=start_height, end_height=end_height, granularity=granularity
)
class BlockHeaderManager:
"""
Efficient on-demand block header access.
Stores and retrieves from local db what it previously downloaded and
downloads on-demand what it doesn't have from full node.
"""
def __init__(self, db: Database, client: APIClient):
self.db = db
self.client = client
self.cache = {}
async def download(self):
our_height = await self.db.get_best_block_height()
best_height = await self.client.block_tip()
for block in await self.client.block_list(our_height+1, best_height):
await self.db.insert_block(Block(
height=block["height"],
version=0,
file_number=0,
block_hash=block["block_hash"],
prev_block_hash=block["previous_hash"],
merkle_root=block["merkle_root"],
claim_trie_root=block["claim_trie_root"],
timestamp=block["timestamp"],
bits=block["bits"],
nonce=block["nonce"],
txs=[]
))
async def get_header(self, height):
blocks = await self.client.first.block_list(height=height)
if blocks:
return blocks[0]
class FastSync(Sync):
def __init__(self, service: Service, client: APIClient):
super().__init__(service.ledger, service.db)
self.service = service
self.client = client
self.advance_loop_task: Optional[asyncio.Task] = None
self.on_block = client.get_event_stream('blockchain.block')
self.on_block_event = asyncio.Event()
self.on_block_subscription: Optional[BroadcastSubscription] = None
self.blocks = BlockHeaderManager(self.db, self.client)
self.filters = FilterManager(self.db, self.client)
async def get_block_headers(self, start_height: int, end_height: int = None):
return await self.client.block_list(start_height, end_height)
async def get_best_block_height(self) -> int:
return await self.client.block_tip()
async def start(self):
self.advance_loop_task = asyncio.create_task(self.advance())
await self.advance_loop_task
self.advance_loop_task = asyncio.create_task(self.loop())
self.on_block_subscription = self.on_block.listen(
lambda e: self.on_block_event.set()
)
async def stop(self):
for task in (self.on_block_subscription, self.advance_loop_task):
if task is not None:
task.cancel()
async def advance(self):
await asyncio.wait([
self.blocks.download(),
self.filters.download()
])
async def loop(self):
while True:
try:
await self.on_block_event.wait()
self.on_block_event.clear()
await self.advance()
except asyncio.CancelledError:
return
except Exception as e:
log.exception(e)
await self.stop()
# async def get_local_status_and_history(self, address, history=None):
# if not history:
# address_details = await self.db.get_address(address=address)

View file

@ -810,6 +810,9 @@ class CommandTestCase(IntegrationTestCase):
async def file_list(self, *args, **kwargs):
return (await self.out(self.api.file_list(*args, **kwargs)))['items']
async def utxo_list(self, *args, **kwargs):
return (await self.out(self.api.utxo_list(*args, **kwargs)))['items']
async def txo_list(self, *args, **kwargs):
return (await self.out(self.api.txo_list(*args, **kwargs)))['items']
@ -834,6 +837,9 @@ class CommandTestCase(IntegrationTestCase):
async def collection_resolve(self, *args, **kwargs):
return (await self.out(self.api.collection_resolve(*args, **kwargs)))['items']
async def support_list(self, *args, **kwargs):
return (await self.out(self.api.support_list(*args, **kwargs)))['items']
async def transaction_list(self, *args, **kwargs):
return (await self.out(self.api.transaction_list(*args, **kwargs)))['items']

View file

@ -458,23 +458,6 @@ class Account:
self._channel_keys_deserialized[channel_pubkey_hash] = private_key
return private_key
async def maybe_migrate_certificates(self):
def to_der(private_key_pem):
return ecdsa.SigningKey.from_pem(private_key_pem, hashfunc=sha256).get_verifying_key().to_der()
if not self.channel_keys:
return
channel_keys = {}
for private_key_pem in self.channel_keys.values():
if not isinstance(private_key_pem, str):
continue
if "-----BEGIN EC PRIVATE KEY-----" not in private_key_pem:
continue
public_key_der = await asyncio.get_running_loop().run_in_executor(None, to_der, private_key_pem)
channel_keys[self.ledger.public_key_to_address(public_key_der)] = private_key_pem
if self.channel_keys != channel_keys:
self.channel_keys = channel_keys
async def save_max_gap(self):
gap_changed = False
if issubclass(self.address_generator, HierarchicalDeterministic):

View file

@ -572,7 +572,7 @@ class ChannelListManager(ClaimListManager):
async def create(
self, name: str, amount: int, holding_account: Account,
funding_accounts: List[Account], save_key=True, **kwargs
funding_accounts: List[Account], change_account: Account, save_key=True, **kwargs
) -> Transaction:
holding_address = await holding_account.receiving.get_or_create_usable_address()
@ -586,7 +586,7 @@ class ChannelListManager(ClaimListManager):
await txo.generate_channel_private_key()
tx = await self.wallet.create_transaction(
[], [txo], funding_accounts, funding_accounts[0]
[], [txo], funding_accounts, change_account
)
await self.wallet.sign(tx)

View file

View file

@ -65,79 +65,63 @@ class AccountManagement(CommandTestCase):
self.assertEqual(len(accounts), 1)
self.assertEqual(accounts[0]['name'], 'recreated account')
async def test_wallet_migration(self):
# null certificates should get deleted
await self.channel_create('@foo1')
await self.channel_create('@foo2')
await self.channel_create('@foo3')
keys = list(self.account.channel_keys.keys())
self.account.channel_keys[keys[0]] = None
self.account.channel_keys[keys[1]] = "some invalid junk"
await self.account.maybe_migrate_certificates()
self.assertEqual(list(self.account.channel_keys.keys()), [keys[2]])
async def assertFindsClaims(self, claim_names, awaitable):
self.assertEqual(claim_names, [txo.claim_name for txo in (await awaitable)['items']])
self.assertEqual(claim_names, [txo["name"] for txo in await awaitable])
async def assertOutputAmount(self, amounts, awaitable):
self.assertEqual(amounts, [dewies_to_lbc(txo.amount) for txo in (await awaitable)['items']])
self.assertEqual(amounts, [txo["amount"] for txo in await awaitable])
async def test_commands_across_accounts(self):
channel_list = self.daemon.jsonrpc_channel_list
stream_list = self.daemon.jsonrpc_stream_list
support_list = self.daemon.jsonrpc_support_list
utxo_list = self.daemon.jsonrpc_utxo_list
default_account = self.wallet.default_account
second_account = await self.daemon.jsonrpc_account_create('second account')
account1 = self.wallet.accounts.default.id
account2 = (await self.account_create('second account'))["id"]
tx = await self.daemon.jsonrpc_account_send(
'0.05', await self.daemon.jsonrpc_address_unused(account_id=second_account.id)
)
await self.confirm_tx(tx.id)
await self.assertOutputAmount(['0.05', '9.949876'], utxo_list())
await self.assertOutputAmount(['0.05'], utxo_list(account_id=second_account.id))
await self.assertOutputAmount(['9.949876'], utxo_list(account_id=default_account.id))
address2 = await self.address_unused(account2)
await self.wallet_send('0.05', address2, fund_account_id=self.account.id)
await self.generate(1)
await self.assertOutputAmount(['0.05', '9.949876'], self.utxo_list())
await self.assertOutputAmount(['9.949876'], self.utxo_list(account_id=account1))
await self.assertOutputAmount(['0.05'], self.utxo_list(account_id=account2))
channel1 = await self.channel_create('@channel-in-account1', '0.01')
channel2 = await self.channel_create(
'@channel-in-account2', '0.01', account_id=second_account.id, funding_account_ids=[default_account.id]
'@channel-in-account2', '0.01', account_id=account2, fund_account_id=[account1]
)
await self.assertFindsClaims(['@channel-in-account2', '@channel-in-account1'], channel_list())
await self.assertFindsClaims(['@channel-in-account1'], channel_list(account_id=default_account.id))
await self.assertFindsClaims(['@channel-in-account2'], channel_list(account_id=second_account.id))
await self.assertFindsClaims(['@channel-in-account2', '@channel-in-account1'], self.channel_list())
await self.assertFindsClaims(['@channel-in-account1'], self.channel_list(account_id=account1))
await self.assertFindsClaims(['@channel-in-account2'], self.channel_list(account_id=account2))
stream1 = await self.stream_create('stream-in-account1', '0.01', channel_id=self.get_claim_id(channel1))
stream2 = await self.stream_create(
'stream-in-account2', '0.01', channel_id=self.get_claim_id(channel2),
account_id=second_account.id, funding_account_ids=[default_account.id]
account_id=account2, fund_account_id=[account1]
)
await self.assertFindsClaims(['stream-in-account2', 'stream-in-account1'], stream_list())
await self.assertFindsClaims(['stream-in-account1'], stream_list(account_id=default_account.id))
await self.assertFindsClaims(['stream-in-account2'], stream_list(account_id=second_account.id))
await self.assertFindsClaims(['stream-in-account2', 'stream-in-account1'], self.stream_list())
await self.assertFindsClaims(['stream-in-account1'], self.stream_list(account_id=account1))
await self.assertFindsClaims(['stream-in-account2'], self.stream_list(account_id=account2))
await self.assertFindsClaims(
['stream-in-account2', 'stream-in-account1', '@channel-in-account2', '@channel-in-account1'],
self.daemon.jsonrpc_claim_list()
self.claim_list()
)
await self.assertFindsClaims(
['stream-in-account1', '@channel-in-account1'],
self.daemon.jsonrpc_claim_list(account_id=default_account.id)
self.claim_list(account_id=account1)
)
await self.assertFindsClaims(
['stream-in-account2', '@channel-in-account2'],
self.daemon.jsonrpc_claim_list(account_id=second_account.id)
self.claim_list(account_id=account2)
)
support1 = await self.support_create(self.get_claim_id(stream1), '0.01')
support2 = await self.support_create(
self.get_claim_id(stream2), '0.01', account_id=second_account.id, funding_account_ids=[default_account.id]
self.get_claim_id(stream2), '0.01', account_id=account2, fund_account_id=[account1]
)
self.assertEqual([support2['txid'], support1['txid']], [txo.tx_ref.id for txo in (await support_list())['items']])
self.assertEqual([support1['txid']], [txo.tx_ref.id for txo in (await support_list(account_id=default_account.id))['items']])
self.assertEqual([support2['txid']], [txo.tx_ref.id for txo in (await support_list(account_id=second_account.id))['items']])
self.assertEqual([support2['txid'], support1['txid']], [txo['txid'] for txo in await self.support_list()])
self.assertEqual([support1['txid']], [txo['txid'] for txo in await self.support_list(account_id=account1)])
self.assertEqual([support2['txid']], [txo['txid'] for txo in await self.support_list(account_id=account2)])
history = await self.daemon.jsonrpc_transaction_list()
history = await self.transaction_list()
self.assertItemCount(history, 8)
history = history['items']
self.assertEqual(extract(history[0]['support_info'][0], ['claim_name', 'is_tip', 'amount', 'balance_delta']), {