match many addresses at once and use 100k filters

This commit is contained in:
Lex Berezhny 2021-01-18 09:47:25 -05:00
parent 5562e84722
commit a2c4608bc8
4 changed files with 89 additions and 86 deletions

View file

@ -26,17 +26,29 @@ class DatabaseAddressIterator:
self.chain = chain
self.n = -1
def __iter__(self) -> Iterator[Tuple[bytes, int, bool]]:
with context().connect_streaming() as c:
sql = (
@staticmethod
def get_sql(account_id, chain):
return (
select(
AccountAddress.c.pubkey,
AccountAddress.c.n
).where(
(AccountAddress.c.account == self.account_id) &
(AccountAddress.c.chain == self.chain)
(AccountAddress.c.account == account_id) &
(AccountAddress.c.chain == chain)
).order_by(AccountAddress.c.n)
)
@staticmethod
def get_address_hash_bytes(account_id, chain):
return [
bytearray(hash160(row['pubkey'])) for row in context().fetchall(
DatabaseAddressIterator.get_sql(account_id, chain)
)
]
def __iter__(self) -> Iterator[Tuple[bytes, int, bool]]:
with context().connect_streaming() as c:
sql = self.get_sql(self.account_id, self.chain)
for row in c.execute(sql):
self.n = row['n']
yield hash160(row['pubkey']), self.n, False
@ -105,12 +117,11 @@ def generate_addresses_using_filters(best_height, allowed_gap, address_manager)
def get_missing_sub_filters_for_addresses(granularity, address_manager):
need = set()
for matcher, filter_range in get_filter_matchers_at_granularity(granularity):
for address_hash, _, _ in DatabaseAddressIterator(*address_manager):
address_bytes = bytearray(address_hash)
if matcher.Match(address_bytes) and not has_filter_range(*filter_range):
filters = get_filter_matchers_at_granularity(granularity)
addresses = DatabaseAddressIterator.get_address_hash_bytes(*address_manager)
for matcher, filter_range in filters:
if matcher.MatchAny(addresses) and not has_filter_range(*filter_range):
need.add(filter_range)
break
return need

View file

@ -3447,7 +3447,7 @@ class Client(API):
async def connect(self):
self.session = ClientSession()
self.ws = await self.session.ws_connect(self.url, max_msg_size=20*1024*1024)
self.ws = await self.session.ws_connect(self.url, max_msg_size=50*1024*1024)
self.receive_messages_task = asyncio.create_task(self.receive_messages())
async def disconnect(self):

View file

@ -1,3 +1,4 @@
import time
import asyncio
import logging
from typing import Dict
@ -155,6 +156,7 @@ class FilterManager:
f" matching addresses for account {account.id} "
f"address group {address_manager.chain_number}..."
)
start = time.perf_counter()
missing = await self.db.generate_addresses_using_filters(
best_height, address_manager.gap, (
account.id,
@ -164,6 +166,7 @@ class FilterManager:
address_manager.public_key.depth
)
)
print(f" {time.perf_counter()-start}s")
if missing:
print("downloading level 3 filters")
await self.download_and_save_filters(missing)

View file

