diff --git a/lbry/blockchain/sync/blocks.py b/lbry/blockchain/sync/blocks.py index 464d54cb7..33a836ca8 100644 --- a/lbry/blockchain/sync/blocks.py +++ b/lbry/blockchain/sync/blocks.py @@ -91,9 +91,7 @@ def sync_spends(initial_sync: bool, p: ProgressContext): # 3. drop old txi and vacuum p.ctx.execute(text("DROP TABLE old_txi;")) if p.ctx.is_postgres: - with p.ctx.engine.connect() as c: - c.execute(text("COMMIT;")) - c.execute(text("VACUUM ANALYZE txi;")) + p.ctx.execute_notx(text("VACUUM ANALYZE txi;")) p.step() for constraint in pg_add_txi_constraints_and_indexes: if p.ctx.is_postgres: @@ -119,9 +117,7 @@ def sync_spends(initial_sync: bool, p: ProgressContext): # 6. drop old txo p.ctx.execute(text("DROP TABLE old_txo;")) if p.ctx.is_postgres: - with p.ctx.engine.connect() as c: - c.execute(text("COMMIT;")) - c.execute(text("VACUUM ANALYZE txo;")) + p.ctx.execute_notx(text("VACUUM ANALYZE txo;")) p.step() for constraint in pg_add_txo_constraints_and_indexes: if p.ctx.is_postgres: @@ -137,9 +133,7 @@ def sync_spends(initial_sync: bool, p: ProgressContext): p.step() # 3. Update visibility map, which speeds up index-only scans. if p.ctx.is_postgres: - with p.ctx.engine.connect() as c: - c.execute(text("COMMIT;")) - c.execute(text("VACUUM txo;")) + p.ctx.execute_notx(text("VACUUM txo;")) p.step() diff --git a/lbry/blockchain/sync/claims.py b/lbry/blockchain/sync/claims.py index 1778cb88d..16cadbcdd 100644 --- a/lbry/blockchain/sync/claims.py +++ b/lbry/blockchain/sync/claims.py @@ -116,7 +116,7 @@ def claims_insert( ), progress_id=blocks[0], label=make_label("add claims", blocks) ) - with p.ctx.engine.connect().execution_options(stream_results=True) as c: + with p.ctx.connect_streaming() as c: loader = p.ctx.get_bulk_loader() cursor = c.execute(select_claims_for_saving( blocks, missing_in_claims_table=missing_in_claims_table @@ -146,12 +146,12 @@ def claims_insert( @event_emitter("blockchain.sync.claims.indexes", "steps") def claims_constraints_and_indexes(p: ProgressContext): - p.start(1 + len(pg_add_claim_and_tag_constraints_and_indexes)) + p.start(2 + len(pg_add_claim_and_tag_constraints_and_indexes)) if p.ctx.is_postgres: - with p.ctx.engine.connect() as c: - c.execute(text("COMMIT;")) - c.execute(text("VACUUM ANALYZE claim;")) - c.execute(text("VACUUM ANALYZE tag;")) + p.ctx.execute_notx(text("VACUUM ANALYZE claim;")) + p.step() + if p.ctx.is_postgres: + p.ctx.execute_notx(text("VACUUM ANALYZE tag;")) p.step() for constraint in pg_add_claim_and_tag_constraints_and_indexes: if p.ctx.is_postgres: @@ -162,14 +162,12 @@ def claims_constraints_and_indexes(p: ProgressContext): @event_emitter("blockchain.sync.claims.vacuum", "steps") def claims_vacuum(p: ProgressContext): p.start(2) - with p.ctx.engine.connect() as c: - if p.ctx.is_postgres: - c.execute(text("COMMIT;")) - c.execute(text("VACUUM claim;")) - p.step() - if p.ctx.is_postgres: - c.execute(text("VACUUM tag;")) - p.step() + if p.ctx.is_postgres: + p.ctx.execute_notx(text("VACUUM claim;")) + p.step() + if p.ctx.is_postgres: + p.ctx.execute_notx(text("VACUUM tag;")) + p.step() @event_emitter("blockchain.sync.claims.update", "claims") @@ -178,7 +176,7 @@ def claims_update(blocks: Tuple[int, int], p: ProgressContext): count_unspent_txos(CLAIM_TYPE_CODES, blocks, missing_or_stale_in_claims_table=True), progress_id=blocks[0], label=make_label("mod claims", blocks) ) - with p.ctx.engine.connect().execution_options(stream_results=True) as c: + with p.ctx.connect_streaming() as c: loader = p.ctx.get_bulk_loader() cursor = c.execute(select_claims_for_saving( blocks, missing_or_stale_in_claims_table=True @@ -202,24 +200,25 @@ def claims_delete(claims, p: ProgressContext): def update_takeovers(blocks: Tuple[int, int], takeovers, p: ProgressContext): p.start(takeovers, label=make_label("mod winner", blocks)) chain = get_or_initialize_lbrycrd(p.ctx) - for takeover in chain.db.sync_get_takeovers(start_height=blocks[0], end_height=blocks[-1]): - update_claims = ( - Claim.update() - .where(Claim.c.normalized == takeover['normalized']) - .values( - is_controlling=case( - [(Claim.c.claim_hash == takeover['claim_hash'], True)], - else_=False - ), - takeover_height=case( - [(Claim.c.claim_hash == takeover['claim_hash'], takeover['height'])], - else_=None - ), - activation_height=least(Claim.c.activation_height, takeover['height']), + with p.ctx.engine.begin() as c: + for takeover in chain.db.sync_get_takeovers(start_height=blocks[0], end_height=blocks[-1]): + update_claims = ( + Claim.update() + .where(Claim.c.normalized == takeover['normalized']) + .values( + is_controlling=case( + [(Claim.c.claim_hash == takeover['claim_hash'], True)], + else_=False + ), + takeover_height=case( + [(Claim.c.claim_hash == takeover['claim_hash'], takeover['height'])], + else_=None + ), + activation_height=least(Claim.c.activation_height, takeover['height']), + ) ) - ) - result = p.ctx.execute(update_claims) - p.add(result.rowcount) + result = c.execute(update_claims) + p.add(result.rowcount) @event_emitter("blockchain.sync.claims.stakes", "claims") diff --git a/lbry/blockchain/sync/supports.py b/lbry/blockchain/sync/supports.py index 06304d202..de7d014aa 100644 --- a/lbry/blockchain/sync/supports.py +++ b/lbry/blockchain/sync/supports.py @@ -48,7 +48,7 @@ def supports_insert(blocks: Tuple[int, int], missing_in_supports_table: bool, p: missing_in_supports_table=missing_in_supports_table, ) ) - with p.ctx.engine.connect().execution_options(stream_results=True) as c: + with p.ctx.connect_streaming() as c: loader = p.ctx.get_bulk_loader() for row in c.execute(select_supports): txo = row_to_txo(row) @@ -74,9 +74,7 @@ def supports_delete(supports, p: ProgressContext): def supports_constraints_and_indexes(p: ProgressContext): p.start(1 + len(pg_add_support_constraints_and_indexes)) if p.ctx.is_postgres: - with p.ctx.engine.connect() as c: - c.execute(text("COMMIT;")) - c.execute(text("VACUUM ANALYZE support;")) + p.ctx.execute_notx(text("VACUUM ANALYZE support;")) p.step() for constraint in pg_add_support_constraints_and_indexes: if p.ctx.is_postgres: @@ -87,8 +85,6 @@ def supports_constraints_and_indexes(p: ProgressContext): @event_emitter("blockchain.sync.supports.vacuum", "steps") def supports_vacuum(p: ProgressContext): p.start(1) - with p.ctx.engine.connect() as c: - if p.ctx.is_postgres: - c.execute(text("COMMIT;")) - c.execute(text("VACUUM support;")) - p.step() + if p.ctx.is_postgres: + p.ctx.execute_notx(text("VACUUM support;")) + p.step() diff --git a/lbry/db/query_context.py b/lbry/db/query_context.py index c94fca8ff..b335a21fb 100644 --- a/lbry/db/query_context.py +++ b/lbry/db/query_context.py @@ -36,7 +36,6 @@ _context: ContextVar['QueryContext'] = ContextVar('_context') @dataclass class QueryContext: engine: Engine - connection: Connection ledger: Ledger message_queue: mp.Queue stop_event: mp.Event @@ -58,14 +57,14 @@ class QueryContext: @property def is_postgres(self): - return self.connection.dialect.name == 'postgresql' + return self.engine.dialect.name == 'postgresql' @property def is_sqlite(self): - return self.connection.dialect.name == 'sqlite' + return self.engine.dialect.name == 'sqlite' def raise_unsupported_dialect(self): - raise RuntimeError(f'Unsupported database dialect: {self.connection.dialect.name}.') + raise RuntimeError(f'Unsupported database dialect: {self.engine.dialect.name}.') def get_resolve_censor(self) -> Censor: return Censor(self.blocked_streams, self.blocked_channels) @@ -74,25 +73,39 @@ class QueryContext: return Censor(self.filtered_streams, self.filtered_channels) def pg_copy(self, table, rows): - connection = self.connection.connection - copy_manager = self.copy_managers.get(table.name) - if copy_manager is None: - self.copy_managers[table.name] = copy_manager = CopyManager( - self.connection.connection, table.name, rows[0].keys() - ) - copy_manager.copy(map(dict.values, rows), BytesIO) - connection.commit() + with self.engine.begin() as c: + copy_manager = self.copy_managers.get(table.name) + if copy_manager is None: + self.copy_managers[table.name] = copy_manager = CopyManager( + c.connection, table.name, rows[0].keys() + ) + copy_manager.conn = c.connection + copy_manager.copy(map(dict.values, rows), BytesIO) + copy_manager.conn = None + + def connect_without_transaction(self): + return self.engine.connect().execution_options(isolation_level="AUTOCOMMIT") + + def connect_streaming(self): + return self.engine.connect().execution_options(stream_results=True) + + def execute_notx(self, sql, *args): + with self.connect_without_transaction() as c: + return c.execute(sql, *args) def execute(self, sql, *args): - return self.connection.execute(sql, *args) + with self.engine.begin() as c: + return c.execute(sql, *args) def fetchone(self, sql, *args): - row = self.connection.execute(sql, *args).fetchone() - return dict(row._mapping) if row else row + with self.engine.begin() as c: + row = c.execute(sql, *args).fetchone() + return dict(row._mapping) if row else row def fetchall(self, sql, *args): - rows = self.connection.execute(sql, *args).fetchall() - return [dict(row._mapping) for row in rows] + with self.engine.begin() as c: + rows = c.execute(sql, *args).fetchall() + return [dict(row._mapping) for row in rows] def fetchtotal(self, condition) -> int: sql = select(func.count('*').label('total')).where(condition) @@ -166,11 +179,17 @@ def set_postgres_settings(connection, _): def set_sqlite_settings(connection, _): + connection.isolation_level = None cursor = connection.cursor() cursor.execute('PRAGMA journal_mode=WAL;') cursor.close() +def do_sqlite_begin(connection): + # see: https://bit.ly/3j4vvXm + connection.exec_driver_sql("BEGIN") + + def initialize( ledger: Ledger, message_queue: mp.Queue, stop_event: mp.Event, track_metrics=False, block_and_filter=None): @@ -180,15 +199,14 @@ def initialize( sqlalchemy_event.listen(engine, "connect", set_postgres_settings) elif engine.name == "sqlite": sqlalchemy_event.listen(engine, "connect", set_sqlite_settings) - connection = engine.connect() + sqlalchemy_event.listen(engine, "begin", do_sqlite_begin) if block_and_filter is not None: blocked_streams, blocked_channels, filtered_streams, filtered_channels = block_and_filter else: blocked_streams = blocked_channels = filtered_streams = filtered_channels = {} _context.set( QueryContext( - pid=os.getpid(), - engine=engine, connection=connection, + pid=os.getpid(), engine=engine, ledger=ledger, message_queue=message_queue, stop_event=stop_event, stack=[], metrics={}, is_tracking_metrics=track_metrics, blocked_streams=blocked_streams, blocked_channels=blocked_channels, @@ -200,10 +218,7 @@ def initialize( def uninitialize(): ctx = _context.get(None) if ctx is not None: - if ctx.connection: - ctx.connection.close() - if ctx.engine: - ctx.engine.dispose() + ctx.engine.dispose() _context.set(None) @@ -664,7 +679,6 @@ class BulkLoader: ) def flush(self, return_row_count_for_table) -> int: - execute = self.ctx.connection.execute done = 0 for sql, rows in self.get_queries(): if not rows: @@ -672,7 +686,7 @@ class BulkLoader: if self.ctx.is_postgres and isinstance(sql, Insert): self.ctx.pg_copy(sql.table, rows) else: - execute(sql, rows) + self.ctx.execute(sql, rows) if sql.table == return_row_count_for_table: done += len(rows) rows.clear() diff --git a/tests/integration/blockchain/test_blockchain.py b/tests/integration/blockchain/test_blockchain.py index 903f5a157..9463454c8 100644 --- a/tests/integration/blockchain/test_blockchain.py +++ b/tests/integration/blockchain/test_blockchain.py @@ -559,7 +559,7 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): ) self.assertConsumingEvents( events, "blockchain.sync.claims.indexes", ("steps",), [ - (0, None, (7,), (1,), (2,), (3,), (4,), (5,), (6,), (7,)) + (0, None, (8,), (1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,)) ] ) self.assertEqual(