diff --git a/lbry/blockchain/sync/claims.py b/lbry/blockchain/sync/claims.py index 982e48e93..ddb65a5f6 100644 --- a/lbry/blockchain/sync/claims.py +++ b/lbry/blockchain/sync/claims.py @@ -165,17 +165,16 @@ def claims_insert( @event_emitter("blockchain.sync.claims.indexes", "steps") def claims_constraints_and_indexes(p: ProgressContext): - if p.ctx.is_postgres and pg_has_claim_primary_key(p): - return + is_postgres = p.ctx.is_postgres and p.ctx.pg_has_pk_constraint('claim') p.start(2 + len(pg_add_claim_and_tag_constraints_and_indexes)) - if p.ctx.is_postgres: + if is_postgres: p.ctx.execute_notx(text("VACUUM ANALYZE claim;")) p.step() - if p.ctx.is_postgres: + if is_postgres: p.ctx.execute_notx(text("VACUUM ANALYZE tag;")) p.step() for constraint in pg_add_claim_and_tag_constraints_and_indexes: - if p.ctx.is_postgres: + if is_postgres: p.ctx.execute(text(constraint)) p.step() @@ -311,11 +310,3 @@ def update_claim_filters(resolve_censor_channel_hashes, search_censor_channel_ha ['claim_hash', 'censor_type', 'censoring_channel_hash'], select_reposts(search_censor_channel_hashes, Censor.SEARCH) )) - - -def pg_has_claim_primary_key(p: ProgressContext): - claim_constraints = p.ctx.execute( - "select * from information_schema.table_constraints as tc " - "where tc.table_name='claim' and constraint_type='PRIMARY KEY'" - ).fetchall() - return len(claim_constraints) > 0 diff --git a/lbry/db/query_context.py b/lbry/db/query_context.py index 68e192500..00e69f3f1 100644 --- a/lbry/db/query_context.py +++ b/lbry/db/query_context.py @@ -12,7 +12,7 @@ from contextvars import ContextVar from sqlalchemy import create_engine, inspect, bindparam, func, exists, event as sqlalchemy_event from sqlalchemy.future import select from sqlalchemy.engine import Engine -from sqlalchemy.sql import Insert +from sqlalchemy.sql import Insert, text try: from pgcopy import CopyManager except ImportError: @@ -104,6 +104,13 @@ class QueryContext: copy_manager.copy(map(dict.values, rows), BytesIO) copy_manager.conn = None + def pg_has_pk_constraint(self, table_name): + claim_constraints = self.fetchall(text( + f"select * from information_schema.table_constraints as tc " + f"where tc.table_name='{table_name}' and constraint_type='PRIMARY KEY'" + )) + return len(claim_constraints) > 0 + def connect_without_transaction(self): return self.engine.connect().execution_options(isolation_level="AUTOCOMMIT") diff --git a/lbry/testcase.py b/lbry/testcase.py index edeb9983c..448eb1884 100644 --- a/lbry/testcase.py +++ b/lbry/testcase.py @@ -883,7 +883,7 @@ class EventGenerator: yield from self.claims_main_start() yield from self.claims_insert(self.claims) if self.initial_sync: - yield from self.generate("blockchain.sync.claims.indexes", ("steps",), 0, None, (10,), (1,)) + yield from self.generate("blockchain.sync.claims.indexes", ("steps",), 0, None, (11,), (1,)) else: yield from self.claims_takeovers(self.takeovers) yield from self.claims_stakes() @@ -1011,7 +1011,7 @@ class EventGenerator: def spends_steps(self): yield from self.generate( "blockchain.sync.spends.main", ("steps",), 0, None, - (20 if self.initial_sync else 5,), + (23 if self.initial_sync else 5,), (1,) )