fixed issues with database transaction isolation

This commit is contained in:
Lex Berezhny 2020-07-14 16:46:24 -04:00
parent 1c29ae7204
commit a4680878c4
5 changed files with 80 additions and 77 deletions

View file

@ -91,9 +91,7 @@ def sync_spends(initial_sync: bool, p: ProgressContext):
# 3. drop old txi and vacuum # 3. drop old txi and vacuum
p.ctx.execute(text("DROP TABLE old_txi;")) p.ctx.execute(text("DROP TABLE old_txi;"))
if p.ctx.is_postgres: if p.ctx.is_postgres:
with p.ctx.engine.connect() as c: p.ctx.execute_notx(text("VACUUM ANALYZE txi;"))
c.execute(text("COMMIT;"))
c.execute(text("VACUUM ANALYZE txi;"))
p.step() p.step()
for constraint in pg_add_txi_constraints_and_indexes: for constraint in pg_add_txi_constraints_and_indexes:
if p.ctx.is_postgres: if p.ctx.is_postgres:
@ -119,9 +117,7 @@ def sync_spends(initial_sync: bool, p: ProgressContext):
# 6. drop old txo # 6. drop old txo
p.ctx.execute(text("DROP TABLE old_txo;")) p.ctx.execute(text("DROP TABLE old_txo;"))
if p.ctx.is_postgres: if p.ctx.is_postgres:
with p.ctx.engine.connect() as c: p.ctx.execute_notx(text("VACUUM ANALYZE txo;"))
c.execute(text("COMMIT;"))
c.execute(text("VACUUM ANALYZE txo;"))
p.step() p.step()
for constraint in pg_add_txo_constraints_and_indexes: for constraint in pg_add_txo_constraints_and_indexes:
if p.ctx.is_postgres: if p.ctx.is_postgres:
@ -137,9 +133,7 @@ def sync_spends(initial_sync: bool, p: ProgressContext):
p.step() p.step()
# 3. Update visibility map, which speeds up index-only scans. # 3. Update visibility map, which speeds up index-only scans.
if p.ctx.is_postgres: if p.ctx.is_postgres:
with p.ctx.engine.connect() as c: p.ctx.execute_notx(text("VACUUM txo;"))
c.execute(text("COMMIT;"))
c.execute(text("VACUUM txo;"))
p.step() p.step()

View file

