From 5b9e40c0c09c9ba0b257861e74ad3410f56bf8b8 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 17 Sep 2020 11:25:50 -0400 Subject: [PATCH] sync block filters --- lbry/blockchain/block.py | 7 ++- lbry/blockchain/ledger.py | 2 +- lbry/blockchain/sync/blocks.py | 80 +++++++++++++++++----------- lbry/blockchain/sync/synchronizer.py | 30 ++++++----- lbry/db/tables.py | 20 ++++++- 5 files changed, 87 insertions(+), 52 deletions(-) diff --git a/lbry/blockchain/block.py b/lbry/blockchain/block.py index 07fd4e7e2..b7acc7e72 100644 --- a/lbry/blockchain/block.py +++ b/lbry/blockchain/block.py @@ -1,6 +1,5 @@ import struct -from typing import Set -from typing import NamedTuple, List +from typing import NamedTuple, List, Tuple from chiabip158 import PyBIP158 # pylint: disable=no-name-in-module @@ -12,8 +11,8 @@ from lbry.blockchain.bcd_data_stream import BCDataStream ZERO_BLOCK = bytes((0,)*32) -def create_block_filter(address_hashes: Set[bytes]) -> bytes: - return bytes(PyBIP158([bytearray(a) for a in address_hashes]).GetEncoded()) +def create_block_filter(address_hashes: List[bytearray]) -> bytes: + return bytes(PyBIP158(address_hashes).GetEncoded()) def get_block_filter(block_filter: bytes) -> PyBIP158: diff --git a/lbry/blockchain/ledger.py b/lbry/blockchain/ledger.py index dddb282b8..d0ec5d582 100644 --- a/lbry/blockchain/ledger.py +++ b/lbry/blockchain/ledger.py @@ -51,7 +51,7 @@ class Ledger: return Base58.encode(bytearray(raw_address + double_sha256(raw_address)[0:4])) @staticmethod - def address_to_hash160(address): + def address_to_hash160(address) -> bytes: return Base58.decode(address)[1:21] @classmethod diff --git a/lbry/blockchain/sync/blocks.py b/lbry/blockchain/sync/blocks.py index 9c4ddc91c..b7bf24e51 100644 --- a/lbry/blockchain/sync/blocks.py +++ b/lbry/blockchain/sync/blocks.py @@ -4,11 +4,14 @@ from sqlalchemy import table, bindparam, text, func, union from sqlalchemy.future import select from sqlalchemy.schema import CreateTable -from lbry.db.tables import Block as BlockTable, TX, TXO, TXI, Claim, Tag, Support +from lbry.db.tables import Block as BlockTable, TX, TXO, TXI, Claim, Tag, Support, BlockFilter from lbry.db.tables import ( + join_block_filter, + pg_add_block_constraints_and_indexes, + pg_add_block_filter_constraints_and_indexes, pg_add_tx_constraints_and_indexes, pg_add_txo_constraints_and_indexes, - pg_add_txi_constraints_and_indexes, + pg_add_txi_constraints_and_indexes ) from lbry.db.query_context import ProgressContext, event_emitter, context from lbry.db.sync import set_input_addresses, update_spent_outputs @@ -65,10 +68,27 @@ def sync_spends(initial_sync: bool, p: ProgressContext): if initial_sync: p.start( 7 + + len(pg_add_block_constraints_and_indexes) + + len(pg_add_block_filter_constraints_and_indexes) + len(pg_add_tx_constraints_and_indexes) + len(pg_add_txi_constraints_and_indexes) + len(pg_add_txo_constraints_and_indexes) ) + if p.ctx.is_postgres: + p.ctx.execute_notx(text("VACUUM ANALYZE block;")) + p.step() + for constraint in pg_add_block_constraints_and_indexes: + if p.ctx.is_postgres: + p.ctx.execute(text(constraint)) + p.step() + if p.ctx.is_postgres: + p.ctx.execute_notx(text("VACUUM ANALYZE block_filter;")) + p.step() + for constraint in pg_add_block_filter_constraints_and_indexes: + if p.ctx.is_postgres: + p.ctx.execute(text(constraint)) + p.step() + # 1. tx table stuff if p.ctx.is_postgres: p.ctx.execute_notx(text("VACUUM ANALYZE tx;")) @@ -79,6 +99,8 @@ def sync_spends(initial_sync: bool, p: ProgressContext): p.step() # A. Update TXIs to have the address of TXO they are spending. # 2. txi table reshuffling + + p.ctx.execute(text("ALTER TABLE txi RENAME TO old_txi;")) p.ctx.execute(CreateTable(TXI, include_foreign_key_constraints=[])) if p.ctx.is_postgres: @@ -150,44 +172,38 @@ def sync_spends(initial_sync: bool, p: ProgressContext): @event_emitter("blockchain.sync.filter.generate", "blocks") -def sync_block_filters(p: ProgressContext): - blocks = [] - all_filters = [] - all_addresses = [] - for block in get_blocks_without_filters(): - addresses = { - p.ctx.ledger.address_to_hash160(r["address"]) - for r in get_block_tx_addresses(block_hash=block["block_hash"]) - } - all_addresses.extend(addresses) - block_filter = create_block_filter(addresses) - all_filters.append(block_filter) - blocks.append({"pk": block["block_hash"], "block_filter": block_filter}) - p.ctx.execute( - BlockTable.update().where(BlockTable.c.block_hash == bindparam("pk")), blocks +def sync_block_filters(blocks, p: ProgressContext): + p.start( + len(blocks), progress_id=blocks[0][1], label=f"generate filters {blocks[0][1]}-{blocks[-1][1]}" ) + for block_hash, block_height in blocks: + block_filter = create_block_filter([ + bytearray(p.ctx.ledger.address_to_hash160(r["address"])) + for r in get_block_tx_addresses(block_height) + ]) + p.ctx.execute( + BlockFilter.insert().values(block_hash=block_hash, block_filter=block_filter) + ) + p.step() def get_blocks_without_filters(): - return context().fetchall( - select(BlockTable.c.block_hash) - .where(BlockTable.c.block_filter.is_(None)) - ) + result = [] + for block in context().fetchall( + select(BlockTable.c.block_hash, BlockTable.c.height).select_from( + join_block_filter + ).where(BlockFilter.c.block_filter.is_(None))): + result.append((block['block_hash'], block['height'])) + return result -def get_block_tx_addresses(block_hash=None, tx_hash=None): - if block_hash is not None: - constraint = (TX.c.block_hash == block_hash) - elif tx_hash is not None: - constraint = (TX.c.tx_hash == tx_hash) - else: - raise ValueError('block_hash or tx_hash must be provided.') +def get_block_tx_addresses(block_height): return context().fetchall( union( - select(TXO.c.address).select_from(TXO.join(TX)) - .where((TXO.c.address.isnot_(None)) & constraint), - select(TXI.c.address).select_from(TXI.join(TX)) - .where((TXI.c.address.isnot_(None)) & constraint), + select(TXO.c.address).select_from(TXO) + .where((TXO.c.address.isnot(None)) & (TXO.c.height == block_height)), + select(TXI.c.address).select_from(TXI.join(TXO)) + .where((TXI.c.address.isnot(None)) & (TXI.c.height == block_height)), ) ) diff --git a/lbry/blockchain/sync/synchronizer.py b/lbry/blockchain/sync/synchronizer.py index cea8bf751..5940266d2 100644 --- a/lbry/blockchain/sync/synchronizer.py +++ b/lbry/blockchain/sync/synchronizer.py @@ -18,7 +18,6 @@ log = logging.getLogger(__name__) BLOCKS_INIT_EVENT = Event.add("blockchain.sync.blocks.init", "steps") BLOCKS_MAIN_EVENT = Event.add("blockchain.sync.blocks.main", "blocks", "txs") -FILTER_INIT_EVENT = Event.add("blockchain.sync.filter.init", "steps") FILTER_MAIN_EVENT = Event.add("blockchain.sync.filter.main", "blocks") CLAIMS_INIT_EVENT = Event.add("blockchain.sync.claims.init", "steps") CLAIMS_MAIN_EVENT = Event.add("blockchain.sync.claims.main", "claims") @@ -142,15 +141,20 @@ class BlockchainSync(Sync): return starting_height, best_height_processed async def sync_filters(self): - if not self.conf.spv_address_filters: - return - with Progress(self.db.message_queue, FILTER_MAIN_EVENT) as p: - blocks = 0 - tasks = [] - # for chunk in range(select min(height), max(height) from block where filter is null): - # tasks.append(self.db.run(block_phase.sync_filters, chunk, self.FILTER_FLUSH_SIZE)) - p.start(blocks) - await self.run_tasks(tasks) + # if not self.conf.spv_address_filters: + # return + blocks = await self.db.run(block_phase.get_blocks_without_filters) + batch_size = (len(blocks) // self.db.workers) + 1 + batches = [ + blocks[index:index + batch_size] for index in range(0, len(blocks), batch_size) + ] + if batches: + with Progress(self.db.message_queue, FILTER_MAIN_EVENT) as p: + p.start(len(blocks)) + await self.run_tasks([ + self.db.run(block_phase.sync_block_filters, batch) + for batch in batches + ]) async def sync_spends(self, blocks_added): if blocks_added: @@ -298,14 +302,12 @@ class BlockchainSync(Sync): async def advance(self): blocks_added = await self.sync_blocks() - sync_filters_task = asyncio.create_task(self.sync_filters()) - sync_trends_task = asyncio.create_task(self.sync_trends()) await self.sync_spends(blocks_added) + await self.sync_filters() initial_claim_sync = await self.sync_claims(blocks_added) await self.sync_supports(blocks_added) await self.sync_channel_stats(blocks_added, initial_claim_sync) - await sync_trends_task - await sync_filters_task + await self.sync_trends() if blocks_added: await self._on_block_controller.add(BlockEvent(blocks_added[-1])) diff --git a/lbry/db/tables.py b/lbry/db/tables.py index 2ae6733fd..042d1af5c 100644 --- a/lbry/db/tables.py +++ b/lbry/db/tables.py @@ -45,9 +45,25 @@ Block = Table( Column('file_number', SmallInteger), Column('height', Integer), Column('timestamp', Integer), - Column('block_filter', LargeBinary, nullable=True) ) +pg_add_block_constraints_and_indexes = [ + "ALTER TABLE block ADD PRIMARY KEY (block_hash);" +] + +BlockFilter = Table( + 'block_filter', metadata, + Column('block_hash', LargeBinary, primary_key=True), + Column('block_filter', LargeBinary, nullable=True), +) +join_block_filter = Block.join(BlockFilter, BlockFilter.columns.block_hash == Block.columns.block_hash, full=True) +pg_add_block_filter_constraints_and_indexes = [ + "ALTER TABLE block_filter ADD PRIMARY KEY (block_hash);", + "ALTER TABLE block_filter ADD CONSTRAINT fk_block_filter " + " FOREIGN KEY(block_hash) " + " REFERENCES block(block_hash) " + " ON DELETE CASCADE;" +] TX = Table( 'tx', metadata, @@ -117,6 +133,7 @@ pg_add_txo_constraints_and_indexes = [ f"INCLUDE (claim_hash) WHERE txo_type={TXO_TYPES['support']};", f"CREATE INDEX txo_spent_supports_by_height ON txo (spent_height DESC) " f"INCLUDE (claim_hash) WHERE txo_type={TXO_TYPES['support']};", + "CREATE INDEX txo_tx_height ON txo (height);" ] @@ -134,6 +151,7 @@ txi_join_account = TXI.join(AccountAddress, TXI.columns.address == AccountAddres pg_add_txi_constraints_and_indexes = [ "ALTER TABLE txi ADD PRIMARY KEY (txo_hash);", + "CREATE INDEX txi_tx_height ON txi (height);" ]