Expose multi_get
from rocksdb, use it for fetching tx hashes #23
3 changed files with 32 additions and 10 deletions
|
@ -125,8 +125,8 @@ class BlockchainProcessorService(BlockchainService):
|
||||||
self.pending_transaction_num_mapping: Dict[bytes, int] = {}
|
self.pending_transaction_num_mapping: Dict[bytes, int] = {}
|
||||||
self.pending_transactions: Dict[int, bytes] = {}
|
self.pending_transactions: Dict[int, bytes] = {}
|
||||||
|
|
||||||
self.hashX_history_cache = LRUCache(100)
|
self.hashX_history_cache = LRUCache(1000)
|
||||||
self.hashX_full_cache = LRUCache(100)
|
self.hashX_full_cache = LRUCache(1000)
|
||||||
|
|
||||||
async def run_in_thread_with_lock(self, func, *args):
|
async def run_in_thread_with_lock(self, func, *args):
|
||||||
# Run in a thread to prevent blocking. Shielded so that
|
# Run in a thread to prevent blocking. Shielded so that
|
||||||
|
@ -1255,8 +1255,8 @@ class BlockchainProcessorService(BlockchainService):
|
||||||
else:
|
else:
|
||||||
tx_nums = self.hashX_history_cache[hashX]
|
tx_nums = self.hashX_history_cache[hashX]
|
||||||
history = ''
|
history = ''
|
||||||
for tx_num in tx_nums:
|
for tx_num, tx_hash in zip(tx_nums, self.db.get_tx_hashes(tx_nums)):
|
||||||
history += f'{hash_to_hex_str(self.db.get_tx_hash(tx_num))}:{bisect_right(self.db.tx_counts, tx_num):d}:'
|
history += f'{hash_to_hex_str(tx_hash)}:{bisect_right(self.db.tx_counts, tx_num):d}:'
|
||||||
self.hashX_full_cache[hashX] = history
|
self.hashX_full_cache[hashX] = history
|
||||||
return history
|
return history
|
||||||
|
|
||||||
|
@ -1269,8 +1269,8 @@ class BlockchainProcessorService(BlockchainService):
|
||||||
else:
|
else:
|
||||||
tx_nums = self.hashX_history_cache[hashX]
|
tx_nums = self.hashX_history_cache[hashX]
|
||||||
history = ''
|
history = ''
|
||||||
for tx_num in tx_nums:
|
for tx_num, tx_hash in zip(tx_nums, self.db.get_tx_hashes(tx_nums)):
|
||||||
history += f'{hash_to_hex_str(self.db.get_tx_hash(tx_num))}:{bisect_right(self.db.tx_counts, tx_num):d}:'
|
history += f'{hash_to_hex_str(tx_hash)}:{bisect_right(self.db.tx_counts, tx_num):d}:'
|
||||||
for tx_hash, height in new_history:
|
for tx_hash, height in new_history:
|
||||||
history += f'{hash_to_hex_str(tx_hash)}:{height:d}:'
|
history += f'{hash_to_hex_str(tx_hash)}:{height:d}:'
|
||||||
if history:
|
if history:
|
||||||
|
|
|
@ -863,6 +863,11 @@ class HubDB:
|
||||||
return self.total_transactions[tx_num]
|
return self.total_transactions[tx_num]
|
||||||
return self.prefix_db.tx_hash.get(tx_num, deserialize_value=False)
|
return self.prefix_db.tx_hash.get(tx_num, deserialize_value=False)
|
||||||
|
|
||||||
|
def get_tx_hashes(self, tx_nums: List[int]) -> List[Optional[bytes]]:
|
||||||
|
if self._cache_all_tx_hashes:
|
||||||
|
return [None if tx_num > self.db_tx_count else self.total_transactions[tx_num] for tx_num in tx_nums]
|
||||||
|
return self.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in tx_nums], deserialize_value=False)
|
||||||
|
|
||||||
def get_raw_mempool_tx(self, tx_hash: bytes) -> Optional[bytes]:
|
def get_raw_mempool_tx(self, tx_hash: bytes) -> Optional[bytes]:
|
||||||
return self.prefix_db.mempool_tx.get(tx_hash, deserialize_value=False)
|
return self.prefix_db.mempool_tx.get(tx_hash, deserialize_value=False)
|
||||||
|
|
||||||
|
@ -1042,10 +1047,11 @@ class HubDB:
|
||||||
tx_nums = await run_in_executor(self._executor, self.read_history, hashX, limit)
|
tx_nums = await run_in_executor(self._executor, self.read_history, hashX, limit)
|
||||||
history = []
|
history = []
|
||||||
append_history = history.append
|
append_history = history.append
|
||||||
for tx_num in tx_nums:
|
while tx_nums:
|
||||||
tx_hash = self.get_tx_hash(tx_num) \
|
batch, tx_nums = tx_nums[:100], tx_nums[100:]
|
||||||
if self._cache_all_tx_hashes else await run_in_executor(self._executor, self.get_tx_hash, tx_num)
|
batch_result = self.get_tx_hashes(batch) if self._cache_all_tx_hashes else await run_in_executor(self._executor, self.get_tx_hashes, batch)
|
||||||
append_history((tx_hash, bisect_right(self.tx_counts, tx_num)))
|
for tx_num, tx_hash in zip(batch, batch_result):
|
||||||
|
append_history((tx_hash, bisect_right(self.tx_counts, tx_num)))
|
||||||
await asyncio.sleep(0)
|
await asyncio.sleep(0)
|
||||||
return history
|
return history
|
||||||
|
|
||||||
|
|
|
@ -88,6 +88,22 @@ class PrefixRow(metaclass=PrefixRowType):
|
||||||
if v:
|
if v:
|
||||||
return v if not deserialize_value else self.unpack_value(v)
|
return v if not deserialize_value else self.unpack_value(v)
|
||||||
|
|
||||||
|
def multi_get(self, key_args: typing.List[typing.Tuple], fill_cache=True, deserialize_value=True):
|
||||||
|
packed_keys = {tuple(args): self.pack_key(*args) for args in key_args}
|
||||||
|
result = {
|
||||||
|
k[-1]: v for k, v in (
|
||||||
|
self._db.multi_get([(self._column_family, packed_keys[tuple(args)]) for args in key_args],
|
||||||
|
fill_cache=fill_cache) or {}
|
||||||
|
).items()
|
||||||
|
}
|
||||||
|
|
||||||
|
def handle_value(v):
|
||||||
|
return None if v is None else v if not deserialize_value else self.unpack_value(v)
|
||||||
|
|
||||||
|
return [
|
||||||
|
handle_value(result[packed_keys[tuple(k_args)]]) for k_args in key_args
|
||||||
|
]
|
||||||
|
|
||||||
def get_pending(self, *key_args, fill_cache=True, deserialize_value=True):
|
def get_pending(self, *key_args, fill_cache=True, deserialize_value=True):
|
||||||
packed_key = self.pack_key(*key_args)
|
packed_key = self.pack_key(*key_args)
|
||||||
last_op = self._op_stack.get_last_op_for_key(packed_key)
|
last_op = self._op_stack.get_last_op_for_key(packed_key)
|
||||||
|
|
Loading…
Reference in a new issue