fix turning address status index on and off

This commit is contained in:
Jack Robison 2022-05-12 15:21:51 -04:00
parent 51a753c4d2
commit bf1667b44d
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
5 changed files with 184 additions and 18 deletions

View file

@ -6,14 +6,15 @@ class BlockchainEnv(Env):
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
blocking_channel_ids=None, filtering_channel_ids=None,
db_max_open_files=64, daemon_url=None, hashX_history_cache_size=None,
index_address_status=None):
index_address_status=None, rebuild_address_status_from_height=None):
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status)
self.db_max_open_files = db_max_open_files
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
self.hashX_history_cache_size = hashX_history_cache_size if hashX_history_cache_size is not None \
else self.integer('ADDRESS_HISTORY_CACHE_SIZE', 1000)
self.rebuild_address_status_from_height = rebuild_address_status_from_height \
if isinstance(rebuild_address_status_from_height, int) else -1
@classmethod
def contribute_to_arg_parser(cls, parser):
@ -31,6 +32,9 @@ class BlockchainEnv(Env):
help="LRU cache size for address histories, used when processing new blocks "
"and when processing mempool updates. Can be set in env with "
"'ADDRESS_HISTORY_CACHE_SIZE'")
parser.add_argument('--rebuild_address_status_from_height', type=int, default=-1,
help="Rebuild address statuses, set to 0 to reindex all address statuses or provide a "
"block height to start reindexing from. Defaults to -1 (off).")
@classmethod
def from_arg_parser(cls, args):
@ -39,5 +43,6 @@ class BlockchainEnv(Env):
max_query_workers=args.max_query_workers, chain=args.chain, reorg_limit=args.reorg_limit,
prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes,
cache_all_claim_txos=args.cache_all_claim_txos, index_address_status=args.index_address_statuses,
hashX_history_cache_size=args.address_history_cache_size
hashX_history_cache_size=args.address_history_cache_size,
rebuild_address_status_from_height=args.rebuild_address_status_from_height
)

View file

@ -45,6 +45,7 @@ class BlockchainProcessorService(BlockchainService):
def __init__(self, env: 'BlockchainEnv'):
super().__init__(env, secondary_name='', thread_workers=1, thread_prefix='block-processor')
self.env = env
self.daemon = LBCDaemon(env.coin, env.daemon_url)
self.mempool = MemPool(env.coin, self.db)
self.coin = env.coin
@ -1704,6 +1705,10 @@ class BlockchainProcessorService(BlockchainService):
async def _finished_initial_catch_up(self):
self.log.info(f'caught up to height {self.height}')
# if self.env.index_address_status and self.db.last_indexed_address_status_height != self.db.db_height:
# await self.db.rebuild_hashX_status_index(self.db.last_indexed_address_status_height)
# Flush everything but with catching_up->False state.
self.db.catching_up = False
@ -1719,6 +1724,9 @@ class BlockchainProcessorService(BlockchainService):
while self.db.db_version < max(self.db.DB_VERSIONS):
if self.db.db_version == 7:
from scribe.db.migrators.migrate7to8 import migrate, FROM_VERSION, TO_VERSION
elif self.db.db_version == 8:
from scribe.db.migrators.migrate8to9 import migrate, FROM_VERSION, TO_VERSION
self.db._index_address_status = self.env.index_address_status
else:
raise RuntimeError("unknown db version")
self.log.warning(f"migrating database from version {FROM_VERSION} to version {TO_VERSION}")
@ -1726,6 +1734,18 @@ class BlockchainProcessorService(BlockchainService):
self.log.info("finished migration")
self.db.read_db_state()
# update the hashX status index if was off before and is now on of if requested from a height
if (self.env.index_address_status and not self.db._index_address_status and self.db.last_indexed_address_status_height < self.db.db_height) or self.env.rebuild_address_status_from_height >= 0:
starting_height = self.db.last_indexed_address_status_height
if self.env.rebuild_address_status_from_height >= 0:
starting_height = self.env.rebuild_address_status_from_height
yield self.db.rebuild_hashX_status_index(starting_height)
elif self.db._index_address_status and not self.env.index_address_status:
self.log.warning("turned off address indexing at block %i", self.db.db_height)
self.db._index_address_status = False
self.db.write_db_state()
self.db.prefix_db.unsafe_commit()
self.height = self.db.db_height
self.tip = self.db.db_tip
self.tx_count = self.db.db_tx_count

View file

