From 8e683c9cd03efe804cc1a3befbc2646a9b4194d4 Mon Sep 17 00:00:00 2001 From: Lex Berezhny Date: Mon, 13 Jul 2020 21:00:24 -0400 Subject: [PATCH] indexing fixes --- lbry/blockchain/sync/blocks.py | 53 +++++----- lbry/blockchain/sync/claims.py | 11 ++- lbry/blockchain/sync/supports.py | 9 +- lbry/db/tables.py | 97 +++++++++---------- .../integration/blockchain/test_blockchain.py | 9 +- 5 files changed, 90 insertions(+), 89 deletions(-) diff --git a/lbry/blockchain/sync/blocks.py b/lbry/blockchain/sync/blocks.py index 5760d6bd2..41c986cb8 100644 --- a/lbry/blockchain/sync/blocks.py +++ b/lbry/blockchain/sync/blocks.py @@ -6,6 +6,7 @@ from sqlalchemy.schema import CreateTable from lbry.db.tables import Block as BlockTable, TX, TXO, TXI from lbry.db.tables import ( + pg_add_tx_constraints_and_indexes, pg_add_txo_constraints_and_indexes, pg_add_txi_constraints_and_indexes, ) @@ -62,19 +63,24 @@ def sync_block_file( @event_emitter("blockchain.sync.spends.main", "steps") def sync_spends(initial_sync: bool, p: ProgressContext): if initial_sync: - p.start(9) + p.start( + 6 + + len(pg_add_tx_constraints_and_indexes) + + len(pg_add_txi_constraints_and_indexes) + + len(pg_add_txo_constraints_and_indexes) + ) + for constraint in pg_add_tx_constraints_and_indexes: + if p.ctx.is_postgres: + p.ctx.execute(text(constraint)) + p.step() # A. Update TXIs to have the address of TXO they are spending. - # 1. add tx constraints - if p.ctx.is_postgres: - p.ctx.execute(text("ALTER TABLE tx ADD PRIMARY KEY (tx_hash);")) - p.step() - # 2. txi table reshuffling + # 1. txi table reshuffling p.ctx.execute(text("ALTER TABLE txi RENAME TO old_txi;")) p.ctx.execute(CreateTable(TXI, include_foreign_key_constraints=[])) if p.ctx.is_postgres: p.ctx.execute(text("ALTER TABLE txi DROP CONSTRAINT txi_pkey;")) p.step() - # 3. insert + # 2. insert old_txi = table("old_txi", *(c.copy() for c in TXI.columns)) # pylint: disable=not-an-iterable columns = [c for c in old_txi.columns if c.name != "address"] + [TXO.c.address] join_txi_on_txo = old_txi.join(TXO, old_txi.c.txo_hash == TXO.c.txo_hash) @@ -82,48 +88,45 @@ def sync_spends(initial_sync: bool, p: ProgressContext): insert_txis = TXI.insert().from_select(columns, select_txis) p.ctx.execute(insert_txis) p.step() - # 4. drop old txi and vacuum + # 3. drop old txi and vacuum p.ctx.execute(text("DROP TABLE old_txi;")) if p.ctx.is_postgres: with p.ctx.engine.connect() as c: c.execute(text("COMMIT;")) c.execute(text("VACUUM ANALYZE txi;")) p.step() - # 5. restore integrity constraint - if p.ctx.is_postgres: - pg_add_txi_constraints_and_indexes(p.ctx.execute) - p.step() - # 6. txo table reshuffling + for constraint in pg_add_txi_constraints_and_indexes: + if p.ctx.is_postgres: + p.ctx.execute(text(constraint)) + p.step() + # B. Update TXOs to have the height at which they were spent (if they were). + # 4. txo table reshuffling p.ctx.execute(text("ALTER TABLE txo RENAME TO old_txo;")) p.ctx.execute(CreateTable(TXO, include_foreign_key_constraints=[])) if p.ctx.is_postgres: p.ctx.execute(text("ALTER TABLE txo DROP CONSTRAINT txo_pkey;")) p.step() - # 7. insert + # 5. insert old_txo = table("old_txo", *(c.copy() for c in TXO.columns)) # pylint: disable=not-an-iterable columns = [c for c in old_txo.columns if c.name != "spent_height"] insert_columns = columns + [TXO.c.spent_height] - select_columns = columns + [ - func.coalesce(TXI.c.height, 0).label("spent_height") - ] - join_txo_on_txi = old_txo.join( - TXI, old_txo.c.txo_hash == TXI.c.txo_hash, isouter=True - ) + select_columns = columns + [func.coalesce(TXI.c.height, 0).label("spent_height")] + join_txo_on_txi = old_txo.join(TXI, old_txo.c.txo_hash == TXI.c.txo_hash, isouter=True) select_txos = select(*select_columns).select_from(join_txo_on_txi) insert_txos = TXO.insert().from_select(insert_columns, select_txos) p.ctx.execute(insert_txos) p.step() - # 8. drop old txo + # 6. drop old txo p.ctx.execute(text("DROP TABLE old_txo;")) if p.ctx.is_postgres: with p.ctx.engine.connect() as c: c.execute(text("COMMIT;")) c.execute(text("VACUUM ANALYZE txo;")) p.step() - # 9. restore integrity constraint - if p.ctx.is_postgres: - pg_add_txo_constraints_and_indexes(p.ctx.execute) - p.step() + for constraint in pg_add_txo_constraints_and_indexes: + if p.ctx.is_postgres: + p.ctx.execute(text(constraint)) + p.step() else: p.start(2) # 1. Update spent TXOs setting spent_height diff --git a/lbry/blockchain/sync/claims.py b/lbry/blockchain/sync/claims.py index 9cc3a2ab1..2086020f1 100644 --- a/lbry/blockchain/sync/claims.py +++ b/lbry/blockchain/sync/claims.py @@ -11,7 +11,7 @@ from lbry.db.queries.txio import ( where_abandoned_claims, count_channels_with_changed_content ) from lbry.db.query_context import ProgressContext, event_emitter -from lbry.db.tables import TX, TXO, Claim, Support, pg_add_claim_constraints_and_indexes +from lbry.db.tables import TX, TXO, Claim, Support, pg_add_claim_and_tag_constraints_and_indexes from lbry.db.utils import least from lbry.db.constants import TXO_TYPES, CLAIM_TYPE_CODES from lbry.blockchain.transaction import Output @@ -149,15 +149,16 @@ def claims_insert( @event_emitter("blockchain.sync.claims.indexes", "steps") def claims_constraints_and_indexes(p: ProgressContext): - p.start(2) + p.start(1 + len(pg_add_claim_and_tag_constraints_and_indexes)) if p.ctx.is_postgres: with p.ctx.engine.connect() as c: c.execute(text("COMMIT;")) c.execute(text("VACUUM ANALYZE claim;")) p.step() - if p.ctx.is_postgres: - pg_add_claim_constraints_and_indexes(p.ctx.execute) - p.step() + for constraint in pg_add_claim_and_tag_constraints_and_indexes: + if p.ctx.is_postgres: + p.ctx.execute(constraint) + p.step() @event_emitter("blockchain.sync.claims.update", "claims") diff --git a/lbry/blockchain/sync/supports.py b/lbry/blockchain/sync/supports.py index f2a951214..6db46a262 100644 --- a/lbry/blockchain/sync/supports.py +++ b/lbry/blockchain/sync/supports.py @@ -65,15 +65,16 @@ def supports_insert(blocks: Tuple[int, int], missing_in_supports_table: bool, p: @event_emitter("blockchain.sync.supports.indexes", "steps") def supports_constraints_and_indexes(p: ProgressContext): - p.start(2) + p.start(1 + len(pg_add_support_constraints_and_indexes)) if p.ctx.is_postgres: with p.ctx.engine.connect() as c: c.execute(text("COMMIT;")) c.execute(text("VACUUM ANALYZE support;")) p.step() - if p.ctx.is_postgres: - pg_add_support_constraints_and_indexes(p.ctx.execute) - p.step() + for constraint in pg_add_support_constraints_and_indexes: + if p.ctx.is_postgres: + p.ctx.execute(constraint) + p.step() @event_emitter("blockchain.sync.supports.delete", "supports") diff --git a/lbry/db/tables.py b/lbry/db/tables.py index 2421a3fa7..ee931f211 100644 --- a/lbry/db/tables.py +++ b/lbry/db/tables.py @@ -1,9 +1,8 @@ # pylint: skip-file from sqlalchemy import ( - MetaData, Table, Column, ForeignKey, PrimaryKeyConstraint, + MetaData, Table, Column, ForeignKey, LargeBinary, Text, SmallInteger, Integer, BigInteger, Boolean, - text ) from .constants import TXO_TYPES, CLAIM_TYPE_CODES @@ -65,6 +64,11 @@ TX = Table( ) +pg_add_tx_constraints_and_indexes = [ + "ALTER TABLE tx ADD PRIMARY KEY (tx_hash);", +] + + TXO = Table( 'txo', metadata, Column('tx_hash', LargeBinary, ForeignKey(TX.columns.tx_hash)), @@ -95,36 +99,25 @@ TXO = Table( txo_join_account = TXO.join(AccountAddress, TXO.columns.address == AccountAddress.columns.address) -def pg_add_txo_constraints_and_indexes(execute): - execute(text("ALTER TABLE txo ADD PRIMARY KEY (txo_hash);")) +pg_add_txo_constraints_and_indexes = [ + "ALTER TABLE txo ADD PRIMARY KEY (txo_hash);", # find appropriate channel public key for signing a content claim - execute(text(f""" - CREATE INDEX txo_channel_hash_w_height_desc_and_pub_key - ON txo (claim_hash, height desc) INCLUDE (public_key) - WHERE txo_type={TXO_TYPES['channel']}; - """)) - # update supports for a claim - execute(text(f""" - CREATE INDEX txo_unspent_supports - ON txo (claim_hash) INCLUDE (amount) - WHERE spent_height = 0 AND txo_type={TXO_TYPES['support']}; - """)) - # claim changes by height - execute(text(f""" - CREATE INDEX txo_claim_changes - ON txo (height DESC) INCLUDE (txo_hash) - WHERE spent_height = 0 AND txo_type IN {tuple(CLAIM_TYPE_CODES)}; - """)) - # supports added - execute(text(f""" - CREATE INDEX txo_added_supports_by_height ON txo (height DESC) - INCLUDE (claim_hash) WHERE txo_type={TXO_TYPES['support']}; - """)) - # supports spent - execute(text(f""" - CREATE INDEX txo_spent_supports_by_height ON txo (spent_height DESC) - INCLUDE (claim_hash) WHERE txo_type={TXO_TYPES['support']}; - """)) + f"CREATE INDEX txo_channel_hash_by_height_desc_w_pub_key " + f"ON txo (claim_hash, height desc) INCLUDE (public_key) " + f"WHERE txo_type={TXO_TYPES['channel']};", + # for calculating supports on a claim + f"CREATE INDEX txo_unspent_supports ON txo (claim_hash) INCLUDE (amount) " + f"WHERE spent_height = 0 AND txo_type={TXO_TYPES['support']};", + # for finding modified claims in a block range + f"CREATE INDEX txo_claim_changes " + f"ON txo (height DESC) INCLUDE (txo_hash) " + f"WHERE spent_height = 0 AND txo_type IN {tuple(CLAIM_TYPE_CODES)};", + # for finding claims which need support totals re-calculated in a block range + f"CREATE INDEX txo_added_supports_by_height ON txo (height DESC) " + f"INCLUDE (claim_hash) WHERE txo_type={TXO_TYPES['support']};", + f"CREATE INDEX txo_spent_supports_by_height ON txo (spent_height DESC) " + f"INCLUDE (claim_hash) WHERE txo_type={TXO_TYPES['support']};", +] TXI = Table( @@ -139,8 +132,9 @@ TXI = Table( txi_join_account = TXI.join(AccountAddress, TXI.columns.address == AccountAddress.columns.address) -def pg_add_txi_constraints_and_indexes(execute): - execute(text("ALTER TABLE txi ADD PRIMARY KEY (txo_hash);")) +pg_add_txi_constraints_and_indexes = [ + "ALTER TABLE txi ADD PRIMARY KEY (txo_hash);", +] Claim = Table( @@ -210,18 +204,19 @@ Tag = Table( ) -def pg_add_claim_constraints_and_indexes(execute): - execute(text("ALTER TABLE claim ADD PRIMARY KEY (claim_hash);")) - execute(text("ALTER TABLE tag ADD PRIMARY KEY (claim_hash, tag);")) - # take over updates are base on normalized name - execute(text("CREATE INDEX claim_normalized ON claim (normalized);")) - # finding claims that aren't updated with latest TXO - execute(text("CREATE UNIQUE INDEX claim_txo_hash ON claim (txo_hash);")) - # used to calculate content in a channel - execute(text(""" - CREATE INDEX signed_content ON claim (channel_hash) - INCLUDE (amount) WHERE is_signature_valid; - """)) +pg_add_claim_and_tag_constraints_and_indexes = [ + "ALTER TABLE claim ADD PRIMARY KEY (claim_hash);", + # for checking if claim is up-to-date + "CREATE UNIQUE INDEX claim_txo_hash ON claim (txo_hash);", + # used by takeover process to reset winning claims + "CREATE INDEX claim_normalized ON claim (normalized);", + # used to count()/sum() claims signed by channel + "CREATE INDEX signed_content ON claim (channel_hash) " + "INCLUDE (amount) WHERE is_signature_valid;", + # basic tag indexes + "ALTER TABLE tag ADD PRIMARY KEY (claim_hash, tag);", + "CREATE INDEX tags ON tag (tag) INCLUDE (claim_hash);", +] Support = Table( @@ -245,9 +240,9 @@ Support = Table( ) -def pg_add_support_constraints_and_indexes(execute): - execute(text("ALTER TABLE support ADD PRIMARY KEY (txo_hash);")) - execute(text(""" - CREATE INDEX signed_support ON support (channel_hash) - INCLUDE (amount) WHERE is_signature_valid; - """)) +pg_add_support_constraints_and_indexes = [ + "ALTER TABLE support ADD PRIMARY KEY (txo_hash);", + # used to count()/sum() supports signed by channel + "CREATE INDEX signed_support ON support (channel_hash) " + "INCLUDE (amount) WHERE is_signature_valid;", +] diff --git a/tests/integration/blockchain/test_blockchain.py b/tests/integration/blockchain/test_blockchain.py index f8cb658bd..52abfcde9 100644 --- a/tests/integration/blockchain/test_blockchain.py +++ b/tests/integration/blockchain/test_blockchain.py @@ -500,7 +500,7 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): events = [] self.sync.on_progress.listen(events.append) - # initial_sync = True + # initial sync await self.sync.advance() await asyncio.sleep(1) # give it time to collect events self.assertConsumingEvents( @@ -533,7 +533,8 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): ) self.assertConsumingEvents( events, "blockchain.sync.spends.main", ("steps",), [ - (0, None, (9,), (1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,), (9,)) + (0, None, (14,), (1,), (2,), (3,), (4,), (5,), (6,), + (7,), (8,), (9,), (10,), (11,), (12,), (13,), (14,)) ] ) self.assertEqual( @@ -558,7 +559,7 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): ) self.assertConsumingEvents( events, "blockchain.sync.claims.indexes", ("steps",), [ - (0, None, (2,), (1,), (2,)) + (0, None, (7,), (1,), (2,), (3,), (4,), (5,), (6,), (7,)) ] ) self.assertEqual( @@ -580,7 +581,7 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): ) self.assertConsumingEvents( events, "blockchain.sync.supports.indexes", ("steps",), [ - (0, None, (2,), (1,), (2,)) + (0, None, (3,), (1,), (2,), (3,)) ] ) self.assertEqual(