add a migrator to build the hashX status index and to compactify histories

This commit is contained in:
Jack Robison 2022-04-06 22:01:55 -04:00
parent b1bb5927c7
commit 5d8a32368a
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
4 changed files with 100 additions and 1 deletions

View file

@ -1637,6 +1637,16 @@ class BlockchainProcessorService(BlockchainService):
await self.run_in_thread_with_lock(flush) await self.run_in_thread_with_lock(flush)
def _iter_start_tasks(self): def _iter_start_tasks(self):
while self.db.db_version < max(self.db.DB_VERSIONS):
if self.db.db_version == 7:
from scribe.db.migrators.migrate7to8 import migrate, FROM_VERSION, TO_VERSION
else:
raise RuntimeError("unknown db version")
self.log.warning(f"migrating database from version {FROM_VERSION} to version {TO_VERSION}")
migrate(self.db)
self.log.info("finished migration")
self.db.read_db_state()
self.height = self.db.db_height self.height = self.db.db_height
self.tip = self.db.db_tip self.tip = self.db.db_tip
self.tx_count = self.db.db_tx_count self.tx_count = self.db.db_tx_count

View file

@ -32,7 +32,7 @@ TXO_STRUCT_pack = TXO_STRUCT.pack
class HubDB: class HubDB:
DB_VERSIONS = HIST_DB_VERSIONS = [7] DB_VERSIONS = [7, 8]
def __init__(self, coin, db_dir: str, cache_MB: int = 512, reorg_limit: int = 200, def __init__(self, coin, db_dir: str, cache_MB: int = 512, reorg_limit: int = 200,
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False, cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,

View file

View file

@ -0,0 +1,89 @@
import logging
import time
import array
import typing
from bisect import bisect_right
from scribe.common import sha256
if typing.TYPE_CHECKING:
from scribe.db.db import HubDB
FROM_VERSION = 7
TO_VERSION = 8
log = logging.getLogger()
def get_all_hashXs(db):
def iterator():
last_hashX = None
for k in db.prefix_db.hashX_history.iterate(deserialize_key=False, include_value=False):
hashX = k[1:12]
if last_hashX is None:
last_hashX = hashX
if last_hashX != hashX:
yield hashX
last_hashX = hashX
if last_hashX:
yield last_hashX
return [hashX for hashX in iterator()]
def hashX_history(db: 'HubDB', hashX: bytes):
history = b''
to_delete = []
for k, v in db.prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_value=False, deserialize_key=False):
to_delete.append((k, v))
history += v
return history, to_delete
def hashX_status_from_history(db: 'HubDB', history: bytes) -> bytes:
tx_counts = db.tx_counts
hist_tx_nums = array.array('I')
hist_tx_nums.frombytes(history)
hist = ''
for tx_num in hist_tx_nums:
hist += f'{db.get_tx_hash(tx_num)[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:'
return sha256(hist.encode())
def migrate(db):
start = time.perf_counter()
prefix_db = db.prefix_db
hashXs = get_all_hashXs(db)
log.info(f"loaded {len(hashXs)} hashXs in {round(time.perf_counter() - start, 2)}s, "
f"now building the status index...")
op_cnt = 0
hashX_cnt = 0
for hashX in hashXs:
hashX_cnt += 1
key = prefix_db.hashX_status.pack_key(hashX)
history, to_delete = hashX_history(db, hashX)
status = hashX_status_from_history(db, history)
existing_status = prefix_db.hashX_status.get(hashX, deserialize_value=False)
if existing_status and existing_status != status:
prefix_db.stage_raw_delete(key, existing_status)
op_cnt += 1
elif existing_status == status:
pass
else:
prefix_db.stage_raw_put(key, status)
op_cnt += 1
if len(to_delete) > 1:
for k, v in to_delete:
prefix_db.stage_raw_delete(k, v)
op_cnt += 1
if history:
prefix_db.stage_raw_put(prefix_db.hashX_history.pack_key(hashX, 0), history)
op_cnt += 1
if op_cnt > 100000:
prefix_db.unsafe_commit()
log.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses")
op_cnt = 0
if op_cnt:
prefix_db.unsafe_commit()
log.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses")
db.db_version = 8
db.write_db_state()
db.prefix_db.unsafe_commit()
log.info("finished migration")