Expose multi_get
from rocksdb, use it for fetching tx hashes #23
3 changed files with 32 additions and 10 deletions
|
@ -125,8 +125,8 @@ class BlockchainProcessorService(BlockchainService):
|
|||
self.pending_transaction_num_mapping: Dict[bytes, int] = {}
|
||||
self.pending_transactions: Dict[int, bytes] = {}
|
||||
|
||||
self.hashX_history_cache = LRUCache(100)
|
||||
self.hashX_full_cache = LRUCache(100)
|
||||
self.hashX_history_cache = LRUCache(1000)
|
||||
self.hashX_full_cache = LRUCache(1000)
|
||||
|
||||
async def run_in_thread_with_lock(self, func, *args):
|
||||
# Run in a thread to prevent blocking. Shielded so that
|
||||
|
@ -1255,8 +1255,8 @@ class BlockchainProcessorService(BlockchainService):
|
|||
else:
|
||||
tx_nums = self.hashX_history_cache[hashX]
|
||||
history = ''
|
||||
for tx_num in tx_nums:
|
||||
history += f'{hash_to_hex_str(self.db.get_tx_hash(tx_num))}:{bisect_right(self.db.tx_counts, tx_num):d}:'
|
||||
for tx_num, tx_hash in zip(tx_nums, self.db.get_tx_hashes(tx_nums)):
|
||||
history += f'{hash_to_hex_str(tx_hash)}:{bisect_right(self.db.tx_counts, tx_num):d}:'
|
||||
self.hashX_full_cache[hashX] = history
|
||||
return history
|
||||
|
||||
|
@ -1269,8 +1269,8 @@ class BlockchainProcessorService(BlockchainService):
|
|||
else:
|
||||
tx_nums = self.hashX_history_cache[hashX]
|
||||
history = ''
|
||||
for tx_num in tx_nums:
|
||||
history += f'{hash_to_hex_str(self.db.get_tx_hash(tx_num))}:{bisect_right(self.db.tx_counts, tx_num):d}:'
|
||||
for tx_num, tx_hash in zip(tx_nums, self.db.get_tx_hashes(tx_nums)):
|
||||
history += f'{hash_to_hex_str(tx_hash)}:{bisect_right(self.db.tx_counts, tx_num):d}:'
|
||||
for tx_hash, height in new_history:
|
||||
history += f'{hash_to_hex_str(tx_hash)}:{height:d}:'
|
||||
if history:
|
||||
|
|
|
@ -863,6 +863,11 @@ class HubDB:
|
|||
return self.total_transactions[tx_num]
|
||||
return self.prefix_db.tx_hash.get(tx_num, deserialize_value=False)
|
||||
|
||||
def get_tx_hashes(self, tx_nums: List[int]) -> List[Optional[bytes]]:
|
||||
if self._cache_all_tx_hashes:
|
||||
return [None if tx_num > self.db_tx_count else self.total_transactions[tx_num] for tx_num in tx_nums]
|
||||
return self.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in tx_nums], deserialize_value=False)
|
||||
|
||||
def get_raw_mempool_tx(self, tx_hash: bytes) -> Optional[bytes]:
|
||||
return self.prefix_db.mempool_tx.get(tx_hash, deserialize_value=False)
|
||||
|
||||
|
@ -1042,10 +1047,11 @@ class HubDB:
|
|||
tx_nums = await run_in_executor(self._executor, self.read_history, hashX, limit)
|
||||
history = []
|
||||
append_history = history.append
|
||||
for tx_num in tx_nums:
|
||||
tx_hash = self.get_tx_hash(tx_num) \
|
||||
if self._cache_all_tx_hashes else await run_in_executor(self._executor, self.get_tx_hash, tx_num)
|
||||
append_history((tx_hash, bisect_right(self.tx_counts, tx_num)))
|
||||
while tx_nums:
|
||||
batch, tx_nums = tx_nums[:100], tx_nums[100:]
|
||||
batch_result = self.get_tx_hashes(batch) if self._cache_all_tx_hashes else await run_in_executor(self._executor, self.get_tx_hashes, batch)
|
||||
for tx_num, tx_hash in zip(batch, batch_result):
|
||||
append_history((tx_hash, bisect_right(self.tx_counts, tx_num)))
|
||||
await asyncio.sleep(0)
|
||||
return history
|
||||
|
||||
|
|
|
@ -88,6 +88,22 @@ class PrefixRow(metaclass=PrefixRowType):
|
|||
if v:
|
||||
return v if not deserialize_value else self.unpack_value(v)
|
||||
|
||||
def multi_get(self, key_args: typing.List[typing.Tuple], fill_cache=True, deserialize_value=True):
|
||||
packed_keys = {tuple(args): self.pack_key(*args) for args in key_args}
|
||||
result = {
|
||||
k[-1]: v for k, v in (
|
||||
self._db.multi_get([(self._column_family, packed_keys[tuple(args)]) for args in key_args],
|
||||
fill_cache=fill_cache) or {}
|
||||
).items()
|
||||
}
|
||||
|
||||
def handle_value(v):
|
||||
return None if v is None else v if not deserialize_value else self.unpack_value(v)
|
||||
|
||||
return [
|
||||
handle_value(result[packed_keys[tuple(k_args)]]) for k_args in key_args
|
||||
]
|
||||
|
||||
def get_pending(self, *key_args, fill_cache=True, deserialize_value=True):
|
||||
packed_key = self.pack_key(*key_args)
|
||||
last_op = self._op_stack.get_last_op_for_key(packed_key)
|
||||
|
|
Loading…
Reference in a new issue