diff --git a/scribe/blockchain/env.py b/scribe/blockchain/env.py index e98ac06..ea5c560 100644 --- a/scribe/blockchain/env.py +++ b/scribe/blockchain/env.py @@ -6,14 +6,15 @@ class BlockchainEnv(Env): prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, blocking_channel_ids=None, filtering_channel_ids=None, db_max_open_files=64, daemon_url=None, hashX_history_cache_size=None, - index_address_status=None): + index_address_status=None, rebuild_address_status_from_height=None): super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes, cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status) self.db_max_open_files = db_max_open_files self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL') self.hashX_history_cache_size = hashX_history_cache_size if hashX_history_cache_size is not None \ else self.integer('ADDRESS_HISTORY_CACHE_SIZE', 1000) - + self.rebuild_address_status_from_height = rebuild_address_status_from_height \ + if isinstance(rebuild_address_status_from_height, int) else -1 @classmethod def contribute_to_arg_parser(cls, parser): @@ -31,6 +32,9 @@ class BlockchainEnv(Env): help="LRU cache size for address histories, used when processing new blocks " "and when processing mempool updates. Can be set in env with " "'ADDRESS_HISTORY_CACHE_SIZE'") + parser.add_argument('--rebuild_address_status_from_height', type=int, default=-1, + help="Rebuild address statuses, set to 0 to reindex all address statuses or provide a " + "block height to start reindexing from. Defaults to -1 (off).") @classmethod def from_arg_parser(cls, args): @@ -39,5 +43,6 @@ class BlockchainEnv(Env): max_query_workers=args.max_query_workers, chain=args.chain, reorg_limit=args.reorg_limit, prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes, cache_all_claim_txos=args.cache_all_claim_txos, index_address_status=args.index_address_statuses, - hashX_history_cache_size=args.address_history_cache_size + hashX_history_cache_size=args.address_history_cache_size, + rebuild_address_status_from_height=args.rebuild_address_status_from_height ) diff --git a/scribe/blockchain/service.py b/scribe/blockchain/service.py index e67700a..952899f 100644 --- a/scribe/blockchain/service.py +++ b/scribe/blockchain/service.py @@ -45,6 +45,7 @@ class BlockchainProcessorService(BlockchainService): def __init__(self, env: 'BlockchainEnv'): super().__init__(env, secondary_name='', thread_workers=1, thread_prefix='block-processor') + self.env = env self.daemon = LBCDaemon(env.coin, env.daemon_url) self.mempool = MemPool(env.coin, self.db) self.coin = env.coin @@ -1704,6 +1705,10 @@ class BlockchainProcessorService(BlockchainService): async def _finished_initial_catch_up(self): self.log.info(f'caught up to height {self.height}') + + # if self.env.index_address_status and self.db.last_indexed_address_status_height != self.db.db_height: + # await self.db.rebuild_hashX_status_index(self.db.last_indexed_address_status_height) + # Flush everything but with catching_up->False state. self.db.catching_up = False @@ -1719,6 +1724,9 @@ class BlockchainProcessorService(BlockchainService): while self.db.db_version < max(self.db.DB_VERSIONS): if self.db.db_version == 7: from scribe.db.migrators.migrate7to8 import migrate, FROM_VERSION, TO_VERSION + elif self.db.db_version == 8: + from scribe.db.migrators.migrate8to9 import migrate, FROM_VERSION, TO_VERSION + self.db._index_address_status = self.env.index_address_status else: raise RuntimeError("unknown db version") self.log.warning(f"migrating database from version {FROM_VERSION} to version {TO_VERSION}") @@ -1726,6 +1734,18 @@ class BlockchainProcessorService(BlockchainService): self.log.info("finished migration") self.db.read_db_state() + # update the hashX status index if was off before and is now on of if requested from a height + if (self.env.index_address_status and not self.db._index_address_status and self.db.last_indexed_address_status_height < self.db.db_height) or self.env.rebuild_address_status_from_height >= 0: + starting_height = self.db.last_indexed_address_status_height + if self.env.rebuild_address_status_from_height >= 0: + starting_height = self.env.rebuild_address_status_from_height + yield self.db.rebuild_hashX_status_index(starting_height) + elif self.db._index_address_status and not self.env.index_address_status: + self.log.warning("turned off address indexing at block %i", self.db.db_height) + self.db._index_address_status = False + self.db.write_db_state() + self.db.prefix_db.unsafe_commit() + self.height = self.db.db_height self.tip = self.db.db_tip self.tx_count = self.db.db_tx_count diff --git a/scribe/db/db.py b/scribe/db/db.py index b569a47..6a509d6 100644 --- a/scribe/db/db.py +++ b/scribe/db/db.py @@ -18,7 +18,7 @@ from scribe.schema.url import URL, normalize_name from scribe.schema.claim import guess_stream_type from scribe.schema.result import Censor from scribe.blockchain.transaction import TxInput -from scribe.common import hash_to_hex_str, hash160, LRUCacheWithMetrics +from scribe.common import hash_to_hex_str, hash160, LRUCacheWithMetrics, sha256 from scribe.db.merkle import Merkle, MerkleCache, FastMerkleCacheItem from scribe.db.common import ResolveResult, STREAM_TYPES, CLAIM_TYPES, ExpandedResolveResult, DBError, UTXO from scribe.db.prefixes import PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, PrefixDB @@ -34,7 +34,7 @@ NAMESPACE = f"{PROMETHEUS_NAMESPACE}_db" class HubDB: - DB_VERSIONS = [7, 8] + DB_VERSIONS = [7, 8, 9] def __init__(self, coin, db_dir: str, reorg_limit: int = 200, cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False, @@ -63,6 +63,7 @@ class HubDB: self.hist_comp_cursor = -1 self.es_sync_height = 0 + self.last_indexed_address_status_height = 0 # blocking/filtering dicts blocking_channels = blocking_channel_ids or [] @@ -864,6 +865,78 @@ class HubDB: self.prefix_db.close() self.prefix_db = None + def _rebuild_hashX_status_index(self, start_height: int): + self.logger.warning("rebuilding the address status index...") + prefix_db = self.prefix_db + + def hashX_iterator(): + last_hashX = None + for k in prefix_db.hashX_history.iterate(deserialize_key=False, include_value=False): + hashX = k[1:12] + if last_hashX is None: + last_hashX = hashX + if last_hashX != hashX: + yield hashX + last_hashX = hashX + if last_hashX: + yield last_hashX + + def hashX_status_from_history(history: bytes) -> bytes: + tx_counts = self.tx_counts + hist_tx_nums = array.array('I') + hist_tx_nums.frombytes(history) + hist = '' + for tx_num in hist_tx_nums: + hist += f'{self.get_tx_hash(tx_num)[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:' + return sha256(hist.encode()) + + start = time.perf_counter() + + if start_height <= 0: + self.logger.info("loading all blockchain addresses, this will take a little while...") + hashXs = [hashX for hashX in hashX_iterator()] + else: + self.logger.info("loading addresses since block %i...", start_height) + hashXs = set() + for touched in prefix_db.touched_hashX.iterate(start=(start_height,), stop=(self.db_height + 1,), + include_key=False): + hashXs.update(touched.touched_hashXs) + hashXs = list(hashXs) + + self.logger.info(f"loaded {len(hashXs)} hashXs in {round(time.perf_counter() - start, 2)}s, " + f"now building the status index...") + op_cnt = 0 + hashX_cnt = 0 + for hashX in hashXs: + hashX_cnt += 1 + key = prefix_db.hashX_status.pack_key(hashX) + history = b''.join(prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_value=False, include_key=False)) + status = hashX_status_from_history(history) + existing_status = prefix_db.hashX_status.get(hashX, deserialize_value=False) + if existing_status and existing_status == status: + continue + elif not existing_status: + prefix_db.stage_raw_put(key, status) + op_cnt += 1 + else: + prefix_db.stage_raw_delete(key, existing_status) + prefix_db.stage_raw_put(key, status) + op_cnt += 2 + if op_cnt > 100000: + prefix_db.unsafe_commit() + self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...") + op_cnt = 0 + if op_cnt: + prefix_db.unsafe_commit() + self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...") + self._index_address_status = True + self.write_db_state() + self.prefix_db.unsafe_commit() + self.logger.info("finished indexing address statuses") + + def rebuild_hashX_status_index(self, start_height: int): + return asyncio.get_event_loop().run_in_executor(self._executor, self._rebuild_hashX_status_index, start_height) + def _get_hashX_status(self, hashX: bytes): mempool_status = self.prefix_db.hashX_mempool_status.get(hashX, deserialize_value=False) if mempool_status: @@ -1126,13 +1199,18 @@ class HubDB: def write_db_state(self): """Write (UTXO) state to the batch.""" + last_indexed_address_status = 0 if self.db_height > 0: - self.prefix_db.db_state.stage_delete((), self.prefix_db.db_state.get()) + existing = self.prefix_db.db_state.get() + last_indexed_address_status = existing.hashX_status_last_indexed_height + self.prefix_db.db_state.stage_delete((), existing.expanded) + if self._index_address_status: + last_indexed_address_status = self.db_height self.prefix_db.db_state.stage_put((), ( self.genesis_bytes, self.db_height, self.db_tx_count, self.db_tip, - self.utxo_flush_count, int(self.wall_time), self.catching_up, self.db_version, + self.utxo_flush_count, int(self.wall_time), self.catching_up, self._index_address_status, self.db_version, self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor, - self.es_sync_height + self.es_sync_height, last_indexed_address_status ) ) @@ -1152,6 +1230,7 @@ class HubDB: self.hist_comp_cursor = -1 self.hist_db_version = max(self.DB_VERSIONS) self.es_sync_height = 0 + self.last_indexed_address_status_height = 0 else: self.db_version = state.db_version if self.db_version not in self.DB_VERSIONS: @@ -1173,6 +1252,7 @@ class HubDB: self.hist_comp_cursor = state.comp_cursor self.hist_db_version = state.db_version self.es_sync_height = state.es_sync_height + self.last_indexed_address_status_height = state.hashX_status_last_indexed_height return state def assert_db_state(self): diff --git a/scribe/db/migrators/migrate8to9.py b/scribe/db/migrators/migrate8to9.py new file mode 100644 index 0000000..d74a398 --- /dev/null +++ b/scribe/db/migrators/migrate8to9.py @@ -0,0 +1,26 @@ +import logging + +FROM_VERSION = 8 +TO_VERSION = 9 + + +def migrate(db): + log = logging.getLogger(__name__) + prefix_db = db.prefix_db + index_address_status = db._index_address_status + + log.info("migrating the db to version 9") + + if not index_address_status: + log.info("deleting the existing address status index") + to_delete = list(prefix_db.hashX_status.iterate(deserialize_key=False, deserialize_value=False)) + while to_delete: + batch, to_delete = to_delete[:10000], to_delete[10000:] + if batch: + prefix_db.multi_delete(batch) + prefix_db.unsafe_commit() + + db.db_version = 9 + db.write_db_state() + db.prefix_db.unsafe_commit() + log.info("finished migration") diff --git a/scribe/db/prefixes.py b/scribe/db/prefixes.py index cb46b06..4cb214b 100644 --- a/scribe/db/prefixes.py +++ b/scribe/db/prefixes.py @@ -420,12 +420,40 @@ class DBState(typing.NamedTuple): tip: bytes utxo_flush_count: int wall_time: int - catching_up: bool + bit_fields: int db_version: int hist_flush_count: int comp_flush_count: int comp_cursor: int es_sync_height: int + hashX_status_last_indexed_height: int + + @property + def catching_up(self) -> bool: + return self.bit_fields & 1 == 1 + + @property + def index_address_statuses(self) -> bool: + return self.bit_fields & 2 == 1 + + @property + def expanded(self): + return ( + self.genesis, + self.height, + self.tx_count, + self.tip, + self.utxo_flush_count, + self.wall_time, + self.catching_up, + self.index_address_statuses, + self.db_version, + self.hist_flush_count, + self.comp_flush_count, + self.comp_cursor, + self.es_sync_height, + self.hashX_status_last_indexed_height + ) class ActiveAmountPrefixRow(PrefixRow): @@ -1420,7 +1448,7 @@ class SupportAmountPrefixRow(PrefixRow): class DBStatePrefixRow(PrefixRow): prefix = DB_PREFIXES.db_state.value - value_struct = struct.Struct(b'>32sLL32sLLBBlllL') + value_struct = struct.Struct(b'>32sLL32sLLBBlllLL') key_struct = struct.Struct(b'') key_part_lambdas = [ @@ -1437,12 +1465,16 @@ class DBStatePrefixRow(PrefixRow): @classmethod def pack_value(cls, genesis: bytes, height: int, tx_count: int, tip: bytes, utxo_flush_count: int, wall_time: int, - catching_up: bool, db_version: int, hist_flush_count: int, comp_flush_count: int, - comp_cursor: int, es_sync_height: int) -> bytes: + catching_up: bool, index_address_statuses: bool, db_version: int, hist_flush_count: int, + comp_flush_count: int, comp_cursor: int, es_sync_height: int, + last_indexed_address_statuses: int) -> bytes: + bit_fields = 0 + bit_fields |= int(catching_up) << 0 + bit_fields |= int(index_address_statuses) << 1 return super().pack_value( genesis, height, tx_count, tip, utxo_flush_count, - wall_time, 1 if catching_up else 0, db_version, hist_flush_count, - comp_flush_count, comp_cursor, es_sync_height + wall_time, bit_fields, db_version, hist_flush_count, + comp_flush_count, comp_cursor, es_sync_height, last_indexed_address_statuses ) @classmethod @@ -1451,15 +1483,18 @@ class DBStatePrefixRow(PrefixRow): # TODO: delete this after making a new snapshot - 10/20/21 # migrate in the es_sync_height if it doesnt exist data += data[32:36] + if len(data) == 98: + data += data[32:36] return DBState(*super().unpack_value(data)) @classmethod def pack_item(cls, genesis: bytes, height: int, tx_count: int, tip: bytes, utxo_flush_count: int, wall_time: int, - catching_up: bool, db_version: int, hist_flush_count: int, comp_flush_count: int, - comp_cursor: int, es_sync_height: int): + catching_up: bool, index_address_statuses: bool, db_version: int, hist_flush_count: int, + comp_flush_count: int, comp_cursor: int, es_sync_height: int, last_indexed_address_statuses: int): return cls.pack_key(), cls.pack_value( - genesis, height, tx_count, tip, utxo_flush_count, wall_time, catching_up, db_version, hist_flush_count, - comp_flush_count, comp_cursor, es_sync_height + genesis, height, tx_count, tip, utxo_flush_count, wall_time, catching_up, index_address_statuses, + db_version, hist_flush_count, comp_flush_count, comp_cursor, es_sync_height, + last_indexed_address_statuses )