From c5f18a416696178f57ac3e8ec1a526c9a41ae9a2 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 6 May 2022 11:54:25 -0400 Subject: [PATCH 01/15] expose `--address_history_cache_size` setting for `scribe` --- README.md | 1 + scribe/blockchain/env.py | 9 ++++++++- scribe/blockchain/service.py | 17 ++++------------- 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 91180bf..fa06aab 100644 --- a/README.md +++ b/README.md @@ -88,6 +88,7 @@ For various reasons it may be desirable to block or filtering content from claim #### Options for `scribe` - `--db_max_open_files` This setting translates into the max_open_files option given to rocksdb. A higher number will use more memory. Defaults to 64. + - `--address_history_cache_size` The count of items in the address history cache used for processing blocks and mempool updates. A higher number will use more memory, shouldn't ever need to be higher than 10000. Defaults to 1000. #### Options for `scribe-elastic-sync` - `--reindex` If this flag is set drop and rebuild the elasticsearch index. diff --git a/scribe/blockchain/env.py b/scribe/blockchain/env.py index ee07519..7a7fc09 100644 --- a/scribe/blockchain/env.py +++ b/scribe/blockchain/env.py @@ -5,11 +5,13 @@ class BlockchainEnv(Env): def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, blocking_channel_ids=None, filtering_channel_ids=None, - db_max_open_files=64, daemon_url=None): + db_max_open_files=64, daemon_url=None, hashX_history_cache_size=None): super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes, cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids) self.db_max_open_files = db_max_open_files self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL') + self.hashX_history_cache_size = hashX_history_cache_size if hashX_history_cache_size is not None \ + else self.integer('ADDRESS_HISTORY_CACHE_SIZE', 1000) @classmethod def contribute_to_arg_parser(cls, parser): @@ -22,6 +24,11 @@ class BlockchainEnv(Env): parser.add_argument('--db_max_open_files', type=int, default=64, help='This setting translates into the max_open_files option given to rocksdb. ' 'A higher number will use more memory. Defaults to 64.') + parser.add_argument('--address_history_cache_size', type=int, + default=cls.integer('ADDRESS_HISTORY_CACHE_SIZE', 1000), + help="LRU cache size for address histories, used when processing new blocks " + "and when processing mempool updates. Can be set in env with " + "'ADDRESS_HISTORY_CACHE_SIZE'") @classmethod def from_arg_parser(cls, args): diff --git a/scribe/blockchain/service.py b/scribe/blockchain/service.py index 865c859..a3a4dc8 100644 --- a/scribe/blockchain/service.py +++ b/scribe/blockchain/service.py @@ -50,25 +50,16 @@ class BlockchainProcessorService(BlockchainService): self.coin = env.coin self.wait_for_blocks_duration = 0.1 self._ready_to_stop = asyncio.Event() - + self.blocks_event = asyncio.Event() + self.prefetcher = Prefetcher(self.daemon, env.coin, self.blocks_event) self._caught_up_event: Optional[asyncio.Event] = None self.height = 0 self.tip = bytes.fromhex(self.coin.GENESIS_HASH)[::-1] self.tx_count = 0 - self.blocks_event = asyncio.Event() - self.prefetcher = Prefetcher(self.daemon, env.coin, self.blocks_event) - # self.logger = logging.getLogger(__name__) - - # Meta self.touched_hashXs: Set[bytes] = set() - - # UTXO cache self.utxo_cache: Dict[Tuple[bytes, int], Tuple[bytes, int]] = {} - # Claimtrie cache - self.db_op_stack: Optional['RevertableOpStack'] = None - ################################# # attributes used for calculating stake activations and takeovers per block ################################# @@ -125,8 +116,8 @@ class BlockchainProcessorService(BlockchainService): self.pending_transaction_num_mapping: Dict[bytes, int] = {} self.pending_transactions: Dict[int, bytes] = {} - self.hashX_history_cache = LRUCache(1000) - self.hashX_full_cache = LRUCache(1000) + self.hashX_history_cache = LRUCache(min(100, max(0, env.hashX_history_cache_size))) + self.hashX_full_cache = LRUCache(min(100, max(0, env.hashX_history_cache_size))) async def run_in_thread_with_lock(self, func, *args): # Run in a thread to prevent blocking. Shielded so that From 02922845dd9082e37c5a9b4ccd3a4b40bd6bb147 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 10 May 2022 18:47:14 -0400 Subject: [PATCH 02/15] add multi_put and multi_delete api to the db class --- scribe/db/interface.py | 16 +++++++- scribe/db/revertable.py | 88 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 101 insertions(+), 3 deletions(-) diff --git a/scribe/db/interface.py b/scribe/db/interface.py index b4f430b..5045705 100644 --- a/scribe/db/interface.py +++ b/scribe/db/interface.py @@ -101,6 +101,9 @@ class PrefixRow(metaclass=PrefixRowType): handle_value(result[packed_keys[tuple(k_args)]]) for k_args in key_args ] + def stage_multi_put(self, items): + self._op_stack.multi_put([RevertablePut(self.pack_key(*k), self.pack_value(*v)) for k, v in items]) + def get_pending(self, *key_args, fill_cache=True, deserialize_value=True): packed_key = self.pack_key(*key_args) last_op = self._op_stack.get_last_op_for_key(packed_key) @@ -178,7 +181,7 @@ class BasePrefixDB: cf = self._db.get_column_family(prefix.value) self.column_families[prefix.value] = cf - self._op_stack = RevertableOpStack(self.get, unsafe_prefixes=unsafe_prefixes) + self._op_stack = RevertableOpStack(self.get, self.multi_get, unsafe_prefixes=unsafe_prefixes) self._max_undo_depth = max_undo_depth def unsafe_commit(self): @@ -259,6 +262,17 @@ class BasePrefixDB: cf = self.column_families[key[:1]] return self._db.get((cf, key), fill_cache=fill_cache) + def multi_get(self, keys: typing.List[bytes], fill_cache=True): + first_key = keys[0] + if not all(first_key[0] == key[0] for key in keys): + raise ValueError('cannot multi-delete across column families') + cf = self.column_families[first_key[:1]] + db_result = self._db.multi_get([(cf, k) for k in keys], fill_cache=fill_cache) + return list(db_result.values()) + + def multi_delete(self, items: typing.List[typing.Tuple[bytes, bytes]]): + self._op_stack.multi_delete([RevertableDelete(k, v) for k, v in items]) + def iterator(self, start: bytes, column_family: 'rocksdb.ColumnFamilyHandle' = None, iterate_lower_bound: bytes = None, iterate_upper_bound: bytes = None, reverse: bool = False, include_key: bool = True, include_value: bool = True, diff --git a/scribe/db/revertable.py b/scribe/db/revertable.py index 64e1d88..a982a97 100644 --- a/scribe/db/revertable.py +++ b/scribe/db/revertable.py @@ -2,7 +2,7 @@ import struct import logging from string import printable from collections import defaultdict -from typing import Tuple, Iterable, Callable, Optional +from typing import Tuple, Iterable, Callable, Optional, List from scribe.db.common import DB_PREFIXES _OP_STRUCT = struct.Struct('>BLL') @@ -82,7 +82,8 @@ class OpStackIntegrity(Exception): class RevertableOpStack: - def __init__(self, get_fn: Callable[[bytes], Optional[bytes]], unsafe_prefixes=None): + def __init__(self, get_fn: Callable[[bytes], Optional[bytes]], + multi_get_fn: Callable[[List[bytes]], Iterable[Optional[bytes]]], unsafe_prefixes=None): """ This represents a sequence of revertable puts and deletes to a key-value database that checks for integrity violations when applying the puts and deletes. The integrity checks assure that keys that do not exist @@ -95,6 +96,7 @@ class RevertableOpStack: :param unsafe_prefixes: optional set of prefixes to ignore integrity errors for, violations are still logged """ self._get = get_fn + self._multi_get = multi_get_fn self._items = defaultdict(list) self._unsafe_prefixes = unsafe_prefixes or set() @@ -133,6 +135,88 @@ class RevertableOpStack: raise err self._items[op.key].append(op) + def multi_put(self, ops: List[RevertablePut]): + """ + Apply a put or delete op, checking that it introduces no integrity errors + """ + + if not ops: + return + + need_put = [] + + if not all(op.is_put for op in ops): + raise ValueError(f"list must contain only puts") + if not len(set(map(lambda op: op.key, ops))) == len(ops): + raise ValueError(f"list must contain unique keys") + + for op in ops: + if self._items[op.key] and op.invert() == self._items[op.key][-1]: + self._items[op.key].pop() # if the new op is the inverse of the last op, we can safely null both + continue + elif self._items[op.key] and self._items[op.key][-1] == op: # duplicate of last op + continue # raise an error? + else: + need_put.append(op) + + for op, stored_val in zip(need_put, self._multi_get(list(map(lambda item: item.key, need_put)))): + has_stored_val = stored_val is not None + delete_stored_op = None if not has_stored_val else RevertableDelete(op.key, stored_val) + will_delete_existing_stored = False if delete_stored_op is None else (delete_stored_op in self._items[op.key]) + try: + if has_stored_val and not will_delete_existing_stored: + raise OpStackIntegrity(f"db op tries to overwrite before deleting existing: {op}") + except OpStackIntegrity as err: + if op.key[:1] in self._unsafe_prefixes: + log.debug(f"skipping over integrity error: {err}") + else: + raise err + self._items[op.key].append(op) + + def multi_delete(self, ops: List[RevertableDelete]): + """ + Apply a put or delete op, checking that it introduces no integrity errors + """ + + if not ops: + return + + need_delete = [] + + if not all(op.is_delete for op in ops): + raise ValueError(f"list must contain only deletes") + if not len(set(map(lambda op: op.key, ops))) == len(ops): + raise ValueError(f"list must contain unique keys") + + for op in ops: + if self._items[op.key] and op.invert() == self._items[op.key][-1]: + self._items[op.key].pop() # if the new op is the inverse of the last op, we can safely null both + continue + elif self._items[op.key] and self._items[op.key][-1] == op: # duplicate of last op + continue # raise an error? + else: + need_delete.append(op) + + for op, stored_val in zip(need_delete, self._multi_get(list(map(lambda item: item.key, need_delete)))): + has_stored_val = stored_val is not None + delete_stored_op = None if not has_stored_val else RevertableDelete(op.key, stored_val) + will_delete_existing_stored = False if delete_stored_op is None else (delete_stored_op in self._items[op.key]) + try: + if op.is_delete and has_stored_val and stored_val != op.value and not will_delete_existing_stored: + # there is a value and we're not deleting it in this op + # check that a delete for the stored value is in the stack + raise OpStackIntegrity(f"db op tries to delete with incorrect existing value {op}") + elif not stored_val: + raise OpStackIntegrity(f"db op tries to delete nonexistent key: {op}") + elif op.is_delete and stored_val != op.value: + raise OpStackIntegrity(f"db op tries to delete with incorrect value: {op}") + except OpStackIntegrity as err: + if op.key[:1] in self._unsafe_prefixes: + log.debug(f"skipping over integrity error: {err}") + else: + raise err + self._items[op.key].append(op) + def extend_ops(self, ops: Iterable[RevertableOp]): """ Apply a sequence of put or delete ops, checking that they introduce no integrity errors From 869fc1698ca3b8d744b24fa2a3e87f57f630b224 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 10 May 2022 18:49:22 -0400 Subject: [PATCH 03/15] add tx caching layer to scribe writer to improve performance when cache_all_tx_hashes isn't on --- scribe/blockchain/service.py | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/scribe/blockchain/service.py b/scribe/blockchain/service.py index a3a4dc8..222e269 100644 --- a/scribe/blockchain/service.py +++ b/scribe/blockchain/service.py @@ -118,6 +118,7 @@ class BlockchainProcessorService(BlockchainService): self.hashX_history_cache = LRUCache(min(100, max(0, env.hashX_history_cache_size))) self.hashX_full_cache = LRUCache(min(100, max(0, env.hashX_history_cache_size))) + self.history_tx_info_cache = LRUCache(2 ** 20) async def run_in_thread_with_lock(self, func, *args): # Run in a thread to prevent blocking. Shielded so that @@ -1245,9 +1246,21 @@ class BlockchainProcessorService(BlockchainService): self.hashX_history_cache[hashX] = tx_nums = self.db.read_history(hashX, limit=None) else: tx_nums = self.hashX_history_cache[hashX] + needed_tx_infos = [] + append_needed_tx_info = needed_tx_infos.append + tx_infos = {} + for tx_num in tx_nums: + if tx_num in self.history_tx_info_cache: + tx_infos[tx_num] = self.history_tx_info_cache[tx_num] + else: + append_needed_tx_info(tx_num) + if needed_tx_infos: + for tx_num, tx_hash in zip(needed_tx_infos, self.db.get_tx_hashes(needed_tx_infos)): + tx_infos[tx_num] = self.history_tx_info_cache[tx_num] = f'{tx_hash[::-1].hex()}:{bisect_right(self.db.tx_counts, tx_num):d}:' + history = '' - for tx_num, tx_hash in zip(tx_nums, self.db.get_tx_hashes(tx_nums)): - history += f'{hash_to_hex_str(tx_hash)}:{bisect_right(self.db.tx_counts, tx_num):d}:' + for tx_num in tx_nums: + history += tx_infos[tx_num] self.hashX_full_cache[hashX] = history return history @@ -1491,9 +1504,15 @@ class BlockchainProcessorService(BlockchainService): if self.env.cache_all_tx_hashes: while len(self.db.total_transactions) > self.db.tx_counts[-1]: self.db.tx_num_mapping.pop(self.db.total_transactions.pop()) + if self.tx_count in self.history_tx_info_cache: + self.history_tx_info_cache.pop(self.tx_count) self.tx_count -= 1 else: - self.tx_count = self.db.tx_counts[-1] + new_tx_count = self.db.tx_counts[-1] + while self.tx_count > new_tx_count: + if self.tx_count in self.history_tx_info_cache: + self.history_tx_info_cache.pop(self.tx_count) + self.tx_count -= 1 self.height -= 1 # self.touched can include other addresses which is From e4ac106b982c4ad6a24af10b88ae73939d458516 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 10 May 2022 18:49:53 -0400 Subject: [PATCH 04/15] _get_clear_mempool_ops --- scribe/blockchain/service.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/scribe/blockchain/service.py b/scribe/blockchain/service.py index 222e269..ee858c7 100644 --- a/scribe/blockchain/service.py +++ b/scribe/blockchain/service.py @@ -1395,9 +1395,8 @@ class BlockchainProcessorService(BlockchainService): self.db.prefix_db.tx_count.stage_put(key_args=(height,), value_args=(tx_count,)) - for k, v in self.db.prefix_db.hashX_mempool_status.iterate( - start=(b'\x00' * 20, ), stop=(b'\xff' * 20, ), deserialize_key=False, deserialize_value=False): - self.db.prefix_db.stage_raw_delete(k, v) + # clear the mempool tx index + self._get_clear_mempool_ops() for hashX, new_history in self.hashXs_by_tx.items(): # TODO: combine this with compaction so that we only read the history once @@ -1450,6 +1449,12 @@ class BlockchainProcessorService(BlockchainService): # print("*************\n") return txo_count + def _get_clear_mempool_ops(self): + self.db.prefix_db.multi_delete( + list(self.db.prefix_db.hashX_mempool_status.iterate(start=(b'\x00' * 20, ), stop=(b'\xff' * 20, ), + deserialize_key=False, deserialize_value=False)) + ) + def clear_after_advance_or_reorg(self): self.txo_to_claim.clear() self.claim_hash_to_txo.clear() From d244136efd13be742d4ec27b44e59ce34bb6f9fd Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 10 May 2022 18:55:55 -0400 Subject: [PATCH 05/15] add `--index_address_statuses` option -scribe no longer writes address statuses nor compacts them during initial sync -scribe will only precompute address statuses if `--index_address_statuses` is set -combine history compaction with updating the address status --- scribe/blockchain/env.py | 6 +- scribe/blockchain/service.py | 117 +++++++++++++++++++++++++++-------- scribe/db/db.py | 4 +- scribe/env.py | 10 ++- scribe/hub/mempool.py | 9 ++- scribe/hub/session.py | 13 +++- scribe/service.py | 3 +- 7 files changed, 128 insertions(+), 34 deletions(-) diff --git a/scribe/blockchain/env.py b/scribe/blockchain/env.py index 7a7fc09..c6c2cdd 100644 --- a/scribe/blockchain/env.py +++ b/scribe/blockchain/env.py @@ -5,14 +5,16 @@ class BlockchainEnv(Env): def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, blocking_channel_ids=None, filtering_channel_ids=None, - db_max_open_files=64, daemon_url=None, hashX_history_cache_size=None): + db_max_open_files=64, daemon_url=None, hashX_history_cache_size=None, + index_address_status=None): super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes, - cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids) + cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status) self.db_max_open_files = db_max_open_files self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL') self.hashX_history_cache_size = hashX_history_cache_size if hashX_history_cache_size is not None \ else self.integer('ADDRESS_HISTORY_CACHE_SIZE', 1000) + @classmethod def contribute_to_arg_parser(cls, parser): super().contribute_to_arg_parser(parser) diff --git a/scribe/blockchain/service.py b/scribe/blockchain/service.py index ee858c7..767442f 100644 --- a/scribe/blockchain/service.py +++ b/scribe/blockchain/service.py @@ -146,8 +146,9 @@ class BlockchainProcessorService(BlockchainService): def update_mempool(unsafe_commit, mempool_prefix, to_put, to_delete): self.mempool.remove(to_delete) touched_hashXs = self.mempool.update_mempool(to_put) - for hashX in touched_hashXs: - self._get_update_hashX_mempool_status_ops(hashX) + if self.env.index_address_status: + for hashX in touched_hashXs: + self._get_update_hashX_mempool_status_ops(hashX) for tx_hash, raw_tx in to_put: mempool_prefix.stage_put((tx_hash,), (raw_tx,)) for tx_hash, raw_tx in to_delete.items(): @@ -1290,22 +1291,6 @@ class BlockchainProcessorService(BlockchainService): status = sha256(history.encode()) self.db.prefix_db.hashX_mempool_status.stage_put((hashX,), (status,)) - def _get_compactify_hashX_history_ops(self, height: int, hashX: bytes): - if height > self.env.reorg_limit: # compactify existing history - hist_txs = b'' - # accumulate and delete all of the tx histories between height 1 and current - reorg_limit - for k, hist in self.db.prefix_db.hashX_history.iterate( - start=(hashX, 1), stop=(hashX, height - self.env.reorg_limit), - deserialize_key=False, deserialize_value=False): - hist_txs += hist - self.db.prefix_db.stage_raw_delete(k, hist) - if hist_txs: - # add the accumulated histories onto the existing compacted history at height 0 - key = self.db.prefix_db.hashX_history.pack_key(hashX, 0) - existing = self.db.prefix_db.get(key) - if existing is not None: - self.db.prefix_db.stage_raw_delete(key, existing) - self.db.prefix_db.stage_raw_put(key, (existing or b'') + hist_txs) def advance_block(self, block: Block): height = self.height + 1 @@ -1398,15 +1383,11 @@ class BlockchainProcessorService(BlockchainService): # clear the mempool tx index self._get_clear_mempool_ops() - for hashX, new_history in self.hashXs_by_tx.items(): - # TODO: combine this with compaction so that we only read the history once - self._get_update_hashX_status_ops( - hashX, [(self.pending_transactions[tx_num], height) for tx_num in new_history] - ) - self._get_compactify_hashX_history_ops(height, hashX) - if not new_history: - continue - self.db.prefix_db.hashX_history.stage_put(key_args=(hashX, height), value_args=(new_history,)) + # update hashX history status hashes and compactify the histories + self._get_update_hashX_histories_ops(height) + + if not self.db.catching_up and self.env.index_address_status: + self._get_compactify_ops(height) self.tx_count = tx_count self.db.tx_counts.append(self.tx_count) @@ -1455,6 +1436,88 @@ class BlockchainProcessorService(BlockchainService): deserialize_key=False, deserialize_value=False)) ) + def _get_update_hashX_histories_ops(self, height: int): + self.db.prefix_db.hashX_history.stage_multi_put( + [((hashX, height), (new_tx_nums,)) for hashX, new_tx_nums in self.hashXs_by_tx.items()] + ) + + def _get_compactify_ops(self, height: int): + existing_hashX_statuses = self.db.prefix_db.hashX_status.multi_get([(hashX,) for hashX in self.hashXs_by_tx.keys()], deserialize_value=False) + if existing_hashX_statuses: + pack_key = self.db.prefix_db.hashX_status.pack_key + keys = [ + pack_key(hashX) for hashX, existing in zip( + self.hashXs_by_tx, existing_hashX_statuses + ) + ] + self.db.prefix_db.multi_delete([(k, v) for k, v in zip(keys, existing_hashX_statuses) if v is not None]) + + block_hashX_history_deletes = [] + append_deletes_hashX_history = block_hashX_history_deletes.append + block_hashX_history_puts = [] + + for (hashX, new_tx_nums), existing in zip(self.hashXs_by_tx.items(), existing_hashX_statuses): + new_history = [(self.pending_transactions[tx_num], height) for tx_num in new_tx_nums] + + tx_nums = [] + txs_extend = tx_nums.extend + compact_hist_txs = [] + compact_txs_extend = compact_hist_txs.extend + history_item_0 = None + existing_item_0 = None + reorg_limit = self.env.reorg_limit + unpack_history = self.db.prefix_db.hashX_history.unpack_value + unpack_key = self.db.prefix_db.hashX_history.unpack_key + needs_compaction = False + + total_hist_txs = b'' + for k, hist in self.db.prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_key=False, + deserialize_value=False): + hist_txs = unpack_history(hist) + total_hist_txs += hist + txs_extend(hist_txs) + hist_height = unpack_key(k).height + if height > reorg_limit and hist_height < height - reorg_limit: + compact_txs_extend(hist_txs) + if hist_height == 0: + history_item_0 = (k, hist) + elif hist_height > 0: + needs_compaction = True + # self.db.prefix_db.stage_raw_delete(k, hist) + append_deletes_hashX_history((k, hist)) + existing_item_0 = history_item_0 + if needs_compaction: + # add the accumulated histories onto the existing compacted history at height 0 + if existing_item_0 is not None: # delete if key 0 exists + key, existing = existing_item_0 + append_deletes_hashX_history((key, existing)) + block_hashX_history_puts.append(((hashX, 0), (compact_hist_txs,))) + if not new_history: + continue + + needed_tx_infos = [] + append_needed_tx_info = needed_tx_infos.append + tx_infos = {} + for tx_num in tx_nums: + if tx_num in self.history_tx_info_cache: + tx_infos[tx_num] = self.history_tx_info_cache[tx_num] + else: + append_needed_tx_info(tx_num) + if needed_tx_infos: + for tx_num, tx_hash in zip(needed_tx_infos, self.db.get_tx_hashes(needed_tx_infos)): + tx_info = f'{tx_hash[::-1].hex()}:{bisect_right(self.db.tx_counts, tx_num):d}:' + tx_infos[tx_num] = tx_info + self.history_tx_info_cache[tx_num] = tx_info + history = ''.join(map(tx_infos.__getitem__, tx_nums)) + for tx_hash, height in new_history: + history += f'{tx_hash[::-1].hex()}:{height:d}:' + if history: + status = sha256(history.encode()) + self.db.prefix_db.hashX_status.stage_put((hashX,), (status,)) + + self.db.prefix_db.multi_delete(block_hashX_history_deletes) + self.db.prefix_db.hashX_history.stage_multi_put(block_hashX_history_puts) + def clear_after_advance_or_reorg(self): self.txo_to_claim.clear() self.claim_hash_to_txo.clear() diff --git a/scribe/db/db.py b/scribe/db/db.py index ee34112..e4db988 100644 --- a/scribe/db/db.py +++ b/scribe/db/db.py @@ -39,7 +39,8 @@ class HubDB: def __init__(self, coin, db_dir: str, reorg_limit: int = 200, cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False, secondary_name: str = '', max_open_files: int = 64, blocking_channel_ids: List[str] = None, - filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None): + filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None, + index_address_status=False): self.logger = logging.getLogger(__name__) self.coin = coin self._executor = executor @@ -52,6 +53,7 @@ class HubDB: if secondary_name: assert max_open_files == -1, 'max open files must be -1 for secondary readers' self._db_max_open_files = max_open_files + self._index_address_status = index_address_status self.prefix_db: typing.Optional[PrefixDB] = None self.hist_unflushed = defaultdict(partial(array.array, 'I')) diff --git a/scribe/env.py b/scribe/env.py index a019540..6ca1146 100644 --- a/scribe/env.py +++ b/scribe/env.py @@ -31,7 +31,7 @@ class Env: def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, - blocking_channel_ids=None, filtering_channel_ids=None): + blocking_channel_ids=None, filtering_channel_ids=None, index_address_status=None): self.logger = logging.getLogger(__name__) self.db_dir = db_dir if db_dir is not None else self.required('DB_DIRECTORY') @@ -52,6 +52,8 @@ class Env: 'BLOCKING_CHANNEL_IDS', '').split(' ') self.filtering_channel_ids = filtering_channel_ids if filtering_channel_ids is not None else self.default( 'FILTERING_CHANNEL_IDS', '').split(' ') + self.index_address_status = index_address_status if index_address_status is not None else \ + self.boolean('INDEX_ADDRESS_STATUS', False) @classmethod def default(cls, envvar, default): @@ -187,6 +189,12 @@ class Env: "Claims that are reposted by these channels aren't returned in search results. " "Can be set in env with 'FILTERING_CHANNEL_IDS'", default=cls.default('FILTERING_CHANNEL_IDS', '').split(' ')) + parser.add_argument('--index_address_statuses', action='store_true', + help="Use precomputed address statuses, must be enabled in the reader and the writer to " + "use it. If disabled (the default), the status of an address must be calculated at " + "runtime when clients request it (address subscriptions, address history sync). " + "If enabled, scribe will maintain an index of precomputed statuses", + default=cls.boolean('INDEX_ADDRESS_STATUS', False)) @classmethod def from_arg_parser(cls, args): diff --git a/scribe/hub/mempool.py b/scribe/hub/mempool.py index 8d7dbd9..a2db139 100644 --- a/scribe/hub/mempool.py +++ b/scribe/hub/mempool.py @@ -157,6 +157,14 @@ class HubMemPool: result.append(MemPoolTxSummary(tx_hash, tx.fee, has_ui)) return result + def mempool_history(self, hashX: bytes) -> str: + result = '' + for tx_hash in self.touched_hashXs.get(hashX, ()): + if tx_hash not in self.txs: + continue # the tx hash for the touched address is an input that isn't in mempool anymore + result += f'{tx_hash[::-1].hex()}:{-any(_hash in self.txs for _hash, idx in self.txs[tx_hash].in_pairs):d}:' + return result + def unordered_UTXOs(self, hashX): """Return an unordered list of UTXO named tuples from mempool transactions that pay to hashX. @@ -276,7 +284,6 @@ class HubMemPool: if session.subscribe_headers and height_changed: sent_headers += 1 self._notification_q.put_nowait((session_id, height_changed, hashXes)) - if sent_headers: self.logger.info(f'notified {sent_headers} sessions of new block header') if session_hashxes_to_notify: diff --git a/scribe/hub/session.py b/scribe/hub/session.py index e350d5c..21df5be 100644 --- a/scribe/hub/session.py +++ b/scribe/hub/session.py @@ -1089,7 +1089,18 @@ class LBRYElectrumX(asyncio.Protocol): return len(self.hashX_subs) async def get_hashX_status(self, hashX: bytes): - return await self.loop.run_in_executor(self.db._executor, self.db.get_hashX_status, hashX) + self.session_manager.db.last_flush + if self.env.index_address_status: + loop = self.loop + return await loop.run_in_executor(None, self.db.get_hashX_status, hashX) + history = ''.join( + f"{tx_hash[::-1].hex()}:{height:d}:" + for tx_hash, height in await self.db.limited_history(hashX, limit=None) + ) + self.mempool.mempool_history(hashX) + if not history: + return + status = sha256(history.encode()) + return status.hex() async def send_history_notifications(self, *hashXes: typing.Iterable[bytes]): notifications = [] diff --git a/scribe/service.py b/scribe/service.py index bcde306..d2227fd 100644 --- a/scribe/service.py +++ b/scribe/service.py @@ -30,7 +30,8 @@ class BlockchainService: self.db = HubDB( env.coin, env.db_dir, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes, secondary_name=secondary_name, max_open_files=-1, blocking_channel_ids=env.blocking_channel_ids, - filtering_channel_ids=env.filtering_channel_ids, executor=self._executor + filtering_channel_ids=env.filtering_channel_ids, executor=self._executor, + index_address_status=env.index_address_status ) self._stopping = False From f91d2be91e5a7485ada5e8f3173c444094165005 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 10 May 2022 23:53:15 -0400 Subject: [PATCH 06/15] log if address status index is turned on --- scribe/db/db.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/scribe/db/db.py b/scribe/db/db.py index e4db988..8cdb643 100644 --- a/scribe/db/db.py +++ b/scribe/db/db.py @@ -828,6 +828,8 @@ class HubDB: # read db state self.read_db_state() + self.logger.info("index address statuses: %s", self._index_address_status) + # These are our state as we move ahead of DB state self.fs_height = self.db_height self.fs_tx_count = self.db_tx_count From 460a06ec0441cf597a78775b506c46dac7c2893f Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 11 May 2022 12:46:29 -0400 Subject: [PATCH 07/15] reduce history_tx_info_cache size --- scribe/blockchain/service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scribe/blockchain/service.py b/scribe/blockchain/service.py index 767442f..0de6285 100644 --- a/scribe/blockchain/service.py +++ b/scribe/blockchain/service.py @@ -118,7 +118,7 @@ class BlockchainProcessorService(BlockchainService): self.hashX_history_cache = LRUCache(min(100, max(0, env.hashX_history_cache_size))) self.hashX_full_cache = LRUCache(min(100, max(0, env.hashX_history_cache_size))) - self.history_tx_info_cache = LRUCache(2 ** 20) + self.history_tx_info_cache = LRUCache(2 ** 16) async def run_in_thread_with_lock(self, func, *args): # Run in a thread to prevent blocking. Shielded so that From 708c45504a6bbd4134fd7e8e2f23f67fc3ca74a0 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 12 May 2022 10:48:48 -0400 Subject: [PATCH 08/15] delete unused code --- scribe/blockchain/service.py | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/scribe/blockchain/service.py b/scribe/blockchain/service.py index 0de6285..e67700a 100644 --- a/scribe/blockchain/service.py +++ b/scribe/blockchain/service.py @@ -1265,23 +1265,6 @@ class BlockchainProcessorService(BlockchainService): self.hashX_full_cache[hashX] = history return history - def _get_update_hashX_status_ops(self, hashX: bytes, new_history: List[Tuple[bytes, int]]): - existing = self.db.prefix_db.hashX_status.get(hashX) - if existing: - self.db.prefix_db.hashX_status.stage_delete((hashX,), existing) - if hashX not in self.hashX_history_cache: - tx_nums = self.db.read_history(hashX, limit=None) - else: - tx_nums = self.hashX_history_cache[hashX] - history = '' - for tx_num, tx_hash in zip(tx_nums, self.db.get_tx_hashes(tx_nums)): - history += f'{hash_to_hex_str(tx_hash)}:{bisect_right(self.db.tx_counts, tx_num):d}:' - for tx_hash, height in new_history: - history += f'{hash_to_hex_str(tx_hash)}:{height:d}:' - if history: - status = sha256(history.encode()) - self.db.prefix_db.hashX_status.stage_put((hashX,), (status,)) - def _get_update_hashX_mempool_status_ops(self, hashX: bytes): existing = self.db.prefix_db.hashX_mempool_status.get(hashX) if existing: @@ -1291,7 +1274,6 @@ class BlockchainProcessorService(BlockchainService): status = sha256(history.encode()) self.db.prefix_db.hashX_mempool_status.stage_put((hashX,), (status,)) - def advance_block(self, block: Block): height = self.height + 1 # print("advance ", height) From 32c21a26a9b5b25014b9dd800aa419af75146c9f Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 12 May 2022 12:48:11 -0400 Subject: [PATCH 09/15] cleanup --- scribe/db/db.py | 5 ++++- scribe/hub/session.py | 4 +--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/scribe/db/db.py b/scribe/db/db.py index 8cdb643..b569a47 100644 --- a/scribe/db/db.py +++ b/scribe/db/db.py @@ -864,7 +864,7 @@ class HubDB: self.prefix_db.close() self.prefix_db = None - def get_hashX_status(self, hashX: bytes): + def _get_hashX_status(self, hashX: bytes): mempool_status = self.prefix_db.hashX_mempool_status.get(hashX, deserialize_value=False) if mempool_status: return mempool_status.hex() @@ -872,6 +872,9 @@ class HubDB: if status: return status.hex() + async def get_hashX_status(self, hashX: bytes): + return await asyncio.get_event_loop().run_in_executor(self._executor, self._get_hashX_status, hashX) + def get_tx_hash(self, tx_num: int) -> bytes: if self._cache_all_tx_hashes: return self.total_transactions[tx_num] diff --git a/scribe/hub/session.py b/scribe/hub/session.py index 21df5be..594ee7a 100644 --- a/scribe/hub/session.py +++ b/scribe/hub/session.py @@ -1089,10 +1089,8 @@ class LBRYElectrumX(asyncio.Protocol): return len(self.hashX_subs) async def get_hashX_status(self, hashX: bytes): - self.session_manager.db.last_flush if self.env.index_address_status: - loop = self.loop - return await loop.run_in_executor(None, self.db.get_hashX_status, hashX) + return await self.db.get_hashX_status(hashX) history = ''.join( f"{tx_hash[::-1].hex()}:{height:d}:" for tx_hash, height in await self.db.limited_history(hashX, limit=None) From 51a753c4d26812b93af180d36e1fd65eb4ede269 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 12 May 2022 14:36:14 -0400 Subject: [PATCH 10/15] fix settings --- scribe/blockchain/env.py | 3 ++- scribe/hub/env.py | 7 ++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/scribe/blockchain/env.py b/scribe/blockchain/env.py index c6c2cdd..e98ac06 100644 --- a/scribe/blockchain/env.py +++ b/scribe/blockchain/env.py @@ -38,5 +38,6 @@ class BlockchainEnv(Env): db_dir=args.db_dir, daemon_url=args.daemon_url, db_max_open_files=args.db_max_open_files, max_query_workers=args.max_query_workers, chain=args.chain, reorg_limit=args.reorg_limit, prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes, - cache_all_claim_txos=args.cache_all_claim_txos + cache_all_claim_txos=args.cache_all_claim_txos, index_address_status=args.index_address_statuses, + hashX_history_cache_size=args.address_history_cache_size ) diff --git a/scribe/hub/env.py b/scribe/hub/env.py index 8babd3e..85ada99 100644 --- a/scribe/hub/env.py +++ b/scribe/hub/env.py @@ -10,9 +10,10 @@ class ServerEnv(Env): payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None, session_timeout=None, drop_client=None, description=None, daily_fee=None, database_query_timeout=None, elastic_notifier_host=None, elastic_notifier_port=None, - blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, peer_announce=None): + blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, peer_announce=None, + index_address_status=None): super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes, - cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids) + cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status) self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL') self.host = host if host is not None else self.default('HOST', 'localhost') self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost') @@ -109,5 +110,5 @@ class ServerEnv(Env): drop_client=args.drop_client, description=args.description, daily_fee=args.daily_fee, database_query_timeout=args.query_timeout_ms, blocking_channel_ids=args.blocking_channel_ids, filtering_channel_ids=args.filtering_channel_ids, elastic_notifier_host=args.elastic_notifier_host, - elastic_notifier_port=args.elastic_notifier_port + elastic_notifier_port=args.elastic_notifier_port, index_address_status=args.index_address_statuses ) From bf1667b44d54beb764c385e19d0c587dd9d8e44e Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 12 May 2022 15:21:51 -0400 Subject: [PATCH 11/15] fix turning address status index on and off --- scribe/blockchain/env.py | 11 +++- scribe/blockchain/service.py | 20 +++++++ scribe/db/db.py | 90 ++++++++++++++++++++++++++++-- scribe/db/migrators/migrate8to9.py | 26 +++++++++ scribe/db/prefixes.py | 55 ++++++++++++++---- 5 files changed, 184 insertions(+), 18 deletions(-) create mode 100644 scribe/db/migrators/migrate8to9.py diff --git a/scribe/blockchain/env.py b/scribe/blockchain/env.py index e98ac06..ea5c560 100644 --- a/scribe/blockchain/env.py +++ b/scribe/blockchain/env.py @@ -6,14 +6,15 @@ class BlockchainEnv(Env): prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, blocking_channel_ids=None, filtering_channel_ids=None, db_max_open_files=64, daemon_url=None, hashX_history_cache_size=None, - index_address_status=None): + index_address_status=None, rebuild_address_status_from_height=None): super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes, cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status) self.db_max_open_files = db_max_open_files self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL') self.hashX_history_cache_size = hashX_history_cache_size if hashX_history_cache_size is not None \ else self.integer('ADDRESS_HISTORY_CACHE_SIZE', 1000) - + self.rebuild_address_status_from_height = rebuild_address_status_from_height \ + if isinstance(rebuild_address_status_from_height, int) else -1 @classmethod def contribute_to_arg_parser(cls, parser): @@ -31,6 +32,9 @@ class BlockchainEnv(Env): help="LRU cache size for address histories, used when processing new blocks " "and when processing mempool updates. Can be set in env with " "'ADDRESS_HISTORY_CACHE_SIZE'") + parser.add_argument('--rebuild_address_status_from_height', type=int, default=-1, + help="Rebuild address statuses, set to 0 to reindex all address statuses or provide a " + "block height to start reindexing from. Defaults to -1 (off).") @classmethod def from_arg_parser(cls, args): @@ -39,5 +43,6 @@ class BlockchainEnv(Env): max_query_workers=args.max_query_workers, chain=args.chain, reorg_limit=args.reorg_limit, prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes, cache_all_claim_txos=args.cache_all_claim_txos, index_address_status=args.index_address_statuses, - hashX_history_cache_size=args.address_history_cache_size + hashX_history_cache_size=args.address_history_cache_size, + rebuild_address_status_from_height=args.rebuild_address_status_from_height ) diff --git a/scribe/blockchain/service.py b/scribe/blockchain/service.py index e67700a..952899f 100644 --- a/scribe/blockchain/service.py +++ b/scribe/blockchain/service.py @@ -45,6 +45,7 @@ class BlockchainProcessorService(BlockchainService): def __init__(self, env: 'BlockchainEnv'): super().__init__(env, secondary_name='', thread_workers=1, thread_prefix='block-processor') + self.env = env self.daemon = LBCDaemon(env.coin, env.daemon_url) self.mempool = MemPool(env.coin, self.db) self.coin = env.coin @@ -1704,6 +1705,10 @@ class BlockchainProcessorService(BlockchainService): async def _finished_initial_catch_up(self): self.log.info(f'caught up to height {self.height}') + + # if self.env.index_address_status and self.db.last_indexed_address_status_height != self.db.db_height: + # await self.db.rebuild_hashX_status_index(self.db.last_indexed_address_status_height) + # Flush everything but with catching_up->False state. self.db.catching_up = False @@ -1719,6 +1724,9 @@ class BlockchainProcessorService(BlockchainService): while self.db.db_version < max(self.db.DB_VERSIONS): if self.db.db_version == 7: from scribe.db.migrators.migrate7to8 import migrate, FROM_VERSION, TO_VERSION + elif self.db.db_version == 8: + from scribe.db.migrators.migrate8to9 import migrate, FROM_VERSION, TO_VERSION + self.db._index_address_status = self.env.index_address_status else: raise RuntimeError("unknown db version") self.log.warning(f"migrating database from version {FROM_VERSION} to version {TO_VERSION}") @@ -1726,6 +1734,18 @@ class BlockchainProcessorService(BlockchainService): self.log.info("finished migration") self.db.read_db_state() + # update the hashX status index if was off before and is now on of if requested from a height + if (self.env.index_address_status and not self.db._index_address_status and self.db.last_indexed_address_status_height < self.db.db_height) or self.env.rebuild_address_status_from_height >= 0: + starting_height = self.db.last_indexed_address_status_height + if self.env.rebuild_address_status_from_height >= 0: + starting_height = self.env.rebuild_address_status_from_height + yield self.db.rebuild_hashX_status_index(starting_height) + elif self.db._index_address_status and not self.env.index_address_status: + self.log.warning("turned off address indexing at block %i", self.db.db_height) + self.db._index_address_status = False + self.db.write_db_state() + self.db.prefix_db.unsafe_commit() + self.height = self.db.db_height self.tip = self.db.db_tip self.tx_count = self.db.db_tx_count diff --git a/scribe/db/db.py b/scribe/db/db.py index b569a47..6a509d6 100644 --- a/scribe/db/db.py +++ b/scribe/db/db.py @@ -18,7 +18,7 @@ from scribe.schema.url import URL, normalize_name from scribe.schema.claim import guess_stream_type from scribe.schema.result import Censor from scribe.blockchain.transaction import TxInput -from scribe.common import hash_to_hex_str, hash160, LRUCacheWithMetrics +from scribe.common import hash_to_hex_str, hash160, LRUCacheWithMetrics, sha256 from scribe.db.merkle import Merkle, MerkleCache, FastMerkleCacheItem from scribe.db.common import ResolveResult, STREAM_TYPES, CLAIM_TYPES, ExpandedResolveResult, DBError, UTXO from scribe.db.prefixes import PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, PrefixDB @@ -34,7 +34,7 @@ NAMESPACE = f"{PROMETHEUS_NAMESPACE}_db" class HubDB: - DB_VERSIONS = [7, 8] + DB_VERSIONS = [7, 8, 9] def __init__(self, coin, db_dir: str, reorg_limit: int = 200, cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False, @@ -63,6 +63,7 @@ class HubDB: self.hist_comp_cursor = -1 self.es_sync_height = 0 + self.last_indexed_address_status_height = 0 # blocking/filtering dicts blocking_channels = blocking_channel_ids or [] @@ -864,6 +865,78 @@ class HubDB: self.prefix_db.close() self.prefix_db = None + def _rebuild_hashX_status_index(self, start_height: int): + self.logger.warning("rebuilding the address status index...") + prefix_db = self.prefix_db + + def hashX_iterator(): + last_hashX = None + for k in prefix_db.hashX_history.iterate(deserialize_key=False, include_value=False): + hashX = k[1:12] + if last_hashX is None: + last_hashX = hashX + if last_hashX != hashX: + yield hashX + last_hashX = hashX + if last_hashX: + yield last_hashX + + def hashX_status_from_history(history: bytes) -> bytes: + tx_counts = self.tx_counts + hist_tx_nums = array.array('I') + hist_tx_nums.frombytes(history) + hist = '' + for tx_num in hist_tx_nums: + hist += f'{self.get_tx_hash(tx_num)[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:' + return sha256(hist.encode()) + + start = time.perf_counter() + + if start_height <= 0: + self.logger.info("loading all blockchain addresses, this will take a little while...") + hashXs = [hashX for hashX in hashX_iterator()] + else: + self.logger.info("loading addresses since block %i...", start_height) + hashXs = set() + for touched in prefix_db.touched_hashX.iterate(start=(start_height,), stop=(self.db_height + 1,), + include_key=False): + hashXs.update(touched.touched_hashXs) + hashXs = list(hashXs) + + self.logger.info(f"loaded {len(hashXs)} hashXs in {round(time.perf_counter() - start, 2)}s, " + f"now building the status index...") + op_cnt = 0 + hashX_cnt = 0 + for hashX in hashXs: + hashX_cnt += 1 + key = prefix_db.hashX_status.pack_key(hashX) + history = b''.join(prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_value=False, include_key=False)) + status = hashX_status_from_history(history) + existing_status = prefix_db.hashX_status.get(hashX, deserialize_value=False) + if existing_status and existing_status == status: + continue + elif not existing_status: + prefix_db.stage_raw_put(key, status) + op_cnt += 1 + else: + prefix_db.stage_raw_delete(key, existing_status) + prefix_db.stage_raw_put(key, status) + op_cnt += 2 + if op_cnt > 100000: + prefix_db.unsafe_commit() + self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...") + op_cnt = 0 + if op_cnt: + prefix_db.unsafe_commit() + self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...") + self._index_address_status = True + self.write_db_state() + self.prefix_db.unsafe_commit() + self.logger.info("finished indexing address statuses") + + def rebuild_hashX_status_index(self, start_height: int): + return asyncio.get_event_loop().run_in_executor(self._executor, self._rebuild_hashX_status_index, start_height) + def _get_hashX_status(self, hashX: bytes): mempool_status = self.prefix_db.hashX_mempool_status.get(hashX, deserialize_value=False) if mempool_status: @@ -1126,13 +1199,18 @@ class HubDB: def write_db_state(self): """Write (UTXO) state to the batch.""" + last_indexed_address_status = 0 if self.db_height > 0: - self.prefix_db.db_state.stage_delete((), self.prefix_db.db_state.get()) + existing = self.prefix_db.db_state.get() + last_indexed_address_status = existing.hashX_status_last_indexed_height + self.prefix_db.db_state.stage_delete((), existing.expanded) + if self._index_address_status: + last_indexed_address_status = self.db_height self.prefix_db.db_state.stage_put((), ( self.genesis_bytes, self.db_height, self.db_tx_count, self.db_tip, - self.utxo_flush_count, int(self.wall_time), self.catching_up, self.db_version, + self.utxo_flush_count, int(self.wall_time), self.catching_up, self._index_address_status, self.db_version, self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor, - self.es_sync_height + self.es_sync_height, last_indexed_address_status ) ) @@ -1152,6 +1230,7 @@ class HubDB: self.hist_comp_cursor = -1 self.hist_db_version = max(self.DB_VERSIONS) self.es_sync_height = 0 + self.last_indexed_address_status_height = 0 else: self.db_version = state.db_version if self.db_version not in self.DB_VERSIONS: @@ -1173,6 +1252,7 @@ class HubDB: self.hist_comp_cursor = state.comp_cursor self.hist_db_version = state.db_version self.es_sync_height = state.es_sync_height + self.last_indexed_address_status_height = state.hashX_status_last_indexed_height return state def assert_db_state(self): diff --git a/scribe/db/migrators/migrate8to9.py b/scribe/db/migrators/migrate8to9.py new file mode 100644 index 0000000..d74a398 --- /dev/null +++ b/scribe/db/migrators/migrate8to9.py @@ -0,0 +1,26 @@ +import logging + +FROM_VERSION = 8 +TO_VERSION = 9 + + +def migrate(db): + log = logging.getLogger(__name__) + prefix_db = db.prefix_db + index_address_status = db._index_address_status + + log.info("migrating the db to version 9") + + if not index_address_status: + log.info("deleting the existing address status index") + to_delete = list(prefix_db.hashX_status.iterate(deserialize_key=False, deserialize_value=False)) + while to_delete: + batch, to_delete = to_delete[:10000], to_delete[10000:] + if batch: + prefix_db.multi_delete(batch) + prefix_db.unsafe_commit() + + db.db_version = 9 + db.write_db_state() + db.prefix_db.unsafe_commit() + log.info("finished migration") diff --git a/scribe/db/prefixes.py b/scribe/db/prefixes.py index cb46b06..4cb214b 100644 --- a/scribe/db/prefixes.py +++ b/scribe/db/prefixes.py @@ -420,12 +420,40 @@ class DBState(typing.NamedTuple): tip: bytes utxo_flush_count: int wall_time: int - catching_up: bool + bit_fields: int db_version: int hist_flush_count: int comp_flush_count: int comp_cursor: int es_sync_height: int + hashX_status_last_indexed_height: int + + @property + def catching_up(self) -> bool: + return self.bit_fields & 1 == 1 + + @property + def index_address_statuses(self) -> bool: + return self.bit_fields & 2 == 1 + + @property + def expanded(self): + return ( + self.genesis, + self.height, + self.tx_count, + self.tip, + self.utxo_flush_count, + self.wall_time, + self.catching_up, + self.index_address_statuses, + self.db_version, + self.hist_flush_count, + self.comp_flush_count, + self.comp_cursor, + self.es_sync_height, + self.hashX_status_last_indexed_height + ) class ActiveAmountPrefixRow(PrefixRow): @@ -1420,7 +1448,7 @@ class SupportAmountPrefixRow(PrefixRow): class DBStatePrefixRow(PrefixRow): prefix = DB_PREFIXES.db_state.value - value_struct = struct.Struct(b'>32sLL32sLLBBlllL') + value_struct = struct.Struct(b'>32sLL32sLLBBlllLL') key_struct = struct.Struct(b'') key_part_lambdas = [ @@ -1437,12 +1465,16 @@ class DBStatePrefixRow(PrefixRow): @classmethod def pack_value(cls, genesis: bytes, height: int, tx_count: int, tip: bytes, utxo_flush_count: int, wall_time: int, - catching_up: bool, db_version: int, hist_flush_count: int, comp_flush_count: int, - comp_cursor: int, es_sync_height: int) -> bytes: + catching_up: bool, index_address_statuses: bool, db_version: int, hist_flush_count: int, + comp_flush_count: int, comp_cursor: int, es_sync_height: int, + last_indexed_address_statuses: int) -> bytes: + bit_fields = 0 + bit_fields |= int(catching_up) << 0 + bit_fields |= int(index_address_statuses) << 1 return super().pack_value( genesis, height, tx_count, tip, utxo_flush_count, - wall_time, 1 if catching_up else 0, db_version, hist_flush_count, - comp_flush_count, comp_cursor, es_sync_height + wall_time, bit_fields, db_version, hist_flush_count, + comp_flush_count, comp_cursor, es_sync_height, last_indexed_address_statuses ) @classmethod @@ -1451,15 +1483,18 @@ class DBStatePrefixRow(PrefixRow): # TODO: delete this after making a new snapshot - 10/20/21 # migrate in the es_sync_height if it doesnt exist data += data[32:36] + if len(data) == 98: + data += data[32:36] return DBState(*super().unpack_value(data)) @classmethod def pack_item(cls, genesis: bytes, height: int, tx_count: int, tip: bytes, utxo_flush_count: int, wall_time: int, - catching_up: bool, db_version: int, hist_flush_count: int, comp_flush_count: int, - comp_cursor: int, es_sync_height: int): + catching_up: bool, index_address_statuses: bool, db_version: int, hist_flush_count: int, + comp_flush_count: int, comp_cursor: int, es_sync_height: int, last_indexed_address_statuses: int): return cls.pack_key(), cls.pack_value( - genesis, height, tx_count, tip, utxo_flush_count, wall_time, catching_up, db_version, hist_flush_count, - comp_flush_count, comp_cursor, es_sync_height + genesis, height, tx_count, tip, utxo_flush_count, wall_time, catching_up, index_address_statuses, + db_version, hist_flush_count, comp_flush_count, comp_cursor, es_sync_height, + last_indexed_address_statuses ) From f747637688bf174032f55488b5506782490405aa Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 17 May 2022 10:02:36 -0400 Subject: [PATCH 12/15] reduce cache sizes --- scribe/blockchain/daemon.py | 4 ++-- scribe/db/db.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/scribe/blockchain/daemon.py b/scribe/blockchain/daemon.py index fdf0e4e..745eaa3 100644 --- a/scribe/blockchain/daemon.py +++ b/scribe/blockchain/daemon.py @@ -55,8 +55,8 @@ class LBCDaemon: self._height = None self.available_rpcs = {} self.connector = aiohttp.TCPConnector(ssl=False) - self._block_hash_cache = LRUCacheWithMetrics(100000) - self._block_cache = LRUCacheWithMetrics(2 ** 13, metric_name='block', namespace=NAMESPACE) + self._block_hash_cache = LRUCacheWithMetrics(1024) + self._block_cache = LRUCacheWithMetrics(64, metric_name='block', namespace=NAMESPACE) async def close(self): if self.connector: diff --git a/scribe/db/db.py b/scribe/db/db.py index 6a509d6..bcb3cc5 100644 --- a/scribe/db/db.py +++ b/scribe/db/db.py @@ -83,7 +83,7 @@ class HubDB: self.tx_counts = None self.headers = None self.block_hashes = None - self.encoded_headers = LRUCacheWithMetrics(1 << 21, metric_name='encoded_headers', namespace=NAMESPACE) + self.encoded_headers = LRUCacheWithMetrics(1024, metric_name='encoded_headers', namespace=NAMESPACE) self.last_flush = time.time() # Header merkle cache From 25a8c6b5589c6734c974b0993f32a3b173a75023 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 17 May 2022 11:52:23 -0400 Subject: [PATCH 13/15] remove unneeded labels from prometheus --- scribe/hub/session.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/scribe/hub/session.py b/scribe/hub/session.py index 594ee7a..3ea3cff 100644 --- a/scribe/hub/session.py +++ b/scribe/hub/session.py @@ -128,7 +128,7 @@ class SessionManager: session_count_metric = Gauge("session_count", "Number of connected client sessions", namespace=NAMESPACE, labelnames=("version",)) request_count_metric = Counter("requests_count", "Number of requests received", namespace=NAMESPACE, - labelnames=("method", "version")) + labelnames=("method",)) tx_request_count_metric = Counter("requested_transaction", "Number of transactions requested", namespace=NAMESPACE) tx_replied_count_metric = Counter("replied_transaction", "Number of transactions responded", namespace=NAMESPACE) urls_to_resolve_count_metric = Counter("urls_to_resolve", "Number of urls to resolve", namespace=NAMESPACE) @@ -644,9 +644,9 @@ class LBRYElectrumX(asyncio.Protocol): MAX_CHUNK_SIZE = 40960 session_counter = itertools.count() RESPONSE_TIMES = Histogram("response_time", "Response times", namespace=NAMESPACE, - labelnames=("method", "version"), buckets=HISTOGRAM_BUCKETS) + labelnames=("method",), buckets=HISTOGRAM_BUCKETS) NOTIFICATION_COUNT = Counter("notification", "Number of notifications sent (for subscriptions)", - namespace=NAMESPACE, labelnames=("method", "version")) + namespace=NAMESPACE, labelnames=("method",)) REQUEST_ERRORS_COUNT = Counter( "request_error", "Number of requests that returned errors", namespace=NAMESPACE, labelnames=("method", "version") @@ -793,7 +793,7 @@ class LBRYElectrumX(asyncio.Protocol): """Handle an incoming request. ElectrumX doesn't receive notifications from client sessions. """ - self.session_manager.request_count_metric.labels(method=request.method, version=self.client_version).inc() + self.session_manager.request_count_metric.labels(method=request.method).inc() if isinstance(request, Request): method = request.method @@ -981,10 +981,7 @@ class LBRYElectrumX(asyncio.Protocol): 'internal server error') if isinstance(request, Request): message = request.send_result(result) - self.RESPONSE_TIMES.labels( - method=request.method, - version=self.client_version - ).observe(time.perf_counter() - start) + self.RESPONSE_TIMES.labels(method=request.method).observe(time.perf_counter() - start) if message: await self._send_message(message) if isinstance(result, Exception): @@ -1014,7 +1011,7 @@ class LBRYElectrumX(asyncio.Protocol): async def send_notification(self, method, args=()) -> bool: """Send an RPC notification over the network.""" message = self.connection.send_notification(Notification(method, args)) - self.NOTIFICATION_COUNT.labels(method=method, version=self.client_version).inc() + self.NOTIFICATION_COUNT.labels(method=method).inc() try: await self._send_message(message) return True @@ -1119,7 +1116,7 @@ class LBRYElectrumX(asyncio.Protocol): start = time.perf_counter() self.session_manager.notifications_in_flight_metric.inc() for method, args in notifications: - self.NOTIFICATION_COUNT.labels(method=method, version=self.client_version).inc() + self.NOTIFICATION_COUNT.labels(method=method,).inc() try: await self.send_notifications( Batch([Notification(method, (alias, status)) for (method, (alias, status)) in notifications]) From 830ee294efca147f95d34ae086483d810d904c4f Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 17 May 2022 19:20:31 -0400 Subject: [PATCH 14/15] batched catch up for the address status index --- scribe/blockchain/service.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/scribe/blockchain/service.py b/scribe/blockchain/service.py index 952899f..078f06b 100644 --- a/scribe/blockchain/service.py +++ b/scribe/blockchain/service.py @@ -1693,6 +1693,8 @@ class BlockchainProcessorService(BlockchainService): self._ready_to_stop.set() async def _need_catch_up(self): + self.log.info("database has fallen behind blockchain daemon, catching up") + self.db.catching_up = True def flush(): @@ -1706,8 +1708,8 @@ class BlockchainProcessorService(BlockchainService): async def _finished_initial_catch_up(self): self.log.info(f'caught up to height {self.height}') - # if self.env.index_address_status and self.db.last_indexed_address_status_height != self.db.db_height: - # await self.db.rebuild_hashX_status_index(self.db.last_indexed_address_status_height) + if self.env.index_address_status and self.db.last_indexed_address_status_height < self.db.db_height: + await self.db.rebuild_hashX_status_index(self.db.last_indexed_address_status_height) # Flush everything but with catching_up->False state. self.db.catching_up = False From 14b14686f40ad5cf7a437fa66d850176e4cbc931 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 17 May 2022 19:58:01 -0400 Subject: [PATCH 15/15] update readme --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index fa06aab..fadc822 100644 --- a/README.md +++ b/README.md @@ -89,6 +89,7 @@ For various reasons it may be desirable to block or filtering content from claim #### Options for `scribe` - `--db_max_open_files` This setting translates into the max_open_files option given to rocksdb. A higher number will use more memory. Defaults to 64. - `--address_history_cache_size` The count of items in the address history cache used for processing blocks and mempool updates. A higher number will use more memory, shouldn't ever need to be higher than 10000. Defaults to 1000. + - `--index_address_statuses` Maintain an index of the statuses of address transaction histories, this makes handling notifications for transactions in a block uniformly fast at the expense of more time to process new blocks and somewhat more disk space (~10gb as of block 1161417). #### Options for `scribe-elastic-sync` - `--reindex` If this flag is set drop and rebuild the elasticsearch index. @@ -104,6 +105,7 @@ For various reasons it may be desirable to block or filtering content from claim - `--query_timeout_ms` Timeout for claim searches in elasticsearch in milliseconds. Can be set from the environment with `QUERY_TIMEOUT_MS` - `--blocking_channel_ids` Space separated list of channel claim ids used for blocking. Claims that are reposted by these channels can't be resolved or returned in search results. Can be set from the environment with `BLOCKING_CHANNEL_IDS`. - `--filtering_channel_ids` Space separated list of channel claim ids used for blocking. Claims that are reposted by these channels aren't returned in search results. Can be set from the environment with `FILTERING_CHANNEL_IDS` + - `--index_address_statuses` Use the address history status index, this makes handling notifications for transactions in a block uniformly fast (must be turned on in `scribe` too). ## Contributing