@ -18,7 +18,7 @@ from scribe.schema.url import URL, normalize_name
from scribe.schema.claim import guess_stream_type
from scribe.schema.result import Censor
from scribe.blockchain.transaction import TxInput
from scribe.common import hash_to_hex_str, hash160, LRUCacheWithMetrics
from scribe.common import hash_to_hex_str, hash160, LRUCacheWithMetrics, sha256
from scribe.db.merkle import Merkle, MerkleCache, FastMerkleCacheItem
from scribe.db.common import ResolveResult, STREAM_TYPES, CLAIM_TYPES, ExpandedResolveResult, DBError, UTXO
from scribe.db.prefixes import PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, PrefixDB
@ -34,7 +34,7 @@ NAMESPACE = f"{PROMETHEUS_NAMESPACE}_db"
class HubDB:
DB_VERSIONS = [7, 8]
DB_VERSIONS = [7, 8, 9]
def __init__(self, coin, db_dir: str, reorg_limit: int = 200,
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
@ -63,6 +63,7 @@ class HubDB:
self.hist_comp_cursor = -1
self.es_sync_height = 0
self.last_indexed_address_status_height = 0
# blocking/filtering dicts
blocking_channels = blocking_channel_ids or []
@ -864,6 +865,78 @@ class HubDB:
self.prefix_db.close()
self.prefix_db = None
def _rebuild_hashX_status_index(self, start_height: int):
self.logger.warning("rebuilding the address status index...")
prefix_db = self.prefix_db
def hashX_iterator():
last_hashX = None
for k in prefix_db.hashX_history.iterate(deserialize_key=False, include_value=False):
hashX = k[1:12]
if last_hashX is None:
last_hashX = hashX
if last_hashX != hashX:
yield hashX
last_hashX = hashX
if last_hashX:
yield last_hashX
def hashX_status_from_history(history: bytes) -> bytes:
tx_counts = self.tx_counts
hist_tx_nums = array.array('I')
hist_tx_nums.frombytes(history)
hist = ''
for tx_num in hist_tx_nums:
hist += f'{self.get_tx_hash(tx_num)[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:'
return sha256(hist.encode())
start = time.perf_counter()
if start_height <= 0:
self.logger.info("loading all blockchain addresses, this will take a little while...")
hashXs = [hashX for hashX in hashX_iterator()]
else:
self.logger.info("loading addresses since block %i...", start_height)
hashXs = set()
for touched in prefix_db.touched_hashX.iterate(start=(start_height,), stop=(self.db_height + 1,),
include_key=False):
hashXs.update(touched.touched_hashXs)
hashXs = list(hashXs)
self.logger.info(f"loaded {len(hashXs)} hashXs in {round(time.perf_counter() - start, 2)}s, "
f"now building the status index...")
op_cnt = 0
hashX_cnt = 0
for hashX in hashXs:
hashX_cnt += 1
key = prefix_db.hashX_status.pack_key(hashX)
history = b''.join(prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_value=False, include_key=False))
status = hashX_status_from_history(history)
existing_status = prefix_db.hashX_status.get(hashX, deserialize_value=False)
if existing_status and existing_status == status:
continue
elif not existing_status:
prefix_db.stage_raw_put(key, status)
op_cnt += 1
else:
prefix_db.stage_raw_delete(key, existing_status)
prefix_db.stage_raw_put(key, status)
op_cnt += 2
if op_cnt > 100000:
prefix_db.unsafe_commit()
self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...")
op_cnt = 0
if op_cnt:
prefix_db.unsafe_commit()
self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...")
self._index_address_status = True
self.write_db_state()
self.prefix_db.unsafe_commit()
self.logger.info("finished indexing address statuses")
def rebuild_hashX_status_index(self, start_height: int):
return asyncio.get_event_loop().run_in_executor(self._executor, self._rebuild_hashX_status_index, start_height)
def _get_hashX_status(self, hashX: bytes):
mempool_status = self.prefix_db.hashX_mempool_status.get(hashX, deserialize_value=False)
if mempool_status:
@ -1126,13 +1199,18 @@ class HubDB:
def write_db_state(self):
"""Write (UTXO) state to the batch."""
last_indexed_address_status = 0
if self.db_height > 0:
self.prefix_db.db_state.stage_delete((), self.prefix_db.db_state.get())
existing = self.prefix_db.db_state.get()
last_indexed_address_status = existing.hashX_status_last_indexed_height
self.prefix_db.db_state.stage_delete((), existing.expanded)
if self._index_address_status:
last_indexed_address_status = self.db_height
self.prefix_db.db_state.stage_put((), (
self.genesis_bytes, self.db_height, self.db_tx_count, self.db_tip,
self.utxo_flush_count, int(self.wall_time), self.catching_up, self.db_version,
self.utxo_flush_count, int(self.wall_time), self.catching_up, self._index_address_status, self.db_version,
self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor,
self.es_sync_height
self.es_sync_height, last_indexed_address_status
)
)
@ -1152,6 +1230,7 @@ class HubDB:
self.hist_comp_cursor = -1
self.hist_db_version = max(self.DB_VERSIONS)
self.es_sync_height = 0
self.last_indexed_address_status_height = 0
else:
self.db_version = state.db_version
if self.db_version not in self.DB_VERSIONS:
@ -1173,6 +1252,7 @@ class HubDB:
self.hist_comp_cursor = state.comp_cursor
self.hist_db_version = state.db_version
self.es_sync_height = state.es_sync_height
self.last_indexed_address_status_height = state.hashX_status_last_indexed_height
return state
def assert_db_state(self):

