From 91846939f6bc0194062cebd5fe903e2fb345cd43 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 18 Dec 2019 19:09:33 -0300 Subject: [PATCH 1/9] organize logic for when its downloading --- lbry/lbry/extras/daemon/Components.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/lbry/lbry/extras/daemon/Components.py b/lbry/lbry/extras/daemon/Components.py index 6da6ba71c..fc7854532 100644 --- a/lbry/lbry/extras/daemon/Components.py +++ b/lbry/lbry/extras/daemon/Components.py @@ -128,14 +128,13 @@ class WalletComponent(Component): if self.wallet_manager.ledger.network.remote_height: local_height = self.wallet_manager.ledger.local_height_including_downloaded_height disk_height = len(self.wallet_manager.ledger.headers) - download_height = local_height - disk_height if disk_height != local_height else local_height remote_height = self.wallet_manager.ledger.network.remote_height - target_height = remote_height - disk_height if disk_height != local_height else remote_height - best_hash = self.wallet_manager.get_best_blockhash() - if not target_height: - progress = 100 + if disk_height != local_height != remote_height: + download_height, target_height = local_height - disk_height, remote_height - disk_height else: - progress = min(max(math.ceil(float(download_height) / float(target_height) * 100), 0), 100) + download_height, target_height = local_height, remote_height + progress = min(max(math.ceil(float(download_height) / float(target_height) * 100), 0), 100) + best_hash = self.wallet_manager.get_best_blockhash() result.update({ 'headers_synchronization_progress': progress, 'blocks': max(local_height, 0), From d66c8013500379c84a01047097eb18a1765478d0 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 17 Dec 2019 23:00:28 -0300 Subject: [PATCH 2/9] wake up mempool on broadcast --- torba/torba/server/mempool.py | 15 ++++++++++----- torba/torba/server/session.py | 1 + 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/torba/torba/server/mempool.py b/torba/torba/server/mempool.py index fc2c96c7e..62449e40a 100644 --- a/torba/torba/server/mempool.py +++ b/torba/torba/server/mempool.py @@ -11,7 +11,6 @@ import asyncio import itertools import time from abc import ABC, abstractmethod -from asyncio import Lock, sleep from collections import defaultdict import attr @@ -104,7 +103,8 @@ class MemPool: self.refresh_secs = refresh_secs self.log_status_secs = log_status_secs # Prevents mempool refreshes during fee histogram calculation - self.lock = Lock() + self.lock = asyncio.Lock() + self.wakeup = asyncio.Event() async def _logging(self, synchronized_event): """Print regular logs of mempool stats.""" @@ -117,7 +117,7 @@ class MemPool: while True: self.logger.info(f'{len(self.txs):,d} txs ' f'touching {len(self.hashXs):,d} addresses') - await sleep(self.log_status_secs) + await asyncio.sleep(self.log_status_secs) await synchronized_event.wait() async def _refresh_histogram(self, synchronized_event): @@ -126,7 +126,7 @@ class MemPool: async with self.lock: # Threaded as can be expensive await asyncio.get_event_loop().run_in_executor(None, self._update_histogram, 100_000) - await sleep(self.coin.MEMPOOL_HISTOGRAM_REFRESH_SECS) + await asyncio.sleep(self.coin.MEMPOOL_HISTOGRAM_REFRESH_SECS) def _update_histogram(self, bin_size): # Build a histogram by fee rate @@ -212,7 +212,12 @@ class MemPool: synchronized_event.set() synchronized_event.clear() await self.api.on_mempool(touched, height) - await sleep(self.refresh_secs) + try: + await asyncio.wait_for(self.wakeup.wait(), timeout=self.refresh_secs) + except asyncio.TimeoutError: + pass + finally: + self.wakeup.clear() async def _process_mempool(self, all_hashes): # Re-sync with the new set of hashes diff --git a/torba/torba/server/session.py b/torba/torba/server/session.py index d9252d350..fe78133bc 100644 --- a/torba/torba/server/session.py +++ b/torba/torba/server/session.py @@ -567,6 +567,7 @@ class SessionManager: async def broadcast_transaction(self, raw_tx): hex_hash = await self.daemon.broadcast_transaction(raw_tx) + self.mempool.wakeup.set() self.txs_sent += 1 return hex_hash From ada03e12fcb3aa84d45dfcc7ce4cf64e5ce69774 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 17 Dec 2019 23:59:44 -0300 Subject: [PATCH 3/9] proper check for restrictions on gettx --- torba/torba/client/basenetwork.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index e443d2fc1..5a45dc848 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -232,7 +232,7 @@ class BaseNetwork: def get_transaction(self, tx_hash, known_height=None): # use any server if its old, otherwise restrict to who gave us the history - restricted = not known_height or 0 > known_height > self.remote_height - 10 + restricted = known_height in (None, -1, 0) or 0 > known_height > self.remote_height - 10 return self.rpc('blockchain.transaction.get', [tx_hash], restricted) def get_transaction_height(self, tx_hash, known_height=None): From 84b8a224239008297a06c5e125e45bd2adb64e49 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 19 Dec 2019 16:20:12 -0300 Subject: [PATCH 4/9] comment the broadcast interruption case --- torba/torba/server/mempool.py | 1 + 1 file changed, 1 insertion(+) diff --git a/torba/torba/server/mempool.py b/torba/torba/server/mempool.py index 62449e40a..f1c162c3a 100644 --- a/torba/torba/server/mempool.py +++ b/torba/torba/server/mempool.py @@ -213,6 +213,7 @@ class MemPool: synchronized_event.clear() await self.api.on_mempool(touched, height) try: + # we wait up to `refresh_secs` but go early if a broadcast happens (which triggers wakeup event) await asyncio.wait_for(self.wakeup.wait(), timeout=self.refresh_secs) except asyncio.TimeoutError: pass From 3e03dd3e80ac65dbb967bdad3df745426b09d39b Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 19 Dec 2019 20:20:42 -0300 Subject: [PATCH 5/9] apply timeout and check for expected height on wait --- lbry/lbry/testcase.py | 4 +++- torba/torba/client/baseledger.py | 5 ++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/lbry/lbry/testcase.py b/lbry/lbry/testcase.py index c49206c47..23b880a6f 100644 --- a/lbry/lbry/testcase.py +++ b/lbry/lbry/testcase.py @@ -90,6 +90,7 @@ class CommandTestCase(IntegrationTestCase): self.daemons = [] self.extra_wallet_nodes = [] self.extra_wallet_node_port = 5280 + self.__height = 0 self.daemon = await self.add_daemon(self.wallet_node) await self.account.ensure_address_gap() @@ -196,6 +197,7 @@ class CommandTestCase(IntegrationTestCase): async def generate(self, blocks): """ Ask lbrycrd to generate some blocks and wait until ledger has them. """ await self.blockchain.generate(blocks) + self.__height += 1 await self.ledger.on_header.where(self.blockchain.is_expected_block) async def blockchain_claim_name(self, name: str, value: str, amount: str, confirm=True): @@ -223,7 +225,7 @@ class CommandTestCase(IntegrationTestCase): if confirm: await self.ledger.wait(tx) await self.generate(1) - await self.ledger.wait(tx) + await self.ledger.wait(tx, self.__height) return self.sout(tx) def create_upload_file(self, data, prefix=None, suffix=None): diff --git a/torba/torba/client/baseledger.py b/torba/torba/client/baseledger.py index f3cf38518..fa0349769 100644 --- a/torba/torba/client/baseledger.py +++ b/torba/torba/client/baseledger.py @@ -575,7 +575,10 @@ class BaseLedger(metaclass=LedgerRegistry): # broadcast can't be a retriable call yet return self.network.broadcast(hexlify(tx.raw).decode()) - async def wait(self, tx: basetransaction.BaseTransaction, height=-1, timeout=None): + async def wait(self, tx: basetransaction.BaseTransaction, height=-1, timeout=1): + we_have = await self.get_transactions(txid=tx.id) + if we_have and we_have[0].height >= height: + return addresses = set() for txi in tx.inputs: if txi.txo_ref.txo is not None: From 6652d5545573334a0ee73b282888c7d9edb2fccf Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 19 Dec 2019 21:46:14 -0300 Subject: [PATCH 6/9] wip --- torba/torba/client/baseledger.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/torba/torba/client/baseledger.py b/torba/torba/client/baseledger.py index fa0349769..1d2a50479 100644 --- a/torba/torba/client/baseledger.py +++ b/torba/torba/client/baseledger.py @@ -575,10 +575,7 @@ class BaseLedger(metaclass=LedgerRegistry): # broadcast can't be a retriable call yet return self.network.broadcast(hexlify(tx.raw).decode()) - async def wait(self, tx: basetransaction.BaseTransaction, height=-1, timeout=1): - we_have = await self.get_transactions(txid=tx.id) - if we_have and we_have[0].height >= height: - return + async def wait(self, tx: basetransaction.BaseTransaction, height=-1, timeout=2): addresses = set() for txi in tx.inputs: if txi.txo_ref.txo is not None: @@ -596,4 +593,12 @@ class BaseLedger(metaclass=LedgerRegistry): )) for address_record in records ], timeout=timeout) if pending: - raise asyncio.TimeoutError('Timed out waiting for transaction.') + for record in records: + found = False + _, local_history = await self.get_local_status_and_history(None, history=record['history']) + for txid, local_height in local_history: + if txid == tx.id and local_height >= height: + found = True + if not found: + print(record['history'], addresses, tx.id) + raise asyncio.TimeoutError('Timed out waiting for transaction: %s', tx.id) From 4c6dedfa4f129a85f93faec093435536e1cf28bf Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 19 Dec 2019 22:34:10 -0300 Subject: [PATCH 7/9] fix last test --- .../integration/test_purchase_command.py | 25 +++++++++++-------- torba/torba/client/baseledger.py | 4 +-- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/lbry/tests/integration/test_purchase_command.py b/lbry/tests/integration/test_purchase_command.py index 1e33d10dd..8e9cd6ade 100644 --- a/lbry/tests/integration/test_purchase_command.py +++ b/lbry/tests/integration/test_purchase_command.py @@ -36,16 +36,17 @@ class PurchaseCommandTests(CommandTestCase): await self.ledger.wait(purchase) return claim_id - async def assertStreamPurchased(self, stream: Transaction, purchase: Transaction): - stream_txo, purchase_txo = stream.outputs[0], purchase.outputs[0] - stream_fee = stream_txo.claim.stream.fee - self.assertEqual(stream_fee.dewies, purchase_txo.amount) - self.assertEqual(stream_fee.address, purchase_txo.get_address(self.ledger)) + async def assertStreamPurchased(self, stream: Transaction, operation): await self.account.release_all_outputs() buyer_balance = await self.account.get_balance() merchant_balance = lbc_to_dewies(str(await self.blockchain.get_balance())) pre_purchase_count = (await self.daemon.jsonrpc_purchase_list())['total_items'] + purchase = await operation() + stream_txo, purchase_txo = stream.outputs[0], purchase.outputs[0] + stream_fee = stream_txo.claim.stream.fee + self.assertEqual(stream_fee.dewies, purchase_txo.amount) + self.assertEqual(stream_fee.address, purchase_txo.get_address(self.ledger)) await self.ledger.wait(purchase) await self.generate(1) @@ -76,7 +77,7 @@ class PurchaseCommandTests(CommandTestCase): claim_id = stream.outputs[0].claim_id # explicit purchase of claim - tx = await self.daemon.jsonrpc_purchase_create(claim_id) + tx = lambda: self.daemon.jsonrpc_purchase_create(claim_id) await self.assertStreamPurchased(stream, tx) # check that `get` doesn't purchase it again @@ -88,8 +89,12 @@ class PurchaseCommandTests(CommandTestCase): # `get` does purchase a stream we don't have yet another_stream = await self.priced_stream('another') - response = await self.daemon.jsonrpc_get('lbry://another') - await self.assertStreamPurchased(another_stream, response.content_fee) + + async def imagine_its_a_lambda(): + response = await self.daemon.jsonrpc_get('lbry://another') + return response.content_fee + + await self.assertStreamPurchased(another_stream, imagine_its_a_lambda) # purchase non-existent claim fails with self.assertRaisesRegex(Exception, "Could not find claim with claim_id"): @@ -105,12 +110,12 @@ class PurchaseCommandTests(CommandTestCase): await self.daemon.jsonrpc_purchase_create(claim_id) # force purchasing claim you already own - tx = await self.daemon.jsonrpc_purchase_create(claim_id, allow_duplicate_purchase=True) + tx = lambda: self.daemon.jsonrpc_purchase_create(claim_id, allow_duplicate_purchase=True) await self.assertStreamPurchased(stream, tx) # purchase by uri abc_stream = await self.priced_stream('abc') - tx = await self.daemon.jsonrpc_purchase_create(url='lbry://abc') + tx = lambda: self.daemon.jsonrpc_purchase_create(url='lbry://abc') await self.assertStreamPurchased(abc_stream, tx) async def test_purchase_and_transaction_list(self): diff --git a/torba/torba/client/baseledger.py b/torba/torba/client/baseledger.py index 1d2a50479..68b5a2562 100644 --- a/torba/torba/client/baseledger.py +++ b/torba/torba/client/baseledger.py @@ -575,7 +575,7 @@ class BaseLedger(metaclass=LedgerRegistry): # broadcast can't be a retriable call yet return self.network.broadcast(hexlify(tx.raw).decode()) - async def wait(self, tx: basetransaction.BaseTransaction, height=-1, timeout=2): + async def wait(self, tx: basetransaction.BaseTransaction, height=-1, timeout=1): addresses = set() for txi in tx.inputs: if txi.txo_ref.txo is not None: @@ -601,4 +601,4 @@ class BaseLedger(metaclass=LedgerRegistry): found = True if not found: print(record['history'], addresses, tx.id) - raise asyncio.TimeoutError('Timed out waiting for transaction: %s', tx.id) + raise asyncio.TimeoutError('Timed out waiting for transaction.') From 0301768b79fba039eac266eab3dbeb94a56a56b9 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 19 Dec 2019 22:58:36 -0300 Subject: [PATCH 8/9] lint on tests --- lbry/tests/integration/test_purchase_command.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/lbry/tests/integration/test_purchase_command.py b/lbry/tests/integration/test_purchase_command.py index 8e9cd6ade..aac4bbcac 100644 --- a/lbry/tests/integration/test_purchase_command.py +++ b/lbry/tests/integration/test_purchase_command.py @@ -77,8 +77,7 @@ class PurchaseCommandTests(CommandTestCase): claim_id = stream.outputs[0].claim_id # explicit purchase of claim - tx = lambda: self.daemon.jsonrpc_purchase_create(claim_id) - await self.assertStreamPurchased(stream, tx) + await self.assertStreamPurchased(stream, lambda: self.daemon.jsonrpc_purchase_create(claim_id)) # check that `get` doesn't purchase it again balance = await self.account.get_balance() @@ -110,13 +109,13 @@ class PurchaseCommandTests(CommandTestCase): await self.daemon.jsonrpc_purchase_create(claim_id) # force purchasing claim you already own - tx = lambda: self.daemon.jsonrpc_purchase_create(claim_id, allow_duplicate_purchase=True) - await self.assertStreamPurchased(stream, tx) + await self.assertStreamPurchased( + stream, lambda: self.daemon.jsonrpc_purchase_create(claim_id, allow_duplicate_purchase=True) + ) # purchase by uri abc_stream = await self.priced_stream('abc') - tx = lambda: self.daemon.jsonrpc_purchase_create(url='lbry://abc') - await self.assertStreamPurchased(abc_stream, tx) + await self.assertStreamPurchased(abc_stream, lambda: self.daemon.jsonrpc_purchase_create(url='lbry://abc')) async def test_purchase_and_transaction_list(self): self.assertItemCount(await self.daemon.jsonrpc_purchase_list(), 0) From 5fa801f9f262d0d47c75618aacfee9e67d165fc6 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 19 Dec 2019 23:42:31 -0300 Subject: [PATCH 9/9] broadcast_or_release uses None as timeout as opposed to tests --- lbry/lbry/wallet/manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbry/lbry/wallet/manager.py b/lbry/lbry/wallet/manager.py index 5e9f5d8f6..8b454d38e 100644 --- a/lbry/lbry/wallet/manager.py +++ b/lbry/lbry/wallet/manager.py @@ -226,7 +226,7 @@ class LbryWalletManager(BaseWalletManager): try: await self.ledger.broadcast(tx) if blocking: - await self.ledger.wait(tx) + await self.ledger.wait(tx, timeout=None) except: await self.ledger.release_tx(tx) raise