From 5562e84722811edc21cbdd24d03f81792f169d93 Mon Sep 17 00:00:00 2001 From: Lex Berezhny <lex@damoti.com> Date: Wed, 13 Jan 2021 00:44:23 -0500 Subject: [PATCH] 100k filters --- lbry/blockchain/sync/filter_builder.py | 5 +- lbry/blockchain/sync/synchronizer.py | 4 +- lbry/db/queries/filters.py | 12 +++- lbry/service/light_client.py | 2 + .../integration/service/test_light_client.py | 9 ++- tests/unit/blockchain/test_filters.py | 62 ++++++++-------- tests/unit/wallet/test_sync.py | 70 ++++++++++--------- 7 files changed, 96 insertions(+), 68 deletions(-) diff --git a/lbry/blockchain/sync/filter_builder.py b/lbry/blockchain/sync/filter_builder.py index 9b21a0508..f365ba767 100644 --- a/lbry/blockchain/sync/filter_builder.py +++ b/lbry/blockchain/sync/filter_builder.py @@ -1,11 +1,11 @@ from typing import Dict -def split_range_into_10k_batches(start, end): +def split_range_into_batches(start, end, batch_size=100_000): batch = [start, end] batches = [batch] for block in range(start, end+1): - if 0 < block != batch[0] and block % 10_000 == 0: + if 0 < block != batch[0] and block % batch_size == 0: batch = [block, block] batches.append(batch) else: @@ -50,6 +50,7 @@ class FilterBuilder: self.start = start self.end = end self.group_filters = [ + GroupFilter(start, end, 5), GroupFilter(start, end, 4), GroupFilter(start, end, 3), GroupFilter(start, end, 2), diff --git a/lbry/blockchain/sync/synchronizer.py b/lbry/blockchain/sync/synchronizer.py index 83307e5f6..1a0a3f627 100644 --- a/lbry/blockchain/sync/synchronizer.py +++ b/lbry/blockchain/sync/synchronizer.py @@ -16,7 +16,7 @@ from lbry.error import LbrycrdEventSubscriptionError from . import blocks as block_phase, claims as claim_phase, supports as support_phase from .context import uninitialize -from .filter_builder import split_range_into_10k_batches +from .filter_builder import split_range_into_batches log = logging.getLogger(__name__) @@ -187,7 +187,7 @@ class BlockchainSync(Sync): else: blocks = await self.db.run(block_phase.get_block_range_without_filters) if blocks != (-1, -1): - batches = split_range_into_10k_batches(*blocks) + batches = split_range_into_batches(*blocks) p.step() else: p.step() diff --git a/lbry/db/queries/filters.py b/lbry/db/queries/filters.py index 662df4dac..89db1245a 100644 --- a/lbry/db/queries/filters.py +++ b/lbry/db/queries/filters.py @@ -46,7 +46,7 @@ def get_filters(start_height, end_height=None, granularity=0): .order_by(TXFilter.c.height) ) else: - factor = granularity if granularity <= 4 else log10(granularity) + factor = granularity if granularity < 100 else log10(granularity) if end_height is None: height_condition = (BlockFilter.c.height == start_height) elif end_height == -1: @@ -64,8 +64,11 @@ def get_filters(start_height, end_height=None, granularity=0): def get_minimal_required_filter_ranges(height) -> Dict[int, Tuple[int, int]]: minimal = {} + if height >= 100_000: + minimal[5] = (0, ((height // 100_000)-1) * 100_000) if height >= 10_000: - minimal[4] = (0, ((height // 10_000)-1) * 10_000) + start = height - height % 100_000 + minimal[4] = (start, start + (((height - start) // 10_000) - 1) * 10_000) if height >= 1_000: start = height - height % 10_000 minimal[3] = (start, start+(((height-start) // 1_000)-1) * 1_000) @@ -92,6 +95,9 @@ def get_maximum_known_filters() -> Dict[str, Optional[int]]: select(func.max(BlockFilter.c.height)) .where(BlockFilter.c.factor == 4) .scalar_subquery().label('4'), + select(func.max(BlockFilter.c.height)) + .where(BlockFilter.c.factor == 5) + .scalar_subquery().label('5'), ) return context().fetchone(query) @@ -101,7 +107,7 @@ def get_missing_required_filters(height) -> Set[Tuple[int, int, int]]: missing_filters = set() for granularity, (start, end) in get_minimal_required_filter_ranges(height).items(): known_height = known_filters.get(str(granularity)) - if known_height is not None and known_height > start: + if known_height is not None and known_height >= start: if granularity == 1: adjusted_height = known_height + 1 else: diff --git a/lbry/service/light_client.py b/lbry/service/light_client.py index 9c2c4b0c4..9fbe34a4f 100644 --- a/lbry/service/light_client.py +++ b/lbry/service/light_client.py @@ -193,6 +193,8 @@ class FilterManager: await self.download_initial_filters(best_height) print('generating addresses...') await self.generate_addresses(best_height, wallets) + print("downloading level 3 filters...") + await self.download_sub_filters(4, wallets) print("downloading level 2 filters...") await self.download_sub_filters(3, wallets) print("downloading level 1 filters...") diff --git a/tests/integration/service/test_light_client.py b/tests/integration/service/test_light_client.py index ddd32596a..09665fe8c 100644 --- a/tests/integration/service/test_light_client.py +++ b/tests/integration/service/test_light_client.py @@ -12,7 +12,7 @@ class LightClientTests(IntegrationTestCase): async def asyncSetUp(self): await super().asyncSetUp() await self.chain.generate(200) - self.full_node_daemon = await self.make_full_node_daemon() + self.full_node_daemon = await self.make_full_node_daemon(workers=2) self.full_node: FullNode = self.full_node_daemon.service self.light_client_daemon = await self.make_light_client_daemon(self.full_node_daemon, start=False) self.light_client: LightClient = self.light_client_daemon.service @@ -25,6 +25,7 @@ class LightClientTests(IntegrationTestCase): await self.light_client.wallets.open() await self.light_client.client.start_event_streams() self.db = self.light_client.db + self.api = self.light_client_daemon.api self.sync = self.light_client.sync self.client = self.light_client.client self.account = self.light_client.wallets.default.accounts.default @@ -44,3 +45,9 @@ class LightClientTests(IntegrationTestCase): await self.assertBalance(self.account, '0.0') await self.sync.on_synced.first await self.assertBalance(self.account, '5.0') + + await self.api.channel_create('@foo', '1.0') + await self.chain.generate(1) + await self.sync.on_synced.first + channels = await self.api.channel_list() + self.assertEqual(len(channels), 1) diff --git a/tests/unit/blockchain/test_filters.py b/tests/unit/blockchain/test_filters.py index 156ab7fdb..ee656b4bf 100644 --- a/tests/unit/blockchain/test_filters.py +++ b/tests/unit/blockchain/test_filters.py @@ -1,12 +1,13 @@ from unittest import TestCase from lbry.blockchain.sync.filter_builder import ( - FilterBuilder as FB, GroupFilter as GF, split_range_into_10k_batches as split + FilterBuilder as FB, GroupFilter as GF, split_range_into_batches ) class TestFilterGenerationComponents(TestCase): def test_split_range_into_10k_batches(self): + def split(a, b): return split_range_into_batches(a, b, 10_000) # single block (same start-end) self.assertEqual(split(901_123, 901_123), [[901_123, 901_123]]) # spans a 10k split @@ -70,37 +71,42 @@ class TestFilterGenerationComponents(TestCase): self.assertEqual(FB(819_913, 819_999).query_heights, (810_000, 819_999)) def test_filter_builder_add(self): - fb = FB(818_813, 819_999) - self.assertEqual(fb.query_heights, (810_000, 819_999)) - self.assertEqual(fb.group_filters[0].coverage, [810_000]) - self.assertEqual(fb.group_filters[1].coverage, [818_000, 819_000]) - self.assertEqual(fb.group_filters[2].coverage, [ - 818_800, 818_900, 819_000, 819_100, 819_200, 819_300, - 819_400, 819_500, 819_600, 819_700, 819_800, 819_900 - ]) - fb.add(b'beef0', 810_000, ['a']) - fb.add(b'beef1', 815_001, ['b']) - fb.add(b'beef2', 818_412, ['c']) - fb.add(b'beef3', 818_812, ['d']) - fb.add(b'beef4', 818_813, ['e']) - fb.add(b'beef5', 819_000, ['f']) - fb.add(b'beef6', 819_999, ['g']) - fb.add(b'beef7', 819_999, ['h']) + fb = FB(798_813, 809_999) + self.assertEqual(fb.query_heights, (700_000, 809_999)) + self.assertEqual(fb.group_filters[0].coverage, [700_000]) + self.assertEqual(fb.group_filters[1].coverage, [790_000, 800_000]) + self.assertEqual(fb.group_filters[2].coverage, list(range(798_000, 809_000+1, 1_000))) + self.assertEqual(fb.group_filters[3].coverage, list(range(798_800, 809_900+1, 100))) + fb.add(b'beef0', 787_111, ['a']) + fb.add(b'beef1', 798_222, ['b']) + fb.add(b'beef2', 798_812, ['c']) + fb.add(b'beef3', 798_813, ['d']) + fb.add(b'beef4', 798_814, ['e']) + fb.add(b'beef5', 809_000, ['f']) + fb.add(b'beef6', 809_999, ['g']) + fb.add(b'beef7', 809_999, ['h']) fb.add(b'beef8', 820_000, ['i']) self.assertEqual(fb.group_filters[0].groups, { - 810_000: {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'} + 700_000: {'a', 'b', 'c', 'd', 'e'} }) self.assertEqual(fb.group_filters[1].groups, { - 818_000: {'c', 'd', 'e'}, - 819_000: {'f', 'g', 'h'} + 790_000: {'b', 'c', 'd', 'e'}, + 800_000: {'f', 'g', 'h'} }) - self.assertEqual(fb.group_filters[2].groups[818_800], {'d', 'e'}) - self.assertEqual(fb.group_filters[2].groups[819_000], {'f'}) - self.assertEqual(fb.group_filters[2].groups[819_900], {'g', 'h'}) - self.assertEqual(fb.block_filters, {818813: {'e'}, 819000: {'f'}, 819999: {'g', 'h'}}) + self.assertEqual(fb.group_filters[2].groups, { + 798_000: {'b', 'c', 'd', 'e'}, 799_000: set(), + 800_000: set(), 801_000: set(), 802_000: set(), 803_000: set(), 804_000: set(), + 805_000: set(), 806_000: set(), 807_000: set(), 808_000: set(), + 809_000: {'f', 'g', 'h'} + }) + self.assertEqual(fb.group_filters[3].groups[798_800], {'c', 'd', 'e'}) + self.assertEqual(fb.group_filters[3].groups[809_000], {'f'}) + self.assertEqual(fb.group_filters[3].groups[809_900], {'g', 'h'}) + self.assertEqual(fb.block_filters, {798813: {'d'}, 798814: {'e'}, 809000: {'f'}, 809999: {'h', 'g'}}) self.assertEqual(fb.tx_filters, [ - (b'beef4', 818813, ['e']), - (b'beef5', 819000, ['f']), - (b'beef6', 819999, ['g']), - (b'beef7', 819999, ['h']) + (b'beef3', 798813, ['d']), + (b'beef4', 798814, ['e']), + (b'beef5', 809000, ['f']), + (b'beef6', 809999, ['g']), + (b'beef7', 809999, ['h']) ]) diff --git a/tests/unit/wallet/test_sync.py b/tests/unit/wallet/test_sync.py index b28aaec94..daef3e33a 100644 --- a/tests/unit/wallet/test_sync.py +++ b/tests/unit/wallet/test_sync.py @@ -17,49 +17,55 @@ class TestMissingRequiredFiltersCalculation(UnitDBTestCase): self.assertEqual(q.get_missing_required_filters(100), {(2, 0, 0)}) self.assertEqual(q.get_missing_required_filters(199), {(2, 0, 0), (1, 100, 199)}) self.assertEqual(q.get_missing_required_filters(201), {(2, 0, 100), (1, 200, 201)}) + # all filters missing - self.assertEqual(q.get_missing_required_filters(134_567), { - (4, 0, 120_000), - (3, 130_000, 133_000), - (2, 134_000, 134_400), - (1, 134_500, 134_567) + self.assertEqual(q.get_missing_required_filters(234_567), { + (5, 0, 100_000), + (4, 200_000, 220_000), + (3, 230_000, 233_000), + (2, 234_000, 234_400), + (1, 234_500, 234_567) }) - q.insert_block_filters([(110_000, 4, b'beef')]) - q.insert_block_filters([(129_000, 3, b'beef')]) - q.insert_block_filters([(133_900, 2, b'beef')]) - q.insert_block_filters([(134_499, 1, b'beef')]) - # we we have some filters, but not recent enough (all except 10k are adjusted) - self.assertEqual(q.get_missing_required_filters(134_567), { - (4, 120_000, 120_000), # 0 -> 120_000 - (3, 130_000, 133_000), - (2, 134_000, 134_400), - (1, 134_500, 134_567) + q.insert_block_filters([(0, 5, b'beef')]) + q.insert_block_filters([(190_000, 4, b'beef')]) + q.insert_block_filters([(229_000, 3, b'beef')]) + q.insert_block_filters([(233_900, 2, b'beef')]) + q.insert_block_filters([(234_499, 1, b'beef')]) + # we have some old filters but none useable as initial required (except one 100k filter) + self.assertEqual(q.get_missing_required_filters(234_567), { + (5, 100_000, 100_000), + (4, 200_000, 220_000), + (3, 230_000, 233_000), + (2, 234_000, 234_400), + (1, 234_500, 234_567) }) - q.insert_block_filters([(132_000, 3, b'beef')]) - q.insert_block_filters([(134_300, 2, b'beef')]) - q.insert_block_filters([(134_550, 1, b'beef')]) - # all filters get adjusted because we have recent of each - self.assertEqual(q.get_missing_required_filters(134_567), { - (4, 120_000, 120_000), # 0 -> 120_000 - (3, 133_000, 133_000), # 130_000 -> 133_000 - (2, 134_400, 134_400), # 134_000 -> 134_400 - (1, 134_551, 134_567) # 134_500 -> 134_551 + q.insert_block_filters([(100_000, 5, b'beef')]) + q.insert_block_filters([(210_000, 4, b'beef')]) + q.insert_block_filters([(232_000, 3, b'beef')]) + q.insert_block_filters([(234_300, 2, b'beef')]) + q.insert_block_filters([(234_550, 1, b'beef')]) + # we have some useable initial filters, but not all + self.assertEqual(q.get_missing_required_filters(234_567), { + (4, 220_000, 220_000), + (3, 233_000, 233_000), + (2, 234_400, 234_400), + (1, 234_551, 234_567) }) - q.insert_block_filters([(120_000, 4, b'beef')]) - q.insert_block_filters([(133_000, 3, b'beef')]) - q.insert_block_filters([(134_400, 2, b'beef')]) - q.insert_block_filters([(134_566, 1, b'beef')]) + q.insert_block_filters([(220_000, 4, b'beef')]) + q.insert_block_filters([(233_000, 3, b'beef')]) + q.insert_block_filters([(234_400, 2, b'beef')]) + q.insert_block_filters([(234_566, 1, b'beef')]) # we have latest filters for all except latest single block - self.assertEqual(q.get_missing_required_filters(134_567), { - (1, 134_567, 134_567) # 134_551 -> 134_567 + self.assertEqual(q.get_missing_required_filters(234_567), { + (1, 234_567, 234_567) }) - q.insert_block_filters([(134_567, 1, b'beef')]) + q.insert_block_filters([(234_567, 1, b'beef')]) # we have all latest filters - self.assertEqual(q.get_missing_required_filters(134_567), set()) + self.assertEqual(q.get_missing_required_filters(234_567), set()) class TestAddressGenerationAndTXSync(UnitDBTestCase):