From 30dff02674bd08b2111ca49dd72a2bb460c22492 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 17 Sep 2020 11:25:28 -0400 Subject: [PATCH 1/3] logging formatter --- lbry/console.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbry/console.py b/lbry/console.py index f786d45cf..e4140ebf7 100644 --- a/lbry/console.py +++ b/lbry/console.py @@ -85,7 +85,7 @@ class Basic(Console): super().__init__(service) self.service.sync.on_progress.listen(self.on_sync_progress) self.tasks = {} - logging.basicConfig(level=logging.INFO) + logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-8s %(name)s:%(lineno)d: %(message)s") def starting(self): conf = self.service.conf From 5b9e40c0c09c9ba0b257861e74ad3410f56bf8b8 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 17 Sep 2020 11:25:50 -0400 Subject: [PATCH 2/3] sync block filters --- lbry/blockchain/block.py | 7 ++- lbry/blockchain/ledger.py | 2 +- lbry/blockchain/sync/blocks.py | 80 +++++++++++++++++----------- lbry/blockchain/sync/synchronizer.py | 30 ++++++----- lbry/db/tables.py | 20 ++++++- 5 files changed, 87 insertions(+), 52 deletions(-) diff --git a/lbry/blockchain/block.py b/lbry/blockchain/block.py index 07fd4e7e2..b7acc7e72 100644 --- a/lbry/blockchain/block.py +++ b/lbry/blockchain/block.py @@ -1,6 +1,5 @@ import struct -from typing import Set -from typing import NamedTuple, List +from typing import NamedTuple, List, Tuple from chiabip158 import PyBIP158 # pylint: disable=no-name-in-module @@ -12,8 +11,8 @@ from lbry.blockchain.bcd_data_stream import BCDataStream ZERO_BLOCK = bytes((0,)*32) -def create_block_filter(address_hashes: Set[bytes]) -> bytes: - return bytes(PyBIP158([bytearray(a) for a in address_hashes]).GetEncoded()) +def create_block_filter(address_hashes: List[bytearray]) -> bytes: + return bytes(PyBIP158(address_hashes).GetEncoded()) def get_block_filter(block_filter: bytes) -> PyBIP158: diff --git a/lbry/blockchain/ledger.py b/lbry/blockchain/ledger.py index dddb282b8..d0ec5d582 100644 --- a/lbry/blockchain/ledger.py +++ b/lbry/blockchain/ledger.py @@ -51,7 +51,7 @@ class Ledger: return Base58.encode(bytearray(raw_address + double_sha256(raw_address)[0:4])) @staticmethod - def address_to_hash160(address): + def address_to_hash160(address) -> bytes: return Base58.decode(address)[1:21] @classmethod diff --git a/lbry/blockchain/sync/blocks.py b/lbry/blockchain/sync/blocks.py index 9c4ddc91c..b7bf24e51 100644 --- a/lbry/blockchain/sync/blocks.py +++ b/lbry/blockchain/sync/blocks.py @@ -4,11 +4,14 @@ from sqlalchemy import table, bindparam, text, func, union from sqlalchemy.future import select from sqlalchemy.schema import CreateTable -from lbry.db.tables import Block as BlockTable, TX, TXO, TXI, Claim, Tag, Support +from lbry.db.tables import Block as BlockTable, TX, TXO, TXI, Claim, Tag, Support, BlockFilter from lbry.db.tables import ( + join_block_filter, + pg_add_block_constraints_and_indexes, + pg_add_block_filter_constraints_and_indexes, pg_add_tx_constraints_and_indexes, pg_add_txo_constraints_and_indexes, - pg_add_txi_constraints_and_indexes, + pg_add_txi_constraints_and_indexes ) from lbry.db.query_context import ProgressContext, event_emitter, context from lbry.db.sync import set_input_addresses, update_spent_outputs @@ -65,10 +68,27 @@ def sync_spends(initial_sync: bool, p: ProgressContext): if initial_sync: p.start( 7 + + len(pg_add_block_constraints_and_indexes) + + len(pg_add_block_filter_constraints_and_indexes) + len(pg_add_tx_constraints_and_indexes) + len(pg_add_txi_constraints_and_indexes) + len(pg_add_txo_constraints_and_indexes) ) + if p.ctx.is_postgres: + p.ctx.execute_notx(text("VACUUM ANALYZE block;")) + p.step() + for constraint in pg_add_block_constraints_and_indexes: + if p.ctx.is_postgres: + p.ctx.execute(text(constraint)) + p.step() + if p.ctx.is_postgres: + p.ctx.execute_notx(text("VACUUM ANALYZE block_filter;")) + p.step() + for constraint in pg_add_block_filter_constraints_and_indexes: + if p.ctx.is_postgres: + p.ctx.execute(text(constraint)) + p.step() + # 1. tx table stuff if p.ctx.is_postgres: p.ctx.execute_notx(text("VACUUM ANALYZE tx;")) @@ -79,6 +99,8 @@ def sync_spends(initial_sync: bool, p: ProgressContext): p.step() # A. Update TXIs to have the address of TXO they are spending. # 2. txi table reshuffling + + p.ctx.execute(text("ALTER TABLE txi RENAME TO old_txi;")) p.ctx.execute(CreateTable(TXI, include_foreign_key_constraints=[])) if p.ctx.is_postgres: @@ -150,44 +172,38 @@ def sync_spends(initial_sync: bool, p: ProgressContext): @event_emitter("blockchain.sync.filter.generate", "blocks") -def sync_block_filters(p: ProgressContext): - blocks = [] - all_filters = [] - all_addresses = [] - for block in get_blocks_without_filters(): - addresses = { - p.ctx.ledger.address_to_hash160(r["address"]) - for r in get_block_tx_addresses(block_hash=block["block_hash"]) - } - all_addresses.extend(addresses) - block_filter = create_block_filter(addresses) - all_filters.append(block_filter) - blocks.append({"pk": block["block_hash"], "block_filter": block_filter}) - p.ctx.execute( - BlockTable.update().where(BlockTable.c.block_hash == bindparam("pk")), blocks +def sync_block_filters(blocks, p: ProgressContext): + p.start( + len(blocks), progress_id=blocks[0][1], label=f"generate filters {blocks[0][1]}-{blocks[-1][1]}" ) + for block_hash, block_height in blocks: + block_filter = create_block_filter([ + bytearray(p.ctx.ledger.address_to_hash160(r["address"])) + for r in get_block_tx_addresses(block_height) + ]) + p.ctx.execute( + BlockFilter.insert().values(block_hash=block_hash, block_filter=block_filter) + ) + p.step() def get_blocks_without_filters(): - return context().fetchall( - select(BlockTable.c.block_hash) - .where(BlockTable.c.block_filter.is_(None)) - ) + result = [] + for block in context().fetchall( + select(BlockTable.c.block_hash, BlockTable.c.height).select_from( + join_block_filter + ).where(BlockFilter.c.block_filter.is_(None))): + result.append((block['block_hash'], block['height'])) + return result -def get_block_tx_addresses(block_hash=None, tx_hash=None): - if block_hash is not None: - constraint = (TX.c.block_hash == block_hash) - elif tx_hash is not None: - constraint = (TX.c.tx_hash == tx_hash) - else: - raise ValueError('block_hash or tx_hash must be provided.') +def get_block_tx_addresses(block_height): return context().fetchall( union( - select(TXO.c.address).select_from(TXO.join(TX)) - .where((TXO.c.address.isnot_(None)) & constraint), - select(TXI.c.address).select_from(TXI.join(TX)) - .where((TXI.c.address.isnot_(None)) & constraint), + select(TXO.c.address).select_from(TXO) + .where((TXO.c.address.isnot(None)) & (TXO.c.height == block_height)), + select(TXI.c.address).select_from(TXI.join(TXO)) + .where((TXI.c.address.isnot(None)) & (TXI.c.height == block_height)), ) ) diff --git a/lbry/blockchain/sync/synchronizer.py b/lbry/blockchain/sync/synchronizer.py index cea8bf751..5940266d2 100644 --- a/lbry/blockchain/sync/synchronizer.py +++ b/lbry/blockchain/sync/synchronizer.py @@ -18,7 +18,6 @@ log = logging.getLogger(__name__) BLOCKS_INIT_EVENT = Event.add("blockchain.sync.blocks.init", "steps") BLOCKS_MAIN_EVENT = Event.add("blockchain.sync.blocks.main", "blocks", "txs") -FILTER_INIT_EVENT = Event.add("blockchain.sync.filter.init", "steps") FILTER_MAIN_EVENT = Event.add("blockchain.sync.filter.main", "blocks") CLAIMS_INIT_EVENT = Event.add("blockchain.sync.claims.init", "steps") CLAIMS_MAIN_EVENT = Event.add("blockchain.sync.claims.main", "claims") @@ -142,15 +141,20 @@ class BlockchainSync(Sync): return starting_height, best_height_processed async def sync_filters(self): - if not self.conf.spv_address_filters: - return - with Progress(self.db.message_queue, FILTER_MAIN_EVENT) as p: - blocks = 0 - tasks = [] - # for chunk in range(select min(height), max(height) from block where filter is null): - # tasks.append(self.db.run(block_phase.sync_filters, chunk, self.FILTER_FLUSH_SIZE)) - p.start(blocks) - await self.run_tasks(tasks) + # if not self.conf.spv_address_filters: + # return + blocks = await self.db.run(block_phase.get_blocks_without_filters) + batch_size = (len(blocks) // self.db.workers) + 1 + batches = [ + blocks[index:index + batch_size] for index in range(0, len(blocks), batch_size) + ] + if batches: + with Progress(self.db.message_queue, FILTER_MAIN_EVENT) as p: + p.start(len(blocks)) + await self.run_tasks([ + self.db.run(block_phase.sync_block_filters, batch) + for batch in batches + ]) async def sync_spends(self, blocks_added): if blocks_added: @@ -298,14 +302,12 @@ class BlockchainSync(Sync): async def advance(self): blocks_added = await self.sync_blocks() - sync_filters_task = asyncio.create_task(self.sync_filters()) - sync_trends_task = asyncio.create_task(self.sync_trends()) await self.sync_spends(blocks_added) + await self.sync_filters() initial_claim_sync = await self.sync_claims(blocks_added) await self.sync_supports(blocks_added) await self.sync_channel_stats(blocks_added, initial_claim_sync) - await sync_trends_task - await sync_filters_task + await self.sync_trends() if blocks_added: await self._on_block_controller.add(BlockEvent(blocks_added[-1])) diff --git a/lbry/db/tables.py b/lbry/db/tables.py index 2ae6733fd..042d1af5c 100644 --- a/lbry/db/tables.py +++ b/lbry/db/tables.py @@ -45,9 +45,25 @@ Block = Table( Column('file_number', SmallInteger), Column('height', Integer), Column('timestamp', Integer), - Column('block_filter', LargeBinary, nullable=True) ) +pg_add_block_constraints_and_indexes = [ + "ALTER TABLE block ADD PRIMARY KEY (block_hash);" +] + +BlockFilter = Table( + 'block_filter', metadata, + Column('block_hash', LargeBinary, primary_key=True), + Column('block_filter', LargeBinary, nullable=True), +) +join_block_filter = Block.join(BlockFilter, BlockFilter.columns.block_hash == Block.columns.block_hash, full=True) +pg_add_block_filter_constraints_and_indexes = [ + "ALTER TABLE block_filter ADD PRIMARY KEY (block_hash);", + "ALTER TABLE block_filter ADD CONSTRAINT fk_block_filter " + " FOREIGN KEY(block_hash) " + " REFERENCES block(block_hash) " + " ON DELETE CASCADE;" +] TX = Table( 'tx', metadata, @@ -117,6 +133,7 @@ pg_add_txo_constraints_and_indexes = [ f"INCLUDE (claim_hash) WHERE txo_type={TXO_TYPES['support']};", f"CREATE INDEX txo_spent_supports_by_height ON txo (spent_height DESC) " f"INCLUDE (claim_hash) WHERE txo_type={TXO_TYPES['support']};", + "CREATE INDEX txo_tx_height ON txo (height);" ] @@ -134,6 +151,7 @@ txi_join_account = TXI.join(AccountAddress, TXI.columns.address == AccountAddres pg_add_txi_constraints_and_indexes = [ "ALTER TABLE txi ADD PRIMARY KEY (txo_hash);", + "CREATE INDEX txi_tx_height ON txi (height);" ] From 0b34c4ea8696b346ac846322b50f9ea33fe6d991 Mon Sep 17 00:00:00 2001 From: Lex Berezhny Date: Mon, 21 Sep 2020 10:26:19 -0400 Subject: [PATCH 3/3] got tests to pass on sqlite and postgres and added filter groups at various block height scales --- lbry/blockchain/block.py | 10 +- lbry/blockchain/sync/blocks.py | 168 ++++++++++++------ lbry/blockchain/sync/filter_builder.py | 79 ++++++++ lbry/blockchain/sync/synchronizer.py | 47 +++-- lbry/db/database.py | 3 + lbry/db/queries/base.py | 10 +- lbry/db/query_context.py | 33 +++- lbry/db/tables.py | 63 ++++--- lbry/testcase.py | 76 +++++++- .../integration/blockchain/test_blockchain.py | 1 - tests/unit/blockchain/test_filters.py | 106 +++++++++++ 11 files changed, 496 insertions(+), 100 deletions(-) create mode 100644 lbry/blockchain/sync/filter_builder.py create mode 100644 tests/unit/blockchain/test_filters.py diff --git a/lbry/blockchain/block.py b/lbry/blockchain/block.py index b7acc7e72..8921dbdfe 100644 --- a/lbry/blockchain/block.py +++ b/lbry/blockchain/block.py @@ -1,5 +1,5 @@ import struct -from typing import NamedTuple, List, Tuple +from typing import NamedTuple, List from chiabip158 import PyBIP158 # pylint: disable=no-name-in-module @@ -11,12 +11,12 @@ from lbry.blockchain.bcd_data_stream import BCDataStream ZERO_BLOCK = bytes((0,)*32) -def create_block_filter(address_hashes: List[bytearray]) -> bytes: - return bytes(PyBIP158(address_hashes).GetEncoded()) +def create_address_filter(address_hashes: List[bytes]) -> bytes: + return bytes(PyBIP158([bytearray(a) for a in address_hashes]).GetEncoded()) -def get_block_filter(block_filter: bytes) -> PyBIP158: - return PyBIP158(bytearray(block_filter)) +def get_address_filter(address_filter: bytes) -> PyBIP158: + return PyBIP158(bytearray(address_filter)) class Block(NamedTuple): diff --git a/lbry/blockchain/sync/blocks.py b/lbry/blockchain/sync/blocks.py index b7bf24e51..ca6b95fdb 100644 --- a/lbry/blockchain/sync/blocks.py +++ b/lbry/blockchain/sync/blocks.py @@ -1,24 +1,29 @@ import logging +from typing import Tuple -from sqlalchemy import table, bindparam, text, func, union +from sqlalchemy import table, text, func, union, between from sqlalchemy.future import select from sqlalchemy.schema import CreateTable -from lbry.db.tables import Block as BlockTable, TX, TXO, TXI, Claim, Tag, Support, BlockFilter from lbry.db.tables import ( - join_block_filter, + Block as BlockTable, BlockFilter, BlockGroupFilter, + TX, TXFilter, MempoolFilter, TXO, TXI, Claim, Tag, Support +) +from lbry.db.tables import ( pg_add_block_constraints_and_indexes, pg_add_block_filter_constraints_and_indexes, pg_add_tx_constraints_and_indexes, + pg_add_tx_filter_constraints_and_indexes, pg_add_txo_constraints_and_indexes, - pg_add_txi_constraints_and_indexes + pg_add_txi_constraints_and_indexes, ) from lbry.db.query_context import ProgressContext, event_emitter, context from lbry.db.sync import set_input_addresses, update_spent_outputs -from lbry.blockchain.block import Block, create_block_filter +from lbry.blockchain.block import Block, create_address_filter from lbry.blockchain.bcd_data_stream import BCDataStream from .context import get_or_initialize_lbrycrd +from .filter_builder import FilterBuilder log = logging.getLogger(__name__) @@ -63,32 +68,35 @@ def sync_block_file( return last_block_processed +@event_emitter("blockchain.sync.blocks.indexes", "steps") +def blocks_constraints_and_indexes(p: ProgressContext): + p.start(1 + len(pg_add_block_constraints_and_indexes)) + if p.ctx.is_postgres: + p.ctx.execute_notx(text("VACUUM ANALYZE block;")) + p.step() + for constraint in pg_add_block_constraints_and_indexes: + if p.ctx.is_postgres: + p.ctx.execute(text(constraint)) + p.step() + + +@event_emitter("blockchain.sync.blocks.vacuum", "steps") +def blocks_vacuum(p: ProgressContext): + p.start(1) + if p.ctx.is_postgres: + p.ctx.execute_notx(text("VACUUM block;")) + p.step() + + @event_emitter("blockchain.sync.spends.main", "steps") def sync_spends(initial_sync: bool, p: ProgressContext): if initial_sync: p.start( 7 + - len(pg_add_block_constraints_and_indexes) + - len(pg_add_block_filter_constraints_and_indexes) + len(pg_add_tx_constraints_and_indexes) + len(pg_add_txi_constraints_and_indexes) + len(pg_add_txo_constraints_and_indexes) ) - if p.ctx.is_postgres: - p.ctx.execute_notx(text("VACUUM ANALYZE block;")) - p.step() - for constraint in pg_add_block_constraints_and_indexes: - if p.ctx.is_postgres: - p.ctx.execute(text(constraint)) - p.step() - if p.ctx.is_postgres: - p.ctx.execute_notx(text("VACUUM ANALYZE block_filter;")) - p.step() - for constraint in pg_add_block_filter_constraints_and_indexes: - if p.ctx.is_postgres: - p.ctx.execute(text(constraint)) - p.step() - # 1. tx table stuff if p.ctx.is_postgres: p.ctx.execute_notx(text("VACUUM ANALYZE tx;")) @@ -99,8 +107,6 @@ def sync_spends(initial_sync: bool, p: ProgressContext): p.step() # A. Update TXIs to have the address of TXO they are spending. # 2. txi table reshuffling - - p.ctx.execute(text("ALTER TABLE txi RENAME TO old_txi;")) p.ctx.execute(CreateTable(TXI, include_foreign_key_constraints=[])) if p.ctx.is_postgres: @@ -171,41 +177,96 @@ def sync_spends(initial_sync: bool, p: ProgressContext): p.step() -@event_emitter("blockchain.sync.filter.generate", "blocks") -def sync_block_filters(blocks, p: ProgressContext): - p.start( - len(blocks), progress_id=blocks[0][1], label=f"generate filters {blocks[0][1]}-{blocks[-1][1]}" +@event_emitter("blockchain.sync.filters.generate", "blocks", throttle=100) +def sync_filters(start, end, p: ProgressContext): + fp = FilterBuilder(start, end) + p.start((end-start)+1, progress_id=start, label=f"generate filters {start}-{end}") + with p.ctx.connect_streaming() as c: + loader = p.ctx.get_bulk_loader() + + tx_hash, height, addresses, last_added = None, None, set(), None + address_to_hash = p.ctx.ledger.address_to_hash160 + for row in c.execute(get_block_tx_addresses_sql(*fp.query_heights)): + if tx_hash != row.tx_hash: + if tx_hash is not None: + last_added = tx_hash + fp.add(tx_hash, height, addresses) + tx_hash, height, addresses = row.tx_hash, row.height, set() + addresses.add(address_to_hash(row.address)) + if all([last_added, tx_hash]) and last_added != tx_hash: # pickup last tx + fp.add(tx_hash, height, addresses) + + for tx_hash, height, addresses in fp.tx_filters: + loader.add_transaction_filter( + tx_hash, height, create_address_filter(list(addresses)) + ) + + for height, addresses in fp.block_filters.items(): + loader.add_block_filter( + height, create_address_filter(list(addresses)) + ) + + for group_filter in fp.group_filters: + for height, addresses in group_filter.groups.items(): + loader.add_group_filter( + height, group_filter.factor, create_address_filter(list(addresses)) + ) + + p.add(loader.flush(BlockFilter)) + + +@event_emitter("blockchain.sync.filters.indexes", "steps") +def filters_constraints_and_indexes(p: ProgressContext): + constraints = ( + pg_add_tx_filter_constraints_and_indexes + + pg_add_block_filter_constraints_and_indexes ) - for block_hash, block_height in blocks: - block_filter = create_block_filter([ - bytearray(p.ctx.ledger.address_to_hash160(r["address"])) - for r in get_block_tx_addresses(block_height) - ]) - p.ctx.execute( - BlockFilter.insert().values(block_hash=block_hash, block_filter=block_filter) - ) + p.start(2 + len(constraints)) + if p.ctx.is_postgres: + p.ctx.execute_notx(text("VACUUM ANALYZE block_filter;")) + p.step() + if p.ctx.is_postgres: + p.ctx.execute_notx(text("VACUUM ANALYZE tx_filter;")) + p.step() + for constraint in constraints: + if p.ctx.is_postgres: + p.ctx.execute(text(constraint)) p.step() -def get_blocks_without_filters(): - result = [] - for block in context().fetchall( - select(BlockTable.c.block_hash, BlockTable.c.height).select_from( - join_block_filter - ).where(BlockFilter.c.block_filter.is_(None))): - result.append((block['block_hash'], block['height'])) - return result +@event_emitter("blockchain.sync.filters.vacuum", "steps") +def filters_vacuum(p: ProgressContext): + p.start(2) + if p.ctx.is_postgres: + p.ctx.execute_notx(text("VACUUM block_filter;")) + p.step() + if p.ctx.is_postgres: + p.ctx.execute_notx(text("VACUUM tx_filter;")) + p.step() -def get_block_tx_addresses(block_height): - return context().fetchall( - union( - select(TXO.c.address).select_from(TXO) - .where((TXO.c.address.isnot(None)) & (TXO.c.height == block_height)), - select(TXI.c.address).select_from(TXI.join(TXO)) - .where((TXI.c.address.isnot(None)) & (TXI.c.height == block_height)), +def get_block_range_without_filters() -> Tuple[int, int]: + sql = ( + select( + func.coalesce(func.min(BlockTable.c.height), -1).label('start_height'), + func.coalesce(func.max(BlockTable.c.height), -1).label('end_height'), ) + .select_from(BlockTable) + .where(BlockTable.c.height.notin_(select(BlockFilter.c.height))) ) + result = context().fetchone(sql) + return result['start_height'], result['end_height'] + + +def get_block_tx_addresses_sql(start_height, end_height): + return union( + select(TXO.c.tx_hash, TXO.c.height, TXO.c.address).where( + (TXO.c.address.isnot(None)) & between(TXO.c.height, start_height, end_height) + ), + select(TXI.c.tx_hash, TXI.c.height, TXI.c.address).where( + (TXI.c.address.isnot(None)) & between(TXI.c.height, start_height, end_height) + ), + ).order_by('height', 'tx_hash') @event_emitter("blockchain.sync.rewind.main", "steps") @@ -222,6 +283,11 @@ def rewind(height: int, p: ProgressContext): ), Claim.delete().where(Claim.c.height >= height), Support.delete().where(Support.c.height >= height), + BlockFilter.delete().where(BlockFilter.c.height >= height), + # TODO: group and tx filters need where() clauses (below actually breaks things) + BlockGroupFilter.delete(), + TXFilter.delete(), + MempoolFilter.delete() ] for delete in p.iter(deletes): p.ctx.execute(delete) diff --git a/lbry/blockchain/sync/filter_builder.py b/lbry/blockchain/sync/filter_builder.py new file mode 100644 index 000000000..9b21a0508 --- /dev/null +++ b/lbry/blockchain/sync/filter_builder.py @@ -0,0 +1,79 @@ +from typing import Dict + + +def split_range_into_10k_batches(start, end): + batch = [start, end] + batches = [batch] + for block in range(start, end+1): + if 0 < block != batch[0] and block % 10_000 == 0: + batch = [block, block] + batches.append(batch) + else: + batch[1] = block + return batches + + +class GroupFilter: + """ + Collects addresses into buckets of specific sizes defined by 10 raised to power of factor. + eg. a factor of 2 (10**2) would create block buckets 100-199, 200-299, etc + a factor of 3 (10**3) would create block buckets 1000-1999, 2000-2999, etc + """ + def __init__(self, start, end, factor): + self.start = start + self.end = end + self.factor = factor + self.resolution = resolution = 10**factor + last_height_in_group, groups = resolution-1, {} + for block in range(start, end+1): + if block % resolution == last_height_in_group: + groups[block-last_height_in_group] = set() + self.last_height_in_group = last_height_in_group + self.groups: Dict[int, set] = groups + + @property + def coverage(self): + return list(self.groups.keys()) + + def add(self, height, addresses): + group = self.groups.get(height - (height % self.resolution)) + if group is not None: + group.update(addresses) + + +class FilterBuilder: + """ + Creates filter groups, calculates the necessary block range to fulfill creation + of filter groups and collects tx filters, block filters and group filters. + """ + def __init__(self, start, end): + self.start = start + self.end = end + self.group_filters = [ + GroupFilter(start, end, 4), + GroupFilter(start, end, 3), + GroupFilter(start, end, 2), + ] + self.start_tx_height, self.end_tx_height = self._calculate_tx_heights_for_query() + self.tx_filters = [] + self.block_filters: Dict[int, set] = {} + + def _calculate_tx_heights_for_query(self): + for group_filter in self.group_filters: + if group_filter.groups: + return group_filter.coverage[0], self.end + return self.start, self.end + + @property + def query_heights(self): + return self.start_tx_height, self.end_tx_height + + def add(self, tx_hash, height, addresses): + if self.start <= height <= self.end: + self.tx_filters.append((tx_hash, height, addresses)) + block_filter = self.block_filters.get(height) + if block_filter is None: + block_filter = self.block_filters[height] = set() + block_filter.update(addresses) + for group_filter in self.group_filters: + group_filter.add(height, addresses) diff --git a/lbry/blockchain/sync/synchronizer.py b/lbry/blockchain/sync/synchronizer.py index 5940266d2..f55299320 100644 --- a/lbry/blockchain/sync/synchronizer.py +++ b/lbry/blockchain/sync/synchronizer.py @@ -13,12 +13,14 @@ from lbry.blockchain.lbrycrd import Lbrycrd from lbry.error import LbrycrdEventSubscriptionError from . import blocks as block_phase, claims as claim_phase, supports as support_phase +from .filter_builder import split_range_into_10k_batches log = logging.getLogger(__name__) BLOCKS_INIT_EVENT = Event.add("blockchain.sync.blocks.init", "steps") BLOCKS_MAIN_EVENT = Event.add("blockchain.sync.blocks.main", "blocks", "txs") -FILTER_MAIN_EVENT = Event.add("blockchain.sync.filter.main", "blocks") +FILTER_INIT_EVENT = Event.add("blockchain.sync.filters.init", "steps") +FILTER_MAIN_EVENT = Event.add("blockchain.sync.filters.main", "blocks") CLAIMS_INIT_EVENT = Event.add("blockchain.sync.claims.init", "steps") CLAIMS_MAIN_EVENT = Event.add("blockchain.sync.claims.main", "claims") TRENDS_INIT_EVENT = Event.add("blockchain.sync.trends.init", "steps") @@ -137,24 +139,39 @@ class BlockchainSync(Sync): }) completed = await self.run_tasks(tasks) if completed: + if starting_height == 0: + await self.db.run(block_phase.blocks_constraints_and_indexes) + else: + await self.db.run(block_phase.blocks_vacuum) best_height_processed = max(f.result() for f in completed) return starting_height, best_height_processed async def sync_filters(self): - # if not self.conf.spv_address_filters: - # return - blocks = await self.db.run(block_phase.get_blocks_without_filters) - batch_size = (len(blocks) // self.db.workers) + 1 - batches = [ - blocks[index:index + batch_size] for index in range(0, len(blocks), batch_size) - ] - if batches: - with Progress(self.db.message_queue, FILTER_MAIN_EVENT) as p: - p.start(len(blocks)) - await self.run_tasks([ - self.db.run(block_phase.sync_block_filters, batch) - for batch in batches - ]) + if not self.conf.spv_address_filters: + return + with Progress(self.db.message_queue, FILTER_INIT_EVENT) as p: + p.start(2) + initial_sync = not await self.db.has_filters() + p.step() + if initial_sync: + blocks = [0, await self.db.get_best_block_height()] + else: + blocks = await self.db.run(block_phase.get_block_range_without_filters) + if blocks != (-1, -1): + batches = split_range_into_10k_batches(*blocks) + p.step() + else: + p.step() + return + with Progress(self.db.message_queue, FILTER_MAIN_EVENT) as p: + p.start((blocks[1]-blocks[0])+1) + await self.run_tasks([ + self.db.run(block_phase.sync_filters, *batch) for batch in batches + ]) + if initial_sync: + await self.db.run(block_phase.filters_constraints_and_indexes) + else: + await self.db.run(block_phase.filters_vacuum) async def sync_spends(self, blocks_added): if blocks_added: diff --git a/lbry/db/database.py b/lbry/db/database.py index e9d912b12..20b4e2fdf 100644 --- a/lbry/db/database.py +++ b/lbry/db/database.py @@ -199,6 +199,9 @@ class Database: async def execute_fetchall(self, sql): return await self.run(q.execute_fetchall, sql) + async def has_filters(self): + return await self.run(q.has_filters) + async def has_claims(self): return await self.run(q.has_claims) diff --git a/lbry/db/queries/base.py b/lbry/db/queries/base.py index cd4716b14..f758adb23 100644 --- a/lbry/db/queries/base.py +++ b/lbry/db/queries/base.py @@ -2,7 +2,7 @@ from sqlalchemy import text from sqlalchemy.future import select from ..query_context import context -from ..tables import SCHEMA_VERSION, metadata, Version, Claim, Support, Block, TX +from ..tables import SCHEMA_VERSION, metadata, Version, Claim, Support, Block, BlockFilter, TX def execute(sql): @@ -13,6 +13,10 @@ def execute_fetchall(sql): return context().fetchall(text(sql)) +def has_filters(): + return context().has_records(BlockFilter) + + def has_claims(): return context().has_records(Claim) @@ -22,7 +26,7 @@ def has_supports(): def get_best_block_height(): - context().fetchmax(Block.c.height, -1) + return context().fetchmax(Block.c.height, -1) def insert_block(block): @@ -50,7 +54,7 @@ def disable_trigger_and_constraints(table_name): ctx = context() if ctx.is_postgres: ctx.execute(text(f"ALTER TABLE {table_name} DISABLE TRIGGER ALL;")) - if table_name in ('tag', 'stake'): + if table_name in ('tag', 'stake', 'block_group_filter', 'mempool_filter'): return if ctx.is_postgres: ctx.execute(text( diff --git a/lbry/db/query_context.py b/lbry/db/query_context.py index 8239c5c4f..5bbd51c5c 100644 --- a/lbry/db/query_context.py +++ b/lbry/db/query_context.py @@ -26,7 +26,10 @@ from lbry.schema.result import Censor from lbry.schema.mime_types import guess_stream_type from .utils import pg_insert -from .tables import Block, TX, TXO, TXI, Claim, Tag, Support +from .tables import ( + Block, BlockFilter, BlockGroupFilter, + TX, TXFilter, TXO, TXI, Claim, Tag, Support +) from .constants import TXO_TYPES, STREAM_TYPES @@ -414,6 +417,9 @@ class BulkLoader: self.tags = [] self.update_claims = [] self.delete_tags = [] + self.tx_filters = [] + self.block_filters = [] + self.group_filters = [] @staticmethod def block_to_row(block: Block) -> dict: @@ -429,7 +435,7 @@ class BulkLoader: def tx_to_row(block_hash: bytes, tx: Transaction) -> dict: row = { 'tx_hash': tx.hash, - 'block_hash': block_hash, + #'block_hash': block_hash, 'raw': tx.raw, 'height': tx.height, 'position': tx.position, @@ -621,6 +627,19 @@ class BulkLoader: self.add_transaction(block.block_hash, tx) return self + def add_block_filter(self, height: int, address_filter: bytes): + self.block_filters.append({ + 'height': height, + 'address_filter': address_filter + }) + + def add_group_filter(self, height: int, factor: int, address_filter: bytes): + self.group_filters.append({ + 'height': height, + 'factor': factor, + 'address_filter': address_filter + }) + def add_transaction(self, block_hash: bytes, tx: Transaction): self.txs.append(self.tx_to_row(block_hash, tx)) for txi in tx.inputs: @@ -630,6 +649,13 @@ class BulkLoader: self.txos.append(self.txo_to_row(tx, txo)) return self + def add_transaction_filter(self, tx_hash: bytes, height: int, address_filter: bytes): + self.tx_filters.append({ + 'tx_hash': tx_hash, + 'height': height, + 'address_filter': address_filter + }) + def add_support(self, txo: Output, **extra): self.supports.append(self.support_to_row(txo, **extra)) @@ -669,7 +695,10 @@ class BulkLoader: def get_queries(self): return ( (Block.insert(), self.blocks), + (BlockFilter.insert(), self.block_filters), + (BlockGroupFilter.insert(), self.group_filters), (TX.insert(), self.txs), + (TXFilter.insert(), self.tx_filters), (TXO.insert(), self.txos), (TXI.insert(), self.txis), (Claim.insert(), self.claims), diff --git a/lbry/db/tables.py b/lbry/db/tables.py index 042d1af5c..0a2d3d169 100644 --- a/lbry/db/tables.py +++ b/lbry/db/tables.py @@ -40,34 +40,41 @@ AccountAddress = Table( Block = Table( 'block', metadata, - Column('block_hash', LargeBinary, primary_key=True), + Column('height', Integer, primary_key=True), + Column('block_hash', LargeBinary), Column('previous_hash', LargeBinary), Column('file_number', SmallInteger), - Column('height', Integer), Column('timestamp', Integer), ) pg_add_block_constraints_and_indexes = [ - "ALTER TABLE block ADD PRIMARY KEY (block_hash);" + "ALTER TABLE block ADD PRIMARY KEY (height);", ] + BlockFilter = Table( 'block_filter', metadata, - Column('block_hash', LargeBinary, primary_key=True), - Column('block_filter', LargeBinary, nullable=True), + Column('height', Integer, primary_key=True), + Column('address_filter', LargeBinary), ) -join_block_filter = Block.join(BlockFilter, BlockFilter.columns.block_hash == Block.columns.block_hash, full=True) + pg_add_block_filter_constraints_and_indexes = [ - "ALTER TABLE block_filter ADD PRIMARY KEY (block_hash);", - "ALTER TABLE block_filter ADD CONSTRAINT fk_block_filter " - " FOREIGN KEY(block_hash) " - " REFERENCES block(block_hash) " - " ON DELETE CASCADE;" + "ALTER TABLE block_filter ADD PRIMARY KEY (height);", + "ALTER TABLE block_filter ADD CONSTRAINT fk_block_filter" + " FOREIGN KEY (height) REFERENCES block (height) ON DELETE CASCADE;", ] + +BlockGroupFilter = Table( + 'block_group_filter', metadata, + Column('height', Integer), + Column('factor', SmallInteger), + Column('address_filter', LargeBinary), +) + + TX = Table( 'tx', metadata, - Column('block_hash', LargeBinary, nullable=True), Column('tx_hash', LargeBinary, primary_key=True), Column('raw', LargeBinary), Column('height', Integer), @@ -76,15 +83,34 @@ TX = Table( Column('day', Integer, nullable=True), Column('is_verified', Boolean, server_default='FALSE'), Column('purchased_claim_hash', LargeBinary, nullable=True), - Column('tx_filter', LargeBinary, nullable=True) ) - pg_add_tx_constraints_and_indexes = [ "ALTER TABLE tx ADD PRIMARY KEY (tx_hash);", ] +TXFilter = Table( + 'tx_filter', metadata, + Column('tx_hash', LargeBinary, primary_key=True), + Column('height', Integer), + Column('address_filter', LargeBinary), +) + +pg_add_tx_filter_constraints_and_indexes = [ + "ALTER TABLE tx_filter ADD PRIMARY KEY (tx_hash);", + "ALTER TABLE tx_filter ADD CONSTRAINT fk_tx_filter" + " FOREIGN KEY (tx_hash) REFERENCES tx (tx_hash) ON DELETE CASCADE;" +] + + +MempoolFilter = Table( + 'mempool_filter', metadata, + Column('filter_number', Integer), + Column('mempool_filter', LargeBinary), +) + + TXO = Table( 'txo', metadata, Column('tx_hash', LargeBinary, ForeignKey(TX.columns.tx_hash)), @@ -114,7 +140,6 @@ TXO = Table( txo_join_account = TXO.join(AccountAddress, TXO.columns.address == AccountAddress.columns.address) - pg_add_txo_constraints_and_indexes = [ "ALTER TABLE txo ADD PRIMARY KEY (txo_hash);", # find appropriate channel public key for signing a content claim @@ -133,7 +158,7 @@ pg_add_txo_constraints_and_indexes = [ f"INCLUDE (claim_hash) WHERE txo_type={TXO_TYPES['support']};", f"CREATE INDEX txo_spent_supports_by_height ON txo (spent_height DESC) " f"INCLUDE (claim_hash) WHERE txo_type={TXO_TYPES['support']};", - "CREATE INDEX txo_tx_height ON txo (height);" + "CREATE INDEX txo_height ON txo (height);", ] @@ -148,10 +173,9 @@ TXI = Table( txi_join_account = TXI.join(AccountAddress, TXI.columns.address == AccountAddress.columns.address) - pg_add_txi_constraints_and_indexes = [ "ALTER TABLE txi ADD PRIMARY KEY (txo_hash);", - "CREATE INDEX txi_tx_height ON txi (height);" + "CREATE INDEX txi_height ON txi (height);", ] @@ -214,14 +238,12 @@ Claim = Table( Column('trending_global', BigInteger, server_default='0'), ) - Tag = Table( 'tag', metadata, Column('claim_hash', LargeBinary), Column('tag', Text), ) - pg_add_claim_and_tag_constraints_and_indexes = [ "ALTER TABLE claim ADD PRIMARY KEY (claim_hash);", # for checking if claim is up-to-date @@ -259,7 +281,6 @@ Support = Table( Column('is_signature_valid', Boolean, nullable=True), ) - pg_add_support_constraints_and_indexes = [ "ALTER TABLE support ADD PRIMARY KEY (txo_hash);", # used to count()/sum() supports signed by channel diff --git a/lbry/testcase.py b/lbry/testcase.py index 95a58a367..8b17175f8 100644 --- a/lbry/testcase.py +++ b/lbry/testcase.py @@ -769,10 +769,23 @@ class EventGenerator: yield from self.blocks_main_start() for block_file in self.block_files: yield from self.blocks_file(*block_file) + if self.initial_sync: + yield from self.blocks_indexes() + else: + yield from self.blocks_vacuum() yield from self.blocks_main_finish() - yield from self.spends_steps() + yield from self.filters_init() + if self.block_files: + yield from self.filters_main_start() + yield from self.filters_generate() + if self.initial_sync: + yield from self.filters_indexes() + else: + yield from self.filters_vacuum() + yield from self.filters_main_finish() + if self.claims: if not self.initial_sync: yield from self.claims_init() @@ -845,10 +858,69 @@ class EventGenerator: "data": {"id": file, "done": step} } + def blocks_indexes(self): + yield from self.generate( + "blockchain.sync.blocks.indexes", ("steps",), 0, None, (2,), (1,) + ) + + def blocks_vacuum(self): + yield from self.generate( + "blockchain.sync.blocks.vacuum", ("steps",), 0, None, (1,), (1,) + ) + + def filters_init(self): + yield from self.generate("blockchain.sync.filters.init", ("steps",), 0, None, (2,), (1,)) + + def filters_main_start(self): + yield { + "event": "blockchain.sync.filters.main", + "data": { + "id": 0, "done": (0,), + "total": ((self.end_height-self.start_height)+1,), + "units": ("blocks",)} + } + + @staticmethod + def filters_main_finish(): + yield { + "event": "blockchain.sync.filters.main", + "data": {"id": 0, "done": (-1,)} + } + + def filters_generate(self): + #yield from self.generate( + # "blockchain.sync.filters.generate", ("blocks",), 0, + # f"generate filters 0-{blocks-1}", (blocks,), (100,) + #) + blocks = (self.end_height-self.start_height)+1 + yield { + "event": "blockchain.sync.filters.generate", + "data": { + "id": self.start_height, "done": (0,), + "total": (blocks,), + "units": ("blocks",), + "label": f"generate filters {self.start_height}-{self.end_height}", + } + } + yield { + "event": "blockchain.sync.filters.generate", + "data": {"id": self.start_height, "done": (blocks,)} + } + + def filters_indexes(self): + yield from self.generate( + "blockchain.sync.filters.indexes", ("steps",), 0, None, (6,), (1,) + ) + + def filters_vacuum(self): + yield from self.generate( + "blockchain.sync.filters.vacuum", ("steps",), 0, None, (2,), (1,) + ) + def spends_steps(self): yield from self.generate( "blockchain.sync.spends.main", ("steps",), 0, None, - (15 if self.initial_sync else 5,), + (17 if self.initial_sync else 5,), (1,) ) diff --git a/tests/integration/blockchain/test_blockchain.py b/tests/integration/blockchain/test_blockchain.py index f99836fd5..f3a512f47 100644 --- a/tests/integration/blockchain/test_blockchain.py +++ b/tests/integration/blockchain/test_blockchain.py @@ -372,7 +372,6 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): generate = not os.path.exists(self.TEST_DATA_CACHE_DIR) self.db = await self.make_db(self.chain) - self.chain.ledger.conf.spv_address_filters = False self.sync = BlockchainSync(self.chain, self.db) if not generate: diff --git a/tests/unit/blockchain/test_filters.py b/tests/unit/blockchain/test_filters.py new file mode 100644 index 000000000..156ab7fdb --- /dev/null +++ b/tests/unit/blockchain/test_filters.py @@ -0,0 +1,106 @@ +from unittest import TestCase +from lbry.blockchain.sync.filter_builder import ( + FilterBuilder as FB, GroupFilter as GF, split_range_into_10k_batches as split +) + + +class TestFilterGenerationComponents(TestCase): + + def test_split_range_into_10k_batches(self): + # single block (same start-end) + self.assertEqual(split(901_123, 901_123), [[901_123, 901_123]]) + # spans a 10k split + self.assertEqual(split(901_123, 911_123), [[901_123, 909_999], [910_000, 911_123]]) + # starts on last element before split + self.assertEqual(split(909_999, 911_123), [[909_999, 909_999], [910_000, 911_123]]) + # starts on first element after split + self.assertEqual(split(910_000, 911_123), [[910_000, 911_123]]) + # ends on last element before split + self.assertEqual(split(901_123, 909_999), [[901_123, 909_999]]) + # ends on first element after split + self.assertEqual(split(901_123, 910_000), [[901_123, 909_999], [910_000, 910_000]]) + # initial sync from 0 onwards + self.assertEqual(split(0, 37645), [ + [0, 9_999], + [10_000, 19_999], + [20_000, 29_999], + [30_000, 37645] + ]) + + def test_group_filter_coverage(self): + # single block (same start-end) + self.assertEqual(GF(1893, 1898, 2).coverage, []) + # spans a group split + self.assertEqual(GF(1893, 1905, 2).coverage, [1800]) + # starts on last element before split and + self.assertEqual(GF(1799, 1915, 2).coverage, [1700, 1800]) + # starts on first element after split + self.assertEqual(GF(1800, 1915, 2).coverage, [1800]) + # ends on last element before split + self.assertEqual(GF(1893, 1899, 2).coverage, [1800]) + # ends on first element after split + self.assertEqual(GF(1899, 1900, 2).coverage, [1800]) + self.assertEqual(GF(1599, 1899, 2).coverage, [1500, 1600, 1700, 1800]) + self.assertEqual(GF(1600, 1899, 2).coverage, [1600, 1700, 1800]) + + def test_group_filter_add_tx(self): + gf = GF(1898, 2002, 2) + gf.add(1798, ['a']) # outside range + gf.add(1800, ['b']) # first element in group 1800 + gf.add(1801, ['c']) + gf.add(1898, ['d']) + gf.add(1899, ['e']) # last element in group 1800 + gf.add(1900, ['f']) # first element in group 1900 + gf.add(1901, ['g']) + gf.add(2001, ['h']) # outside range + self.assertEqual(gf.groups, { + 1800: {'b', 'c', 'd', 'e'}, + 1900: {'f', 'g'} + }) + + def test_filter_builder_query_heights(self): + self.assertEqual(FB(893, 898).query_heights, (893, 898)) + self.assertEqual(FB(893, 899).query_heights, (800, 899)) + self.assertEqual(FB(913, 998).query_heights, (913, 998)) + self.assertEqual(FB(913, 999).query_heights, (0, 999)) + self.assertEqual(FB(1_913, 1_999).query_heights, (1_000, 1_999)) + self.assertEqual(FB(9_913, 9_998).query_heights, (9_913, 9_998)) + self.assertEqual(FB(9_913, 9_999).query_heights, (0, 9_999)) + self.assertEqual(FB(19_913, 19_999).query_heights, (10_000, 19_999)) + self.assertEqual(FB(819_913, 819_999).query_heights, (810_000, 819_999)) + + def test_filter_builder_add(self): + fb = FB(818_813, 819_999) + self.assertEqual(fb.query_heights, (810_000, 819_999)) + self.assertEqual(fb.group_filters[0].coverage, [810_000]) + self.assertEqual(fb.group_filters[1].coverage, [818_000, 819_000]) + self.assertEqual(fb.group_filters[2].coverage, [ + 818_800, 818_900, 819_000, 819_100, 819_200, 819_300, + 819_400, 819_500, 819_600, 819_700, 819_800, 819_900 + ]) + fb.add(b'beef0', 810_000, ['a']) + fb.add(b'beef1', 815_001, ['b']) + fb.add(b'beef2', 818_412, ['c']) + fb.add(b'beef3', 818_812, ['d']) + fb.add(b'beef4', 818_813, ['e']) + fb.add(b'beef5', 819_000, ['f']) + fb.add(b'beef6', 819_999, ['g']) + fb.add(b'beef7', 819_999, ['h']) + fb.add(b'beef8', 820_000, ['i']) + self.assertEqual(fb.group_filters[0].groups, { + 810_000: {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'} + }) + self.assertEqual(fb.group_filters[1].groups, { + 818_000: {'c', 'd', 'e'}, + 819_000: {'f', 'g', 'h'} + }) + self.assertEqual(fb.group_filters[2].groups[818_800], {'d', 'e'}) + self.assertEqual(fb.group_filters[2].groups[819_000], {'f'}) + self.assertEqual(fb.group_filters[2].groups[819_900], {'g', 'h'}) + self.assertEqual(fb.block_filters, {818813: {'e'}, 819000: {'f'}, 819999: {'g', 'h'}}) + self.assertEqual(fb.tx_filters, [ + (b'beef4', 818813, ['e']), + (b'beef5', 819000, ['f']), + (b'beef6', 819999, ['g']), + (b'beef7', 819999, ['h']) + ])