expose rocksdb multi_get api for column families

-use multi_get for fetching tx hashes when they're not all held in memory
This commit is contained in:
Jack Robison 2022-04-13 15:49:29 -04:00
parent ce50da9abe
commit 4f0359c177
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
3 changed files with 30 additions and 8 deletions

View file

@ -1255,8 +1255,8 @@ class BlockchainProcessorService(BlockchainService):
else: else:
tx_nums = self.hashX_history_cache[hashX] tx_nums = self.hashX_history_cache[hashX]
history = '' history = ''
for tx_num in tx_nums: for tx_num, tx_hash in zip(tx_nums, self.db.get_tx_hashes(tx_nums)):
history += f'{hash_to_hex_str(self.db.get_tx_hash(tx_num))}:{bisect_right(self.db.tx_counts, tx_num):d}:' history += f'{hash_to_hex_str(tx_hash)}:{bisect_right(self.db.tx_counts, tx_num):d}:'
self.hashX_full_cache[hashX] = history self.hashX_full_cache[hashX] = history
return history return history
@ -1269,8 +1269,8 @@ class BlockchainProcessorService(BlockchainService):
else: else:
tx_nums = self.hashX_history_cache[hashX] tx_nums = self.hashX_history_cache[hashX]
history = '' history = ''
for tx_num in tx_nums: for tx_num, tx_hash in zip(tx_nums, self.db.get_tx_hashes(tx_nums)):
history += f'{hash_to_hex_str(self.db.get_tx_hash(tx_num))}:{bisect_right(self.db.tx_counts, tx_num):d}:' history += f'{hash_to_hex_str(tx_hash)}:{bisect_right(self.db.tx_counts, tx_num):d}:'
for tx_hash, height in new_history: for tx_hash, height in new_history:
history += f'{hash_to_hex_str(tx_hash)}:{height:d}:' history += f'{hash_to_hex_str(tx_hash)}:{height:d}:'
if history: if history:

View file

@ -863,6 +863,11 @@ class HubDB:
return self.total_transactions[tx_num] return self.total_transactions[tx_num]
return self.prefix_db.tx_hash.get(tx_num, deserialize_value=False) return self.prefix_db.tx_hash.get(tx_num, deserialize_value=False)
def get_tx_hashes(self, tx_nums: List[int]) -> List[Optional[bytes]]:
if self._cache_all_tx_hashes:
return [None if tx_num > self.db_tx_count else self.total_transactions[tx_num] for tx_num in tx_nums]
return self.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in tx_nums], deserialize_value=False)
def get_raw_mempool_tx(self, tx_hash: bytes) -> Optional[bytes]: def get_raw_mempool_tx(self, tx_hash: bytes) -> Optional[bytes]:
return self.prefix_db.mempool_tx.get(tx_hash, deserialize_value=False) return self.prefix_db.mempool_tx.get(tx_hash, deserialize_value=False)
@ -1042,10 +1047,11 @@ class HubDB:
tx_nums = await run_in_executor(self._executor, self.read_history, hashX, limit) tx_nums = await run_in_executor(self._executor, self.read_history, hashX, limit)
history = [] history = []
append_history = history.append append_history = history.append
for tx_num in tx_nums: while tx_nums:
tx_hash = self.get_tx_hash(tx_num) \ batch, tx_nums = tx_nums[:100], tx_nums[100:]
if self._cache_all_tx_hashes else await run_in_executor(self._executor, self.get_tx_hash, tx_num) batch_result = self.get_tx_hashes(batch) if self._cache_all_tx_hashes else await run_in_executor(self._executor, self.get_tx_hashes, batch)
append_history((tx_hash, bisect_right(self.tx_counts, tx_num))) for tx_num, tx_hash in zip(batch, batch_result):
append_history((tx_hash, bisect_right(self.tx_counts, tx_num)))
await asyncio.sleep(0) await asyncio.sleep(0)
return history return history

View file

@ -88,6 +88,22 @@ class PrefixRow(metaclass=PrefixRowType):
if v: if v:
return v if not deserialize_value else self.unpack_value(v) return v if not deserialize_value else self.unpack_value(v)
def multi_get(self, key_args: typing.List[typing.Tuple], fill_cache=True, deserialize_value=True):
packed_keys = {tuple(args): self.pack_key(*args) for args in key_args}
result = {
k[-1]: v for k, v in (
self._db.multi_get([(self._column_family, packed_keys[tuple(args)]) for args in key_args],
fill_cache=fill_cache) or {}
).items()
}
def handle_value(v):
return None if v is None else v if not deserialize_value else self.unpack_value(v)
return [
handle_value(result[packed_keys[tuple(k_args)]]) for k_args in key_args
]
def get_pending(self, *key_args, fill_cache=True, deserialize_value=True): def get_pending(self, *key_args, fill_cache=True, deserialize_value=True):
packed_key = self.pack_key(*key_args) packed_key = self.pack_key(*key_args)
last_op = self._op_stack.get_last_op_for_key(packed_key) last_op = self._op_stack.get_last_op_for_key(packed_key)