indexing fixes

This commit is contained in:
Lex Berezhny 2020-07-13 21:00:24 -04:00
parent 69c45d43d3
commit 8e683c9cd0
5 changed files with 90 additions and 89 deletions

View file

@ -6,6 +6,7 @@ from sqlalchemy.schema import CreateTable
from lbry.db.tables import Block as BlockTable, TX, TXO, TXI from lbry.db.tables import Block as BlockTable, TX, TXO, TXI
from lbry.db.tables import ( from lbry.db.tables import (
pg_add_tx_constraints_and_indexes,
pg_add_txo_constraints_and_indexes, pg_add_txo_constraints_and_indexes,
pg_add_txi_constraints_and_indexes, pg_add_txi_constraints_and_indexes,
) )
@ -62,19 +63,24 @@ def sync_block_file(
@event_emitter("blockchain.sync.spends.main", "steps") @event_emitter("blockchain.sync.spends.main", "steps")
def sync_spends(initial_sync: bool, p: ProgressContext): def sync_spends(initial_sync: bool, p: ProgressContext):
if initial_sync: if initial_sync:
p.start(9) p.start(
6 +
len(pg_add_tx_constraints_and_indexes) +
len(pg_add_txi_constraints_and_indexes) +
len(pg_add_txo_constraints_and_indexes)
)
for constraint in pg_add_tx_constraints_and_indexes:
if p.ctx.is_postgres:
p.ctx.execute(text(constraint))
p.step()
# A. Update TXIs to have the address of TXO they are spending. # A. Update TXIs to have the address of TXO they are spending.
# 1. add tx constraints # 1. txi table reshuffling
if p.ctx.is_postgres:
p.ctx.execute(text("ALTER TABLE tx ADD PRIMARY KEY (tx_hash);"))
p.step()
# 2. txi table reshuffling
p.ctx.execute(text("ALTER TABLE txi RENAME TO old_txi;")) p.ctx.execute(text("ALTER TABLE txi RENAME TO old_txi;"))
p.ctx.execute(CreateTable(TXI, include_foreign_key_constraints=[])) p.ctx.execute(CreateTable(TXI, include_foreign_key_constraints=[]))
if p.ctx.is_postgres: if p.ctx.is_postgres:
p.ctx.execute(text("ALTER TABLE txi DROP CONSTRAINT txi_pkey;")) p.ctx.execute(text("ALTER TABLE txi DROP CONSTRAINT txi_pkey;"))
p.step() p.step()
# 3. insert # 2. insert
old_txi = table("old_txi", *(c.copy() for c in TXI.columns)) # pylint: disable=not-an-iterable old_txi = table("old_txi", *(c.copy() for c in TXI.columns)) # pylint: disable=not-an-iterable
columns = [c for c in old_txi.columns if c.name != "address"] + [TXO.c.address] columns = [c for c in old_txi.columns if c.name != "address"] + [TXO.c.address]
join_txi_on_txo = old_txi.join(TXO, old_txi.c.txo_hash == TXO.c.txo_hash) join_txi_on_txo = old_txi.join(TXO, old_txi.c.txo_hash == TXO.c.txo_hash)
@ -82,48 +88,45 @@ def sync_spends(initial_sync: bool, p: ProgressContext):
insert_txis = TXI.insert().from_select(columns, select_txis) insert_txis = TXI.insert().from_select(columns, select_txis)
p.ctx.execute(insert_txis) p.ctx.execute(insert_txis)
p.step() p.step()
# 4. drop old txi and vacuum # 3. drop old txi and vacuum
p.ctx.execute(text("DROP TABLE old_txi;")) p.ctx.execute(text("DROP TABLE old_txi;"))
if p.ctx.is_postgres: if p.ctx.is_postgres:
with p.ctx.engine.connect() as c: with p.ctx.engine.connect() as c:
c.execute(text("COMMIT;")) c.execute(text("COMMIT;"))
c.execute(text("VACUUM ANALYZE txi;")) c.execute(text("VACUUM ANALYZE txi;"))
p.step() p.step()
# 5. restore integrity constraint for constraint in pg_add_txi_constraints_and_indexes:
if p.ctx.is_postgres: if p.ctx.is_postgres:
pg_add_txi_constraints_and_indexes(p.ctx.execute) p.ctx.execute(text(constraint))
p.step() p.step()
# 6. txo table reshuffling # B. Update TXOs to have the height at which they were spent (if they were).
# 4. txo table reshuffling
p.ctx.execute(text("ALTER TABLE txo RENAME TO old_txo;")) p.ctx.execute(text("ALTER TABLE txo RENAME TO old_txo;"))
p.ctx.execute(CreateTable(TXO, include_foreign_key_constraints=[])) p.ctx.execute(CreateTable(TXO, include_foreign_key_constraints=[]))
if p.ctx.is_postgres: if p.ctx.is_postgres:
p.ctx.execute(text("ALTER TABLE txo DROP CONSTRAINT txo_pkey;")) p.ctx.execute(text("ALTER TABLE txo DROP CONSTRAINT txo_pkey;"))
p.step() p.step()
# 7. insert # 5. insert
old_txo = table("old_txo", *(c.copy() for c in TXO.columns)) # pylint: disable=not-an-iterable old_txo = table("old_txo", *(c.copy() for c in TXO.columns)) # pylint: disable=not-an-iterable
columns = [c for c in old_txo.columns if c.name != "spent_height"] columns = [c for c in old_txo.columns if c.name != "spent_height"]
insert_columns = columns + [TXO.c.spent_height] insert_columns = columns + [TXO.c.spent_height]
select_columns = columns + [ select_columns = columns + [func.coalesce(TXI.c.height, 0).label("spent_height")]
func.coalesce(TXI.c.height, 0).label("spent_height") join_txo_on_txi = old_txo.join(TXI, old_txo.c.txo_hash == TXI.c.txo_hash, isouter=True)
]
join_txo_on_txi = old_txo.join(
TXI, old_txo.c.txo_hash == TXI.c.txo_hash, isouter=True
)
select_txos = select(*select_columns).select_from(join_txo_on_txi) select_txos = select(*select_columns).select_from(join_txo_on_txi)
insert_txos = TXO.insert().from_select(insert_columns, select_txos) insert_txos = TXO.insert().from_select(insert_columns, select_txos)
p.ctx.execute(insert_txos) p.ctx.execute(insert_txos)
p.step() p.step()
# 8. drop old txo # 6. drop old txo
p.ctx.execute(text("DROP TABLE old_txo;")) p.ctx.execute(text("DROP TABLE old_txo;"))
if p.ctx.is_postgres: if p.ctx.is_postgres:
with p.ctx.engine.connect() as c: with p.ctx.engine.connect() as c:
c.execute(text("COMMIT;")) c.execute(text("COMMIT;"))
c.execute(text("VACUUM ANALYZE txo;")) c.execute(text("VACUUM ANALYZE txo;"))
p.step() p.step()
# 9. restore integrity constraint for constraint in pg_add_txo_constraints_and_indexes:
if p.ctx.is_postgres: if p.ctx.is_postgres:
pg_add_txo_constraints_and_indexes(p.ctx.execute) p.ctx.execute(text(constraint))
p.step() p.step()
else: else:
p.start(2) p.start(2)
# 1. Update spent TXOs setting spent_height # 1. Update spent TXOs setting spent_height

View file

@ -11,7 +11,7 @@ from lbry.db.queries.txio import (
where_abandoned_claims, count_channels_with_changed_content where_abandoned_claims, count_channels_with_changed_content
) )
from lbry.db.query_context import ProgressContext, event_emitter from lbry.db.query_context import ProgressContext, event_emitter
from lbry.db.tables import TX, TXO, Claim, Support, pg_add_claim_constraints_and_indexes from lbry.db.tables import TX, TXO, Claim, Support, pg_add_claim_and_tag_constraints_and_indexes
from lbry.db.utils import least from lbry.db.utils import least
from lbry.db.constants import TXO_TYPES, CLAIM_TYPE_CODES from lbry.db.constants import TXO_TYPES, CLAIM_TYPE_CODES
from lbry.blockchain.transaction import Output from lbry.blockchain.transaction import Output
@ -149,15 +149,16 @@ def claims_insert(
@event_emitter("blockchain.sync.claims.indexes", "steps") @event_emitter("blockchain.sync.claims.indexes", "steps")
def claims_constraints_and_indexes(p: ProgressContext): def claims_constraints_and_indexes(p: ProgressContext):
p.start(2) p.start(1 + len(pg_add_claim_and_tag_constraints_and_indexes))
if p.ctx.is_postgres: if p.ctx.is_postgres:
with p.ctx.engine.connect() as c: with p.ctx.engine.connect() as c:
c.execute(text("COMMIT;")) c.execute(text("COMMIT;"))
c.execute(text("VACUUM ANALYZE claim;")) c.execute(text("VACUUM ANALYZE claim;"))
p.step() p.step()
if p.ctx.is_postgres: for constraint in pg_add_claim_and_tag_constraints_and_indexes:
pg_add_claim_constraints_and_indexes(p.ctx.execute) if p.ctx.is_postgres:
p.step() p.ctx.execute(constraint)
p.step()
@event_emitter("blockchain.sync.claims.update", "claims") @event_emitter("blockchain.sync.claims.update", "claims")

View file

@ -65,15 +65,16 @@ def supports_insert(blocks: Tuple[int, int], missing_in_supports_table: bool, p:
@event_emitter("blockchain.sync.supports.indexes", "steps") @event_emitter("blockchain.sync.supports.indexes", "steps")
def supports_constraints_and_indexes(p: ProgressContext): def supports_constraints_and_indexes(p: ProgressContext):
p.start(2) p.start(1 + len(pg_add_support_constraints_and_indexes))
if p.ctx.is_postgres: if p.ctx.is_postgres:
with p.ctx.engine.connect() as c: with p.ctx.engine.connect() as c:
c.execute(text("COMMIT;")) c.execute(text("COMMIT;"))
c.execute(text("VACUUM ANALYZE support;")) c.execute(text("VACUUM ANALYZE support;"))
p.step() p.step()
if p.ctx.is_postgres: for constraint in pg_add_support_constraints_and_indexes:
pg_add_support_constraints_and_indexes(p.ctx.execute) if p.ctx.is_postgres:
p.step() p.ctx.execute(constraint)
p.step()
@event_emitter("blockchain.sync.supports.delete", "supports") @event_emitter("blockchain.sync.supports.delete", "supports")

View file

@ -1,9 +1,8 @@
# pylint: skip-file # pylint: skip-file
from sqlalchemy import ( from sqlalchemy import (
MetaData, Table, Column, ForeignKey, PrimaryKeyConstraint, MetaData, Table, Column, ForeignKey,
LargeBinary, Text, SmallInteger, Integer, BigInteger, Boolean, LargeBinary, Text, SmallInteger, Integer, BigInteger, Boolean,
text
) )
from .constants import TXO_TYPES, CLAIM_TYPE_CODES from .constants import TXO_TYPES, CLAIM_TYPE_CODES
@ -65,6 +64,11 @@ TX = Table(
) )
pg_add_tx_constraints_and_indexes = [
"ALTER TABLE tx ADD PRIMARY KEY (tx_hash);",
]
TXO = Table( TXO = Table(
'txo', metadata, 'txo', metadata,
Column('tx_hash', LargeBinary, ForeignKey(TX.columns.tx_hash)), Column('tx_hash', LargeBinary, ForeignKey(TX.columns.tx_hash)),
@ -95,36 +99,25 @@ TXO = Table(
txo_join_account = TXO.join(AccountAddress, TXO.columns.address == AccountAddress.columns.address) txo_join_account = TXO.join(AccountAddress, TXO.columns.address == AccountAddress.columns.address)
def pg_add_txo_constraints_and_indexes(execute): pg_add_txo_constraints_and_indexes = [
execute(text("ALTER TABLE txo ADD PRIMARY KEY (txo_hash);")) "ALTER TABLE txo ADD PRIMARY KEY (txo_hash);",
# find appropriate channel public key for signing a content claim # find appropriate channel public key for signing a content claim
execute(text(f""" f"CREATE INDEX txo_channel_hash_by_height_desc_w_pub_key "
CREATE INDEX txo_channel_hash_w_height_desc_and_pub_key f"ON txo (claim_hash, height desc) INCLUDE (public_key) "
ON txo (claim_hash, height desc) INCLUDE (public_key) f"WHERE txo_type={TXO_TYPES['channel']};",
WHERE txo_type={TXO_TYPES['channel']}; # for calculating supports on a claim
""")) f"CREATE INDEX txo_unspent_supports ON txo (claim_hash) INCLUDE (amount) "
# update supports for a claim f"WHERE spent_height = 0 AND txo_type={TXO_TYPES['support']};",
execute(text(f""" # for finding modified claims in a block range
CREATE INDEX txo_unspent_supports f"CREATE INDEX txo_claim_changes "
ON txo (claim_hash) INCLUDE (amount) f"ON txo (height DESC) INCLUDE (txo_hash) "
WHERE spent_height = 0 AND txo_type={TXO_TYPES['support']}; f"WHERE spent_height = 0 AND txo_type IN {tuple(CLAIM_TYPE_CODES)};",
""")) # for finding claims which need support totals re-calculated in a block range
# claim changes by height f"CREATE INDEX txo_added_supports_by_height ON txo (height DESC) "
execute(text(f""" f"INCLUDE (claim_hash) WHERE txo_type={TXO_TYPES['support']};",
CREATE INDEX txo_claim_changes f"CREATE INDEX txo_spent_supports_by_height ON txo (spent_height DESC) "
ON txo (height DESC) INCLUDE (txo_hash) f"INCLUDE (claim_hash) WHERE txo_type={TXO_TYPES['support']};",
WHERE spent_height = 0 AND txo_type IN {tuple(CLAIM_TYPE_CODES)}; ]
"""))
# supports added
execute(text(f"""
CREATE INDEX txo_added_supports_by_height ON txo (height DESC)
INCLUDE (claim_hash) WHERE txo_type={TXO_TYPES['support']};
"""))
# supports spent
execute(text(f"""
CREATE INDEX txo_spent_supports_by_height ON txo (spent_height DESC)
INCLUDE (claim_hash) WHERE txo_type={TXO_TYPES['support']};
"""))
TXI = Table( TXI = Table(
@ -139,8 +132,9 @@ TXI = Table(
txi_join_account = TXI.join(AccountAddress, TXI.columns.address == AccountAddress.columns.address) txi_join_account = TXI.join(AccountAddress, TXI.columns.address == AccountAddress.columns.address)
def pg_add_txi_constraints_and_indexes(execute): pg_add_txi_constraints_and_indexes = [
execute(text("ALTER TABLE txi ADD PRIMARY KEY (txo_hash);")) "ALTER TABLE txi ADD PRIMARY KEY (txo_hash);",
]
Claim = Table( Claim = Table(
@ -210,18 +204,19 @@ Tag = Table(
) )
def pg_add_claim_constraints_and_indexes(execute): pg_add_claim_and_tag_constraints_and_indexes = [
execute(text("ALTER TABLE claim ADD PRIMARY KEY (claim_hash);")) "ALTER TABLE claim ADD PRIMARY KEY (claim_hash);",
execute(text("ALTER TABLE tag ADD PRIMARY KEY (claim_hash, tag);")) # for checking if claim is up-to-date
# take over updates are base on normalized name "CREATE UNIQUE INDEX claim_txo_hash ON claim (txo_hash);",
execute(text("CREATE INDEX claim_normalized ON claim (normalized);")) # used by takeover process to reset winning claims
# finding claims that aren't updated with latest TXO "CREATE INDEX claim_normalized ON claim (normalized);",
execute(text("CREATE UNIQUE INDEX claim_txo_hash ON claim (txo_hash);")) # used to count()/sum() claims signed by channel
# used to calculate content in a channel "CREATE INDEX signed_content ON claim (channel_hash) "
execute(text(""" "INCLUDE (amount) WHERE is_signature_valid;",
CREATE INDEX signed_content ON claim (channel_hash) # basic tag indexes
INCLUDE (amount) WHERE is_signature_valid; "ALTER TABLE tag ADD PRIMARY KEY (claim_hash, tag);",
""")) "CREATE INDEX tags ON tag (tag) INCLUDE (claim_hash);",
]
Support = Table( Support = Table(
@ -245,9 +240,9 @@ Support = Table(
) )
def pg_add_support_constraints_and_indexes(execute): pg_add_support_constraints_and_indexes = [
execute(text("ALTER TABLE support ADD PRIMARY KEY (txo_hash);")) "ALTER TABLE support ADD PRIMARY KEY (txo_hash);",
execute(text(""" # used to count()/sum() supports signed by channel
CREATE INDEX signed_support ON support (channel_hash) "CREATE INDEX signed_support ON support (channel_hash) "
INCLUDE (amount) WHERE is_signature_valid; "INCLUDE (amount) WHERE is_signature_valid;",
""")) ]

View file

@ -500,7 +500,7 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase):
events = [] events = []
self.sync.on_progress.listen(events.append) self.sync.on_progress.listen(events.append)
# initial_sync = True # initial sync
await self.sync.advance() await self.sync.advance()
await asyncio.sleep(1) # give it time to collect events await asyncio.sleep(1) # give it time to collect events
self.assertConsumingEvents( self.assertConsumingEvents(
@ -533,7 +533,8 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase):
) )
self.assertConsumingEvents( self.assertConsumingEvents(
events, "blockchain.sync.spends.main", ("steps",), [ events, "blockchain.sync.spends.main", ("steps",), [
(0, None, (9,), (1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,), (9,)) (0, None, (14,), (1,), (2,), (3,), (4,), (5,), (6,),
(7,), (8,), (9,), (10,), (11,), (12,), (13,), (14,))
] ]
) )
self.assertEqual( self.assertEqual(
@ -558,7 +559,7 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase):
) )
self.assertConsumingEvents( self.assertConsumingEvents(
events, "blockchain.sync.claims.indexes", ("steps",), [ events, "blockchain.sync.claims.indexes", ("steps",), [
(0, None, (2,), (1,), (2,)) (0, None, (7,), (1,), (2,), (3,), (4,), (5,), (6,), (7,))
] ]
) )
self.assertEqual( self.assertEqual(
@ -580,7 +581,7 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase):
) )
self.assertConsumingEvents( self.assertConsumingEvents(
events, "blockchain.sync.supports.indexes", ("steps",), [ events, "blockchain.sync.supports.indexes", ("steps",), [
(0, None, (2,), (1,), (2,)) (0, None, (3,), (1,), (2,), (3,))
] ]
) )
self.assertEqual( self.assertEqual(