diff --git a/lbry/blockchain/sync/blocks.py b/lbry/blockchain/sync/blocks.py index 405b4ed62..823b31db2 100644 --- a/lbry/blockchain/sync/blocks.py +++ b/lbry/blockchain/sync/blocks.py @@ -27,7 +27,7 @@ def get_best_block_height_for_file(file_number): )['height'] -@event_emitter("blockchain.sync.block.file", "blocks", "txs", throttle=100) +@event_emitter("blockchain.sync.blocks.file", "blocks", "txs", throttle=100) def sync_block_file( file_number: int, start_height: int, txs: int, flush_size: int, p: ProgressContext ): @@ -59,8 +59,8 @@ def sync_block_file( return last_block_processed -@event_emitter("blockchain.sync.txoi.main", "steps") -def sync_txoi(initial_sync: bool, p: ProgressContext): +@event_emitter("blockchain.sync.spends.main", "steps") +def sync_spends(initial_sync: bool, p: ProgressContext): if initial_sync: p.start(9) # A. Update TXIs to have the address of TXO they are spending. diff --git a/lbry/blockchain/sync/claims.py b/lbry/blockchain/sync/claims.py index 006a23f5f..11c755a8a 100644 --- a/lbry/blockchain/sync/claims.py +++ b/lbry/blockchain/sync/claims.py @@ -1,7 +1,7 @@ import logging from typing import Tuple, Union -from sqlalchemy import case, func, desc +from sqlalchemy import case, func, desc, text from sqlalchemy.future import select from lbry.db.queries.txio import ( @@ -11,7 +11,7 @@ from lbry.db.queries.txio import ( where_abandoned_claims ) from lbry.db.query_context import ProgressContext, event_emitter -from lbry.db.tables import TX, TXO, Claim, Support +from lbry.db.tables import TX, TXO, Claim, Support, pg_add_claim_constraints_and_indexes from lbry.db.utils import least from lbry.db.constants import TXO_TYPES from lbry.blockchain.transaction import Output @@ -79,10 +79,9 @@ def select_claims_for_saving( missing_in_claims_table=missing_in_claims_table, missing_or_stale_in_claims_table=missing_or_stale_in_claims_table, ) - ) + ).select_from(TXO.join(TX)) if txo_types != TXO_TYPES['channel']: channel_txo = TXO.alias('channel_txo') - channel_claim = Claim.alias('channel_claim') return ( select_claims.add_columns( TXO.c.signature, TXO.c.signature_digest, @@ -93,15 +92,10 @@ def select_claims_for_saving( (channel_txo.c.claim_hash == TXO.c.channel_hash) & (channel_txo.c.height <= TXO.c.height) ).order_by(desc(channel_txo.c.height)).limit(1).scalar_subquery() - )]).label('channel_public_key'), - channel_claim.c.short_url.label('channel_url') - ).select_from( - TXO.join(TX).join( - channel_claim, channel_claim.c.claim_hash == TXO.c.channel_hash, isouter=True - ) + )]).label('channel_public_key') ) ) - return select_claims.select_from(TXO.join(TX)) + return select_claims def row_to_claim_for_saving(row) -> Tuple[Output, dict]: @@ -114,18 +108,17 @@ def row_to_claim_for_saving(row) -> Tuple[Output, dict]: extra.update({ 'signature': row.signature, 'signature_digest': row.signature_digest, - 'channel_public_key': row.channel_public_key, - 'channel_url': row.channel_url + 'channel_public_key': row.channel_public_key }) return txo, extra @event_emitter("blockchain.sync.claims.insert", "claims") def claims_insert( - txo_types: Union[int, Tuple[int, ...]], - blocks: Tuple[int, int], - missing_in_claims_table: bool, - p: ProgressContext + txo_types: Union[int, Tuple[int, ...]], + blocks: Tuple[int, int], + missing_in_claims_table: bool, + p: ProgressContext ): chain = get_or_initialize_lbrycrd(p.ctx) @@ -136,29 +129,26 @@ def claims_insert( ), progress_id=blocks[0], label=make_label("add claims at", blocks) ) + channel_url_cache = {} + with p.ctx.engine.connect().execution_options(stream_results=True) as c: loader = p.ctx.get_bulk_loader() cursor = c.execute(select_claims_for_saving( txo_types, blocks, missing_in_claims_table=missing_in_claims_table ).order_by(TXO.c.claim_hash)) for rows in cursor.partitions(900): - claim_metadata = iter(chain.db.sync_get_claim_metadata( + claim_metadata = chain.db.sync_get_claim_metadata( claim_hashes=[row['claim_hash'] for row in rows] - )) + ) + i, txos_w_extra, unknown_channel_urls, txos_wo_channel_url = 0, [], set(), [] for row in rows: - metadata = next(claim_metadata, None) - if metadata is None or metadata['claim_hash'] != row.claim_hash: - log.error( - r"During sync'ing a claim in our db couldn't find a " - r"match in lbrycrd's db. This could be because lbrycrd " - r"moved a block forward and updated its own claim table " - r"while we were still on a previous block, or it could be " - r"a more fundamental issue... ¯\_(ツ)_/¯" - ) - if metadata is None: - break - if metadata['claim_hash'] != row.claim_hash: - continue + metadata = claim_metadata[i] if i < len(claim_metadata) else None + if metadata is None: + break + elif metadata['claim_hash'] != row.claim_hash: + continue + else: + i += 1 txo, extra = row_to_claim_for_saving(row) extra.update({ 'short_url': metadata['short_url'], @@ -167,25 +157,72 @@ def claims_insert( 'expiration_height': metadata['expiration_height'], 'takeover_height': metadata['takeover_height'], }) + txos_w_extra.append((txo, extra)) + set_or_add_to_url_lookup( + channel_url_cache, txo, extra, unknown_channel_urls, txos_wo_channel_url + ) + perform_url_lookup(chain, channel_url_cache, unknown_channel_urls, txos_wo_channel_url) + for txo, extra in txos_w_extra: loader.add_claim(txo, **extra) if len(loader.claims) >= 25_000: p.add(loader.flush(Claim)) p.add(loader.flush(Claim)) +def set_or_add_to_url_lookup(cache: dict, txo: Output, extra: dict, to_lookup: set, to_set: list): + claim = txo.can_decode_claim + if claim and claim.is_signed: + if claim.signing_channel_hash not in cache: + to_lookup.add(claim.signing_channel_hash) + to_set.append((claim.signing_channel_hash, extra)) + else: + extra['channel_url'] = cache[claim.signing_channel_hash] + + +def perform_url_lookup(chain, cache, to_lookup: set, to_set: list): + if to_lookup: + channels = chain.db.sync_get_claim_metadata(claim_hashes=list(to_lookup)) + for channel in channels: + cache[channel['claim_hash']] = channel['short_url'] + for channel_hash, extra in to_set: + extra['channel_url'] = cache.get(channel_hash) + + +@event_emitter("blockchain.sync.claims.indexes", "steps") +def claims_constraints_and_indexes(p: ProgressContext): + p.start(2) + if p.ctx.is_postgres: + with p.ctx.engine.connect() as c: + c.execute(text("COMMIT;")) + c.execute(text("VACUUM ANALYZE claim;")) + p.step() + if p.ctx.is_postgres: + pg_add_claim_constraints_and_indexes(p.ctx.execute) + p.step() + + @event_emitter("blockchain.sync.claims.update", "claims") def claims_update(txo_types: Union[int, Tuple[int, ...]], blocks: Tuple[int, int], p: ProgressContext): p.start( count_unspent_txos(txo_types, blocks, missing_or_stale_in_claims_table=True), progress_id=blocks[0], label=make_label("update claims at", blocks) ) + chain = get_or_initialize_lbrycrd(p.ctx) with p.ctx.engine.connect().execution_options(stream_results=True) as c: loader = p.ctx.get_bulk_loader() cursor = c.execute(select_claims_for_saving( txo_types, blocks, missing_or_stale_in_claims_table=True )) + channel_url_cache = {} for row in cursor: txo, extra = row_to_claim_for_saving(row) + claim = txo.can_decode_claim + if claim and claim.is_signed: + if claim.signing_channel_hash not in channel_url_cache: + channels = chain.db.sync_get_claim_metadata(claim_hashes=[claim.signing_channel_hash]) + if channels: + channel_url_cache[channels[0]['claim_hash']] = channels[0]['short_url'] + extra['channel_url'] = channel_url_cache.get(claim.signing_channel_hash) loader.update_claim(txo, **extra) if len(loader.update_claims) >= 500: p.add(loader.flush(Claim)) diff --git a/lbry/blockchain/sync/supports.py b/lbry/blockchain/sync/supports.py index 737cca53b..1c580b710 100644 --- a/lbry/blockchain/sync/supports.py +++ b/lbry/blockchain/sync/supports.py @@ -1,10 +1,10 @@ import logging from typing import Tuple -from sqlalchemy import case, desc +from sqlalchemy import case, desc, text from sqlalchemy.future import select -from lbry.db.tables import TX, TXO, Support +from lbry.db.tables import TX, TXO, Support, pg_add_support_constraints_and_indexes from lbry.db.query_context import ProgressContext, event_emitter from lbry.db.queries import row_to_txo from lbry.db.constants import TXO_TYPES @@ -63,6 +63,19 @@ def supports_insert(blocks: Tuple[int, int], missing_in_supports_table: bool, p: p.add(loader.flush(Support)) +@event_emitter("blockchain.sync.supports.indexes", "steps") +def supports_constraints_and_indexes(p: ProgressContext): + p.start(2) + if p.ctx.is_postgres: + with p.ctx.engine.connect() as c: + c.execute(text("COMMIT;")) + c.execute(text("VACUUM ANALYZE support;")) + p.step() + if p.ctx.is_postgres: + pg_add_support_constraints_and_indexes(p.ctx.execute) + p.step() + + @event_emitter("blockchain.sync.supports.delete", "supports") def supports_delete(supports, p: ProgressContext): p.start(supports, label="delete supports") diff --git a/lbry/blockchain/sync/synchronizer.py b/lbry/blockchain/sync/synchronizer.py index ca4125257..5b0c5c45b 100644 --- a/lbry/blockchain/sync/synchronizer.py +++ b/lbry/blockchain/sync/synchronizer.py @@ -1,7 +1,6 @@ import os import asyncio import logging -from functools import partial from typing import Optional, Tuple, Set, List, Coroutine from lbry.db import Database @@ -17,16 +16,16 @@ from . import blocks as block_phase, claims as claim_phase, supports as support_ log = logging.getLogger(__name__) -BLOCK_INIT_EVENT = Event.add("blockchain.sync.block.init", "steps") -BLOCK_MAIN_EVENT = Event.add("blockchain.sync.block.main", "blocks", "txs") +BLOCKS_INIT_EVENT = Event.add("blockchain.sync.blocks.init", "steps") +BLOCKS_MAIN_EVENT = Event.add("blockchain.sync.blocks.main", "blocks", "txs") FILTER_INIT_EVENT = Event.add("blockchain.sync.filter.init", "steps") FILTER_MAIN_EVENT = Event.add("blockchain.sync.filter.main", "blocks") -CLAIM_INIT_EVENT = Event.add("blockchain.sync.claims.init", "steps") -CLAIM_MAIN_EVENT = Event.add("blockchain.sync.claims.main", "claims") -SUPPORT_INIT_EVENT = Event.add("blockchain.sync.supports.init", "steps") -SUPPORT_MAIN_EVENT = Event.add("blockchain.sync.supports.main", "supports") -TREND_INIT_EVENT = Event.add("blockchain.sync.trends.init", "steps") -TREND_MAIN_EVENT = Event.add("blockchain.sync.trends.main", "blocks") +CLAIMS_INIT_EVENT = Event.add("blockchain.sync.claims.init", "steps") +CLAIMS_MAIN_EVENT = Event.add("blockchain.sync.claims.main", "claims") +TRENDS_INIT_EVENT = Event.add("blockchain.sync.trends.init", "steps") +TRENDS_MAIN_EVENT = Event.add("blockchain.sync.trends.main", "blocks") +SUPPORTS_INIT_EVENT = Event.add("blockchain.sync.supports.init", "steps") +SUPPORTS_MAIN_EVENT = Event.add("blockchain.sync.supports.main", "supports") class BlockchainSync(Sync): @@ -87,7 +86,7 @@ class BlockchainSync(Sync): tasks = [] starting_height = None tx_count = block_count = 0 - with Progress(self.db.message_queue, BLOCK_INIT_EVENT) as p: + with Progress(self.db.message_queue, BLOCKS_INIT_EVENT) as p: ending_height = await self.chain.db.get_best_height() for chain_file in p.iter(await self.chain.db.get_block_files()): # block files may be read and saved out of order, need to check @@ -113,7 +112,7 @@ class BlockchainSync(Sync): block_phase.sync_block_file, chain_file['file_number'], our_best_file_height+1, chain_file['txs'], self.TX_FLUSH_SIZE )) - with Progress(self.db.message_queue, BLOCK_MAIN_EVENT) as p: + with Progress(self.db.message_queue, BLOCKS_MAIN_EVENT) as p: p.start(block_count, tx_count, extra={ "starting_height": starting_height, "ending_height": ending_height, @@ -137,9 +136,9 @@ class BlockchainSync(Sync): p.start(blocks) await self.run_tasks(tasks) - async def sync_txios(self, blocks_added): + async def sync_spends(self, blocks_added): if blocks_added: - await self.db.run(block_phase.sync_txoi, blocks_added[0] == 0) + await self.db.run(block_phase.sync_spends, blocks_added[0] == 0) async def count_unspent_txos( self, @@ -191,7 +190,7 @@ class BlockchainSync(Sync): async def sync_claims(self, blocks): total = delete_claims = takeovers = claims_with_changed_supports = 0 initial_sync = not await self.db.has_claims() - with Progress(self.db.message_queue, CLAIM_INIT_EVENT) as p: + with Progress(self.db.message_queue, CLAIMS_INIT_EVENT) as p: if initial_sync: p.start(2) # 1. distribute channel insertion load @@ -237,7 +236,7 @@ class BlockchainSync(Sync): p.step() else: return - with Progress(self.db.message_queue, CLAIM_MAIN_EVENT) as p: + with Progress(self.db.message_queue, CLAIMS_MAIN_EVENT) as p: p.start(total) insertions = [ (TXO_TYPES['channel'], channel_batches), @@ -261,13 +260,15 @@ class BlockchainSync(Sync): await self.db.run(claim_phase.update_takeovers, blocks, takeovers) if claims_with_changed_supports: await self.db.run(claim_phase.update_stakes, blocks, claims_with_changed_supports) + if initial_sync: + await self.db.run(claim_phase.claims_constraints_and_indexes) if channels_with_changed_content: return initial_sync, channels_with_changed_content async def sync_supports(self, blocks): delete_supports = 0 initial_sync = not await self.db.has_supports() - with Progress(self.db.message_queue, SUPPORT_INIT_EVENT) as p: + with Progress(self.db.message_queue, SUPPORTS_INIT_EVENT) as p: if initial_sync: total, support_batches = await self.distribute_unspent_txos(TXO_TYPES['support']) elif blocks: @@ -284,7 +285,7 @@ class BlockchainSync(Sync): p.step() else: return - with Progress(self.db.message_queue, SUPPORT_MAIN_EVENT) as p: + with Progress(self.db.message_queue, SUPPORTS_MAIN_EVENT) as p: p.start(total) if support_batches: await self.run_tasks([ @@ -294,6 +295,8 @@ class BlockchainSync(Sync): ]) if delete_supports: await self.db.run(support_phase.supports_delete, delete_supports) + if initial_sync: + await self.db.run(support_phase.supports_constraints_and_indexes) async def sync_channel_stats(self, blocks, initial_sync, channels_with_changed_content): if channels_with_changed_content: @@ -308,7 +311,7 @@ class BlockchainSync(Sync): blocks_added = await self.sync_blocks() sync_filters_task = asyncio.create_task(self.sync_filters()) sync_trends_task = asyncio.create_task(self.sync_trends()) - await self.sync_txios(blocks_added) + await self.sync_spends(blocks_added) channel_stats = await self.sync_claims(blocks_added) await self.sync_supports(blocks_added) if channel_stats: diff --git a/lbry/db/queries/txio.py b/lbry/db/queries/txio.py index 7cb23ba2d..747535544 100644 --- a/lbry/db/queries/txio.py +++ b/lbry/db/queries/txio.py @@ -129,7 +129,7 @@ def distribute_unspent_txos( func.count('*').label('items'), func.min(chunks.c.height).label('start_height'), func.max(chunks.c.height).label('end_height'), - ).group_by(chunks.c.chunk) + ).group_by(chunks.c.chunk).order_by(chunks.c.chunk) ) total = 0 buckets = [] diff --git a/lbry/db/query_context.py b/lbry/db/query_context.py index dfb7a2f7f..b658fa019 100644 --- a/lbry/db/query_context.py +++ b/lbry/db/query_context.py @@ -545,8 +545,11 @@ class BulkLoader: d['claim_type'] = TXO_TYPES['channel'] if claim.is_signed: d['channel_hash'] = claim.signing_channel_hash - d['is_signature_valid'] = Output.is_signature_valid( - signature, signature_digest, channel_public_key + d['is_signature_valid'] = ( + all((signature, signature_digest, channel_public_key)) & + Output.is_signature_valid( + signature, signature_digest, channel_public_key + ) ) tags = [] @@ -580,8 +583,11 @@ class BulkLoader: d['emoji'] = support.emoji if support.is_signed: d['channel_hash'] = support.signing_channel_hash - d['is_signature_valid'] = Output.is_signature_valid( - signature, signature_digest, channel_public_key + d['is_signature_valid'] = ( + all((signature, signature_digest, channel_public_key)) & + Output.is_signature_valid( + signature, signature_digest, channel_public_key + ) ) return d diff --git a/lbry/db/tables.py b/lbry/db/tables.py index 21545681b..9944ba760 100644 --- a/lbry/db/tables.py +++ b/lbry/db/tables.py @@ -185,6 +185,14 @@ Claim = Table( ) +def pg_add_claim_constraints_and_indexes(execute): + execute(text("ALTER TABLE claim ADD PRIMARY KEY (claim_hash);")) + execute(text(""" + CREATE INDEX signed_content ON claim (channel_hash) + INCLUDE (amount) WHERE is_signature_valid; + """)) + + Tag = Table( 'tag', metadata, Column('claim_hash', LargeBinary), @@ -211,3 +219,11 @@ Support = Table( Column('signature_digest', LargeBinary, nullable=True), Column('is_signature_valid', Boolean, nullable=True), ) + + +def pg_add_support_constraints_and_indexes(execute): + execute(text("ALTER TABLE support ADD PRIMARY KEY (txo_hash);")) + execute(text(""" + CREATE INDEX signed_support ON support (channel_hash) + INCLUDE (amount) WHERE is_signature_valid; + """)) diff --git a/tests/integration/blockchain/test_blockchain.py b/tests/integration/blockchain/test_blockchain.py index 2d4e5ce8d..5f4efee48 100644 --- a/tests/integration/blockchain/test_blockchain.py +++ b/tests/integration/blockchain/test_blockchain.py @@ -257,7 +257,8 @@ class SyncingBlockchainTestCase(BasicBlockchainTestCase): active = [] for txo in await self.db.search_claims( activation_height__lte=self.current_height, - expiration_height__gt=self.current_height): + expiration_height__gt=self.current_height, + order_by=['^height']): if controlling and controlling[0] == txo.claim.stream.title: continue active.append(( @@ -503,13 +504,13 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): await self.sync.advance() await asyncio.sleep(1) # give it time to collect events self.assertConsumingEvents( - events, "blockchain.sync.block.init", ("steps",), [ + events, "blockchain.sync.blocks.init", ("steps",), [ (0, None, (3,), (1,), (2,), (3,)) ] ) self.assertEqual( events.pop(0), { - "event": "blockchain.sync.block.main", + "event": "blockchain.sync.blocks.main", "data": { "id": 0, "done": (0, 0), "total": (353, 544), "units": ("blocks", "txs"), "starting_height": 0, "ending_height": 352, @@ -518,7 +519,7 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): } ) self.assertConsumingEvents( - events, "blockchain.sync.block.file", ("blocks", "txs"), [ + events, "blockchain.sync.blocks.file", ("blocks", "txs"), [ (0, "blk00000.dat", (191, 280), (100, 0), (191, 280)), (1, "blk00001.dat", (89, 178), (89, 178)), (2, "blk00002.dat", (73, 86), (73, 86)), @@ -526,12 +527,12 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): ) self.assertEqual( events.pop(0), { - "event": "blockchain.sync.block.main", + "event": "blockchain.sync.blocks.main", "data": {"id": 0, "done": (-1, -1)} } ) self.assertConsumingEvents( - events, "blockchain.sync.txoi.main", ("steps",), [ + events, "blockchain.sync.spends.main", ("steps",), [ (0, None, (9,), (1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,), (9,)) ] ) @@ -560,6 +561,11 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): (273, "add claims at 273-291", (361,), (361,)), ] ) + self.assertConsumingEvents( + events, "blockchain.sync.claims.indexes", ("steps",), [ + (0, None, (2,), (1,), (2,)) + ] + ) self.assertEqual( events.pop(0), { "event": "blockchain.sync.claims.main", @@ -577,6 +583,11 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): (352, "add supports at 352", (2,), (2,)), ] ) + self.assertConsumingEvents( + events, "blockchain.sync.supports.indexes", ("steps",), [ + (0, None, (2,), (1,), (2,)) + ] + ) self.assertEqual( events.pop(0), { "event": "blockchain.sync.supports.main", @@ -589,7 +600,7 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): await self.sync.advance() # should be no-op await asyncio.sleep(1) # give it time to collect events self.assertConsumingEvents( - events, "blockchain.sync.block.init", ("steps",), [ + events, "blockchain.sync.blocks.init", ("steps",), [ (0, None, (3,), (1,), (2,), (3,)) ] ) @@ -605,13 +616,13 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): await self.sync.advance() await asyncio.sleep(1) # give it time to collect events self.assertConsumingEvents( - events, "blockchain.sync.block.init", ("steps",), [ + events, "blockchain.sync.blocks.init", ("steps",), [ (0, None, (3,), (1,), (2,), (3,)) ] ) self.assertEqual( events.pop(0), { - "event": "blockchain.sync.block.main", + "event": "blockchain.sync.blocks.main", "data": { "id": 0, "done": (0, 0), "total": (2, 4), "units": ("blocks", "txs"), "starting_height": 353, "ending_height": 354, @@ -620,18 +631,18 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): } ) self.assertConsumingEvents( - events, "blockchain.sync.block.file", ("blocks", "txs"), [ + events, "blockchain.sync.blocks.file", ("blocks", "txs"), [ (2, "blk00002.dat", (2, 4), (2, 4)), ] ) self.assertEqual( events.pop(0), { - "event": "blockchain.sync.block.main", + "event": "blockchain.sync.blocks.main", "data": {"id": 0, "done": (-1, -1)} } ) self.assertConsumingEvents( - events, "blockchain.sync.txoi.main", ("steps",), [ + events, "blockchain.sync.spends.main", ("steps",), [ (0, None, (2,), (1,), (2,)) ] )