From 3fe19816572ac8aad4a71a23a04c683ae61f14af Mon Sep 17 00:00:00 2001 From: Lex Berezhny Date: Fri, 3 Jul 2020 15:47:25 -0400 Subject: [PATCH] faster inputs/outputs sync --- lbry/blockchain/sync/steps.py | 103 +++++++++++++----- lbry/db/queries.py | 21 ++-- .../integration/blockchain/test_blockchain.py | 21 ++-- 3 files changed, 94 insertions(+), 51 deletions(-) diff --git a/lbry/blockchain/sync/steps.py b/lbry/blockchain/sync/steps.py index 48688f272..9a9b4f2b2 100644 --- a/lbry/blockchain/sync/steps.py +++ b/lbry/blockchain/sync/steps.py @@ -5,9 +5,10 @@ from contextvars import ContextVar from typing import Set from sqlalchemy import bindparam, case, distinct, text +from sqlalchemy.schema import CreateTable from lbry.db import queries -from lbry.db.tables import Block as BlockTable +from lbry.db.tables import Block as BlockTable, TXO, TXI from lbry.db.query_context import progress, context, Event from lbry.db.queries import rows_to_txos from lbry.db.sync import ( @@ -148,40 +149,82 @@ def process_block_save(block_file_number: int, loader, p=None): @sync_step(Event.INPUT_UPDATE, initial_sync=True, ongoing_sync=True) def process_inputs_outputs(initial_sync=False, p=None): - step = 1 - if initial_sync and p.ctx.is_postgres: - p.start(6) + step = 0 + + def next_step(): + nonlocal step + step += 1 + return step + + if initial_sync: + p.start(9) else: p.start(2) - # 0. Vacuum - if initial_sync and p.ctx.is_postgres: - with p.ctx.engine.connect() as c: - c.execute(text("COMMIT;")) - c.execute(text("VACUUM FULL ANALYZE txo;")) - p.step(step) - step += 1 - c.execute(text("VACUUM FULL ANALYZE txi;")) - p.step(step) - step += 1 + if initial_sync: + # A. add tx constraints + if p.ctx.is_postgres: + p.ctx.execute(text("ALTER TABLE tx ADD PRIMARY KEY (tx_hash);")) + p.step(next_step()) - # 1. Update spent TXOs setting is_spent = True - update_spent_outputs(p.ctx) - p.step(step) - step += 1 - if initial_sync and p.ctx.is_postgres: - p.ctx.execute(text("ALTER TABLE txo ADD PRIMARY KEY (txo_hash);")) - p.step(step) - step += 1 + # 1. Update TXIs to have the address of TXO they are spending. + if initial_sync: + # B. txi table reshuffling + p.ctx.execute(text("ALTER TABLE txi RENAME TO old_txi;")) + p.ctx.execute(CreateTable(TXI, include_foreign_key_constraints=[])) + if p.ctx.is_postgres: + p.ctx.execute(text("ALTER TABLE txi DROP CONSTRAINT txi_pkey;")) + p.step(next_step()) + # C. insert + old_txi = TXI.alias('old_txi') + columns = [c for c in old_txi.columns if c.name != 'address'] + [TXO.c.address] + select_txis = select(*columns).select_from(old_txi.join(TXO)) + insert_txis = TXI.insert().from_select(columns, select_txis) + p.ctx.execute(text( + str(insert_txis.compile(p.ctx.engine)).replace('txi AS old_txi', 'old_txi') + )) + p.step(next_step()) + # D. drop old txi + p.ctx.execute(text("DROP TABLE old_txi;")) + p.step(next_step()) + # E. restore integrity constraint + if p.ctx.is_postgres: + p.ctx.execute(text("ALTER TABLE txi ADD PRIMARY KEY (txo_hash);")) + p.step(next_step()) + else: + set_input_addresses(p.ctx) + p.step(next_step()) - # 2. Update TXIs to have the address of TXO they are spending. - set_input_addresses(p.ctx) - p.step(step) - step += 1 - if initial_sync and p.ctx.is_postgres: - p.ctx.execute(text("ALTER TABLE txi ADD PRIMARY KEY (txo_hash);")) - p.step(step) - step += 1 + # 2. Update spent TXOs setting is_spent = True + if initial_sync: + # F. txo table reshuffling + p.ctx.execute(text("ALTER TABLE txo RENAME TO old_txo;")) + p.ctx.execute(CreateTable(TXO, include_foreign_key_constraints=[])) + if p.ctx.is_postgres: + p.ctx.execute(text("ALTER TABLE txo DROP CONSTRAINT txo_pkey;")) + p.step(next_step()) + # G. insert + old_txo = TXO.alias('old_txo') + columns = ( + [c for c in old_txo.columns if c.name != 'is_spent'] + + [(TXI.c.txo_hash != None).label('is_spent')] + ) + select_txos = select(*columns).select_from(old_txo.join(TXI, isouter=True)) + insert_txos = TXO.insert().from_select(columns, select_txos) + p.ctx.execute(text( + str(insert_txos.compile(p.ctx.engine)).replace('txo AS old_txo', 'old_txo') + )) + p.step(next_step()) + # H. drop old txo + p.ctx.execute(text("DROP TABLE old_txo;")) + p.step(next_step()) + # I. restore integrity constraint + if p.ctx.is_postgres: + p.ctx.execute(text("ALTER TABLE txo ADD PRIMARY KEY (txo_hash);")) + p.step(next_step()) + else: + update_spent_outputs(p.ctx) + p.step(next_step()) @sync_step(Event.BLOCK_FILTER, initial_sync=True, ongoing_sync=True) diff --git a/lbry/db/queries.py b/lbry/db/queries.py index 745b09145..2c2d750b6 100644 --- a/lbry/db/queries.py +++ b/lbry/db/queries.py @@ -48,17 +48,18 @@ def check_version_and_create_tables(): metadata.drop_all(ctx.engine) metadata.create_all(ctx.engine) ctx.execute(Version.insert().values(version=SCHEMA_VERSION)) - if ctx.is_postgres: - disable_indexes_and_integrity_enforcement() - - -def disable_indexes_and_integrity_enforcement(): - with context('disable indexes and integrity enforcement (triggers, primary keys, etc)') as ctx: for table in metadata.sorted_tables: - ctx.execute(text(f"ALTER TABLE {table.name} DISABLE TRIGGER ALL;")) - if table.name == 'tag': - continue - ctx.execute(text(f"ALTER TABLE {table.name} DROP CONSTRAINT {table.name}_pkey CASCADE;")) + disable_trigger_and_constraints(table.name) + + +def disable_trigger_and_constraints(table_name): + ctx = context() + if ctx.is_postgres: + ctx.execute(text(f"ALTER TABLE {table_name} DISABLE TRIGGER ALL;")) + if table_name == 'tag': + return + if ctx.is_postgres: + ctx.execute(text(f"ALTER TABLE {table_name} DROP CONSTRAINT {table_name}_pkey CASCADE;")) def insert_block(block): diff --git a/tests/integration/blockchain/test_blockchain.py b/tests/integration/blockchain/test_blockchain.py index 7e735a5b0..09287951a 100644 --- a/tests/integration/blockchain/test_blockchain.py +++ b/tests/integration/blockchain/test_blockchain.py @@ -535,17 +535,16 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): # 3 - db.sync.input self.assertEventsAlmostEqual( self.extract_events('db.sync.input', events), [ - [0, 6], - [1, 6], - [2, 6], - [3, 6], - [4, 6], - [5, 6], - [6, 6], - ] if self.db_driver == 'postgresql' else [ - [0, 2], - [1, 2], - [2, 2], + [0, 9], + [1, 9], + [2, 9], + [3, 9], + [4, 9], + [5, 9], + [6, 9], + [7, 9], + [8, 9], + [9, 9], ] ) # 4 - blockchain.sync.claim.meta