From a2c4608bc8dd845e9b8fedf4911e273883a5ef78 Mon Sep 17 00:00:00 2001 From: Lex Berezhny Date: Mon, 18 Jan 2021 09:47:25 -0500 Subject: [PATCH] match many addresses at once and use 100k filters --- lbry/db/queries/address.py | 41 +++++++---- lbry/service/api.py | 2 +- lbry/service/light_client.py | 3 + tests/unit/wallet/test_sync.py | 129 +++++++++++++++------------------ 4 files changed, 89 insertions(+), 86 deletions(-) diff --git a/lbry/db/queries/address.py b/lbry/db/queries/address.py index 6ac655f68..6fcd650c8 100644 --- a/lbry/db/queries/address.py +++ b/lbry/db/queries/address.py @@ -26,17 +26,29 @@ class DatabaseAddressIterator: self.chain = chain self.n = -1 + @staticmethod + def get_sql(account_id, chain): + return ( + select( + AccountAddress.c.pubkey, + AccountAddress.c.n + ).where( + (AccountAddress.c.account == account_id) & + (AccountAddress.c.chain == chain) + ).order_by(AccountAddress.c.n) + ) + + @staticmethod + def get_address_hash_bytes(account_id, chain): + return [ + bytearray(hash160(row['pubkey'])) for row in context().fetchall( + DatabaseAddressIterator.get_sql(account_id, chain) + ) + ] + def __iter__(self) -> Iterator[Tuple[bytes, int, bool]]: with context().connect_streaming() as c: - sql = ( - select( - AccountAddress.c.pubkey, - AccountAddress.c.n - ).where( - (AccountAddress.c.account == self.account_id) & - (AccountAddress.c.chain == self.chain) - ).order_by(AccountAddress.c.n) - ) + sql = self.get_sql(self.account_id, self.chain) for row in c.execute(sql): self.n = row['n'] yield hash160(row['pubkey']), self.n, False @@ -105,12 +117,11 @@ def generate_addresses_using_filters(best_height, allowed_gap, address_manager) def get_missing_sub_filters_for_addresses(granularity, address_manager): need = set() - for matcher, filter_range in get_filter_matchers_at_granularity(granularity): - for address_hash, _, _ in DatabaseAddressIterator(*address_manager): - address_bytes = bytearray(address_hash) - if matcher.Match(address_bytes) and not has_filter_range(*filter_range): - need.add(filter_range) - break + filters = get_filter_matchers_at_granularity(granularity) + addresses = DatabaseAddressIterator.get_address_hash_bytes(*address_manager) + for matcher, filter_range in filters: + if matcher.MatchAny(addresses) and not has_filter_range(*filter_range): + need.add(filter_range) return need diff --git a/lbry/service/api.py b/lbry/service/api.py index b12c44bf5..5a4366548 100644 --- a/lbry/service/api.py +++ b/lbry/service/api.py @@ -3447,7 +3447,7 @@ class Client(API): async def connect(self): self.session = ClientSession() - self.ws = await self.session.ws_connect(self.url, max_msg_size=20*1024*1024) + self.ws = await self.session.ws_connect(self.url, max_msg_size=50*1024*1024) self.receive_messages_task = asyncio.create_task(self.receive_messages()) async def disconnect(self): diff --git a/lbry/service/light_client.py b/lbry/service/light_client.py index 9fbe34a4f..a7d097003 100644 --- a/lbry/service/light_client.py +++ b/lbry/service/light_client.py @@ -1,3 +1,4 @@ +import time import asyncio import logging from typing import Dict @@ -155,6 +156,7 @@ class FilterManager: f" matching addresses for account {account.id} " f"address group {address_manager.chain_number}..." ) + start = time.perf_counter() missing = await self.db.generate_addresses_using_filters( best_height, address_manager.gap, ( account.id, @@ -164,6 +166,7 @@ class FilterManager: address_manager.public_key.depth ) ) + print(f" {time.perf_counter()-start}s") if missing: print("downloading level 3 filters") await self.download_and_save_filters(missing) diff --git a/tests/unit/wallet/test_sync.py b/tests/unit/wallet/test_sync.py index daef3e33a..6d240afd0 100644 --- a/tests/unit/wallet/test_sync.py +++ b/tests/unit/wallet/test_sync.py @@ -132,46 +132,48 @@ class TestAddressGenerationAndTXSync(UnitDBTestCase): # create all required filters (include 9 of the addresses in the filters) - q.insert_block_filters([(0, 4, create_address_filter(hashes[0:1]))]) - q.insert_block_filters([(100_000, 4, create_address_filter(hashes[1:2]))]) - q.insert_block_filters([(110_000, 4, create_address_filter([b'beef']))]) - q.insert_block_filters([(120_000, 4, create_address_filter(hashes[2:3]))]) + q.insert_block_filters([(0, 5, create_address_filter(hashes[0:1]))]) + q.insert_block_filters([(100_000, 5, create_address_filter([b'beef']))]) - q.insert_block_filters([(130_000, 3, create_address_filter(hashes[3:4]))]) - q.insert_block_filters([(131_000, 3, create_address_filter([b'beef']))]) - q.insert_block_filters([(133_000, 3, create_address_filter(hashes[4:5]))]) + q.insert_block_filters([(200_000, 4, create_address_filter(hashes[1:2]))]) + q.insert_block_filters([(210_000, 4, create_address_filter([b'beef']))]) + q.insert_block_filters([(220_000, 4, create_address_filter(hashes[2:3]))]) - q.insert_block_filters([(134_000, 2, create_address_filter(hashes[5:6]))]) - q.insert_block_filters([(134_200, 2, create_address_filter([b'beef']))]) - q.insert_block_filters([(134_400, 2, create_address_filter(hashes[6:7]))]) + q.insert_block_filters([(230_000, 3, create_address_filter(hashes[3:4]))]) + q.insert_block_filters([(231_000, 3, create_address_filter([b'beef']))]) + q.insert_block_filters([(233_000, 3, create_address_filter(hashes[4:5]))]) - q.insert_block_filters([(134_500, 1, create_address_filter(hashes[7:8]))]) - q.insert_block_filters([(134_566, 1, create_address_filter([b'beef']))]) - q.insert_block_filters([(134_567, 1, create_address_filter(hashes[8:9]))]) + q.insert_block_filters([(234_000, 2, create_address_filter(hashes[5:6]))]) + q.insert_block_filters([(234_200, 2, create_address_filter([b'beef']))]) + q.insert_block_filters([(234_400, 2, create_address_filter(hashes[6:7]))]) + + q.insert_block_filters([(234_500, 1, create_address_filter(hashes[7:8]))]) + q.insert_block_filters([(234_566, 1, create_address_filter([b'beef']))]) + q.insert_block_filters([(234_567, 1, create_address_filter(hashes[8:9]))]) # check that all required filters did get created - self.assertEqual(q.get_missing_required_filters(134_567), set()) + self.assertEqual(q.get_missing_required_filters(234_567), set()) # no addresses self.assertEqual([], self.get_ordered_addresses()) # generate addresses with 6 address gap, returns new sub filters needed self.assertEqual( - q.generate_addresses_using_filters(134_567, 6, ( + q.generate_addresses_using_filters(234_567, 6, ( self.root_pubkey.address, self.RECEIVING_KEY_N, self.receiving_pubkey.pubkey_bytes, self.receiving_pubkey.chain_code, self.receiving_pubkey.depth )), { - (0, 134500, 134500), - (0, 134567, 134567), - (1, 134000, 134099), - (1, 134400, 134499), - (2, 130000, 130900), - (2, 133000, 133900), - (3, 0, 9000), - (3, 100000, 109000), - (3, 120000, 129000) + (0, 234500, 234500), + (0, 234567, 234567), + (1, 234000, 234099), + (1, 234400, 234499), + (2, 230000, 230900), + (2, 233000, 233900), + (3, 200000, 209000), + (3, 220000, 229000), + (4, 0, 90000), } ) @@ -179,19 +181,19 @@ class TestAddressGenerationAndTXSync(UnitDBTestCase): self.assertEqual([key.address for key in pubkeys], self.get_ordered_addresses()) # "download" missing sub filters - self.insert_sub_filters(4, hashes[0:1], 0) - self.insert_sub_filters(4, hashes[1:2], 100_000) - self.insert_sub_filters(4, hashes[2:3], 120_000) - self.insert_sub_filters(3, hashes[3:4], 130_000) - self.insert_sub_filters(3, hashes[4:5], 133_000) - self.insert_sub_filters(2, hashes[5:6], 134_000) - self.insert_sub_filters(2, hashes[6:7], 134_400) - self.insert_sub_filters(1, hashes[7:8], 134_500) - self.insert_sub_filters(1, hashes[8:9], 134_567) + self.insert_sub_filters(5, hashes[0:1], 0) + self.insert_sub_filters(4, hashes[1:2], 200_000) + self.insert_sub_filters(4, hashes[2:3], 220_000) + self.insert_sub_filters(3, hashes[3:4], 230_000) + self.insert_sub_filters(3, hashes[4:5], 233_000) + self.insert_sub_filters(2, hashes[5:6], 234_000) + self.insert_sub_filters(2, hashes[6:7], 234_400) + self.insert_sub_filters(1, hashes[7:8], 234_500) + self.insert_sub_filters(1, hashes[8:9], 234_567) # no sub filters needed to be downloaded now when re-checking all addresses self.assertEqual( - q.generate_addresses_using_filters(134_567, 6, ( + q.generate_addresses_using_filters(234_567, 6, ( self.root_pubkey.address, self.RECEIVING_KEY_N, self.receiving_pubkey.pubkey_bytes, self.receiving_pubkey.chain_code, @@ -206,15 +208,12 @@ class TestAddressGenerationAndTXSync(UnitDBTestCase): q.get_missing_sub_filters_for_addresses(3, ( self.root_pubkey.address, self.RECEIVING_KEY_N, )), { - (2, 3000, 3900), - (2, 103000, 103900), - (2, 123000, 123900), + (2, 223000, 223900), } ) # "download" missing 1,000 sub filters - self.insert_sub_filters(3, hashes[0:1], 3000) - self.insert_sub_filters(3, hashes[1:2], 103_000) - self.insert_sub_filters(3, hashes[2:3], 123_000) + self.insert_sub_filters(3, hashes[1:2], 203_000) + self.insert_sub_filters(3, hashes[2:3], 223_000) # no more missing sub filters at 1,000 self.assertEqual( q.get_missing_sub_filters_for_addresses(3, ( @@ -227,19 +226,15 @@ class TestAddressGenerationAndTXSync(UnitDBTestCase): q.get_missing_sub_filters_for_addresses(2, ( self.root_pubkey.address, self.RECEIVING_KEY_N, )), { - (1, 3300, 3399), - (1, 103300, 103399), - (1, 123300, 123399), - (1, 130300, 130399), - (1, 133300, 133399), + (1, 233300, 233399), } ) # "download" missing 100 sub filters self.insert_sub_filters(2, hashes[0:1], 3300) - self.insert_sub_filters(2, hashes[1:2], 103_300) - self.insert_sub_filters(2, hashes[2:3], 123_300) - self.insert_sub_filters(2, hashes[3:4], 130_300) - self.insert_sub_filters(2, hashes[4:5], 133_300) + self.insert_sub_filters(2, hashes[1:2], 203_300) + self.insert_sub_filters(2, hashes[2:3], 223_300) + self.insert_sub_filters(2, hashes[3:4], 230_300) + self.insert_sub_filters(2, hashes[4:5], 233_300) # no more missing sub filters at 100 self.assertEqual( q.get_missing_sub_filters_for_addresses(2, ( @@ -252,23 +247,17 @@ class TestAddressGenerationAndTXSync(UnitDBTestCase): q.get_missing_sub_filters_for_addresses(1, ( self.root_pubkey.address, self.RECEIVING_KEY_N, )), { - (0, 3303, 3303), - (0, 103303, 103303), - (0, 123303, 123303), - (0, 130303, 130303), - (0, 133303, 133303), - (0, 134003, 134003), - (0, 134403, 134403), + (0, 234403, 234403), } ) # "download" missing tx filters self.insert_sub_filters(1, hashes[0:1], 3303) - self.insert_sub_filters(1, hashes[1:2], 103_303) - self.insert_sub_filters(1, hashes[2:3], 123_303) - self.insert_sub_filters(1, hashes[3:4], 130_303) - self.insert_sub_filters(1, hashes[4:5], 133_303) - self.insert_sub_filters(1, hashes[5:6], 134_003) - self.insert_sub_filters(1, hashes[6:7], 134_403) + self.insert_sub_filters(1, hashes[1:2], 203_303) + self.insert_sub_filters(1, hashes[2:3], 223_303) + self.insert_sub_filters(1, hashes[3:4], 230_303) + self.insert_sub_filters(1, hashes[4:5], 233_303) + self.insert_sub_filters(1, hashes[5:6], 234_003) + self.insert_sub_filters(1, hashes[6:7], 234_403) # no more missing tx filters self.assertEqual( q.get_missing_sub_filters_for_addresses(1, ( @@ -278,14 +267,14 @@ class TestAddressGenerationAndTXSync(UnitDBTestCase): # find TXs we need to download missing_txs = { - b'7478313033333033', - b'7478313233333033', - b'7478313330333033', - b'7478313333333033', - b'7478313334303033', - b'7478313334343033', - b'7478313334353030', - b'7478313334353637', + b'7478323033333033', + b'7478323233333033', + b'7478323330333033', + b'7478323333333033', + b'7478323334303033', + b'7478323334343033', + b'7478323334353030', + b'7478323334353637', b'747833333033' } self.assertEqual(