diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index ccb704d73..90d8cc039 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -10,7 +10,7 @@ from lbry.wallet.server.db.writer import SQLDB from lbry.wallet.server.daemon import DaemonError from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN from lbry.wallet.server.util import chunks, class_logger -from lbry.wallet.server.leveldb import FlushData +from lbry.wallet.server.leveldb import FlushData, reopen_rocksdb_ctx class Prefetcher: @@ -346,6 +346,7 @@ class BlockProcessor: self.db.flush_dbs(self.flush_data(), flush_utxos, self.estimate_txs_remaining) await self.run_in_thread_with_lock(flush) + await reopen_rocksdb_ctx(self.db.executor) async def _maybe_flush(self): # If caught up, flush everything as client queries are diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index e0616070a..35ae51a83 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -25,18 +25,16 @@ from glob import glob from struct import pack, unpack from contextvars import ContextVar from dataclasses import dataclass -from concurrent.futures.process import ProcessPoolExecutor from concurrent.futures.thread import ThreadPoolExecutor +from concurrent.futures.process import ProcessPoolExecutor import attr from lbry.wallet.server import util from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN from lbry.wallet.server.merkle import Merkle, MerkleCache from lbry.wallet.server.util import formatted_time, unpack_be_uint64, unpack_le_int32_from, pack_le_int32 -from lbry.wallet.server.storage import db_class +from lbry.wallet.server.storage import db_class, RocksDB from lbry.wallet.server.history import History -if typing.TYPE_CHECKING: - from lbry.wallet.server.storage import RocksDB UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") HEADER_PREFIX = b'H' @@ -49,7 +47,7 @@ HISTORY_PREFIX = b'A' HASHX_UTXO_PREFIX = b'h' UTXO_PREFIX = b'u' HASHX_HISTORY_PREFIX = b'x' - +UNDO_PREFIX = b'U' UTXO_STATE = b'state-utxo' HIST_STATE = b'state-hist' @@ -72,12 +70,13 @@ class RocksDBState(typing.NamedTuple): @dataclass class RocksReaderContext: db: 'RocksDB' + db_dir: str name: str merkle: Merkle tx_counts: List[int] - state: RocksDBState block_txs_cache: pylru.lrucache merkle_tx_cache: pylru.lrucache + txid_cache: pylru.lrucache def close(self): self.db.close() @@ -92,24 +91,17 @@ class RocksReaderContext: for tx_count in self.db.iterator(prefix=TX_COUNT_PREFIX, include_key=False) ] - flush_count, = unpack_le_int32_from(self.db.get(STATE_PREFIX + b'flush_count', b'\xff\xff\xff\xff')) - comp_flush_count, = unpack_le_int32_from( - self.db.get(STATE_PREFIX + b'comp_flush_count', b'\xff\xff\xff\xff') - ) - comp_cursor, = unpack_le_int32_from(self.db.get(STATE_PREFIX + b'comp_cursor', b'\xff\xff\xff\xff')) - db_version, = unpack_le_int32_from(self.db.get(STATE_PREFIX + b'db_version', b'\xff\xff\xff\xff')) - genesis = self.db.get(STATE_PREFIX + b'genesis') - tip = self.db.get(STATE_PREFIX + b'tip') - height, = unpack_le_int32_from(self.db.get(STATE_PREFIX + b'height', b'\xff\xff\xff\xff')) - tx_count, = unpack_le_int32_from(self.db.get(STATE_PREFIX + b'tx_count', b'\xff\xff\xff\xff')) - utxo_flush_count, = unpack_le_int32_from(self.db.get(STATE_PREFIX + b'utxo_flush_count', b'\xff\xff\xff\xff')) - wall_time, = unpack_le_int32_from(self.db.get(STATE_PREFIX + b'wall_time', b'\xff\xff\xff\xff')) - first_sync, = unpack_le_int32_from(self.db.get(STATE_PREFIX + b'first_sync', b'\xff\xff\xff\xff')) - - self.state = RocksDBState( - db_version, genesis, height, tx_count, tip, utxo_flush_count, wall_time, first_sync == -1, - flush_count, comp_flush_count, comp_cursor - ) + def ctx_tx_hash(self, tx_num): + tx_height = bisect_right(self.tx_counts, tx_num) + if tx_height > len(self.tx_counts): + return None, tx_height + key = TX_HASH_PREFIX + util.pack_be_uint64(tx_num) + if key in self.txid_cache: + return self.txid_cache[key], tx_height + tx_hash = self.db.get(key) + if tx_height + 100 <= len(self.tx_counts): + self.txid_cache[key] = tx_hash + return tx_hash, tx_height proc_ctx: ContextVar[Optional[RocksReaderContext]] = ContextVar('proc_ctx') @@ -120,17 +112,27 @@ def _update_rocksdb_ctx(): ctx.update_state() +def _reopen_rocksdb_ctx(): + ctx = proc_ctx.get() + ctx.db.close() + ctx.db = RocksDB(ctx.db_dir, ctx.name, for_sync=False, read_only=True) + ctx.update_state() + + async def update_rocksdb_ctx(executor: ProcessPoolExecutor): loop = asyncio.get_event_loop() await asyncio.wait([loop.run_in_executor(executor, _update_rocksdb_ctx) for _ in range(executor._max_workers)]) +async def reopen_rocksdb_ctx(executor: ProcessPoolExecutor): + loop = asyncio.get_event_loop() + await asyncio.wait([loop.run_in_executor(executor, _reopen_rocksdb_ctx) for _ in range(executor._max_workers)]) def _initializer(path, name): db = RocksDB(path, name, for_sync=False, read_only=True) - state = RocksDBState(-1, '', -1, -1, b'', -1, -1, True, -1, -1, -1) - proc_ctx.set(RocksReaderContext(db, name, Merkle(), [], state, pylru.lrucache(50000), pylru.lrucache(100000))) + proc_ctx.set(RocksReaderContext(db, path, name, Merkle(), [], pylru.lrucache(50000), pylru.lrucache(100000), + pylru.lrucache(1000000))) def _teardown(): @@ -159,6 +161,187 @@ async def teardown_executor(executor: ProcessPoolExecutor): executor.shutdown(True) +def lookup_hashXs_utxos(prevouts): + """Return (hashX, suffix) pairs, or None if not found, + for each prevout. + """ + ctx = proc_ctx.get() + iterator = ctx.db.iterator + get = ctx.db.get + + def lookup_hashXs(): + """Return (hashX, suffix) pairs, or None if not found, + for each prevout. + """ + + def lookup_hashX(tx_hash, tx_idx): + idx_packed = pack('<H', tx_idx) + # Key: b'h' + compressed_tx_hash + tx_idx + tx_num + # Value: hashX + prefix = b'h' + tx_hash[:4] + idx_packed + # Find which entry, if any, the TX_HASH matches. + for db_key, hashX in iterator(prefix=prefix): + tx_num_packed = db_key[-4:] + tx_num, = unpack('<I', tx_num_packed) + hash, height = ctx.ctx_tx_hash(tx_num) + if hash == tx_hash: + return hashX, idx_packed + tx_num_packed + return None, None + + return [lookup_hashX(*prevout) for prevout in prevouts] + + def lookup_utxo(hashX, suffix): + if not hashX: + # This can happen when the daemon is a block ahead + # of us and has mempool txs spending outputs from + # that new block + return None + # Key: b'u' + address_hashX + tx_idx + tx_num + # Value: the UTXO value as a 64-bit unsigned integer + db_value = get(b'u' + hashX + suffix) + if not db_value: + # This can happen if the DB was updated between + # getting the hashXs and getting the UTXOs + return None + value, = unpack('<Q', db_value) + return hashX, value + + return [lookup_utxo(*hashX_pair) for hashX_pair in lookup_hashXs()] + + +def get_counts(): + return tuple( + util.unpack_be_uint64(tx_count) + for tx_count in proc_ctx.get().db.iterator(prefix=TX_COUNT_PREFIX, include_key=False) + ) + + +def read_txids(): + return list(proc_ctx.get().db.iterator(prefix=TX_HASH_PREFIX, include_key=False)) + + +def read_headers(): + return [ + header for header in proc_ctx.get().db.iterator(prefix=HEADER_PREFIX, include_key=False) + ] + + +def tx_hash(self, tx_num): + """Return a par (tx_hash, tx_height) for the given tx number. + + If the tx_height is not on disk, returns (None, tx_height).""" + tx_height = bisect_right(self.tx_counts, tx_num) + if tx_height > self.db_height: + return None, tx_height + return self.total_transactions[tx_num], tx_height + + +def read_utxos(hashX): + utxos = [] + utxos_append = utxos.append + s_unpack = unpack + # Key: b'u' + address_hashX + tx_idx + tx_num + # Value: the UTXO value as a 64-bit unsigned integer + for db_key, db_value in proc_ctx.get().db.iterator(prefix=UTXO_PREFIX + hashX): + tx_pos, tx_num = s_unpack('<HI', db_key[-6:]) + value, = unpack('<Q', db_value) + utxos_append((tx_num, tx_pos, value)) + return utxos + + +def limited_history(hashX, limit=None): + ctx = proc_ctx.get() + tx_counts = ctx.tx_counts + db_height = len(tx_counts) + cnt = 0 + txs = [] + + for hist in ctx.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, include_key=False): + a = array.array('I') + a.frombytes(hist) + for tx_num in a: + tx_height = bisect_right(tx_counts, tx_num) + if tx_height > db_height: + return + txs.append((tx_num, tx_height)) + cnt += 1 + if limit and cnt >= limit: + break + if limit and cnt >= limit: + break + return txs + + +def tx_merkle(tx_num, tx_height): + ctx = proc_ctx.get() + db = ctx.db + tx_counts = ctx.tx_counts + db_height = len(tx_counts) + + if tx_height == -1: + return { + 'block_height': -1 + } + tx_pos = tx_num - tx_counts[tx_height - 1] + + uncached = None + if (tx_num, tx_height) in ctx.merkle_tx_cache: + return ctx.merkle_tx_cache[(tx_num, tx_height)] + if tx_height not in ctx.block_txs_cache: + block_txs = list(db.iterator( + start=TX_HASH_PREFIX + util.pack_be_uint64(tx_counts[tx_height - 1]), + stop=None if tx_height + 1 == len(tx_counts) else + TX_HASH_PREFIX + util.pack_be_uint64(tx_counts[tx_height]), include_key=False + )) + if tx_height + 100 <= db_height: + ctx.block_txs_cache[tx_height] = block_txs + else: + block_txs = ctx.block_txs_cache.get(tx_height, uncached) + branch, root = ctx.merkle.branch_and_root(block_txs, tx_pos) + merkle = { + 'block_height': tx_height, + 'merkle': [ + hash_to_hex_str(hash) + for hash in branch + ], + 'pos': tx_pos + } + if tx_height + 100 < db_height: + ctx.merkle_tx_cache[(tx_num, tx_height)] = merkle + return merkle + + +def transaction_info_get_batch(txids: Iterable[str]): + ctx = proc_ctx.get() + db_height = len(ctx.tx_counts) + tx_counts = ctx.tx_counts + tx_db_get = ctx.db.get + tx_infos = {} + + for tx_hash in txids: + tx_hash_bytes = bytes.fromhex(tx_hash)[::-1] + tx_num = tx_db_get(TX_NUM_PREFIX + tx_hash_bytes) + tx = None + tx_height = -1 + if tx_num is not None: + tx_num = unpack_be_uint64(tx_num) + tx_height = bisect_right(tx_counts, tx_num) + if tx_height < db_height: + tx = tx_db_get(TX_PREFIX + tx_hash_bytes) + tx_infos[tx_hash] = ( + None if not tx else tx.hex(), {'block_height': -1} if tx_height == -1 else tx_merkle( + tx_num, tx_height + ) + ) + + return tx_infos + + +def _update_block_txs_cache(tx_num, tx_height): + ctx = proc_ctx.get() + db = ctx.db + tx_counts = ctx.tx_counts + db_height = len(tx_counts) @attr.s(slots=True) @@ -218,12 +401,6 @@ class LevelDB: # tx_counts[N] has the cumulative number of txs at the end of # height N. So tx_counts[0] is 1 - the genesis coinbase - def get_counts(): - return tuple( - util.unpack_be_uint64(tx_count) - for tx_count in self.db.iterator(prefix=TX_COUNT_PREFIX, include_key=False) - ) - tx_counts = await asyncio.get_event_loop().run_in_executor(self.executor, get_counts) assert len(tx_counts) == self.db_height + 1, f"{len(tx_counts)} vs {self.db_height + 1}" self.tx_counts = array.array('I', tx_counts) @@ -235,12 +412,9 @@ class LevelDB: assert self.db_tx_count == 0 async def _read_txids(self): - def get_txids(): - return list(self.db.iterator(prefix=TX_HASH_PREFIX, include_key=False)) - start = time.perf_counter() self.logger.info("loading txids") - txids = await asyncio.get_event_loop().run_in_executor(self.executor, get_txids) + txids = await asyncio.get_event_loop().run_in_executor(self.executor, read_txids) assert len(txids) == len(self.tx_counts) == 0 or len(txids) == self.tx_counts[-1] self.total_transactions = txids ts = time.perf_counter() - start @@ -250,28 +424,19 @@ class LevelDB: if self.headers is not None: return - def get_headers(): - return [ - header for header in self.db.iterator(prefix=HEADER_PREFIX, include_key=False) - ] - - headers = await asyncio.get_event_loop().run_in_executor(self.executor, get_headers) + headers = await asyncio.get_event_loop().run_in_executor(self.executor, read_headers) assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}" self.headers = headers async def _open_dbs(self, for_sync, compacting): + name = f'lbry-{self.env.db_engine}' if self.executor is None: - self.executor = ThreadPoolExecutor(max(1, os.cpu_count() - 1)) - coin_path = os.path.join(self.env.db_dir, 'COIN') - if not os.path.isfile(coin_path): - with util.open_file(coin_path, create=True) as f: - f.write(f'ElectrumX databases and metadata for ' - f'{self.coin.NAME} {self.coin.NET}'.encode()) + self.executor, self.db = await initialize_executor( + max(1, os.cpu_count() - 1), self.env.db_dir, for_sync, name + ) - assert self.db is None - self.db = self.db_class(f'lbry-{self.env.db_engine}', for_sync) if self.db.is_new: - self.logger.info('created new db: %s', f'lbry-{self.env.db_engine}') + self.logger.info('created new db: %s', name) self.logger.info(f'opened DB (for sync: {for_sync})') self.read_utxo_state() @@ -416,13 +581,11 @@ class LevelDB: for i, header in enumerate(flush_data.headers): batch_put(HEADER_PREFIX + util.pack_be_uint64(self.fs_height + i + 1), header) self.headers.append(header) - flush_data.headers.clear() + flush_data.headers.clear() - height_start = self.fs_height + 1 - tx_num = prior_tx_count + height_start = self.fs_height + 1 + tx_num = prior_tx_count - with self.db.write_batch() as batch: - batch_put = batch.put for block_hash, (tx_hashes, txs) in zip(flush_data.block_hashes, flush_data.block_txs): tx_count = self.tx_counts[height_start] batch_put(BLOCK_HASH_PREFIX + util.pack_be_uint64(height_start), block_hash[::-1]) @@ -441,9 +604,13 @@ class LevelDB: self.fs_height = flush_data.height self.fs_tx_count = flush_data.tx_count + elapsed = time.perf_counter() - start_time self.logger.info(f'flushed filesystem data in {elapsed:.2f}s') + + + def flush_history(self): self.history.flush() @@ -468,8 +635,8 @@ class LevelDB: # suffix = tx_idx + tx_num hashX = value[:-12] suffix = key[-2:] + value[-12:-8] - batch_put(b'h' + key[:4] + suffix, hashX) - batch_put(b'u' + hashX + suffix, value[-8:]) + batch_put(HASHX_UTXO_PREFIX + key[:4] + suffix, hashX) + batch_put(UTXO_PREFIX + hashX + suffix, value[-8:]) flush_data.adds.clear() # New undo information @@ -601,39 +768,10 @@ class LevelDB: self._merkle_tx_cache[(tx_num, tx_height)] = merkle return merkle - def _fs_transactions(self, txids: Iterable[str]) -> List[Tuple[str, Optional[str], int, int]]: - unpack_be_uint64 = util.unpack_be_uint64 - tx_counts = self.tx_counts - tx_db_get = self.db.get - tx_infos = [] - - for tx_hash in txids: - tx_hash_bytes = bytes.fromhex(tx_hash)[::-1] - tx_num = tx_db_get(TX_NUM_PREFIX + tx_hash_bytes) - tx = None - tx_height = -1 - if tx_num is not None: - tx_num = unpack_be_uint64(tx_num) - tx_height = bisect_right(tx_counts, tx_num) - if tx_height < self.db_height: - tx = tx_db_get(TX_PREFIX + tx_hash_bytes) - tx_infos.append((tx_hash, None if not tx else tx.hex(), tx_num, tx_height)) - - return tx_infos - async def fs_transactions(self, txids): - txs = await asyncio.get_event_loop().run_in_executor( - self.executor, self._fs_transactions, txids + return await asyncio.get_event_loop().run_in_executor( + self.executor, transaction_info_get_batch, txids ) - unsorted_result = {} - - async def add_result(item): - _txid, _tx, _tx_num, _tx_height = item - unsorted_result[_txid] = (_tx, await self.tx_merkle(_tx_num, _tx_height)) - - if txs: - await asyncio.gather(*map(add_result, txs)) - return {txid: unsorted_result[txid] for txid, _, _, _ in txs} async def fs_block_hashes(self, height, count): if height + count > len(self.headers): @@ -647,49 +785,9 @@ class LevelDB: transactions. By default returns at most 1000 entries. Set limit to None to get them all. """ - # def read_history(): - # hashx_history = [] - # for key, hist in self.history.db.iterator(prefix=hashX): - # a = array.array('I') - # a.frombytes(hist) - # for tx_num in a: - # tx_height = bisect_right(self.tx_counts, tx_num) - # if tx_height > self.db_height: - # tx_hash = None - # else: - # tx_hash = self.tx_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num)) - # - # hashx_history.append((tx_hash, tx_height)) - # if limit and len(hashx_history) >= limit: - # return hashx_history - # return hashx_history - - def read_history(): - db_height = self.db_height - tx_counts = self.tx_counts - tx_db_get = self.db.get - pack_be_uint64 = util.pack_be_uint64 - - cnt = 0 - txs = [] - - for hist in self.history.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, include_key=False): - a = array.array('I') - a.frombytes(hist) - for tx_num in a: - tx_height = bisect_right(tx_counts, tx_num) - if tx_height > db_height: - return - txs.append((tx_num, tx_height)) - cnt += 1 - if limit and cnt >= limit: - break - if limit and cnt >= limit: - break - return txs while True: - history = await asyncio.get_event_loop().run_in_executor(self.executor, read_history) + history = await asyncio.get_event_loop().run_in_executor(self.executor, limited_history, hashX, limit) if history is not None: return [(self.total_transactions[tx_num], tx_height) for (tx_num, tx_height) in history] self.logger.warning(f'limited_history: tx hash ' @@ -704,7 +802,7 @@ class LevelDB: def undo_key(self, height): """DB key for undo information at the given height.""" - return b'U' + pack('>I', height) + return UNDO_PREFIX + pack('>I', height) def read_undo_info(self, height): """Read undo information from a file for the current height.""" @@ -744,10 +842,9 @@ class LevelDB: def clear_excess_undo_info(self): """Clear excess undo info. Only most recent N are kept.""" - prefix = b'U' min_height = self.min_undo_height(self.db_height) keys = [] - for key, hist in self.db.iterator(prefix=prefix): + for key, hist in self.db.iterator(prefix=UNDO_PREFIX): height, = unpack('>I', key[-4:]) if height >= min_height: break @@ -844,23 +941,9 @@ class LevelDB: async def all_utxos(self, hashX): """Return all UTXOs for an address sorted in no particular order.""" - def read_utxos(): - utxos = [] - utxos_append = utxos.append - s_unpack = unpack - fs_tx_hash = self.fs_tx_hash - # Key: b'u' + address_hashX + tx_idx + tx_num - # Value: the UTXO value as a 64-bit unsigned integer - prefix = b'u' + hashX - for db_key, db_value in self.db.iterator(prefix=prefix): - tx_pos, tx_num = s_unpack('<HI', db_key[-6:]) - value, = unpack('<Q', db_value) - tx_hash, height = fs_tx_hash(tx_num) - utxos_append(UTXO(tx_num, tx_pos, tx_hash, height, value)) - return utxos - while True: utxos = await asyncio.get_event_loop().run_in_executor(self.executor, read_utxos) + utxos = [UTXO(tx_num, tx_pos, *self.fs_tx_hash(tx_num), value=value) for (tx_num, tx_pos, value) in utxos] if all(utxo.tx_hash is not None for utxo in utxos): return utxos self.logger.warning(f'all_utxos: tx hash not ' @@ -873,45 +956,4 @@ class LevelDB: Used by the mempool code. """ - def lookup_hashXs(): - """Return (hashX, suffix) pairs, or None if not found, - for each prevout. - """ - def lookup_hashX(tx_hash, tx_idx): - idx_packed = pack('<H', tx_idx) - - # Key: b'h' + compressed_tx_hash + tx_idx + tx_num - # Value: hashX - prefix = b'h' + tx_hash[:4] + idx_packed - - # Find which entry, if any, the TX_HASH matches. - for db_key, hashX in self.db.iterator(prefix=prefix): - tx_num_packed = db_key[-4:] - tx_num, = unpack('<I', tx_num_packed) - hash, height = self.fs_tx_hash(tx_num) - if hash == tx_hash: - return hashX, idx_packed + tx_num_packed - return None, None - return [lookup_hashX(*prevout) for prevout in prevouts] - - def lookup_utxos(hashX_pairs): - def lookup_utxo(hashX, suffix): - if not hashX: - # This can happen when the daemon is a block ahead - # of us and has mempool txs spending outputs from - # that new block - return None - # Key: b'u' + address_hashX + tx_idx + tx_num - # Value: the UTXO value as a 64-bit unsigned integer - key = b'u' + hashX + suffix - db_value = self.db.get(key) - if not db_value: - # This can happen if the DB was updated between - # getting the hashXs and getting the UTXOs - return None - value, = unpack('<Q', db_value) - return hashX, value - return [lookup_utxo(*hashX_pair) for hashX_pair in hashX_pairs] - - hashX_pairs = await asyncio.get_event_loop().run_in_executor(self.executor, lookup_hashXs) - return await asyncio.get_event_loop().run_in_executor(self.executor, lookup_utxos, hashX_pairs) + return await asyncio.get_event_loop().run_in_executor(self.executor, lookup_hashXs_utxos, prevouts) diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 3696f8bc8..4a2a06a1b 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -1274,11 +1274,11 @@ class LBRYElectrumX(SessionBase): hashX = self.address_to_hashX(address) return await self.hashX_unsubscribe(hashX, address) - async def get_balance(self, hashX): - utxos = await self.db.all_utxos(hashX) - confirmed = sum(utxo.value for utxo in utxos) - unconfirmed = await self.mempool.balance_delta(hashX) - return {'confirmed': confirmed, 'unconfirmed': unconfirmed} + # async def get_balance(self, hashX): + # utxos = await self.db.all_utxos(hashX) + # confirmed = sum(utxo.value for utxo in utxos) + # unconfirmed = await self.mempool.balance_delta(hashX) + # return {'confirmed': confirmed, 'unconfirmed': unconfirmed} async def scripthash_get_balance(self, scripthash): """Return the confirmed and unconfirmed balance of a scripthash.""" @@ -1543,15 +1543,12 @@ class LBRYElectrumX(SessionBase): else: batch_result[tx_hash] = [raw_tx, {'block_height': -1}] - def threaded_get_merkle(): - for tx_hash, (raw_tx, block_txs, pos, block_height) in needed_merkles.items(): - batch_result[tx_hash] = raw_tx, { - 'merkle': self._get_merkle_branch(block_txs, pos), - 'pos': pos, - 'block_height': block_height - } - if needed_merkles: - await asyncio.get_running_loop().run_in_executor(self.db.executor, threaded_get_merkle) + for tx_hash, (raw_tx, block_txs, pos, block_height) in needed_merkles.items(): + batch_result[tx_hash] = raw_tx, { + 'merkle': self._get_merkle_branch(block_txs, pos), + 'pos': pos, + 'block_height': block_height + } self.session_mgr.tx_replied_count_metric.inc(len(tx_hashes)) return batch_result