minor refactor and fix for a test
This commit is contained in:
parent
546205ba69
commit
f69426bbad
3 changed files with 14 additions and 16 deletions
|
@ -165,17 +165,16 @@ def claims_insert(
|
||||||
|
|
||||||
@event_emitter("blockchain.sync.claims.indexes", "steps")
|
@event_emitter("blockchain.sync.claims.indexes", "steps")
|
||||||
def claims_constraints_and_indexes(p: ProgressContext):
|
def claims_constraints_and_indexes(p: ProgressContext):
|
||||||
if p.ctx.is_postgres and pg_has_claim_primary_key(p):
|
is_postgres = p.ctx.is_postgres and p.ctx.pg_has_pk_constraint('claim')
|
||||||
return
|
|
||||||
p.start(2 + len(pg_add_claim_and_tag_constraints_and_indexes))
|
p.start(2 + len(pg_add_claim_and_tag_constraints_and_indexes))
|
||||||
if p.ctx.is_postgres:
|
if is_postgres:
|
||||||
p.ctx.execute_notx(text("VACUUM ANALYZE claim;"))
|
p.ctx.execute_notx(text("VACUUM ANALYZE claim;"))
|
||||||
p.step()
|
p.step()
|
||||||
if p.ctx.is_postgres:
|
if is_postgres:
|
||||||
p.ctx.execute_notx(text("VACUUM ANALYZE tag;"))
|
p.ctx.execute_notx(text("VACUUM ANALYZE tag;"))
|
||||||
p.step()
|
p.step()
|
||||||
for constraint in pg_add_claim_and_tag_constraints_and_indexes:
|
for constraint in pg_add_claim_and_tag_constraints_and_indexes:
|
||||||
if p.ctx.is_postgres:
|
if is_postgres:
|
||||||
p.ctx.execute(text(constraint))
|
p.ctx.execute(text(constraint))
|
||||||
p.step()
|
p.step()
|
||||||
|
|
||||||
|
@ -311,11 +310,3 @@ def update_claim_filters(resolve_censor_channel_hashes, search_censor_channel_ha
|
||||||
['claim_hash', 'censor_type', 'censoring_channel_hash'],
|
['claim_hash', 'censor_type', 'censoring_channel_hash'],
|
||||||
select_reposts(search_censor_channel_hashes, Censor.SEARCH)
|
select_reposts(search_censor_channel_hashes, Censor.SEARCH)
|
||||||
))
|
))
|
||||||
|
|
||||||
|
|
||||||
def pg_has_claim_primary_key(p: ProgressContext):
|
|
||||||
claim_constraints = p.ctx.execute(
|
|
||||||
"select * from information_schema.table_constraints as tc "
|
|
||||||
"where tc.table_name='claim' and constraint_type='PRIMARY KEY'"
|
|
||||||
).fetchall()
|
|
||||||
return len(claim_constraints) > 0
|
|
||||||
|
|
|
@ -12,7 +12,7 @@ from contextvars import ContextVar
|
||||||
from sqlalchemy import create_engine, inspect, bindparam, func, exists, event as sqlalchemy_event
|
from sqlalchemy import create_engine, inspect, bindparam, func, exists, event as sqlalchemy_event
|
||||||
from sqlalchemy.future import select
|
from sqlalchemy.future import select
|
||||||
from sqlalchemy.engine import Engine
|
from sqlalchemy.engine import Engine
|
||||||
from sqlalchemy.sql import Insert
|
from sqlalchemy.sql import Insert, text
|
||||||
try:
|
try:
|
||||||
from pgcopy import CopyManager
|
from pgcopy import CopyManager
|
||||||
except ImportError:
|
except ImportError:
|
||||||
|
@ -104,6 +104,13 @@ class QueryContext:
|
||||||
copy_manager.copy(map(dict.values, rows), BytesIO)
|
copy_manager.copy(map(dict.values, rows), BytesIO)
|
||||||
copy_manager.conn = None
|
copy_manager.conn = None
|
||||||
|
|
||||||
|
def pg_has_pk_constraint(self, table_name):
|
||||||
|
claim_constraints = self.fetchall(text(
|
||||||
|
f"select * from information_schema.table_constraints as tc "
|
||||||
|
f"where tc.table_name='{table_name}' and constraint_type='PRIMARY KEY'"
|
||||||
|
))
|
||||||
|
return len(claim_constraints) > 0
|
||||||
|
|
||||||
def connect_without_transaction(self):
|
def connect_without_transaction(self):
|
||||||
return self.engine.connect().execution_options(isolation_level="AUTOCOMMIT")
|
return self.engine.connect().execution_options(isolation_level="AUTOCOMMIT")
|
||||||
|
|
||||||
|
|
|
@ -883,7 +883,7 @@ class EventGenerator:
|
||||||
yield from self.claims_main_start()
|
yield from self.claims_main_start()
|
||||||
yield from self.claims_insert(self.claims)
|
yield from self.claims_insert(self.claims)
|
||||||
if self.initial_sync:
|
if self.initial_sync:
|
||||||
yield from self.generate("blockchain.sync.claims.indexes", ("steps",), 0, None, (10,), (1,))
|
yield from self.generate("blockchain.sync.claims.indexes", ("steps",), 0, None, (11,), (1,))
|
||||||
else:
|
else:
|
||||||
yield from self.claims_takeovers(self.takeovers)
|
yield from self.claims_takeovers(self.takeovers)
|
||||||
yield from self.claims_stakes()
|
yield from self.claims_stakes()
|
||||||
|
@ -1011,7 +1011,7 @@ class EventGenerator:
|
||||||
def spends_steps(self):
|
def spends_steps(self):
|
||||||
yield from self.generate(
|
yield from self.generate(
|
||||||
"blockchain.sync.spends.main", ("steps",), 0, None,
|
"blockchain.sync.spends.main", ("steps",), 0, None,
|
||||||
(20 if self.initial_sync else 5,),
|
(23 if self.initial_sync else 5,),
|
||||||
(1,)
|
(1,)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue