faster blockchain.transaction.get_batch

This commit is contained in:
Jack Robison 2020-11-23 12:05:47 -05:00
parent 190d238a1f
commit faeba9a7e4
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2

View file

@ -15,7 +15,9 @@ import ast
import os
import time
import zlib
import base64
import pylru
import typing
from typing import Optional, List, Tuple, Iterable
from asyncio import sleep
from bisect import bisect_right
from collections import namedtuple
@ -92,6 +94,9 @@ class LevelDB:
self.headers_db = None
self.tx_db = None
self._block_txs_cache = pylru.lrucache(50000)
self._merkle_tx_cache = pylru.lrucache(100000)
async def _read_tx_counts(self):
if self.tx_counts is not None:
return
@ -452,49 +457,75 @@ class LevelDB:
tx_hash = self.tx_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num))
return tx_hash, tx_height
def _fs_transactions(self, txids):
def _iter_transactions():
block_txs = {}
branch_and_root = self.merkle.branch_and_root
tx_iterator = self.tx_db.iterator
tx_counts = self.tx_counts
tx_db_get = self.tx_db.get
unpack_be_uint64 = util.unpack_be_uint64
pack_be_uint64 = util.pack_be_uint64
async def tx_merkle(self, tx_num, tx_height):
if tx_height == -1:
return {
'block_height': -1
}
tx_counts = self.tx_counts
tx_pos = tx_num - tx_counts[tx_height - 1]
for tx_hash in txids:
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
tx_num = tx_db_get(TX_NUM_PREFIX + tx_hash_bytes)
if tx_num is not None:
tx_num = unpack_be_uint64(tx_num)
tx_height = bisect_right(tx_counts, tx_num)
else:
yield tx_hash, (None, {'block_height': -1})
continue
if tx_height >= self.db_height:
yield tx_hash, (None, {'block_height': -1})
continue
tx = tx_db_get(TX_PREFIX + tx_hash_bytes)
if tx_height not in block_txs:
block_txs[tx_height] = list(tx_iterator(
start=TX_HASH_PREFIX + pack_be_uint64(tx_counts[tx_height - 1]),
stop=None if tx_height + 1 == len(tx_counts) else
TX_HASH_PREFIX + pack_be_uint64(tx_counts[tx_height] + 1), include_key=False
))
tx_pos = tx_num - tx_counts[tx_height - 1]
branch, root = branch_and_root(block_txs[tx_height], tx_pos)
merkle = {
'block_height': tx_height,
'merkle': [hash_to_hex_str(hash) for hash in branch],
'pos': tx_pos
}
yield tx_hash, (None if not tx else tx.hex(), merkle)
return {
_tx_hash: _val for (_tx_hash, _val) in _iter_transactions()
def _update_block_txs_cache():
block_txs = list(self.tx_db.iterator(
start=TX_HASH_PREFIX + util.pack_be_uint64(tx_counts[tx_height - 1]),
stop=None if tx_height + 1 == len(tx_counts) else
TX_HASH_PREFIX + util.pack_be_uint64(tx_counts[tx_height] + 1), include_key=False
))
if tx_height + 100 > self.db_height:
return block_txs
self._block_txs_cache[tx_height] = block_txs
uncached = None
if (tx_num, tx_height) in self._merkle_tx_cache:
return self._merkle_tx_cache[(tx_num, tx_height)]
if tx_height not in self._block_txs_cache:
uncached = await asyncio.get_event_loop().run_in_executor(self.executor, _update_block_txs_cache)
merkle = {
'block_height': tx_height,
'merkle': [
hash_to_hex_str(hash) for hash in self.merkle.branch_and_root(
self._block_txs_cache.get(tx_height, uncached), tx_pos
)[0]
],
'pos': tx_pos
}
if tx_height + 100 < self.db_height:
self._merkle_tx_cache[(tx_num, tx_height)] = merkle
return merkle
def _fs_transactions(self, txids: Iterable[str]) -> List[Tuple[str, Optional[str], int, int]]:
unpack_be_uint64 = util.unpack_be_uint64
tx_counts = self.tx_counts
tx_db_get = self.tx_db.get
tx_infos = []
for tx_hash in txids:
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
tx_num = tx_db_get(TX_NUM_PREFIX + tx_hash_bytes)
tx = None
tx_height = -1
if tx_num is not None:
tx_num = unpack_be_uint64(tx_num)
tx_height = bisect_right(tx_counts, tx_num)
if tx_height < self.db_height:
tx = tx_db_get(TX_PREFIX + tx_hash_bytes)
tx_infos.append((tx_hash, None if not tx else tx.hex(), tx_num, tx_height))
return tx_infos
async def fs_transactions(self, txids):
return await asyncio.get_event_loop().run_in_executor(self.executor, self._fs_transactions, txids)
txs = await asyncio.get_event_loop().run_in_executor(
self.executor, self._fs_transactions, txids
)
async def add_result(item):
_txid, _tx, _tx_num, _tx_height = item
result[_txid] = (_tx, await self.tx_merkle(_tx_num, _tx_height))
result = {}
if txs:
await asyncio.gather(*map(add_result, txs))
return result
async def fs_block_hashes(self, height, count):
if height + count > len(self.headers):