From 5671224fe08783f6342e94cb1ec992108060c43f Mon Sep 17 00:00:00 2001 From: Lex Berezhny Date: Tue, 19 Jan 2021 12:32:05 -0500 Subject: [PATCH] insert_transactions and other client speed ups --- lbry/db/database.py | 4 +-- lbry/db/queries/address.py | 42 +++++++++++++++++++++----------- lbry/db/queries/base.py | 7 ++++-- lbry/service/light_client.py | 18 ++++++++------ lbry/testcase.py | 2 +- tests/unit/wallet/test_sync.py | 4 +-- tests/unit/wallet/test_wallet.py | 2 +- 7 files changed, 50 insertions(+), 29 deletions(-) diff --git a/lbry/db/database.py b/lbry/db/database.py index 3ca8de3e4..602681e5c 100644 --- a/lbry/db/database.py +++ b/lbry/db/database.py @@ -269,8 +269,8 @@ class Database: async def insert_tx_filters(self, filters): return await self.run(q.insert_tx_filters, filters) - async def insert_transaction(self, block_hash, tx): - return await self.run(q.insert_transaction, block_hash, tx) + async def insert_transactions(self, txs): + return await self.run(q.insert_transactions, txs) async def update_address_used_times(self, addresses): return await self.run(q.update_address_used_times, addresses) diff --git a/lbry/db/queries/address.py b/lbry/db/queries/address.py index 6fcd650c8..efb68fec7 100644 --- a/lbry/db/queries/address.py +++ b/lbry/db/queries/address.py @@ -94,23 +94,33 @@ class PersistingAddressIterator(DatabaseAddressIterator): yield hash160(pubkey_child.pubkey_bytes), self.n, True +class BatchAddressIterator: + + def __init__(self, iterator: PersistingAddressIterator, size): + self.iterator = iterator + self.size = size + + def __iter__(self) -> Iterator[bytearray]: + i = iter(self.iterator) + while True: + yield [bytearray(next(i)[0]) for _ in range(self.size)] + + def generate_addresses_using_filters(best_height, allowed_gap, address_manager) -> Set: need, have = set(), set() matchers = get_filter_matchers(best_height) - with PersistingAddressIterator(*address_manager) as addresses: - gap = 0 - for address_hash, n, is_new in addresses: # pylint: disable=unused-variable - gap += 1 - address_bytes = bytearray(address_hash) + with PersistingAddressIterator(*address_manager) as address_iterator: + for addresses in BatchAddressIterator(address_iterator, allowed_gap): + has_match = False for matcher, filter_range in matchers: - if matcher.Match(address_bytes): - gap = 0 + if matcher.MatchAny(addresses): + has_match = True if filter_range not in need and filter_range not in have: if has_filter_range(*filter_range): have.add(filter_range) else: need.add(filter_range) - if gap >= allowed_gap: + if not has_match: break return need @@ -127,12 +137,16 @@ def get_missing_sub_filters_for_addresses(granularity, address_manager): def get_missing_tx_for_addresses(address_manager): need = set() - for tx_hash, matcher in get_tx_matchers_for_missing_txs(): - for address_hash, _, _ in DatabaseAddressIterator(*address_manager): - address_bytes = bytearray(address_hash) - if matcher.Match(address_bytes): - need.add(tx_hash) - break + filters = get_tx_matchers_for_missing_txs() + print(f' loaded tx filters ({len(filters)})') + addresses = DatabaseAddressIterator.get_address_hash_bytes(*address_manager) + print(f' loaded addresses ({len(addresses)})') + print(' matching...') + for i, (tx_hash, matcher) in enumerate(filters): + if i > 0 and i % 1000 == 0: + print(f' {i} of {len(filters)} processed') + if matcher.MatchAny(addresses): + need.add(tx_hash) return need diff --git a/lbry/db/queries/base.py b/lbry/db/queries/base.py index d6a2a93e7..ab2714400 100644 --- a/lbry/db/queries/base.py +++ b/lbry/db/queries/base.py @@ -64,8 +64,11 @@ def get_block_headers(first, last=None): return rows -def insert_transaction(block_hash, tx): - context().get_bulk_loader().add_transaction(block_hash, tx).flush(TX) +def insert_transactions(txs): + loader = context().get_bulk_loader() + for block_hash, tx in txs: + loader.add_transaction(block_hash, tx) + loader.flush(return_row_count_for_table=None) def check_version_and_create_tables(): diff --git a/lbry/service/light_client.py b/lbry/service/light_client.py index a7d097003..93ba1fc95 100644 --- a/lbry/service/light_client.py +++ b/lbry/service/light_client.py @@ -136,13 +136,17 @@ class FilterManager: async def download_and_save_txs(self, tx_hashes): if not tx_hashes: return - txids = [hexlify(tx_hash[::-1]).decode() for tx_hash in tx_hashes] - print(f'=> transaction_search(len(txids): {len(txids)})') - txs = await self.client.first.transaction_search(txids=txids, raw=True) - print(f' @ transaction_search(len(txids): {len(txids)})') - for raw_tx in txs.values(): - await self.db.insert_transaction(None, Transaction(unhexlify(raw_tx))) - print(f' # transaction_search(len(txids): {len(txids)})') + all_txids = [hexlify(tx_hash[::-1]).decode() for tx_hash in tx_hashes] + chunk_size = 10 + for i in range(0, len(all_txids), chunk_size): + txids = all_txids[i:i + chunk_size] + print(f' => transaction_search(len(txids): {len(txids)})') + txs = await self.client.first.transaction_search(txids=txids, raw=True) + print(f' <= transaction_search(len(txids): {len(txids)})') + await self.db.insert_transactions([ + (None, Transaction(unhexlify(raw_tx))) for raw_tx in txs.values() + ]) + print(f' saved {len(txids)}) transactions') async def download_initial_filters(self, best_height): missing = await self.db.get_missing_required_filters(best_height) diff --git a/lbry/testcase.py b/lbry/testcase.py index b407bbde8..5fdaf3ed4 100644 --- a/lbry/testcase.py +++ b/lbry/testcase.py @@ -260,7 +260,7 @@ class AsyncUnitDBTestCase(AsyncioTestCase): self.outputs.extend(tx.outputs) return block_or_tx elif isinstance(block_or_tx, Transaction): - await self.db.insert_transaction(block_hash, block_or_tx) + await self.db.insert_transactions([(block_hash, block_or_tx)]) self.outputs.extend(block_or_tx.outputs) return block_or_tx.outputs[0] else: diff --git a/tests/unit/wallet/test_sync.py b/tests/unit/wallet/test_sync.py index e9a588b3d..2b4e08f3e 100644 --- a/tests/unit/wallet/test_sync.py +++ b/tests/unit/wallet/test_sync.py @@ -126,8 +126,8 @@ class TestAddressGenerationAndTXSync(UnitDBTestCase): q.insert_tx_filters([(hexlify(f'tx{height}'.encode()), height, create_address_filter(addresses))]) def test_generate_from_filters_and_download_txs(self): - # 15 addresses will get generated, 9 due to filters and 6 due to gap - pubkeys = [self.receiving_pubkey.child(n) for n in range(15)] + # 18 addresses will get generated + pubkeys = [self.receiving_pubkey.child(n) for n in range(18)] hashes = [hash160(key.pubkey_bytes) for key in pubkeys] # create all required filters (include 9 of the addresses in the filters) diff --git a/tests/unit/wallet/test_wallet.py b/tests/unit/wallet/test_wallet.py index 9fd2c2e01..6727218cd 100644 --- a/tests/unit/wallet/test_wallet.py +++ b/tests/unit/wallet/test_wallet.py @@ -255,7 +255,7 @@ class TransactionIOBalancing(WalletTestCase): .add_inputs([self.txi(self.txo(sum(amounts)+0.1))]) \ .add_outputs(utxos) - await self.db.insert_transaction(b'beef', self.funding_tx) + await self.db.insert_transactions([(b'beef', self.funding_tx)]) return utxos