View file

@ -0,0 +1,26 @@
import logging
FROM_VERSION = 8
TO_VERSION = 9
def migrate(db):
log = logging.getLogger(__name__)
prefix_db = db.prefix_db
index_address_status = db._index_address_status
log.info("migrating the db to version 9")
if not index_address_status:
log.info("deleting the existing address status index")
to_delete = list(prefix_db.hashX_status.iterate(deserialize_key=False, deserialize_value=False))
while to_delete:
batch, to_delete = to_delete[:10000], to_delete[10000:]
if batch:
prefix_db.multi_delete(batch)
prefix_db.unsafe_commit()
db.db_version = 9
db.write_db_state()
db.prefix_db.unsafe_commit()
log.info("finished migration")

View file

@ -420,12 +420,40 @@ class DBState(typing.NamedTuple):
tip: bytes
utxo_flush_count: int
wall_time: int
catching_up: bool
bit_fields: int
db_version: int
hist_flush_count: int
comp_flush_count: int
comp_cursor: int
es_sync_height: int
hashX_status_last_indexed_height: int
@property
def catching_up(self) -> bool:
return self.bit_fields & 1 == 1
@property
def index_address_statuses(self) -> bool:
return self.bit_fields & 2 == 1
@property
def expanded(self):
return (
self.genesis,
self.height,
self.tx_count,
self.tip,
self.utxo_flush_count,
self.wall_time,
self.catching_up,
self.index_address_statuses,
self.db_version,
self.hist_flush_count,
self.comp_flush_count,
self.comp_cursor,
self.es_sync_height,
self.hashX_status_last_indexed_height
)
class ActiveAmountPrefixRow(PrefixRow):
@ -1420,7 +1448,7 @@ class SupportAmountPrefixRow(PrefixRow):
class DBStatePrefixRow(PrefixRow):
prefix = DB_PREFIXES.db_state.value
value_struct = struct.Struct(b'>32sLL32sLLBBlllL')
value_struct = struct.Struct(b'>32sLL32sLLBBlllLL')
key_struct = struct.Struct(b'')
key_part_lambdas = [
@ -1437,12 +1465,16 @@ class DBStatePrefixRow(PrefixRow):
@classmethod
def pack_value(cls, genesis: bytes, height: int, tx_count: int, tip: bytes, utxo_flush_count: int, wall_time: int,
catching_up: bool, db_version: int, hist_flush_count: int, comp_flush_count: int,
comp_cursor: int, es_sync_height: int) -> bytes:
catching_up: bool, index_address_statuses: bool, db_version: int, hist_flush_count: int,
comp_flush_count: int, comp_cursor: int, es_sync_height: int,
last_indexed_address_statuses: int) -> bytes:
bit_fields = 0
bit_fields |= int(catching_up) << 0
bit_fields |= int(index_address_statuses) << 1
return super().pack_value(
genesis, height, tx_count, tip, utxo_flush_count,
wall_time, 1 if catching_up else 0, db_version, hist_flush_count,
comp_flush_count, comp_cursor, es_sync_height
wall_time, bit_fields, db_version, hist_flush_count,
comp_flush_count, comp_cursor, es_sync_height, last_indexed_address_statuses
)
@classmethod
@ -1451,15 +1483,18 @@ class DBStatePrefixRow(PrefixRow):
# TODO: delete this after making a new snapshot - 10/20/21
# migrate in the es_sync_height if it doesnt exist
data += data[32:36]
if len(data) == 98:
data += data[32:36]
return DBState(*super().unpack_value(data))
@classmethod
def pack_item(cls, genesis: bytes, height: int, tx_count: int, tip: bytes, utxo_flush_count: int, wall_time: int,
catching_up: bool, db_version: int, hist_flush_count: int, comp_flush_count: int,
comp_cursor: int, es_sync_height: int):
catching_up: bool, index_address_statuses: bool, db_version: int, hist_flush_count: int,
comp_flush_count: int, comp_cursor: int, es_sync_height: int, last_indexed_address_statuses: int):
return cls.pack_key(), cls.pack_value(
genesis, height, tx_count, tip, utxo_flush_count, wall_time, catching_up, db_version, hist_flush_count,
comp_flush_count, comp_cursor, es_sync_height
genesis, height, tx_count, tip, utxo_flush_count, wall_time, catching_up, index_address_statuses,
db_version, hist_flush_count, comp_flush_count, comp_cursor, es_sync_height,
last_indexed_address_statuses
)