From 4f0359c17777426b29cc7f0ebccb0c52e124ef75 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 13 Apr 2022 15:49:29 -0400 Subject: [PATCH] expose rocksdb `multi_get` api for column families -use multi_get for fetching tx hashes when they're not all held in memory --- scribe/blockchain/service.py | 8 ++++---- scribe/db/db.py | 14 ++++++++++---- scribe/db/interface.py | 16 ++++++++++++++++ 3 files changed, 30 insertions(+), 8 deletions(-) diff --git a/scribe/blockchain/service.py b/scribe/blockchain/service.py index 1204213..8375031 100644 --- a/scribe/blockchain/service.py +++ b/scribe/blockchain/service.py @@ -1255,8 +1255,8 @@ class BlockchainProcessorService(BlockchainService): else: tx_nums = self.hashX_history_cache[hashX] history = '' - for tx_num in tx_nums: - history += f'{hash_to_hex_str(self.db.get_tx_hash(tx_num))}:{bisect_right(self.db.tx_counts, tx_num):d}:' + for tx_num, tx_hash in zip(tx_nums, self.db.get_tx_hashes(tx_nums)): + history += f'{hash_to_hex_str(tx_hash)}:{bisect_right(self.db.tx_counts, tx_num):d}:' self.hashX_full_cache[hashX] = history return history @@ -1269,8 +1269,8 @@ class BlockchainProcessorService(BlockchainService): else: tx_nums = self.hashX_history_cache[hashX] history = '' - for tx_num in tx_nums: - history += f'{hash_to_hex_str(self.db.get_tx_hash(tx_num))}:{bisect_right(self.db.tx_counts, tx_num):d}:' + for tx_num, tx_hash in zip(tx_nums, self.db.get_tx_hashes(tx_nums)): + history += f'{hash_to_hex_str(tx_hash)}:{bisect_right(self.db.tx_counts, tx_num):d}:' for tx_hash, height in new_history: history += f'{hash_to_hex_str(tx_hash)}:{height:d}:' if history: diff --git a/scribe/db/db.py b/scribe/db/db.py index 98fccae..879e4ca 100644 --- a/scribe/db/db.py +++ b/scribe/db/db.py @@ -863,6 +863,11 @@ class HubDB: return self.total_transactions[tx_num] return self.prefix_db.tx_hash.get(tx_num, deserialize_value=False) + def get_tx_hashes(self, tx_nums: List[int]) -> List[Optional[bytes]]: + if self._cache_all_tx_hashes: + return [None if tx_num > self.db_tx_count else self.total_transactions[tx_num] for tx_num in tx_nums] + return self.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in tx_nums], deserialize_value=False) + def get_raw_mempool_tx(self, tx_hash: bytes) -> Optional[bytes]: return self.prefix_db.mempool_tx.get(tx_hash, deserialize_value=False) @@ -1042,10 +1047,11 @@ class HubDB: tx_nums = await run_in_executor(self._executor, self.read_history, hashX, limit) history = [] append_history = history.append - for tx_num in tx_nums: - tx_hash = self.get_tx_hash(tx_num) \ - if self._cache_all_tx_hashes else await run_in_executor(self._executor, self.get_tx_hash, tx_num) - append_history((tx_hash, bisect_right(self.tx_counts, tx_num))) + while tx_nums: + batch, tx_nums = tx_nums[:100], tx_nums[100:] + batch_result = self.get_tx_hashes(batch) if self._cache_all_tx_hashes else await run_in_executor(self._executor, self.get_tx_hashes, batch) + for tx_num, tx_hash in zip(batch, batch_result): + append_history((tx_hash, bisect_right(self.tx_counts, tx_num))) await asyncio.sleep(0) return history diff --git a/scribe/db/interface.py b/scribe/db/interface.py index 4845f87..57278f2 100644 --- a/scribe/db/interface.py +++ b/scribe/db/interface.py @@ -88,6 +88,22 @@ class PrefixRow(metaclass=PrefixRowType): if v: return v if not deserialize_value else self.unpack_value(v) + def multi_get(self, key_args: typing.List[typing.Tuple], fill_cache=True, deserialize_value=True): + packed_keys = {tuple(args): self.pack_key(*args) for args in key_args} + result = { + k[-1]: v for k, v in ( + self._db.multi_get([(self._column_family, packed_keys[tuple(args)]) for args in key_args], + fill_cache=fill_cache) or {} + ).items() + } + + def handle_value(v): + return None if v is None else v if not deserialize_value else self.unpack_value(v) + + return [ + handle_value(result[packed_keys[tuple(k_args)]]) for k_args in key_args + ] + def get_pending(self, *key_args, fill_cache=True, deserialize_value=True): packed_key = self.pack_key(*key_args) last_op = self._op_stack.get_last_op_for_key(packed_key)