diff --git a/README.md b/README.md index 91180bf..fadc822 100644 --- a/README.md +++ b/README.md @@ -88,6 +88,8 @@ For various reasons it may be desirable to block or filtering content from claim #### Options for `scribe` - `--db_max_open_files` This setting translates into the max_open_files option given to rocksdb. A higher number will use more memory. Defaults to 64. + - `--address_history_cache_size` The count of items in the address history cache used for processing blocks and mempool updates. A higher number will use more memory, shouldn't ever need to be higher than 10000. Defaults to 1000. + - `--index_address_statuses` Maintain an index of the statuses of address transaction histories, this makes handling notifications for transactions in a block uniformly fast at the expense of more time to process new blocks and somewhat more disk space (~10gb as of block 1161417). #### Options for `scribe-elastic-sync` - `--reindex` If this flag is set drop and rebuild the elasticsearch index. @@ -103,6 +105,7 @@ For various reasons it may be desirable to block or filtering content from claim - `--query_timeout_ms` Timeout for claim searches in elasticsearch in milliseconds. Can be set from the environment with `QUERY_TIMEOUT_MS` - `--blocking_channel_ids` Space separated list of channel claim ids used for blocking. Claims that are reposted by these channels can't be resolved or returned in search results. Can be set from the environment with `BLOCKING_CHANNEL_IDS`. - `--filtering_channel_ids` Space separated list of channel claim ids used for blocking. Claims that are reposted by these channels aren't returned in search results. Can be set from the environment with `FILTERING_CHANNEL_IDS` + - `--index_address_statuses` Use the address history status index, this makes handling notifications for transactions in a block uniformly fast (must be turned on in `scribe` too). ## Contributing diff --git a/scribe/blockchain/daemon.py b/scribe/blockchain/daemon.py index fdf0e4e..745eaa3 100644 --- a/scribe/blockchain/daemon.py +++ b/scribe/blockchain/daemon.py @@ -55,8 +55,8 @@ class LBCDaemon: self._height = None self.available_rpcs = {} self.connector = aiohttp.TCPConnector(ssl=False) - self._block_hash_cache = LRUCacheWithMetrics(100000) - self._block_cache = LRUCacheWithMetrics(2 ** 13, metric_name='block', namespace=NAMESPACE) + self._block_hash_cache = LRUCacheWithMetrics(1024) + self._block_cache = LRUCacheWithMetrics(64, metric_name='block', namespace=NAMESPACE) async def close(self): if self.connector: diff --git a/scribe/blockchain/env.py b/scribe/blockchain/env.py index ee07519..ea5c560 100644 --- a/scribe/blockchain/env.py +++ b/scribe/blockchain/env.py @@ -5,11 +5,16 @@ class BlockchainEnv(Env): def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, blocking_channel_ids=None, filtering_channel_ids=None, - db_max_open_files=64, daemon_url=None): + db_max_open_files=64, daemon_url=None, hashX_history_cache_size=None, + index_address_status=None, rebuild_address_status_from_height=None): super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes, - cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids) + cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status) self.db_max_open_files = db_max_open_files self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL') + self.hashX_history_cache_size = hashX_history_cache_size if hashX_history_cache_size is not None \ + else self.integer('ADDRESS_HISTORY_CACHE_SIZE', 1000) + self.rebuild_address_status_from_height = rebuild_address_status_from_height \ + if isinstance(rebuild_address_status_from_height, int) else -1 @classmethod def contribute_to_arg_parser(cls, parser): @@ -22,6 +27,14 @@ class BlockchainEnv(Env): parser.add_argument('--db_max_open_files', type=int, default=64, help='This setting translates into the max_open_files option given to rocksdb. ' 'A higher number will use more memory. Defaults to 64.') + parser.add_argument('--address_history_cache_size', type=int, + default=cls.integer('ADDRESS_HISTORY_CACHE_SIZE', 1000), + help="LRU cache size for address histories, used when processing new blocks " + "and when processing mempool updates. Can be set in env with " + "'ADDRESS_HISTORY_CACHE_SIZE'") + parser.add_argument('--rebuild_address_status_from_height', type=int, default=-1, + help="Rebuild address statuses, set to 0 to reindex all address statuses or provide a " + "block height to start reindexing from. Defaults to -1 (off).") @classmethod def from_arg_parser(cls, args): @@ -29,5 +42,7 @@ class BlockchainEnv(Env): db_dir=args.db_dir, daemon_url=args.daemon_url, db_max_open_files=args.db_max_open_files, max_query_workers=args.max_query_workers, chain=args.chain, reorg_limit=args.reorg_limit, prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes, - cache_all_claim_txos=args.cache_all_claim_txos + cache_all_claim_txos=args.cache_all_claim_txos, index_address_status=args.index_address_statuses, + hashX_history_cache_size=args.address_history_cache_size, + rebuild_address_status_from_height=args.rebuild_address_status_from_height ) diff --git a/scribe/blockchain/service.py b/scribe/blockchain/service.py index 865c859..078f06b 100644 --- a/scribe/blockchain/service.py +++ b/scribe/blockchain/service.py @@ -45,30 +45,22 @@ class BlockchainProcessorService(BlockchainService): def __init__(self, env: 'BlockchainEnv'): super().__init__(env, secondary_name='', thread_workers=1, thread_prefix='block-processor') + self.env = env self.daemon = LBCDaemon(env.coin, env.daemon_url) self.mempool = MemPool(env.coin, self.db) self.coin = env.coin self.wait_for_blocks_duration = 0.1 self._ready_to_stop = asyncio.Event() - + self.blocks_event = asyncio.Event() + self.prefetcher = Prefetcher(self.daemon, env.coin, self.blocks_event) self._caught_up_event: Optional[asyncio.Event] = None self.height = 0 self.tip = bytes.fromhex(self.coin.GENESIS_HASH)[::-1] self.tx_count = 0 - self.blocks_event = asyncio.Event() - self.prefetcher = Prefetcher(self.daemon, env.coin, self.blocks_event) - # self.logger = logging.getLogger(__name__) - - # Meta self.touched_hashXs: Set[bytes] = set() - - # UTXO cache self.utxo_cache: Dict[Tuple[bytes, int], Tuple[bytes, int]] = {} - # Claimtrie cache - self.db_op_stack: Optional['RevertableOpStack'] = None - ################################# # attributes used for calculating stake activations and takeovers per block ################################# @@ -125,8 +117,9 @@ class BlockchainProcessorService(BlockchainService): self.pending_transaction_num_mapping: Dict[bytes, int] = {} self.pending_transactions: Dict[int, bytes] = {} - self.hashX_history_cache = LRUCache(1000) - self.hashX_full_cache = LRUCache(1000) + self.hashX_history_cache = LRUCache(min(100, max(0, env.hashX_history_cache_size))) + self.hashX_full_cache = LRUCache(min(100, max(0, env.hashX_history_cache_size))) + self.history_tx_info_cache = LRUCache(2 ** 16) async def run_in_thread_with_lock(self, func, *args): # Run in a thread to prevent blocking. Shielded so that @@ -154,8 +147,9 @@ class BlockchainProcessorService(BlockchainService): def update_mempool(unsafe_commit, mempool_prefix, to_put, to_delete): self.mempool.remove(to_delete) touched_hashXs = self.mempool.update_mempool(to_put) - for hashX in touched_hashXs: - self._get_update_hashX_mempool_status_ops(hashX) + if self.env.index_address_status: + for hashX in touched_hashXs: + self._get_update_hashX_mempool_status_ops(hashX) for tx_hash, raw_tx in to_put: mempool_prefix.stage_put((tx_hash,), (raw_tx,)) for tx_hash, raw_tx in to_delete.items(): @@ -1254,29 +1248,24 @@ class BlockchainProcessorService(BlockchainService): self.hashX_history_cache[hashX] = tx_nums = self.db.read_history(hashX, limit=None) else: tx_nums = self.hashX_history_cache[hashX] + needed_tx_infos = [] + append_needed_tx_info = needed_tx_infos.append + tx_infos = {} + for tx_num in tx_nums: + if tx_num in self.history_tx_info_cache: + tx_infos[tx_num] = self.history_tx_info_cache[tx_num] + else: + append_needed_tx_info(tx_num) + if needed_tx_infos: + for tx_num, tx_hash in zip(needed_tx_infos, self.db.get_tx_hashes(needed_tx_infos)): + tx_infos[tx_num] = self.history_tx_info_cache[tx_num] = f'{tx_hash[::-1].hex()}:{bisect_right(self.db.tx_counts, tx_num):d}:' + history = '' - for tx_num, tx_hash in zip(tx_nums, self.db.get_tx_hashes(tx_nums)): - history += f'{hash_to_hex_str(tx_hash)}:{bisect_right(self.db.tx_counts, tx_num):d}:' + for tx_num in tx_nums: + history += tx_infos[tx_num] self.hashX_full_cache[hashX] = history return history - def _get_update_hashX_status_ops(self, hashX: bytes, new_history: List[Tuple[bytes, int]]): - existing = self.db.prefix_db.hashX_status.get(hashX) - if existing: - self.db.prefix_db.hashX_status.stage_delete((hashX,), existing) - if hashX not in self.hashX_history_cache: - tx_nums = self.db.read_history(hashX, limit=None) - else: - tx_nums = self.hashX_history_cache[hashX] - history = '' - for tx_num, tx_hash in zip(tx_nums, self.db.get_tx_hashes(tx_nums)): - history += f'{hash_to_hex_str(tx_hash)}:{bisect_right(self.db.tx_counts, tx_num):d}:' - for tx_hash, height in new_history: - history += f'{hash_to_hex_str(tx_hash)}:{height:d}:' - if history: - status = sha256(history.encode()) - self.db.prefix_db.hashX_status.stage_put((hashX,), (status,)) - def _get_update_hashX_mempool_status_ops(self, hashX: bytes): existing = self.db.prefix_db.hashX_mempool_status.get(hashX) if existing: @@ -1286,23 +1275,6 @@ class BlockchainProcessorService(BlockchainService): status = sha256(history.encode()) self.db.prefix_db.hashX_mempool_status.stage_put((hashX,), (status,)) - def _get_compactify_hashX_history_ops(self, height: int, hashX: bytes): - if height > self.env.reorg_limit: # compactify existing history - hist_txs = b'' - # accumulate and delete all of the tx histories between height 1 and current - reorg_limit - for k, hist in self.db.prefix_db.hashX_history.iterate( - start=(hashX, 1), stop=(hashX, height - self.env.reorg_limit), - deserialize_key=False, deserialize_value=False): - hist_txs += hist - self.db.prefix_db.stage_raw_delete(k, hist) - if hist_txs: - # add the accumulated histories onto the existing compacted history at height 0 - key = self.db.prefix_db.hashX_history.pack_key(hashX, 0) - existing = self.db.prefix_db.get(key) - if existing is not None: - self.db.prefix_db.stage_raw_delete(key, existing) - self.db.prefix_db.stage_raw_put(key, (existing or b'') + hist_txs) - def advance_block(self, block: Block): height = self.height + 1 # print("advance ", height) @@ -1391,19 +1363,14 @@ class BlockchainProcessorService(BlockchainService): self.db.prefix_db.tx_count.stage_put(key_args=(height,), value_args=(tx_count,)) - for k, v in self.db.prefix_db.hashX_mempool_status.iterate( - start=(b'\x00' * 20, ), stop=(b'\xff' * 20, ), deserialize_key=False, deserialize_value=False): - self.db.prefix_db.stage_raw_delete(k, v) + # clear the mempool tx index + self._get_clear_mempool_ops() - for hashX, new_history in self.hashXs_by_tx.items(): - # TODO: combine this with compaction so that we only read the history once - self._get_update_hashX_status_ops( - hashX, [(self.pending_transactions[tx_num], height) for tx_num in new_history] - ) - self._get_compactify_hashX_history_ops(height, hashX) - if not new_history: - continue - self.db.prefix_db.hashX_history.stage_put(key_args=(hashX, height), value_args=(new_history,)) + # update hashX history status hashes and compactify the histories + self._get_update_hashX_histories_ops(height) + + if not self.db.catching_up and self.env.index_address_status: + self._get_compactify_ops(height) self.tx_count = tx_count self.db.tx_counts.append(self.tx_count) @@ -1446,6 +1413,94 @@ class BlockchainProcessorService(BlockchainService): # print("*************\n") return txo_count + def _get_clear_mempool_ops(self): + self.db.prefix_db.multi_delete( + list(self.db.prefix_db.hashX_mempool_status.iterate(start=(b'\x00' * 20, ), stop=(b'\xff' * 20, ), + deserialize_key=False, deserialize_value=False)) + ) + + def _get_update_hashX_histories_ops(self, height: int): + self.db.prefix_db.hashX_history.stage_multi_put( + [((hashX, height), (new_tx_nums,)) for hashX, new_tx_nums in self.hashXs_by_tx.items()] + ) + + def _get_compactify_ops(self, height: int): + existing_hashX_statuses = self.db.prefix_db.hashX_status.multi_get([(hashX,) for hashX in self.hashXs_by_tx.keys()], deserialize_value=False) + if existing_hashX_statuses: + pack_key = self.db.prefix_db.hashX_status.pack_key + keys = [ + pack_key(hashX) for hashX, existing in zip( + self.hashXs_by_tx, existing_hashX_statuses + ) + ] + self.db.prefix_db.multi_delete([(k, v) for k, v in zip(keys, existing_hashX_statuses) if v is not None]) + + block_hashX_history_deletes = [] + append_deletes_hashX_history = block_hashX_history_deletes.append + block_hashX_history_puts = [] + + for (hashX, new_tx_nums), existing in zip(self.hashXs_by_tx.items(), existing_hashX_statuses): + new_history = [(self.pending_transactions[tx_num], height) for tx_num in new_tx_nums] + + tx_nums = [] + txs_extend = tx_nums.extend + compact_hist_txs = [] + compact_txs_extend = compact_hist_txs.extend + history_item_0 = None + existing_item_0 = None + reorg_limit = self.env.reorg_limit + unpack_history = self.db.prefix_db.hashX_history.unpack_value + unpack_key = self.db.prefix_db.hashX_history.unpack_key + needs_compaction = False + + total_hist_txs = b'' + for k, hist in self.db.prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_key=False, + deserialize_value=False): + hist_txs = unpack_history(hist) + total_hist_txs += hist + txs_extend(hist_txs) + hist_height = unpack_key(k).height + if height > reorg_limit and hist_height < height - reorg_limit: + compact_txs_extend(hist_txs) + if hist_height == 0: + history_item_0 = (k, hist) + elif hist_height > 0: + needs_compaction = True + # self.db.prefix_db.stage_raw_delete(k, hist) + append_deletes_hashX_history((k, hist)) + existing_item_0 = history_item_0 + if needs_compaction: + # add the accumulated histories onto the existing compacted history at height 0 + if existing_item_0 is not None: # delete if key 0 exists + key, existing = existing_item_0 + append_deletes_hashX_history((key, existing)) + block_hashX_history_puts.append(((hashX, 0), (compact_hist_txs,))) + if not new_history: + continue + + needed_tx_infos = [] + append_needed_tx_info = needed_tx_infos.append + tx_infos = {} + for tx_num in tx_nums: + if tx_num in self.history_tx_info_cache: + tx_infos[tx_num] = self.history_tx_info_cache[tx_num] + else: + append_needed_tx_info(tx_num) + if needed_tx_infos: + for tx_num, tx_hash in zip(needed_tx_infos, self.db.get_tx_hashes(needed_tx_infos)): + tx_info = f'{tx_hash[::-1].hex()}:{bisect_right(self.db.tx_counts, tx_num):d}:' + tx_infos[tx_num] = tx_info + self.history_tx_info_cache[tx_num] = tx_info + history = ''.join(map(tx_infos.__getitem__, tx_nums)) + for tx_hash, height in new_history: + history += f'{tx_hash[::-1].hex()}:{height:d}:' + if history: + status = sha256(history.encode()) + self.db.prefix_db.hashX_status.stage_put((hashX,), (status,)) + + self.db.prefix_db.multi_delete(block_hashX_history_deletes) + self.db.prefix_db.hashX_history.stage_multi_put(block_hashX_history_puts) + def clear_after_advance_or_reorg(self): self.txo_to_claim.clear() self.claim_hash_to_txo.clear() @@ -1500,9 +1555,15 @@ class BlockchainProcessorService(BlockchainService): if self.env.cache_all_tx_hashes: while len(self.db.total_transactions) > self.db.tx_counts[-1]: self.db.tx_num_mapping.pop(self.db.total_transactions.pop()) + if self.tx_count in self.history_tx_info_cache: + self.history_tx_info_cache.pop(self.tx_count) self.tx_count -= 1 else: - self.tx_count = self.db.tx_counts[-1] + new_tx_count = self.db.tx_counts[-1] + while self.tx_count > new_tx_count: + if self.tx_count in self.history_tx_info_cache: + self.history_tx_info_cache.pop(self.tx_count) + self.tx_count -= 1 self.height -= 1 # self.touched can include other addresses which is @@ -1632,6 +1693,8 @@ class BlockchainProcessorService(BlockchainService): self._ready_to_stop.set() async def _need_catch_up(self): + self.log.info("database has fallen behind blockchain daemon, catching up") + self.db.catching_up = True def flush(): @@ -1644,6 +1707,10 @@ class BlockchainProcessorService(BlockchainService): async def _finished_initial_catch_up(self): self.log.info(f'caught up to height {self.height}') + + if self.env.index_address_status and self.db.last_indexed_address_status_height < self.db.db_height: + await self.db.rebuild_hashX_status_index(self.db.last_indexed_address_status_height) + # Flush everything but with catching_up->False state. self.db.catching_up = False @@ -1659,6 +1726,9 @@ class BlockchainProcessorService(BlockchainService): while self.db.db_version < max(self.db.DB_VERSIONS): if self.db.db_version == 7: from scribe.db.migrators.migrate7to8 import migrate, FROM_VERSION, TO_VERSION + elif self.db.db_version == 8: + from scribe.db.migrators.migrate8to9 import migrate, FROM_VERSION, TO_VERSION + self.db._index_address_status = self.env.index_address_status else: raise RuntimeError("unknown db version") self.log.warning(f"migrating database from version {FROM_VERSION} to version {TO_VERSION}") @@ -1666,6 +1736,18 @@ class BlockchainProcessorService(BlockchainService): self.log.info("finished migration") self.db.read_db_state() + # update the hashX status index if was off before and is now on of if requested from a height + if (self.env.index_address_status and not self.db._index_address_status and self.db.last_indexed_address_status_height < self.db.db_height) or self.env.rebuild_address_status_from_height >= 0: + starting_height = self.db.last_indexed_address_status_height + if self.env.rebuild_address_status_from_height >= 0: + starting_height = self.env.rebuild_address_status_from_height + yield self.db.rebuild_hashX_status_index(starting_height) + elif self.db._index_address_status and not self.env.index_address_status: + self.log.warning("turned off address indexing at block %i", self.db.db_height) + self.db._index_address_status = False + self.db.write_db_state() + self.db.prefix_db.unsafe_commit() + self.height = self.db.db_height self.tip = self.db.db_tip self.tx_count = self.db.db_tx_count diff --git a/scribe/db/db.py b/scribe/db/db.py index ee34112..bcb3cc5 100644 --- a/scribe/db/db.py +++ b/scribe/db/db.py @@ -18,7 +18,7 @@ from scribe.schema.url import URL, normalize_name from scribe.schema.claim import guess_stream_type from scribe.schema.result import Censor from scribe.blockchain.transaction import TxInput -from scribe.common import hash_to_hex_str, hash160, LRUCacheWithMetrics +from scribe.common import hash_to_hex_str, hash160, LRUCacheWithMetrics, sha256 from scribe.db.merkle import Merkle, MerkleCache, FastMerkleCacheItem from scribe.db.common import ResolveResult, STREAM_TYPES, CLAIM_TYPES, ExpandedResolveResult, DBError, UTXO from scribe.db.prefixes import PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, PrefixDB @@ -34,12 +34,13 @@ NAMESPACE = f"{PROMETHEUS_NAMESPACE}_db" class HubDB: - DB_VERSIONS = [7, 8] + DB_VERSIONS = [7, 8, 9] def __init__(self, coin, db_dir: str, reorg_limit: int = 200, cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False, secondary_name: str = '', max_open_files: int = 64, blocking_channel_ids: List[str] = None, - filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None): + filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None, + index_address_status=False): self.logger = logging.getLogger(__name__) self.coin = coin self._executor = executor @@ -52,6 +53,7 @@ class HubDB: if secondary_name: assert max_open_files == -1, 'max open files must be -1 for secondary readers' self._db_max_open_files = max_open_files + self._index_address_status = index_address_status self.prefix_db: typing.Optional[PrefixDB] = None self.hist_unflushed = defaultdict(partial(array.array, 'I')) @@ -61,6 +63,7 @@ class HubDB: self.hist_comp_cursor = -1 self.es_sync_height = 0 + self.last_indexed_address_status_height = 0 # blocking/filtering dicts blocking_channels = blocking_channel_ids or [] @@ -80,7 +83,7 @@ class HubDB: self.tx_counts = None self.headers = None self.block_hashes = None - self.encoded_headers = LRUCacheWithMetrics(1 << 21, metric_name='encoded_headers', namespace=NAMESPACE) + self.encoded_headers = LRUCacheWithMetrics(1024, metric_name='encoded_headers', namespace=NAMESPACE) self.last_flush = time.time() # Header merkle cache @@ -826,6 +829,8 @@ class HubDB: # read db state self.read_db_state() + self.logger.info("index address statuses: %s", self._index_address_status) + # These are our state as we move ahead of DB state self.fs_height = self.db_height self.fs_tx_count = self.db_tx_count @@ -860,7 +865,79 @@ class HubDB: self.prefix_db.close() self.prefix_db = None - def get_hashX_status(self, hashX: bytes): + def _rebuild_hashX_status_index(self, start_height: int): + self.logger.warning("rebuilding the address status index...") + prefix_db = self.prefix_db + + def hashX_iterator(): + last_hashX = None + for k in prefix_db.hashX_history.iterate(deserialize_key=False, include_value=False): + hashX = k[1:12] + if last_hashX is None: + last_hashX = hashX + if last_hashX != hashX: + yield hashX + last_hashX = hashX + if last_hashX: + yield last_hashX + + def hashX_status_from_history(history: bytes) -> bytes: + tx_counts = self.tx_counts + hist_tx_nums = array.array('I') + hist_tx_nums.frombytes(history) + hist = '' + for tx_num in hist_tx_nums: + hist += f'{self.get_tx_hash(tx_num)[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:' + return sha256(hist.encode()) + + start = time.perf_counter() + + if start_height <= 0: + self.logger.info("loading all blockchain addresses, this will take a little while...") + hashXs = [hashX for hashX in hashX_iterator()] + else: + self.logger.info("loading addresses since block %i...", start_height) + hashXs = set() + for touched in prefix_db.touched_hashX.iterate(start=(start_height,), stop=(self.db_height + 1,), + include_key=False): + hashXs.update(touched.touched_hashXs) + hashXs = list(hashXs) + + self.logger.info(f"loaded {len(hashXs)} hashXs in {round(time.perf_counter() - start, 2)}s, " + f"now building the status index...") + op_cnt = 0 + hashX_cnt = 0 + for hashX in hashXs: + hashX_cnt += 1 + key = prefix_db.hashX_status.pack_key(hashX) + history = b''.join(prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_value=False, include_key=False)) + status = hashX_status_from_history(history) + existing_status = prefix_db.hashX_status.get(hashX, deserialize_value=False) + if existing_status and existing_status == status: + continue + elif not existing_status: + prefix_db.stage_raw_put(key, status) + op_cnt += 1 + else: + prefix_db.stage_raw_delete(key, existing_status) + prefix_db.stage_raw_put(key, status) + op_cnt += 2 + if op_cnt > 100000: + prefix_db.unsafe_commit() + self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...") + op_cnt = 0 + if op_cnt: + prefix_db.unsafe_commit() + self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...") + self._index_address_status = True + self.write_db_state() + self.prefix_db.unsafe_commit() + self.logger.info("finished indexing address statuses") + + def rebuild_hashX_status_index(self, start_height: int): + return asyncio.get_event_loop().run_in_executor(self._executor, self._rebuild_hashX_status_index, start_height) + + def _get_hashX_status(self, hashX: bytes): mempool_status = self.prefix_db.hashX_mempool_status.get(hashX, deserialize_value=False) if mempool_status: return mempool_status.hex() @@ -868,6 +945,9 @@ class HubDB: if status: return status.hex() + async def get_hashX_status(self, hashX: bytes): + return await asyncio.get_event_loop().run_in_executor(self._executor, self._get_hashX_status, hashX) + def get_tx_hash(self, tx_num: int) -> bytes: if self._cache_all_tx_hashes: return self.total_transactions[tx_num] @@ -1119,13 +1199,18 @@ class HubDB: def write_db_state(self): """Write (UTXO) state to the batch.""" + last_indexed_address_status = 0 if self.db_height > 0: - self.prefix_db.db_state.stage_delete((), self.prefix_db.db_state.get()) + existing = self.prefix_db.db_state.get() + last_indexed_address_status = existing.hashX_status_last_indexed_height + self.prefix_db.db_state.stage_delete((), existing.expanded) + if self._index_address_status: + last_indexed_address_status = self.db_height self.prefix_db.db_state.stage_put((), ( self.genesis_bytes, self.db_height, self.db_tx_count, self.db_tip, - self.utxo_flush_count, int(self.wall_time), self.catching_up, self.db_version, + self.utxo_flush_count, int(self.wall_time), self.catching_up, self._index_address_status, self.db_version, self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor, - self.es_sync_height + self.es_sync_height, last_indexed_address_status ) ) @@ -1145,6 +1230,7 @@ class HubDB: self.hist_comp_cursor = -1 self.hist_db_version = max(self.DB_VERSIONS) self.es_sync_height = 0 + self.last_indexed_address_status_height = 0 else: self.db_version = state.db_version if self.db_version not in self.DB_VERSIONS: @@ -1166,6 +1252,7 @@ class HubDB: self.hist_comp_cursor = state.comp_cursor self.hist_db_version = state.db_version self.es_sync_height = state.es_sync_height + self.last_indexed_address_status_height = state.hashX_status_last_indexed_height return state def assert_db_state(self): diff --git a/scribe/db/interface.py b/scribe/db/interface.py index b4f430b..5045705 100644 --- a/scribe/db/interface.py +++ b/scribe/db/interface.py @@ -101,6 +101,9 @@ class PrefixRow(metaclass=PrefixRowType): handle_value(result[packed_keys[tuple(k_args)]]) for k_args in key_args ] + def stage_multi_put(self, items): + self._op_stack.multi_put([RevertablePut(self.pack_key(*k), self.pack_value(*v)) for k, v in items]) + def get_pending(self, *key_args, fill_cache=True, deserialize_value=True): packed_key = self.pack_key(*key_args) last_op = self._op_stack.get_last_op_for_key(packed_key) @@ -178,7 +181,7 @@ class BasePrefixDB: cf = self._db.get_column_family(prefix.value) self.column_families[prefix.value] = cf - self._op_stack = RevertableOpStack(self.get, unsafe_prefixes=unsafe_prefixes) + self._op_stack = RevertableOpStack(self.get, self.multi_get, unsafe_prefixes=unsafe_prefixes) self._max_undo_depth = max_undo_depth def unsafe_commit(self): @@ -259,6 +262,17 @@ class BasePrefixDB: cf = self.column_families[key[:1]] return self._db.get((cf, key), fill_cache=fill_cache) + def multi_get(self, keys: typing.List[bytes], fill_cache=True): + first_key = keys[0] + if not all(first_key[0] == key[0] for key in keys): + raise ValueError('cannot multi-delete across column families') + cf = self.column_families[first_key[:1]] + db_result = self._db.multi_get([(cf, k) for k in keys], fill_cache=fill_cache) + return list(db_result.values()) + + def multi_delete(self, items: typing.List[typing.Tuple[bytes, bytes]]): + self._op_stack.multi_delete([RevertableDelete(k, v) for k, v in items]) + def iterator(self, start: bytes, column_family: 'rocksdb.ColumnFamilyHandle' = None, iterate_lower_bound: bytes = None, iterate_upper_bound: bytes = None, reverse: bool = False, include_key: bool = True, include_value: bool = True, diff --git a/scribe/db/migrators/migrate8to9.py b/scribe/db/migrators/migrate8to9.py new file mode 100644 index 0000000..d74a398 --- /dev/null +++ b/scribe/db/migrators/migrate8to9.py @@ -0,0 +1,26 @@ +import logging + +FROM_VERSION = 8 +TO_VERSION = 9 + + +def migrate(db): + log = logging.getLogger(__name__) + prefix_db = db.prefix_db + index_address_status = db._index_address_status + + log.info("migrating the db to version 9") + + if not index_address_status: + log.info("deleting the existing address status index") + to_delete = list(prefix_db.hashX_status.iterate(deserialize_key=False, deserialize_value=False)) + while to_delete: + batch, to_delete = to_delete[:10000], to_delete[10000:] + if batch: + prefix_db.multi_delete(batch) + prefix_db.unsafe_commit() + + db.db_version = 9 + db.write_db_state() + db.prefix_db.unsafe_commit() + log.info("finished migration") diff --git a/scribe/db/prefixes.py b/scribe/db/prefixes.py index cb46b06..4cb214b 100644 --- a/scribe/db/prefixes.py +++ b/scribe/db/prefixes.py @@ -420,12 +420,40 @@ class DBState(typing.NamedTuple): tip: bytes utxo_flush_count: int wall_time: int - catching_up: bool + bit_fields: int db_version: int hist_flush_count: int comp_flush_count: int comp_cursor: int es_sync_height: int + hashX_status_last_indexed_height: int + + @property + def catching_up(self) -> bool: + return self.bit_fields & 1 == 1 + + @property + def index_address_statuses(self) -> bool: + return self.bit_fields & 2 == 1 + + @property + def expanded(self): + return ( + self.genesis, + self.height, + self.tx_count, + self.tip, + self.utxo_flush_count, + self.wall_time, + self.catching_up, + self.index_address_statuses, + self.db_version, + self.hist_flush_count, + self.comp_flush_count, + self.comp_cursor, + self.es_sync_height, + self.hashX_status_last_indexed_height + ) class ActiveAmountPrefixRow(PrefixRow): @@ -1420,7 +1448,7 @@ class SupportAmountPrefixRow(PrefixRow): class DBStatePrefixRow(PrefixRow): prefix = DB_PREFIXES.db_state.value - value_struct = struct.Struct(b'>32sLL32sLLBBlllL') + value_struct = struct.Struct(b'>32sLL32sLLBBlllLL') key_struct = struct.Struct(b'') key_part_lambdas = [ @@ -1437,12 +1465,16 @@ class DBStatePrefixRow(PrefixRow): @classmethod def pack_value(cls, genesis: bytes, height: int, tx_count: int, tip: bytes, utxo_flush_count: int, wall_time: int, - catching_up: bool, db_version: int, hist_flush_count: int, comp_flush_count: int, - comp_cursor: int, es_sync_height: int) -> bytes: + catching_up: bool, index_address_statuses: bool, db_version: int, hist_flush_count: int, + comp_flush_count: int, comp_cursor: int, es_sync_height: int, + last_indexed_address_statuses: int) -> bytes: + bit_fields = 0 + bit_fields |= int(catching_up) << 0 + bit_fields |= int(index_address_statuses) << 1 return super().pack_value( genesis, height, tx_count, tip, utxo_flush_count, - wall_time, 1 if catching_up else 0, db_version, hist_flush_count, - comp_flush_count, comp_cursor, es_sync_height + wall_time, bit_fields, db_version, hist_flush_count, + comp_flush_count, comp_cursor, es_sync_height, last_indexed_address_statuses ) @classmethod @@ -1451,15 +1483,18 @@ class DBStatePrefixRow(PrefixRow): # TODO: delete this after making a new snapshot - 10/20/21 # migrate in the es_sync_height if it doesnt exist data += data[32:36] + if len(data) == 98: + data += data[32:36] return DBState(*super().unpack_value(data)) @classmethod def pack_item(cls, genesis: bytes, height: int, tx_count: int, tip: bytes, utxo_flush_count: int, wall_time: int, - catching_up: bool, db_version: int, hist_flush_count: int, comp_flush_count: int, - comp_cursor: int, es_sync_height: int): + catching_up: bool, index_address_statuses: bool, db_version: int, hist_flush_count: int, + comp_flush_count: int, comp_cursor: int, es_sync_height: int, last_indexed_address_statuses: int): return cls.pack_key(), cls.pack_value( - genesis, height, tx_count, tip, utxo_flush_count, wall_time, catching_up, db_version, hist_flush_count, - comp_flush_count, comp_cursor, es_sync_height + genesis, height, tx_count, tip, utxo_flush_count, wall_time, catching_up, index_address_statuses, + db_version, hist_flush_count, comp_flush_count, comp_cursor, es_sync_height, + last_indexed_address_statuses ) diff --git a/scribe/db/revertable.py b/scribe/db/revertable.py index 64e1d88..a982a97 100644 --- a/scribe/db/revertable.py +++ b/scribe/db/revertable.py @@ -2,7 +2,7 @@ import struct import logging from string import printable from collections import defaultdict -from typing import Tuple, Iterable, Callable, Optional +from typing import Tuple, Iterable, Callable, Optional, List from scribe.db.common import DB_PREFIXES _OP_STRUCT = struct.Struct('>BLL') @@ -82,7 +82,8 @@ class OpStackIntegrity(Exception): class RevertableOpStack: - def __init__(self, get_fn: Callable[[bytes], Optional[bytes]], unsafe_prefixes=None): + def __init__(self, get_fn: Callable[[bytes], Optional[bytes]], + multi_get_fn: Callable[[List[bytes]], Iterable[Optional[bytes]]], unsafe_prefixes=None): """ This represents a sequence of revertable puts and deletes to a key-value database that checks for integrity violations when applying the puts and deletes. The integrity checks assure that keys that do not exist @@ -95,6 +96,7 @@ class RevertableOpStack: :param unsafe_prefixes: optional set of prefixes to ignore integrity errors for, violations are still logged """ self._get = get_fn + self._multi_get = multi_get_fn self._items = defaultdict(list) self._unsafe_prefixes = unsafe_prefixes or set() @@ -133,6 +135,88 @@ class RevertableOpStack: raise err self._items[op.key].append(op) + def multi_put(self, ops: List[RevertablePut]): + """ + Apply a put or delete op, checking that it introduces no integrity errors + """ + + if not ops: + return + + need_put = [] + + if not all(op.is_put for op in ops): + raise ValueError(f"list must contain only puts") + if not len(set(map(lambda op: op.key, ops))) == len(ops): + raise ValueError(f"list must contain unique keys") + + for op in ops: + if self._items[op.key] and op.invert() == self._items[op.key][-1]: + self._items[op.key].pop() # if the new op is the inverse of the last op, we can safely null both + continue + elif self._items[op.key] and self._items[op.key][-1] == op: # duplicate of last op + continue # raise an error? + else: + need_put.append(op) + + for op, stored_val in zip(need_put, self._multi_get(list(map(lambda item: item.key, need_put)))): + has_stored_val = stored_val is not None + delete_stored_op = None if not has_stored_val else RevertableDelete(op.key, stored_val) + will_delete_existing_stored = False if delete_stored_op is None else (delete_stored_op in self._items[op.key]) + try: + if has_stored_val and not will_delete_existing_stored: + raise OpStackIntegrity(f"db op tries to overwrite before deleting existing: {op}") + except OpStackIntegrity as err: + if op.key[:1] in self._unsafe_prefixes: + log.debug(f"skipping over integrity error: {err}") + else: + raise err + self._items[op.key].append(op) + + def multi_delete(self, ops: List[RevertableDelete]): + """ + Apply a put or delete op, checking that it introduces no integrity errors + """ + + if not ops: + return + + need_delete = [] + + if not all(op.is_delete for op in ops): + raise ValueError(f"list must contain only deletes") + if not len(set(map(lambda op: op.key, ops))) == len(ops): + raise ValueError(f"list must contain unique keys") + + for op in ops: + if self._items[op.key] and op.invert() == self._items[op.key][-1]: + self._items[op.key].pop() # if the new op is the inverse of the last op, we can safely null both + continue + elif self._items[op.key] and self._items[op.key][-1] == op: # duplicate of last op + continue # raise an error? + else: + need_delete.append(op) + + for op, stored_val in zip(need_delete, self._multi_get(list(map(lambda item: item.key, need_delete)))): + has_stored_val = stored_val is not None + delete_stored_op = None if not has_stored_val else RevertableDelete(op.key, stored_val) + will_delete_existing_stored = False if delete_stored_op is None else (delete_stored_op in self._items[op.key]) + try: + if op.is_delete and has_stored_val and stored_val != op.value and not will_delete_existing_stored: + # there is a value and we're not deleting it in this op + # check that a delete for the stored value is in the stack + raise OpStackIntegrity(f"db op tries to delete with incorrect existing value {op}") + elif not stored_val: + raise OpStackIntegrity(f"db op tries to delete nonexistent key: {op}") + elif op.is_delete and stored_val != op.value: + raise OpStackIntegrity(f"db op tries to delete with incorrect value: {op}") + except OpStackIntegrity as err: + if op.key[:1] in self._unsafe_prefixes: + log.debug(f"skipping over integrity error: {err}") + else: + raise err + self._items[op.key].append(op) + def extend_ops(self, ops: Iterable[RevertableOp]): """ Apply a sequence of put or delete ops, checking that they introduce no integrity errors diff --git a/scribe/env.py b/scribe/env.py index a019540..6ca1146 100644 --- a/scribe/env.py +++ b/scribe/env.py @@ -31,7 +31,7 @@ class Env: def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, - blocking_channel_ids=None, filtering_channel_ids=None): + blocking_channel_ids=None, filtering_channel_ids=None, index_address_status=None): self.logger = logging.getLogger(__name__) self.db_dir = db_dir if db_dir is not None else self.required('DB_DIRECTORY') @@ -52,6 +52,8 @@ class Env: 'BLOCKING_CHANNEL_IDS', '').split(' ') self.filtering_channel_ids = filtering_channel_ids if filtering_channel_ids is not None else self.default( 'FILTERING_CHANNEL_IDS', '').split(' ') + self.index_address_status = index_address_status if index_address_status is not None else \ + self.boolean('INDEX_ADDRESS_STATUS', False) @classmethod def default(cls, envvar, default): @@ -187,6 +189,12 @@ class Env: "Claims that are reposted by these channels aren't returned in search results. " "Can be set in env with 'FILTERING_CHANNEL_IDS'", default=cls.default('FILTERING_CHANNEL_IDS', '').split(' ')) + parser.add_argument('--index_address_statuses', action='store_true', + help="Use precomputed address statuses, must be enabled in the reader and the writer to " + "use it. If disabled (the default), the status of an address must be calculated at " + "runtime when clients request it (address subscriptions, address history sync). " + "If enabled, scribe will maintain an index of precomputed statuses", + default=cls.boolean('INDEX_ADDRESS_STATUS', False)) @classmethod def from_arg_parser(cls, args): diff --git a/scribe/hub/env.py b/scribe/hub/env.py index 8babd3e..85ada99 100644 --- a/scribe/hub/env.py +++ b/scribe/hub/env.py @@ -10,9 +10,10 @@ class ServerEnv(Env): payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None, session_timeout=None, drop_client=None, description=None, daily_fee=None, database_query_timeout=None, elastic_notifier_host=None, elastic_notifier_port=None, - blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, peer_announce=None): + blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, peer_announce=None, + index_address_status=None): super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes, - cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids) + cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status) self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL') self.host = host if host is not None else self.default('HOST', 'localhost') self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost') @@ -109,5 +110,5 @@ class ServerEnv(Env): drop_client=args.drop_client, description=args.description, daily_fee=args.daily_fee, database_query_timeout=args.query_timeout_ms, blocking_channel_ids=args.blocking_channel_ids, filtering_channel_ids=args.filtering_channel_ids, elastic_notifier_host=args.elastic_notifier_host, - elastic_notifier_port=args.elastic_notifier_port + elastic_notifier_port=args.elastic_notifier_port, index_address_status=args.index_address_statuses ) diff --git a/scribe/hub/mempool.py b/scribe/hub/mempool.py index 8d7dbd9..a2db139 100644 --- a/scribe/hub/mempool.py +++ b/scribe/hub/mempool.py @@ -157,6 +157,14 @@ class HubMemPool: result.append(MemPoolTxSummary(tx_hash, tx.fee, has_ui)) return result + def mempool_history(self, hashX: bytes) -> str: + result = '' + for tx_hash in self.touched_hashXs.get(hashX, ()): + if tx_hash not in self.txs: + continue # the tx hash for the touched address is an input that isn't in mempool anymore + result += f'{tx_hash[::-1].hex()}:{-any(_hash in self.txs for _hash, idx in self.txs[tx_hash].in_pairs):d}:' + return result + def unordered_UTXOs(self, hashX): """Return an unordered list of UTXO named tuples from mempool transactions that pay to hashX. @@ -276,7 +284,6 @@ class HubMemPool: if session.subscribe_headers and height_changed: sent_headers += 1 self._notification_q.put_nowait((session_id, height_changed, hashXes)) - if sent_headers: self.logger.info(f'notified {sent_headers} sessions of new block header') if session_hashxes_to_notify: diff --git a/scribe/hub/session.py b/scribe/hub/session.py index e350d5c..3ea3cff 100644 --- a/scribe/hub/session.py +++ b/scribe/hub/session.py @@ -128,7 +128,7 @@ class SessionManager: session_count_metric = Gauge("session_count", "Number of connected client sessions", namespace=NAMESPACE, labelnames=("version",)) request_count_metric = Counter("requests_count", "Number of requests received", namespace=NAMESPACE, - labelnames=("method", "version")) + labelnames=("method",)) tx_request_count_metric = Counter("requested_transaction", "Number of transactions requested", namespace=NAMESPACE) tx_replied_count_metric = Counter("replied_transaction", "Number of transactions responded", namespace=NAMESPACE) urls_to_resolve_count_metric = Counter("urls_to_resolve", "Number of urls to resolve", namespace=NAMESPACE) @@ -644,9 +644,9 @@ class LBRYElectrumX(asyncio.Protocol): MAX_CHUNK_SIZE = 40960 session_counter = itertools.count() RESPONSE_TIMES = Histogram("response_time", "Response times", namespace=NAMESPACE, - labelnames=("method", "version"), buckets=HISTOGRAM_BUCKETS) + labelnames=("method",), buckets=HISTOGRAM_BUCKETS) NOTIFICATION_COUNT = Counter("notification", "Number of notifications sent (for subscriptions)", - namespace=NAMESPACE, labelnames=("method", "version")) + namespace=NAMESPACE, labelnames=("method",)) REQUEST_ERRORS_COUNT = Counter( "request_error", "Number of requests that returned errors", namespace=NAMESPACE, labelnames=("method", "version") @@ -793,7 +793,7 @@ class LBRYElectrumX(asyncio.Protocol): """Handle an incoming request. ElectrumX doesn't receive notifications from client sessions. """ - self.session_manager.request_count_metric.labels(method=request.method, version=self.client_version).inc() + self.session_manager.request_count_metric.labels(method=request.method).inc() if isinstance(request, Request): method = request.method @@ -981,10 +981,7 @@ class LBRYElectrumX(asyncio.Protocol): 'internal server error') if isinstance(request, Request): message = request.send_result(result) - self.RESPONSE_TIMES.labels( - method=request.method, - version=self.client_version - ).observe(time.perf_counter() - start) + self.RESPONSE_TIMES.labels(method=request.method).observe(time.perf_counter() - start) if message: await self._send_message(message) if isinstance(result, Exception): @@ -1014,7 +1011,7 @@ class LBRYElectrumX(asyncio.Protocol): async def send_notification(self, method, args=()) -> bool: """Send an RPC notification over the network.""" message = self.connection.send_notification(Notification(method, args)) - self.NOTIFICATION_COUNT.labels(method=method, version=self.client_version).inc() + self.NOTIFICATION_COUNT.labels(method=method).inc() try: await self._send_message(message) return True @@ -1089,7 +1086,16 @@ class LBRYElectrumX(asyncio.Protocol): return len(self.hashX_subs) async def get_hashX_status(self, hashX: bytes): - return await self.loop.run_in_executor(self.db._executor, self.db.get_hashX_status, hashX) + if self.env.index_address_status: + return await self.db.get_hashX_status(hashX) + history = ''.join( + f"{tx_hash[::-1].hex()}:{height:d}:" + for tx_hash, height in await self.db.limited_history(hashX, limit=None) + ) + self.mempool.mempool_history(hashX) + if not history: + return + status = sha256(history.encode()) + return status.hex() async def send_history_notifications(self, *hashXes: typing.Iterable[bytes]): notifications = [] @@ -1110,7 +1116,7 @@ class LBRYElectrumX(asyncio.Protocol): start = time.perf_counter() self.session_manager.notifications_in_flight_metric.inc() for method, args in notifications: - self.NOTIFICATION_COUNT.labels(method=method, version=self.client_version).inc() + self.NOTIFICATION_COUNT.labels(method=method,).inc() try: await self.send_notifications( Batch([Notification(method, (alias, status)) for (method, (alias, status)) in notifications]) diff --git a/scribe/service.py b/scribe/service.py index bcde306..d2227fd 100644 --- a/scribe/service.py +++ b/scribe/service.py @@ -30,7 +30,8 @@ class BlockchainService: self.db = HubDB( env.coin, env.db_dir, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes, secondary_name=secondary_name, max_open_files=-1, blocking_channel_ids=env.blocking_channel_ids, - filtering_channel_ids=env.filtering_channel_ids, executor=self._executor + filtering_channel_ids=env.filtering_channel_ids, executor=self._executor, + index_address_status=env.index_address_status ) self._stopping = False