diff --git a/lbry/lbry/extras/daemon/Components.py b/lbry/lbry/extras/daemon/Components.py index 6da6ba71c..fc7854532 100644 --- a/lbry/lbry/extras/daemon/Components.py +++ b/lbry/lbry/extras/daemon/Components.py @@ -128,14 +128,13 @@ class WalletComponent(Component): if self.wallet_manager.ledger.network.remote_height: local_height = self.wallet_manager.ledger.local_height_including_downloaded_height disk_height = len(self.wallet_manager.ledger.headers) - download_height = local_height - disk_height if disk_height != local_height else local_height remote_height = self.wallet_manager.ledger.network.remote_height - target_height = remote_height - disk_height if disk_height != local_height else remote_height - best_hash = self.wallet_manager.get_best_blockhash() - if not target_height: - progress = 100 + if disk_height != local_height != remote_height: + download_height, target_height = local_height - disk_height, remote_height - disk_height else: - progress = min(max(math.ceil(float(download_height) / float(target_height) * 100), 0), 100) + download_height, target_height = local_height, remote_height + progress = min(max(math.ceil(float(download_height) / float(target_height) * 100), 0), 100) + best_hash = self.wallet_manager.get_best_blockhash() result.update({ 'headers_synchronization_progress': progress, 'blocks': max(local_height, 0), diff --git a/lbry/lbry/testcase.py b/lbry/lbry/testcase.py index c49206c47..23b880a6f 100644 --- a/lbry/lbry/testcase.py +++ b/lbry/lbry/testcase.py @@ -90,6 +90,7 @@ class CommandTestCase(IntegrationTestCase): self.daemons = [] self.extra_wallet_nodes = [] self.extra_wallet_node_port = 5280 + self.__height = 0 self.daemon = await self.add_daemon(self.wallet_node) await self.account.ensure_address_gap() @@ -196,6 +197,7 @@ class CommandTestCase(IntegrationTestCase): async def generate(self, blocks): """ Ask lbrycrd to generate some blocks and wait until ledger has them. """ await self.blockchain.generate(blocks) + self.__height += 1 await self.ledger.on_header.where(self.blockchain.is_expected_block) async def blockchain_claim_name(self, name: str, value: str, amount: str, confirm=True): @@ -223,7 +225,7 @@ class CommandTestCase(IntegrationTestCase): if confirm: await self.ledger.wait(tx) await self.generate(1) - await self.ledger.wait(tx) + await self.ledger.wait(tx, self.__height) return self.sout(tx) def create_upload_file(self, data, prefix=None, suffix=None): diff --git a/lbry/lbry/wallet/manager.py b/lbry/lbry/wallet/manager.py index 5e9f5d8f6..8b454d38e 100644 --- a/lbry/lbry/wallet/manager.py +++ b/lbry/lbry/wallet/manager.py @@ -226,7 +226,7 @@ class LbryWalletManager(BaseWalletManager): try: await self.ledger.broadcast(tx) if blocking: - await self.ledger.wait(tx) + await self.ledger.wait(tx, timeout=None) except: await self.ledger.release_tx(tx) raise diff --git a/lbry/tests/integration/test_purchase_command.py b/lbry/tests/integration/test_purchase_command.py index 1e33d10dd..aac4bbcac 100644 --- a/lbry/tests/integration/test_purchase_command.py +++ b/lbry/tests/integration/test_purchase_command.py @@ -36,16 +36,17 @@ class PurchaseCommandTests(CommandTestCase): await self.ledger.wait(purchase) return claim_id - async def assertStreamPurchased(self, stream: Transaction, purchase: Transaction): - stream_txo, purchase_txo = stream.outputs[0], purchase.outputs[0] - stream_fee = stream_txo.claim.stream.fee - self.assertEqual(stream_fee.dewies, purchase_txo.amount) - self.assertEqual(stream_fee.address, purchase_txo.get_address(self.ledger)) + async def assertStreamPurchased(self, stream: Transaction, operation): await self.account.release_all_outputs() buyer_balance = await self.account.get_balance() merchant_balance = lbc_to_dewies(str(await self.blockchain.get_balance())) pre_purchase_count = (await self.daemon.jsonrpc_purchase_list())['total_items'] + purchase = await operation() + stream_txo, purchase_txo = stream.outputs[0], purchase.outputs[0] + stream_fee = stream_txo.claim.stream.fee + self.assertEqual(stream_fee.dewies, purchase_txo.amount) + self.assertEqual(stream_fee.address, purchase_txo.get_address(self.ledger)) await self.ledger.wait(purchase) await self.generate(1) @@ -76,8 +77,7 @@ class PurchaseCommandTests(CommandTestCase): claim_id = stream.outputs[0].claim_id # explicit purchase of claim - tx = await self.daemon.jsonrpc_purchase_create(claim_id) - await self.assertStreamPurchased(stream, tx) + await self.assertStreamPurchased(stream, lambda: self.daemon.jsonrpc_purchase_create(claim_id)) # check that `get` doesn't purchase it again balance = await self.account.get_balance() @@ -88,8 +88,12 @@ class PurchaseCommandTests(CommandTestCase): # `get` does purchase a stream we don't have yet another_stream = await self.priced_stream('another') - response = await self.daemon.jsonrpc_get('lbry://another') - await self.assertStreamPurchased(another_stream, response.content_fee) + + async def imagine_its_a_lambda(): + response = await self.daemon.jsonrpc_get('lbry://another') + return response.content_fee + + await self.assertStreamPurchased(another_stream, imagine_its_a_lambda) # purchase non-existent claim fails with self.assertRaisesRegex(Exception, "Could not find claim with claim_id"): @@ -105,13 +109,13 @@ class PurchaseCommandTests(CommandTestCase): await self.daemon.jsonrpc_purchase_create(claim_id) # force purchasing claim you already own - tx = await self.daemon.jsonrpc_purchase_create(claim_id, allow_duplicate_purchase=True) - await self.assertStreamPurchased(stream, tx) + await self.assertStreamPurchased( + stream, lambda: self.daemon.jsonrpc_purchase_create(claim_id, allow_duplicate_purchase=True) + ) # purchase by uri abc_stream = await self.priced_stream('abc') - tx = await self.daemon.jsonrpc_purchase_create(url='lbry://abc') - await self.assertStreamPurchased(abc_stream, tx) + await self.assertStreamPurchased(abc_stream, lambda: self.daemon.jsonrpc_purchase_create(url='lbry://abc')) async def test_purchase_and_transaction_list(self): self.assertItemCount(await self.daemon.jsonrpc_purchase_list(), 0) diff --git a/torba/torba/client/baseledger.py b/torba/torba/client/baseledger.py index f3cf38518..68b5a2562 100644 --- a/torba/torba/client/baseledger.py +++ b/torba/torba/client/baseledger.py @@ -575,7 +575,7 @@ class BaseLedger(metaclass=LedgerRegistry): # broadcast can't be a retriable call yet return self.network.broadcast(hexlify(tx.raw).decode()) - async def wait(self, tx: basetransaction.BaseTransaction, height=-1, timeout=None): + async def wait(self, tx: basetransaction.BaseTransaction, height=-1, timeout=1): addresses = set() for txi in tx.inputs: if txi.txo_ref.txo is not None: @@ -593,4 +593,12 @@ class BaseLedger(metaclass=LedgerRegistry): )) for address_record in records ], timeout=timeout) if pending: - raise asyncio.TimeoutError('Timed out waiting for transaction.') + for record in records: + found = False + _, local_history = await self.get_local_status_and_history(None, history=record['history']) + for txid, local_height in local_history: + if txid == tx.id and local_height >= height: + found = True + if not found: + print(record['history'], addresses, tx.id) + raise asyncio.TimeoutError('Timed out waiting for transaction.') diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index e443d2fc1..5a45dc848 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -232,7 +232,7 @@ class BaseNetwork: def get_transaction(self, tx_hash, known_height=None): # use any server if its old, otherwise restrict to who gave us the history - restricted = not known_height or 0 > known_height > self.remote_height - 10 + restricted = known_height in (None, -1, 0) or 0 > known_height > self.remote_height - 10 return self.rpc('blockchain.transaction.get', [tx_hash], restricted) def get_transaction_height(self, tx_hash, known_height=None): diff --git a/torba/torba/server/mempool.py b/torba/torba/server/mempool.py index fc2c96c7e..f1c162c3a 100644 --- a/torba/torba/server/mempool.py +++ b/torba/torba/server/mempool.py @@ -11,7 +11,6 @@ import asyncio import itertools import time from abc import ABC, abstractmethod -from asyncio import Lock, sleep from collections import defaultdict import attr @@ -104,7 +103,8 @@ class MemPool: self.refresh_secs = refresh_secs self.log_status_secs = log_status_secs # Prevents mempool refreshes during fee histogram calculation - self.lock = Lock() + self.lock = asyncio.Lock() + self.wakeup = asyncio.Event() async def _logging(self, synchronized_event): """Print regular logs of mempool stats.""" @@ -117,7 +117,7 @@ class MemPool: while True: self.logger.info(f'{len(self.txs):,d} txs ' f'touching {len(self.hashXs):,d} addresses') - await sleep(self.log_status_secs) + await asyncio.sleep(self.log_status_secs) await synchronized_event.wait() async def _refresh_histogram(self, synchronized_event): @@ -126,7 +126,7 @@ class MemPool: async with self.lock: # Threaded as can be expensive await asyncio.get_event_loop().run_in_executor(None, self._update_histogram, 100_000) - await sleep(self.coin.MEMPOOL_HISTOGRAM_REFRESH_SECS) + await asyncio.sleep(self.coin.MEMPOOL_HISTOGRAM_REFRESH_SECS) def _update_histogram(self, bin_size): # Build a histogram by fee rate @@ -212,7 +212,13 @@ class MemPool: synchronized_event.set() synchronized_event.clear() await self.api.on_mempool(touched, height) - await sleep(self.refresh_secs) + try: + # we wait up to `refresh_secs` but go early if a broadcast happens (which triggers wakeup event) + await asyncio.wait_for(self.wakeup.wait(), timeout=self.refresh_secs) + except asyncio.TimeoutError: + pass + finally: + self.wakeup.clear() async def _process_mempool(self, all_hashes): # Re-sync with the new set of hashes diff --git a/torba/torba/server/session.py b/torba/torba/server/session.py index d9252d350..fe78133bc 100644 --- a/torba/torba/server/session.py +++ b/torba/torba/server/session.py @@ -567,6 +567,7 @@ class SessionManager: async def broadcast_transaction(self, raw_tx): hex_hash = await self.daemon.broadcast_transaction(raw_tx) + self.mempool.wakeup.set() self.txs_sent += 1 return hex_hash