@ -132,46 +132,48 @@ class TestAddressGenerationAndTXSync(UnitDBTestCase):
# create all required filters (include 9 of the addresses in the filters)
q.insert_block_filters([(0, 4, create_address_filter(hashes[0:1]))])
q.insert_block_filters([(100_000, 4, create_address_filter(hashes[1:2]))])
q.insert_block_filters([(110_000, 4, create_address_filter([b'beef']))])
q.insert_block_filters([(120_000, 4, create_address_filter(hashes[2:3]))])
q.insert_block_filters([(0, 5, create_address_filter(hashes[0:1]))])
q.insert_block_filters([(100_000, 5, create_address_filter([b'beef']))])
q.insert_block_filters([(130_000, 3, create_address_filter(hashes[3:4]))])
q.insert_block_filters([(131_000, 3, create_address_filter([b'beef']))])
q.insert_block_filters([(133_000, 3, create_address_filter(hashes[4:5]))])
q.insert_block_filters([(200_000, 4, create_address_filter(hashes[1:2]))])
q.insert_block_filters([(210_000, 4, create_address_filter([b'beef']))])
q.insert_block_filters([(220_000, 4, create_address_filter(hashes[2:3]))])
q.insert_block_filters([(134_000, 2, create_address_filter(hashes[5:6]))])
q.insert_block_filters([(134_200, 2, create_address_filter([b'beef']))])
q.insert_block_filters([(134_400, 2, create_address_filter(hashes[6:7]))])
q.insert_block_filters([(230_000, 3, create_address_filter(hashes[3:4]))])
q.insert_block_filters([(231_000, 3, create_address_filter([b'beef']))])
q.insert_block_filters([(233_000, 3, create_address_filter(hashes[4:5]))])
q.insert_block_filters([(134_500, 1, create_address_filter(hashes[7:8]))])
q.insert_block_filters([(134_566, 1, create_address_filter([b'beef']))])
q.insert_block_filters([(134_567, 1, create_address_filter(hashes[8:9]))])
q.insert_block_filters([(234_000, 2, create_address_filter(hashes[5:6]))])
q.insert_block_filters([(234_200, 2, create_address_filter([b'beef']))])
q.insert_block_filters([(234_400, 2, create_address_filter(hashes[6:7]))])
q.insert_block_filters([(234_500, 1, create_address_filter(hashes[7:8]))])
q.insert_block_filters([(234_566, 1, create_address_filter([b'beef']))])
q.insert_block_filters([(234_567, 1, create_address_filter(hashes[8:9]))])
# check that all required filters did get created
self.assertEqual(q.get_missing_required_filters(134_567), set())
self.assertEqual(q.get_missing_required_filters(234_567), set())
# no addresses
self.assertEqual([], self.get_ordered_addresses())
# generate addresses with 6 address gap, returns new sub filters needed
self.assertEqual(
q.generate_addresses_using_filters(134_567, 6, (
q.generate_addresses_using_filters(234_567, 6, (
self.root_pubkey.address, self.RECEIVING_KEY_N,
self.receiving_pubkey.pubkey_bytes,
self.receiving_pubkey.chain_code,
self.receiving_pubkey.depth
)), {
(0, 134500, 134500),
(0, 134567, 134567),
(1, 134000, 134099),
(1, 134400, 134499),
(2, 130000, 130900),
(2, 133000, 133900),
(3, 0, 9000),
(3, 100000, 109000),
(3, 120000, 129000)
(0, 234500, 234500),
(0, 234567, 234567),
(1, 234000, 234099),
(1, 234400, 234499),
(2, 230000, 230900),
(2, 233000, 233900),
(3, 200000, 209000),
(3, 220000, 229000),
(4, 0, 90000),
}
)
@ -179,19 +181,19 @@ class TestAddressGenerationAndTXSync(UnitDBTestCase):
self.assertEqual([key.address for key in pubkeys], self.get_ordered_addresses())
# "download" missing sub filters
self.insert_sub_filters(4, hashes[0:1], 0)
self.insert_sub_filters(4, hashes[1:2], 100_000)
self.insert_sub_filters(4, hashes[2:3], 120_000)
self.insert_sub_filters(3, hashes[3:4], 130_000)
self.insert_sub_filters(3, hashes[4:5], 133_000)
self.insert_sub_filters(2, hashes[5:6], 134_000)
self.insert_sub_filters(2, hashes[6:7], 134_400)
self.insert_sub_filters(1, hashes[7:8], 134_500)
self.insert_sub_filters(1, hashes[8:9], 134_567)
self.insert_sub_filters(5, hashes[0:1], 0)
self.insert_sub_filters(4, hashes[1:2], 200_000)
self.insert_sub_filters(4, hashes[2:3], 220_000)
self.insert_sub_filters(3, hashes[3:4], 230_000)
self.insert_sub_filters(3, hashes[4:5], 233_000)
self.insert_sub_filters(2, hashes[5:6], 234_000)
self.insert_sub_filters(2, hashes[6:7], 234_400)
self.insert_sub_filters(1, hashes[7:8], 234_500)
self.insert_sub_filters(1, hashes[8:9], 234_567)
# no sub filters needed to be downloaded now when re-checking all addresses
self.assertEqual(
q.generate_addresses_using_filters(134_567, 6, (
q.generate_addresses_using_filters(234_567, 6, (
self.root_pubkey.address, self.RECEIVING_KEY_N,
self.receiving_pubkey.pubkey_bytes,
self.receiving_pubkey.chain_code,
@ -206,15 +208,12 @@ class TestAddressGenerationAndTXSync(UnitDBTestCase):
q.get_missing_sub_filters_for_addresses(3, (
self.root_pubkey.address, self.RECEIVING_KEY_N,
)), {
(2, 3000, 3900),
(2, 103000, 103900),
(2, 123000, 123900),
(2, 223000, 223900),
}
)
# "download" missing 1,000 sub filters
self.insert_sub_filters(3, hashes[0:1], 3000)
self.insert_sub_filters(3, hashes[1:2], 103_000)
self.insert_sub_filters(3, hashes[2:3], 123_000)
self.insert_sub_filters(3, hashes[1:2], 203_000)
self.insert_sub_filters(3, hashes[2:3], 223_000)
# no more missing sub filters at 1,000
self.assertEqual(
q.get_missing_sub_filters_for_addresses(3, (
@ -227,19 +226,15 @@ class TestAddressGenerationAndTXSync(UnitDBTestCase):
q.get_missing_sub_filters_for_addresses(2, (
self.root_pubkey.address, self.RECEIVING_KEY_N,
)), {
(1, 3300, 3399),
(1, 103300, 103399),
(1, 123300, 123399),
(1, 130300, 130399),
(1, 133300, 133399),
(1, 233300, 233399),
}
)
# "download" missing 100 sub filters
self.insert_sub_filters(2, hashes[0:1], 3300)
self.insert_sub_filters(2, hashes[1:2], 103_300)
self.insert_sub_filters(2, hashes[2:3], 123_300)
self.insert_sub_filters(2, hashes[3:4], 130_300)
self.insert_sub_filters(2, hashes[4:5], 133_300)
self.insert_sub_filters(2, hashes[1:2], 203_300)
self.insert_sub_filters(2, hashes[2:3], 223_300)
self.insert_sub_filters(2, hashes[3:4], 230_300)
self.insert_sub_filters(2, hashes[4:5], 233_300)
# no more missing sub filters at 100
self.assertEqual(
q.get_missing_sub_filters_for_addresses(2, (
@ -252,23 +247,17 @@ class TestAddressGenerationAndTXSync(UnitDBTestCase):
q.get_missing_sub_filters_for_addresses(1, (
self.root_pubkey.address, self.RECEIVING_KEY_N,
)), {
(0, 3303, 3303),
(0, 103303, 103303),
(0, 123303, 123303),
(0, 130303, 130303),
(0, 133303, 133303),
(0, 134003, 134003),
(0, 134403, 134403),
(0, 234403, 234403),
}
)
# "download" missing tx filters
self.insert_sub_filters(1, hashes[0:1], 3303)
self.insert_sub_filters(1, hashes[1:2], 103_303)
self.insert_sub_filters(1, hashes[2:3], 123_303)
self.insert_sub_filters(1, hashes[3:4], 130_303)
self.insert_sub_filters(1, hashes[4:5], 133_303)
self.insert_sub_filters(1, hashes[5:6], 134_003)
self.insert_sub_filters(1, hashes[6:7], 134_403)
self.insert_sub_filters(1, hashes[1:2], 203_303)
self.insert_sub_filters(1, hashes[2:3], 223_303)
self.insert_sub_filters(1, hashes[3:4], 230_303)
self.insert_sub_filters(1, hashes[4:5], 233_303)
self.insert_sub_filters(1, hashes[5:6], 234_003)
self.insert_sub_filters(1, hashes[6:7], 234_403)
# no more missing tx filters
self.assertEqual(
q.get_missing_sub_filters_for_addresses(1, (
@ -278,14 +267,14 @@ class TestAddressGenerationAndTXSync(UnitDBTestCase):
# find TXs we need to download
missing_txs = {
b'7478313033333033',
b'7478313233333033',
b'7478313330333033',
b'7478313333333033',
b'7478313334303033',
b'7478313334343033',
b'7478313334353030',
b'7478313334353637',
b'7478323033333033',
b'7478323233333033',
b'7478323330333033',
b'7478323333333033',
b'7478323334303033',
b'7478323334343033',
b'7478323334353030',
b'7478323334353637',
b'747833333033'
}
self.assertEqual(