fixups to make lbrynet work on cli with postgres

This commit is contained in:
Lex Berezhny 2020-06-21 23:21:43 -04:00
parent a3ef8d7411
commit 54a0bf9290
7 changed files with 20 additions and 14 deletions

View file

@ -9,8 +9,8 @@ from .bcd_data_stream import BCDataStream
FILES = [ FILES = [
'claims',
'block_index', 'block_index',
'claims'
] ]
@ -55,9 +55,9 @@ class BlockchainDB:
f"ATTACH DATABASE '{os.path.join(self.directory, file+'.sqlite')}' AS {file}" f"ATTACH DATABASE '{os.path.join(self.directory, file+'.sqlite')}' AS {file}"
) )
self.connection.create_aggregate("find_shortest_id", 2, FindShortestID) self.connection.create_aggregate("find_shortest_id", 2, FindShortestID)
#self.connection.execute( self.connection.execute("CREATE INDEX IF NOT EXISTS claim_originalheight ON claim (originalheight);")
# "CREATE INDEX IF NOT EXISTS claim_originalheight_claimid ON claim (originalheight, claimid);" self.connection.execute("CREATE INDEX IF NOT EXISTS claim_updateheight ON claim (updateheight);")
#) self.connection.execute("create index IF NOT EXISTS support_blockheight on support (blockheight);")
self.connection.row_factory = sqlite3.Row self.connection.row_factory = sqlite3.Row
async def open(self): async def open(self):
@ -195,7 +195,7 @@ class BlockchainDB:
) AS shortestID ) AS shortestID
FROM claim FROM claim
WHERE originalHeight BETWEEN ? AND ? WHERE originalHeight BETWEEN ? AND ?
ORDER BY originalHeight, claimid ORDER BY originalHeight
""", (start_height, end_height) """, (start_height, end_height)
return [{ return [{
"name": r["name"], "name": r["name"],

View file

@ -447,7 +447,8 @@ class BlockchainSync(Sync):
if self.on_block_subscription is not None: if self.on_block_subscription is not None:
self.on_block_subscription.cancel() self.on_block_subscription.cancel()
self.db.stop_event.set() self.db.stop_event.set()
self.advance_loop_task.cancel() if self.advance_loop_task is not None:
self.advance_loop_task.cancel()
async def run(self, f, *args): async def run(self, f, *args):
return await asyncio.get_running_loop().run_in_executor( return await asyncio.get_running_loop().run_in_executor(
@ -498,6 +499,8 @@ class BlockchainSync(Sync):
self.db.stop_event.set() self.db.stop_event.set()
for future in pending: for future in pending:
future.cancel() future.cancel()
for future in done:
future.result()
return return
best_height_processed = max(f.result() for f in done) best_height_processed = max(f.result() for f in done)
# putting event in queue instead of add to progress_controller because # putting event in queue instead of add to progress_controller because

View file

@ -71,11 +71,11 @@ class Advanced(Basic):
def start_sync_block_bars(self, d): def start_sync_block_bars(self, d):
self.bars.clear() self.bars.clear()
self.get_or_create_bar("parse", "total parsing", "blocks", d['blocks'], True) self.get_or_create_bar("read", "total reading", "blocks", d['blocks'], True)
self.get_or_create_bar("save", "total saving", "txs", d['txs'], True) self.get_or_create_bar("save", "total saving", "txs", d['txs'], True)
def close_sync_block_bars(self): def close_sync_block_bars(self):
self.bars.pop("parse").close() self.bars.pop("read").close()
self.bars.pop("save").close() self.bars.pop("save").close()
def update_sync_block_bars(self, event, d): def update_sync_block_bars(self, event, d):
@ -106,7 +106,7 @@ class Advanced(Basic):
self.start_sync_block_bars(d) self.start_sync_block_bars(d)
elif e.endswith("block.done"): elif e.endswith("block.done"):
self.close_sync_block_bars() self.close_sync_block_bars()
elif e.endswith("block.parse"): elif e.endswith("block.read"):
self.update_sync_block_bars("parse", d) self.update_sync_block_bars("read", d)
elif e.endswith("block.save"): elif e.endswith("block.save"):
self.update_sync_block_bars("save", d) self.update_sync_block_bars("save", d)

View file

@ -78,10 +78,10 @@ class Result(Generic[ResultType]):
class Database: class Database:
def __init__(self, ledger: 'Ledger', processes=-1): def __init__(self, ledger: 'Ledger'):
self.url = ledger.conf.db_url_or_default self.url = ledger.conf.db_url_or_default
self.ledger = ledger self.ledger = ledger
self.processes = self._normalize_processes(processes) self.processes = self._normalize_processes(ledger.conf.processes)
self.executor: Optional[Executor] = None self.executor: Optional[Executor] = None
self.message_queue = mp.Queue() self.message_queue = mp.Queue()
self.stop_event = mp.Event() self.stop_event = mp.Event()

View file

@ -54,7 +54,7 @@ def check_version_and_create_tables():
ctx.execute(text("ALTER TABLE txo DISABLE TRIGGER ALL;")) ctx.execute(text("ALTER TABLE txo DISABLE TRIGGER ALL;"))
ctx.execute(text("ALTER TABLE tx DISABLE TRIGGER ALL;")) ctx.execute(text("ALTER TABLE tx DISABLE TRIGGER ALL;"))
ctx.execute(text("ALTER TABLE claim DISABLE TRIGGER ALL;")) ctx.execute(text("ALTER TABLE claim DISABLE TRIGGER ALL;"))
ctx.execute(text("ALTER TABLE claimtrie DISABLE TRIGGER ALL;")) ctx.execute(text("ALTER TABLE support DISABLE TRIGGER ALL;"))
ctx.execute(text("ALTER TABLE block DISABLE TRIGGER ALL;")) ctx.execute(text("ALTER TABLE block DISABLE TRIGGER ALL;"))

View file

@ -521,6 +521,9 @@ class BulkLoader:
if txo.script.is_claim_name: if txo.script.is_claim_name:
claim['creation_height'] = tx.height claim['creation_height'] = tx.height
claim['creation_timestamp'] = tx.timestamp claim['creation_timestamp'] = tx.timestamp
else:
claim['creation_height'] = None
claim['creation_timestamp'] = None
self.claims.append(claim) self.claims.append(claim)
self.tags.extend(tags) self.tags.extend(tags)
return self return self

View file

@ -195,6 +195,6 @@ Support = Table(
Takeover = Table( Takeover = Table(
'takeover', metadata, 'takeover', metadata,
Column('normalized', Text), Column('normalized', Text),
Column('claim_hash', LargeBinary, ForeignKey(TXO.columns.claim_hash)), Column('claim_hash', LargeBinary),
Column('height', Integer), Column('height', Integer),
) )