use multi_get
for fetching transactions #25
2 changed files with 43 additions and 38 deletions
|
@ -976,41 +976,44 @@ class HubDB:
|
||||||
|
|
||||||
async def get_transactions_and_merkles(self, tx_hashes: Iterable[str]):
|
async def get_transactions_and_merkles(self, tx_hashes: Iterable[str]):
|
||||||
tx_infos = {}
|
tx_infos = {}
|
||||||
for tx_hash in tx_hashes:
|
needed = []
|
||||||
tx_infos[tx_hash] = await asyncio.get_event_loop().run_in_executor(
|
needed_confirmed = []
|
||||||
self._executor, self._get_transaction_and_merkle, tx_hash
|
needed_mempool = []
|
||||||
)
|
run_in_executor = asyncio.get_event_loop().run_in_executor
|
||||||
await asyncio.sleep(0)
|
|
||||||
return tx_infos
|
|
||||||
|
|
||||||
def _get_transaction_and_merkle(self, tx_hash):
|
for tx_hash in tx_hashes:
|
||||||
cached_tx = self._tx_and_merkle_cache.get(tx_hash)
|
cached_tx = self._tx_and_merkle_cache.get(tx_hash)
|
||||||
if cached_tx:
|
if cached_tx:
|
||||||
tx, merkle = cached_tx
|
tx, merkle = cached_tx
|
||||||
|
tx_infos[tx_hash] = None if not tx else tx.hex(), merkle
|
||||||
else:
|
else:
|
||||||
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
|
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
|
||||||
tx_num = self.prefix_db.tx_num.get(tx_hash_bytes)
|
if self._cache_all_tx_hashes and tx_hash_bytes in self.tx_num_mapping:
|
||||||
tx = None
|
needed_confirmed.append((tx_hash_bytes, self.tx_num_mapping[tx_hash_bytes]))
|
||||||
tx_height = -1
|
else:
|
||||||
tx_num = None if not tx_num else tx_num.tx_num
|
needed.append(tx_hash_bytes)
|
||||||
|
|
||||||
|
if needed:
|
||||||
|
for tx_hash_bytes, v in zip(needed, await run_in_executor(
|
||||||
|
self._executor, self.prefix_db.tx_num.multi_get, [(tx_hash,) for tx_hash in needed],
|
||||||
|
True, True)):
|
||||||
|
tx_num = None if v is None else v.tx_num
|
||||||
if tx_num is not None:
|
if tx_num is not None:
|
||||||
if self._cache_all_claim_txos:
|
needed_confirmed.append((tx_hash_bytes, tx_num))
|
||||||
fill_cache = tx_num in self.txo_to_claim and len(self.txo_to_claim[tx_num]) > 0
|
|
||||||
else:
|
else:
|
||||||
fill_cache = True
|
needed_mempool.append(tx_hash_bytes)
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
|
||||||
|
if needed_confirmed:
|
||||||
|
for (tx_hash_bytes, tx_num), tx in zip(needed_confirmed, await run_in_executor(
|
||||||
|
self._executor, self.prefix_db.tx.multi_get, [(tx_hash,) for tx_hash, _ in needed_confirmed],
|
||||||
|
True, False)):
|
||||||
tx_height = bisect_right(self.tx_counts, tx_num)
|
tx_height = bisect_right(self.tx_counts, tx_num)
|
||||||
tx = self.prefix_db.tx.get(tx_hash_bytes, fill_cache=fill_cache, deserialize_value=False)
|
|
||||||
if tx_height == -1:
|
|
||||||
merkle = {
|
|
||||||
'block_height': -1
|
|
||||||
}
|
|
||||||
tx = self.prefix_db.mempool_tx.get(tx_hash_bytes, deserialize_value=False)
|
|
||||||
else:
|
|
||||||
tx_pos = tx_num - self.tx_counts[tx_height - 1]
|
tx_pos = tx_num - self.tx_counts[tx_height - 1]
|
||||||
branch, root = self.merkle.branch_and_root(
|
branch, root = self.merkle.branch_and_root(
|
||||||
self.get_block_txs(tx_height), tx_pos
|
self.get_block_txs(tx_height), tx_pos
|
||||||
)
|
)
|
||||||
merkle = {
|
tx_infos[tx_hash_bytes[::-1].hex()] = None if not tx else tx.hex(), {
|
||||||
'block_height': tx_height,
|
'block_height': tx_height,
|
||||||
'merkle': [
|
'merkle': [
|
||||||
hash_to_hex_str(_hash)
|
hash_to_hex_str(_hash)
|
||||||
|
@ -1018,9 +1021,14 @@ class HubDB:
|
||||||
],
|
],
|
||||||
'pos': tx_pos
|
'pos': tx_pos
|
||||||
}
|
}
|
||||||
if tx_height > 0 and tx_height + 10 < self.db_height:
|
await asyncio.sleep(0)
|
||||||
self._tx_and_merkle_cache[tx_hash] = tx, merkle
|
if needed_mempool:
|
||||||
return None if not tx else tx.hex(), merkle
|
for tx_hash_bytes, tx in zip(needed_mempool, await run_in_executor(
|
||||||
|
self._executor, self.prefix_db.mempool_tx.multi_get, [(tx_hash,) for tx_hash in needed_mempool],
|
||||||
|
True, False)):
|
||||||
|
tx_infos[tx_hash_bytes[::-1].hex()] = None if not tx else tx.hex(), {'block_height': -1}
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
return tx_infos
|
||||||
|
|
||||||
async def fs_block_hashes(self, height, count):
|
async def fs_block_hashes(self, height, count):
|
||||||
if height + count > len(self.headers):
|
if height + count > len(self.headers):
|
||||||
|
|
|
@ -90,12 +90,9 @@ class PrefixRow(metaclass=PrefixRowType):
|
||||||
|
|
||||||
def multi_get(self, key_args: typing.List[typing.Tuple], fill_cache=True, deserialize_value=True):
|
def multi_get(self, key_args: typing.List[typing.Tuple], fill_cache=True, deserialize_value=True):
|
||||||
packed_keys = {tuple(args): self.pack_key(*args) for args in key_args}
|
packed_keys = {tuple(args): self.pack_key(*args) for args in key_args}
|
||||||
result = {
|
db_result = self._db.multi_get([(self._column_family, packed_keys[tuple(args)]) for args in key_args],
|
||||||
k[-1]: v for k, v in (
|
fill_cache=fill_cache)
|
||||||
self._db.multi_get([(self._column_family, packed_keys[tuple(args)]) for args in key_args],
|
result = {k[-1]: v for k, v in (db_result or {}).items()}
|
||||||
fill_cache=fill_cache) or {}
|
|
||||||
).items()
|
|
||||||
}
|
|
||||||
|
|
||||||
def handle_value(v):
|
def handle_value(v):
|
||||||
return None if v is None else v if not deserialize_value else self.unpack_value(v)
|
return None if v is None else v if not deserialize_value else self.unpack_value(v)
|
||||||
|
|
Loading…
Reference in a new issue