@ -116,7 +116,7 @@ def claims_insert(
), progress_id=blocks[0], label=make_label("add claims", blocks) ), progress_id=blocks[0], label=make_label("add claims", blocks)
) )
with p.ctx.engine.connect().execution_options(stream_results=True) as c: with p.ctx.connect_streaming() as c:
loader = p.ctx.get_bulk_loader() loader = p.ctx.get_bulk_loader()
cursor = c.execute(select_claims_for_saving( cursor = c.execute(select_claims_for_saving(
blocks, missing_in_claims_table=missing_in_claims_table blocks, missing_in_claims_table=missing_in_claims_table
@ -146,12 +146,12 @@ def claims_insert(
@event_emitter("blockchain.sync.claims.indexes", "steps") @event_emitter("blockchain.sync.claims.indexes", "steps")
def claims_constraints_and_indexes(p: ProgressContext): def claims_constraints_and_indexes(p: ProgressContext):
p.start(1 + len(pg_add_claim_and_tag_constraints_and_indexes)) p.start(2 + len(pg_add_claim_and_tag_constraints_and_indexes))
if p.ctx.is_postgres: if p.ctx.is_postgres:
with p.ctx.engine.connect() as c: p.ctx.execute_notx(text("VACUUM ANALYZE claim;"))
c.execute(text("COMMIT;")) p.step()
c.execute(text("VACUUM ANALYZE claim;")) if p.ctx.is_postgres:
c.execute(text("VACUUM ANALYZE tag;")) p.ctx.execute_notx(text("VACUUM ANALYZE tag;"))
p.step() p.step()
for constraint in pg_add_claim_and_tag_constraints_and_indexes: for constraint in pg_add_claim_and_tag_constraints_and_indexes:
if p.ctx.is_postgres: if p.ctx.is_postgres:
@ -162,14 +162,12 @@ def claims_constraints_and_indexes(p: ProgressContext):
@event_emitter("blockchain.sync.claims.vacuum", "steps") @event_emitter("blockchain.sync.claims.vacuum", "steps")
def claims_vacuum(p: ProgressContext): def claims_vacuum(p: ProgressContext):
p.start(2) p.start(2)
with p.ctx.engine.connect() as c: if p.ctx.is_postgres:
if p.ctx.is_postgres: p.ctx.execute_notx(text("VACUUM claim;"))
c.execute(text("COMMIT;")) p.step()
c.execute(text("VACUUM claim;")) if p.ctx.is_postgres:
p.step() p.ctx.execute_notx(text("VACUUM tag;"))
if p.ctx.is_postgres: p.step()
c.execute(text("VACUUM tag;"))
p.step()
@event_emitter("blockchain.sync.claims.update", "claims") @event_emitter("blockchain.sync.claims.update", "claims")
@ -178,7 +176,7 @@ def claims_update(blocks: Tuple[int, int], p: ProgressContext):
count_unspent_txos(CLAIM_TYPE_CODES, blocks, missing_or_stale_in_claims_table=True), count_unspent_txos(CLAIM_TYPE_CODES, blocks, missing_or_stale_in_claims_table=True),
progress_id=blocks[0], label=make_label("mod claims", blocks) progress_id=blocks[0], label=make_label("mod claims", blocks)
) )
with p.ctx.engine.connect().execution_options(stream_results=True) as c: with p.ctx.connect_streaming() as c:
loader = p.ctx.get_bulk_loader() loader = p.ctx.get_bulk_loader()
cursor = c.execute(select_claims_for_saving( cursor = c.execute(select_claims_for_saving(
blocks, missing_or_stale_in_claims_table=True blocks, missing_or_stale_in_claims_table=True
@ -202,24 +200,25 @@ def claims_delete(claims, p: ProgressContext):
def update_takeovers(blocks: Tuple[int, int], takeovers, p: ProgressContext): def update_takeovers(blocks: Tuple[int, int], takeovers, p: ProgressContext):
p.start(takeovers, label=make_label("mod winner", blocks)) p.start(takeovers, label=make_label("mod winner", blocks))
chain = get_or_initialize_lbrycrd(p.ctx) chain = get_or_initialize_lbrycrd(p.ctx)
for takeover in chain.db.sync_get_takeovers(start_height=blocks[0], end_height=blocks[-1]): with p.ctx.engine.begin() as c:
update_claims = ( for takeover in chain.db.sync_get_takeovers(start_height=blocks[0], end_height=blocks[-1]):
Claim.update() update_claims = (
.where(Claim.c.normalized == takeover['normalized']) Claim.update()
.values( .where(Claim.c.normalized == takeover['normalized'])
is_controlling=case( .values(
[(Claim.c.claim_hash == takeover['claim_hash'], True)], is_controlling=case(
else_=False [(Claim.c.claim_hash == takeover['claim_hash'], True)],
), else_=False
takeover_height=case( ),
[(Claim.c.claim_hash == takeover['claim_hash'], takeover['height'])], takeover_height=case(
else_=None [(Claim.c.claim_hash == takeover['claim_hash'], takeover['height'])],
), else_=None
activation_height=least(Claim.c.activation_height, takeover['height']), ),
activation_height=least(Claim.c.activation_height, takeover['height']),
)
) )
) result = c.execute(update_claims)
result = p.ctx.execute(update_claims) p.add(result.rowcount)
p.add(result.rowcount)
@event_emitter("blockchain.sync.claims.stakes", "claims") @event_emitter("blockchain.sync.claims.stakes", "claims")

View file

@ -48,7 +48,7 @@ def supports_insert(blocks: Tuple[int, int], missing_in_supports_table: bool, p:
missing_in_supports_table=missing_in_supports_table, missing_in_supports_table=missing_in_supports_table,
) )
) )
with p.ctx.engine.connect().execution_options(stream_results=True) as c: with p.ctx.connect_streaming() as c:
loader = p.ctx.get_bulk_loader() loader = p.ctx.get_bulk_loader()
for row in c.execute(select_supports): for row in c.execute(select_supports):
txo = row_to_txo(row) txo = row_to_txo(row)
@ -74,9 +74,7 @@ def supports_delete(supports, p: ProgressContext):
def supports_constraints_and_indexes(p: ProgressContext): def supports_constraints_and_indexes(p: ProgressContext):
p.start(1 + len(pg_add_support_constraints_and_indexes)) p.start(1 + len(pg_add_support_constraints_and_indexes))
if p.ctx.is_postgres: if p.ctx.is_postgres:
with p.ctx.engine.connect() as c: p.ctx.execute_notx(text("VACUUM ANALYZE support;"))
c.execute(text("COMMIT;"))
c.execute(text("VACUUM ANALYZE support;"))
p.step() p.step()
for constraint in pg_add_support_constraints_and_indexes: for constraint in pg_add_support_constraints_and_indexes:
if p.ctx.is_postgres: if p.ctx.is_postgres:
@ -87,8 +85,6 @@ def supports_constraints_and_indexes(p: ProgressContext):
@event_emitter("blockchain.sync.supports.vacuum", "steps") @event_emitter("blockchain.sync.supports.vacuum", "steps")
def supports_vacuum(p: ProgressContext): def supports_vacuum(p: ProgressContext):
p.start(1) p.start(1)
with p.ctx.engine.connect() as c: if p.ctx.is_postgres:
if p.ctx.is_postgres: p.ctx.execute_notx(text("VACUUM support;"))
c.execute(text("COMMIT;")) p.step()
c.execute(text("VACUUM support;"))
p.step()

View file

@ -36,7 +36,6 @@ _context: ContextVar['QueryContext'] = ContextVar('_context')
@dataclass @dataclass
class QueryContext: class QueryContext:
engine: Engine engine: Engine
connection: Connection
ledger: Ledger ledger: Ledger
message_queue: mp.Queue message_queue: mp.Queue
stop_event: mp.Event stop_event: mp.Event
@ -58,14 +57,14 @@ class QueryContext:
@property @property
def is_postgres(self): def is_postgres(self):
return self.connection.dialect.name == 'postgresql' return self.engine.dialect.name == 'postgresql'
@property @property
def is_sqlite(self): def is_sqlite(self):
return self.connection.dialect.name == 'sqlite' return self.engine.dialect.name == 'sqlite'
def raise_unsupported_dialect(self): def raise_unsupported_dialect(self):
raise RuntimeError(f'Unsupported database dialect: {self.connection.dialect.name}.') raise RuntimeError(f'Unsupported database dialect: {self.engine.dialect.name}.')
def get_resolve_censor(self) -> Censor: def get_resolve_censor(self) -> Censor:
return Censor(self.blocked_streams, self.blocked_channels) return Censor(self.blocked_streams, self.blocked_channels)
@ -74,25 +73,39 @@ class QueryContext:
return Censor(self.filtered_streams, self.filtered_channels) return Censor(self.filtered_streams, self.filtered_channels)
def pg_copy(self, table, rows): def pg_copy(self, table, rows):
connection = self.connection.connection with self.engine.begin() as c:
copy_manager = self.copy_managers.get(table.name) copy_manager = self.copy_managers.get(table.name)
if copy_manager is None: if copy_manager is None:
self.copy_managers[table.name] = copy_manager = CopyManager( self.copy_managers[table.name] = copy_manager = CopyManager(
self.connection.connection, table.name, rows[0].keys() c.connection, table.name, rows[0].keys()
) )
copy_manager.copy(map(dict.values, rows), BytesIO) copy_manager.conn = c.connection
connection.commit() copy_manager.copy(map(dict.values, rows), BytesIO)
copy_manager.conn = None
def connect_without_transaction(self):
return self.engine.connect().execution_options(isolation_level="AUTOCOMMIT")
def connect_streaming(self):
return self.engine.connect().execution_options(stream_results=True)
def execute_notx(self, sql, *args):
with self.connect_without_transaction() as c:
return c.execute(sql, *args)
def execute(self, sql, *args): def execute(self, sql, *args):
return self.connection.execute(sql, *args) with self.engine.begin() as c:
return c.execute(sql, *args)
def fetchone(self, sql, *args): def fetchone(self, sql, *args):
row = self.connection.execute(sql, *args).fetchone() with self.engine.begin() as c:
return dict(row._mapping) if row else row row = c.execute(sql, *args).fetchone()
return dict(row._mapping) if row else row
def fetchall(self, sql, *args): def fetchall(self, sql, *args):
rows = self.connection.execute(sql, *args).fetchall() with self.engine.begin() as c:
return [dict(row._mapping) for row in rows] rows = c.execute(sql, *args).fetchall()
return [dict(row._mapping) for row in rows]
def fetchtotal(self, condition) -> int: def fetchtotal(self, condition) -> int:
sql = select(func.count('*').label('total')).where(condition) sql = select(func.count('*').label('total')).where(condition)
@ -166,11 +179,17 @@ def set_postgres_settings(connection, _):
def set_sqlite_settings(connection, _): def set_sqlite_settings(connection, _):
connection.isolation_level = None
cursor = connection.cursor() cursor = connection.cursor()
cursor.execute('PRAGMA journal_mode=WAL;') cursor.execute('PRAGMA journal_mode=WAL;')
cursor.close() cursor.close()
def do_sqlite_begin(connection):
# see: https://bit.ly/3j4vvXm
connection.exec_driver_sql("BEGIN")
def initialize( def initialize(
ledger: Ledger, message_queue: mp.Queue, stop_event: mp.Event, ledger: Ledger, message_queue: mp.Queue, stop_event: mp.Event,
track_metrics=False, block_and_filter=None): track_metrics=False, block_and_filter=None):
@ -180,15 +199,14 @@ def initialize(
sqlalchemy_event.listen(engine, "connect", set_postgres_settings) sqlalchemy_event.listen(engine, "connect", set_postgres_settings)
elif engine.name == "sqlite": elif engine.name == "sqlite":
sqlalchemy_event.listen(engine, "connect", set_sqlite_settings) sqlalchemy_event.listen(engine, "connect", set_sqlite_settings)
connection = engine.connect() sqlalchemy_event.listen(engine, "begin", do_sqlite_begin)
if block_and_filter is not None: if block_and_filter is not None:
blocked_streams, blocked_channels, filtered_streams, filtered_channels = block_and_filter blocked_streams, blocked_channels, filtered_streams, filtered_channels = block_and_filter
else: else:
blocked_streams = blocked_channels = filtered_streams = filtered_channels = {} blocked_streams = blocked_channels = filtered_streams = filtered_channels = {}
_context.set( _context.set(
QueryContext( QueryContext(
pid=os.getpid(), pid=os.getpid(), engine=engine,
engine=engine, connection=connection,
ledger=ledger, message_queue=message_queue, stop_event=stop_event, ledger=ledger, message_queue=message_queue, stop_event=stop_event,
stack=[], metrics={}, is_tracking_metrics=track_metrics, stack=[], metrics={}, is_tracking_metrics=track_metrics,
blocked_streams=blocked_streams, blocked_channels=blocked_channels, blocked_streams=blocked_streams, blocked_channels=blocked_channels,
@ -200,10 +218,7 @@ def initialize(
def uninitialize(): def uninitialize():
ctx = _context.get(None) ctx = _context.get(None)
if ctx is not None: if ctx is not None:
if ctx.connection: ctx.engine.dispose()
ctx.connection.close()
if ctx.engine:
ctx.engine.dispose()
_context.set(None) _context.set(None)
@ -664,7 +679,6 @@ class BulkLoader:
) )
def flush(self, return_row_count_for_table) -> int: def flush(self, return_row_count_for_table) -> int:
execute = self.ctx.connection.execute
done = 0 done = 0
for sql, rows in self.get_queries(): for sql, rows in self.get_queries():
if not rows: if not rows:
@ -672,7 +686,7 @@ class BulkLoader:
if self.ctx.is_postgres and isinstance(sql, Insert): if self.ctx.is_postgres and isinstance(sql, Insert):
self.ctx.pg_copy(sql.table, rows) self.ctx.pg_copy(sql.table, rows)
else: else:
execute(sql, rows) self.ctx.execute(sql, rows)
if sql.table == return_row_count_for_table: if sql.table == return_row_count_for_table:
done += len(rows) done += len(rows)
rows.clear() rows.clear()

View file

@ -559,7 +559,7 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase):
) )
self.assertConsumingEvents( self.assertConsumingEvents(
events, "blockchain.sync.claims.indexes", ("steps",), [ events, "blockchain.sync.claims.indexes", ("steps",), [
(0, None, (7,), (1,), (2,), (3,), (4,), (5,), (6,), (7,)) (0, None, (8,), (1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,))
] ]
) )
self.assertEqual( self.assertEqual(