This commit is contained in:
Jack Robison 2020-11-29 13:53:06 -05:00
parent d960ba7412
commit 70e5ce4806
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
3 changed files with 245 additions and 205 deletions

View file

@ -10,7 +10,7 @@ from lbry.wallet.server.db.writer import SQLDB
from lbry.wallet.server.daemon import DaemonError
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
from lbry.wallet.server.util import chunks, class_logger
from lbry.wallet.server.leveldb import FlushData
from lbry.wallet.server.leveldb import FlushData, reopen_rocksdb_ctx
class Prefetcher:
@ -346,6 +346,7 @@ class BlockProcessor:
self.db.flush_dbs(self.flush_data(), flush_utxos,
self.estimate_txs_remaining)
await self.run_in_thread_with_lock(flush)
await reopen_rocksdb_ctx(self.db.executor)
async def _maybe_flush(self):
# If caught up, flush everything as client queries are

View file

@ -25,18 +25,16 @@ from glob import glob
from struct import pack, unpack
from contextvars import ContextVar
from dataclasses import dataclass
from concurrent.futures.process import ProcessPoolExecutor
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor
import attr
from lbry.wallet.server import util
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
from lbry.wallet.server.merkle import Merkle, MerkleCache
from lbry.wallet.server.util import formatted_time, unpack_be_uint64, unpack_le_int32_from, pack_le_int32
from lbry.wallet.server.storage import db_class
from lbry.wallet.server.storage import db_class, RocksDB
from lbry.wallet.server.history import History
if typing.TYPE_CHECKING:
from lbry.wallet.server.storage import RocksDB
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
HEADER_PREFIX = b'H'
@ -49,7 +47,7 @@ HISTORY_PREFIX = b'A'
HASHX_UTXO_PREFIX = b'h'
UTXO_PREFIX = b'u'
HASHX_HISTORY_PREFIX = b'x'
UNDO_PREFIX = b'U'
UTXO_STATE = b'state-utxo'
HIST_STATE = b'state-hist'
@ -72,12 +70,13 @@ class RocksDBState(typing.NamedTuple):
@dataclass
class RocksReaderContext:
db: 'RocksDB'
db_dir: str
name: str
merkle: Merkle
tx_counts: List[int]
state: RocksDBState
block_txs_cache: pylru.lrucache
merkle_tx_cache: pylru.lrucache
txid_cache: pylru.lrucache
def close(self):
self.db.close()
@ -92,24 +91,17 @@ class RocksReaderContext:
for tx_count in self.db.iterator(prefix=TX_COUNT_PREFIX, include_key=False)
]
flush_count, = unpack_le_int32_from(self.db.get(STATE_PREFIX + b'flush_count', b'\xff\xff\xff\xff'))
comp_flush_count, = unpack_le_int32_from(
self.db.get(STATE_PREFIX + b'comp_flush_count', b'\xff\xff\xff\xff')
)
comp_cursor, = unpack_le_int32_from(self.db.get(STATE_PREFIX + b'comp_cursor', b'\xff\xff\xff\xff'))
db_version, = unpack_le_int32_from(self.db.get(STATE_PREFIX + b'db_version', b'\xff\xff\xff\xff'))
genesis = self.db.get(STATE_PREFIX + b'genesis')
tip = self.db.get(STATE_PREFIX + b'tip')
height, = unpack_le_int32_from(self.db.get(STATE_PREFIX + b'height', b'\xff\xff\xff\xff'))
tx_count, = unpack_le_int32_from(self.db.get(STATE_PREFIX + b'tx_count', b'\xff\xff\xff\xff'))
utxo_flush_count, = unpack_le_int32_from(self.db.get(STATE_PREFIX + b'utxo_flush_count', b'\xff\xff\xff\xff'))
wall_time, = unpack_le_int32_from(self.db.get(STATE_PREFIX + b'wall_time', b'\xff\xff\xff\xff'))
first_sync, = unpack_le_int32_from(self.db.get(STATE_PREFIX + b'first_sync', b'\xff\xff\xff\xff'))
self.state = RocksDBState(
db_version, genesis, height, tx_count, tip, utxo_flush_count, wall_time, first_sync == -1,
flush_count, comp_flush_count, comp_cursor
)
def ctx_tx_hash(self, tx_num):
tx_height = bisect_right(self.tx_counts, tx_num)
if tx_height > len(self.tx_counts):
return None, tx_height
key = TX_HASH_PREFIX + util.pack_be_uint64(tx_num)
if key in self.txid_cache:
return self.txid_cache[key], tx_height
tx_hash = self.db.get(key)
if tx_height + 100 <= len(self.tx_counts):
self.txid_cache[key] = tx_hash
return tx_hash, tx_height
proc_ctx: ContextVar[Optional[RocksReaderContext]] = ContextVar('proc_ctx')
@ -120,17 +112,27 @@ def _update_rocksdb_ctx():
ctx.update_state()
def _reopen_rocksdb_ctx():
ctx = proc_ctx.get()
ctx.db.close()
ctx.db = RocksDB(ctx.db_dir, ctx.name, for_sync=False, read_only=True)
ctx.update_state()
async def update_rocksdb_ctx(executor: ProcessPoolExecutor):
loop = asyncio.get_event_loop()
await asyncio.wait([loop.run_in_executor(executor, _update_rocksdb_ctx) for _ in range(executor._max_workers)])
async def reopen_rocksdb_ctx(executor: ProcessPoolExecutor):
loop = asyncio.get_event_loop()
await asyncio.wait([loop.run_in_executor(executor, _reopen_rocksdb_ctx) for _ in range(executor._max_workers)])
def _initializer(path, name):
db = RocksDB(path, name, for_sync=False, read_only=True)
state = RocksDBState(-1, '', -1, -1, b'', -1, -1, True, -1, -1, -1)
proc_ctx.set(RocksReaderContext(db, name, Merkle(), [], state, pylru.lrucache(50000), pylru.lrucache(100000)))
proc_ctx.set(RocksReaderContext(db, path, name, Merkle(), [], pylru.lrucache(50000), pylru.lrucache(100000),
pylru.lrucache(1000000)))
def _teardown():
@ -159,6 +161,187 @@ async def teardown_executor(executor: ProcessPoolExecutor):
executor.shutdown(True)
def lookup_hashXs_utxos(prevouts):
"""Return (hashX, suffix) pairs, or None if not found,
for each prevout.
"""
ctx = proc_ctx.get()
iterator = ctx.db.iterator
get = ctx.db.get
def lookup_hashXs():
"""Return (hashX, suffix) pairs, or None if not found,
for each prevout.
"""
def lookup_hashX(tx_hash, tx_idx):
idx_packed = pack('<H', tx_idx)
# Key: b'h' + compressed_tx_hash + tx_idx + tx_num
# Value: hashX
prefix = b'h' + tx_hash[:4] + idx_packed
# Find which entry, if any, the TX_HASH matches.
for db_key, hashX in iterator(prefix=prefix):
tx_num_packed = db_key[-4:]
tx_num, = unpack('<I', tx_num_packed)
hash, height = ctx.ctx_tx_hash(tx_num)
if hash == tx_hash:
return hashX, idx_packed + tx_num_packed
return None, None
return [lookup_hashX(*prevout) for prevout in prevouts]
def lookup_utxo(hashX, suffix):
if not hashX:
# This can happen when the daemon is a block ahead
# of us and has mempool txs spending outputs from
# that new block
return None
# Key: b'u' + address_hashX + tx_idx + tx_num
# Value: the UTXO value as a 64-bit unsigned integer
db_value = get(b'u' + hashX + suffix)
if not db_value:
# This can happen if the DB was updated between
# getting the hashXs and getting the UTXOs
return None
value, = unpack('<Q', db_value)
return hashX, value
return [lookup_utxo(*hashX_pair) for hashX_pair in lookup_hashXs()]
def get_counts():
return tuple(
util.unpack_be_uint64(tx_count)
for tx_count in proc_ctx.get().db.iterator(prefix=TX_COUNT_PREFIX, include_key=False)
)
def read_txids():
return list(proc_ctx.get().db.iterator(prefix=TX_HASH_PREFIX, include_key=False))
def read_headers():
return [
header for header in proc_ctx.get().db.iterator(prefix=HEADER_PREFIX, include_key=False)
]
def tx_hash(self, tx_num):
"""Return a par (tx_hash, tx_height) for the given tx number.
If the tx_height is not on disk, returns (None, tx_height)."""
tx_height = bisect_right(self.tx_counts, tx_num)
if tx_height > self.db_height:
return None, tx_height
return self.total_transactions[tx_num], tx_height
def read_utxos(hashX):
utxos = []
utxos_append = utxos.append
s_unpack = unpack
# Key: b'u' + address_hashX + tx_idx + tx_num
# Value: the UTXO value as a 64-bit unsigned integer
for db_key, db_value in proc_ctx.get().db.iterator(prefix=UTXO_PREFIX + hashX):
tx_pos, tx_num = s_unpack('<HI', db_key[-6:])
value, = unpack('<Q', db_value)
utxos_append((tx_num, tx_pos, value))
return utxos
def limited_history(hashX, limit=None):
ctx = proc_ctx.get()
tx_counts = ctx.tx_counts
db_height = len(tx_counts)
cnt = 0
txs = []
for hist in ctx.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, include_key=False):
a = array.array('I')
a.frombytes(hist)
for tx_num in a:
tx_height = bisect_right(tx_counts, tx_num)
if tx_height > db_height:
return
txs.append((tx_num, tx_height))
cnt += 1
if limit and cnt >= limit:
break
if limit and cnt >= limit:
break
return txs
def tx_merkle(tx_num, tx_height):
ctx = proc_ctx.get()
db = ctx.db
tx_counts = ctx.tx_counts
db_height = len(tx_counts)
if tx_height == -1:
return {
'block_height': -1
}
tx_pos = tx_num - tx_counts[tx_height - 1]
uncached = None
if (tx_num, tx_height) in ctx.merkle_tx_cache:
return ctx.merkle_tx_cache[(tx_num, tx_height)]
if tx_height not in ctx.block_txs_cache:
block_txs = list(db.iterator(
start=TX_HASH_PREFIX + util.pack_be_uint64(tx_counts[tx_height - 1]),
stop=None if tx_height + 1 == len(tx_counts) else
TX_HASH_PREFIX + util.pack_be_uint64(tx_counts[tx_height]), include_key=False
))
if tx_height + 100 <= db_height:
ctx.block_txs_cache[tx_height] = block_txs
else:
block_txs = ctx.block_txs_cache.get(tx_height, uncached)
branch, root = ctx.merkle.branch_and_root(block_txs, tx_pos)
merkle = {
'block_height': tx_height,
'merkle': [
hash_to_hex_str(hash)
for hash in branch
],
'pos': tx_pos
}
if tx_height + 100 < db_height:
ctx.merkle_tx_cache[(tx_num, tx_height)] = merkle
return merkle
def transaction_info_get_batch(txids: Iterable[str]):
ctx = proc_ctx.get()
db_height = len(ctx.tx_counts)
tx_counts = ctx.tx_counts
tx_db_get = ctx.db.get
tx_infos = {}
for tx_hash in txids:
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
tx_num = tx_db_get(TX_NUM_PREFIX + tx_hash_bytes)
tx = None
tx_height = -1
if tx_num is not None:
tx_num = unpack_be_uint64(tx_num)
tx_height = bisect_right(tx_counts, tx_num)
if tx_height < db_height:
tx = tx_db_get(TX_PREFIX + tx_hash_bytes)
tx_infos[tx_hash] = (
None if not tx else tx.hex(), {'block_height': -1} if tx_height == -1 else tx_merkle(
tx_num, tx_height
)
)
return tx_infos
def _update_block_txs_cache(tx_num, tx_height):
ctx = proc_ctx.get()
db = ctx.db
tx_counts = ctx.tx_counts
db_height = len(tx_counts)
@attr.s(slots=True)
@ -218,12 +401,6 @@ class LevelDB:
# tx_counts[N] has the cumulative number of txs at the end of
# height N. So tx_counts[0] is 1 - the genesis coinbase
def get_counts():
return tuple(
util.unpack_be_uint64(tx_count)
for tx_count in self.db.iterator(prefix=TX_COUNT_PREFIX, include_key=False)
)
tx_counts = await asyncio.get_event_loop().run_in_executor(self.executor, get_counts)
assert len(tx_counts) == self.db_height + 1, f"{len(tx_counts)} vs {self.db_height + 1}"
self.tx_counts = array.array('I', tx_counts)
@ -235,12 +412,9 @@ class LevelDB:
assert self.db_tx_count == 0
async def _read_txids(self):
def get_txids():
return list(self.db.iterator(prefix=TX_HASH_PREFIX, include_key=False))
start = time.perf_counter()
self.logger.info("loading txids")
txids = await asyncio.get_event_loop().run_in_executor(self.executor, get_txids)
txids = await asyncio.get_event_loop().run_in_executor(self.executor, read_txids)
assert len(txids) == len(self.tx_counts) == 0 or len(txids) == self.tx_counts[-1]
self.total_transactions = txids
ts = time.perf_counter() - start
@ -250,28 +424,19 @@ class LevelDB:
if self.headers is not None:
return
def get_headers():
return [
header for header in self.db.iterator(prefix=HEADER_PREFIX, include_key=False)
]
headers = await asyncio.get_event_loop().run_in_executor(self.executor, get_headers)
headers = await asyncio.get_event_loop().run_in_executor(self.executor, read_headers)
assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}"
self.headers = headers
async def _open_dbs(self, for_sync, compacting):
name = f'lbry-{self.env.db_engine}'
if self.executor is None:
self.executor = ThreadPoolExecutor(max(1, os.cpu_count() - 1))
coin_path = os.path.join(self.env.db_dir, 'COIN')
if not os.path.isfile(coin_path):
with util.open_file(coin_path, create=True) as f:
f.write(f'ElectrumX databases and metadata for '
f'{self.coin.NAME} {self.coin.NET}'.encode())
self.executor, self.db = await initialize_executor(
max(1, os.cpu_count() - 1), self.env.db_dir, for_sync, name
)
assert self.db is None
self.db = self.db_class(f'lbry-{self.env.db_engine}', for_sync)
if self.db.is_new:
self.logger.info('created new db: %s', f'lbry-{self.env.db_engine}')
self.logger.info('created new db: %s', name)
self.logger.info(f'opened DB (for sync: {for_sync})')
self.read_utxo_state()
@ -416,13 +581,11 @@ class LevelDB:
for i, header in enumerate(flush_data.headers):
batch_put(HEADER_PREFIX + util.pack_be_uint64(self.fs_height + i + 1), header)
self.headers.append(header)
flush_data.headers.clear()
flush_data.headers.clear()
height_start = self.fs_height + 1
tx_num = prior_tx_count
height_start = self.fs_height + 1
tx_num = prior_tx_count
with self.db.write_batch() as batch:
batch_put = batch.put
for block_hash, (tx_hashes, txs) in zip(flush_data.block_hashes, flush_data.block_txs):
tx_count = self.tx_counts[height_start]
batch_put(BLOCK_HASH_PREFIX + util.pack_be_uint64(height_start), block_hash[::-1])
@ -441,9 +604,13 @@ class LevelDB:
self.fs_height = flush_data.height
self.fs_tx_count = flush_data.tx_count
elapsed = time.perf_counter() - start_time
self.logger.info(f'flushed filesystem data in {elapsed:.2f}s')
def flush_history(self):
self.history.flush()
@ -468,8 +635,8 @@ class LevelDB:
# suffix = tx_idx + tx_num
hashX = value[:-12]
suffix = key[-2:] + value[-12:-8]
batch_put(b'h' + key[:4] + suffix, hashX)
batch_put(b'u' + hashX + suffix, value[-8:])
batch_put(HASHX_UTXO_PREFIX + key[:4] + suffix, hashX)
batch_put(UTXO_PREFIX + hashX + suffix, value[-8:])
flush_data.adds.clear()
# New undo information
@ -601,39 +768,10 @@ class LevelDB:
self._merkle_tx_cache[(tx_num, tx_height)] = merkle
return merkle
def _fs_transactions(self, txids: Iterable[str]) -> List[Tuple[str, Optional[str], int, int]]:
unpack_be_uint64 = util.unpack_be_uint64
tx_counts = self.tx_counts
tx_db_get = self.db.get
tx_infos = []
for tx_hash in txids:
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
tx_num = tx_db_get(TX_NUM_PREFIX + tx_hash_bytes)
tx = None
tx_height = -1
if tx_num is not None:
tx_num = unpack_be_uint64(tx_num)
tx_height = bisect_right(tx_counts, tx_num)
if tx_height < self.db_height:
tx = tx_db_get(TX_PREFIX + tx_hash_bytes)
tx_infos.append((tx_hash, None if not tx else tx.hex(), tx_num, tx_height))
return tx_infos
async def fs_transactions(self, txids):
txs = await asyncio.get_event_loop().run_in_executor(
self.executor, self._fs_transactions, txids
return await asyncio.get_event_loop().run_in_executor(
self.executor, transaction_info_get_batch, txids
)
unsorted_result = {}
async def add_result(item):
_txid, _tx, _tx_num, _tx_height = item
unsorted_result[_txid] = (_tx, await self.tx_merkle(_tx_num, _tx_height))
if txs:
await asyncio.gather(*map(add_result, txs))
return {txid: unsorted_result[txid] for txid, _, _, _ in txs}
async def fs_block_hashes(self, height, count):
if height + count > len(self.headers):
@ -647,49 +785,9 @@ class LevelDB:
transactions. By default returns at most 1000 entries. Set
limit to None to get them all.
"""
# def read_history():
# hashx_history = []
# for key, hist in self.history.db.iterator(prefix=hashX):
# a = array.array('I')
# a.frombytes(hist)
# for tx_num in a:
# tx_height = bisect_right(self.tx_counts, tx_num)
# if tx_height > self.db_height:
# tx_hash = None
# else:
# tx_hash = self.tx_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num))
#
# hashx_history.append((tx_hash, tx_height))
# if limit and len(hashx_history) >= limit:
# return hashx_history
# return hashx_history
def read_history():
db_height = self.db_height
tx_counts = self.tx_counts
tx_db_get = self.db.get
pack_be_uint64 = util.pack_be_uint64
cnt = 0
txs = []
for hist in self.history.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, include_key=False):
a = array.array('I')
a.frombytes(hist)
for tx_num in a:
tx_height = bisect_right(tx_counts, tx_num)
if tx_height > db_height:
return
txs.append((tx_num, tx_height))
cnt += 1
if limit and cnt >= limit:
break
if limit and cnt >= limit:
break
return txs
while True:
history = await asyncio.get_event_loop().run_in_executor(self.executor, read_history)
history = await asyncio.get_event_loop().run_in_executor(self.executor, limited_history, hashX, limit)
if history is not None:
return [(self.total_transactions[tx_num], tx_height) for (tx_num, tx_height) in history]
self.logger.warning(f'limited_history: tx hash '
@ -704,7 +802,7 @@ class LevelDB:
def undo_key(self, height):
"""DB key for undo information at the given height."""
return b'U' + pack('>I', height)
return UNDO_PREFIX + pack('>I', height)
def read_undo_info(self, height):
"""Read undo information from a file for the current height."""
@ -744,10 +842,9 @@ class LevelDB:
def clear_excess_undo_info(self):
"""Clear excess undo info. Only most recent N are kept."""
prefix = b'U'
min_height = self.min_undo_height(self.db_height)
keys = []
for key, hist in self.db.iterator(prefix=prefix):
for key, hist in self.db.iterator(prefix=UNDO_PREFIX):
height, = unpack('>I', key[-4:])
if height >= min_height:
break
@ -844,23 +941,9 @@ class LevelDB:
async def all_utxos(self, hashX):
"""Return all UTXOs for an address sorted in no particular order."""
def read_utxos():
utxos = []
utxos_append = utxos.append
s_unpack = unpack
fs_tx_hash = self.fs_tx_hash
# Key: b'u' + address_hashX + tx_idx + tx_num
# Value: the UTXO value as a 64-bit unsigned integer
prefix = b'u' + hashX
for db_key, db_value in self.db.iterator(prefix=prefix):
tx_pos, tx_num = s_unpack('<HI', db_key[-6:])
value, = unpack('<Q', db_value)
tx_hash, height = fs_tx_hash(tx_num)
utxos_append(UTXO(tx_num, tx_pos, tx_hash, height, value))
return utxos
while True:
utxos = await asyncio.get_event_loop().run_in_executor(self.executor, read_utxos)
utxos = [UTXO(tx_num, tx_pos, *self.fs_tx_hash(tx_num), value=value) for (tx_num, tx_pos, value) in utxos]
if all(utxo.tx_hash is not None for utxo in utxos):
return utxos
self.logger.warning(f'all_utxos: tx hash not '
@ -873,45 +956,4 @@ class LevelDB:
Used by the mempool code.
"""
def lookup_hashXs():
"""Return (hashX, suffix) pairs, or None if not found,
for each prevout.
"""
def lookup_hashX(tx_hash, tx_idx):
idx_packed = pack('<H', tx_idx)
# Key: b'h' + compressed_tx_hash + tx_idx + tx_num
# Value: hashX
prefix = b'h' + tx_hash[:4] + idx_packed
# Find which entry, if any, the TX_HASH matches.
for db_key, hashX in self.db.iterator(prefix=prefix):
tx_num_packed = db_key[-4:]
tx_num, = unpack('<I', tx_num_packed)
hash, height = self.fs_tx_hash(tx_num)
if hash == tx_hash:
return hashX, idx_packed + tx_num_packed
return None, None
return [lookup_hashX(*prevout) for prevout in prevouts]
def lookup_utxos(hashX_pairs):
def lookup_utxo(hashX, suffix):
if not hashX:
# This can happen when the daemon is a block ahead
# of us and has mempool txs spending outputs from
# that new block
return None
# Key: b'u' + address_hashX + tx_idx + tx_num
# Value: the UTXO value as a 64-bit unsigned integer
key = b'u' + hashX + suffix
db_value = self.db.get(key)
if not db_value:
# This can happen if the DB was updated between
# getting the hashXs and getting the UTXOs
return None
value, = unpack('<Q', db_value)
return hashX, value
return [lookup_utxo(*hashX_pair) for hashX_pair in hashX_pairs]
hashX_pairs = await asyncio.get_event_loop().run_in_executor(self.executor, lookup_hashXs)
return await asyncio.get_event_loop().run_in_executor(self.executor, lookup_utxos, hashX_pairs)
return await asyncio.get_event_loop().run_in_executor(self.executor, lookup_hashXs_utxos, prevouts)

View file

@ -1274,11 +1274,11 @@ class LBRYElectrumX(SessionBase):
hashX = self.address_to_hashX(address)
return await self.hashX_unsubscribe(hashX, address)
async def get_balance(self, hashX):
utxos = await self.db.all_utxos(hashX)
confirmed = sum(utxo.value for utxo in utxos)
unconfirmed = await self.mempool.balance_delta(hashX)
return {'confirmed': confirmed, 'unconfirmed': unconfirmed}
# async def get_balance(self, hashX):
# utxos = await self.db.all_utxos(hashX)
# confirmed = sum(utxo.value for utxo in utxos)
# unconfirmed = await self.mempool.balance_delta(hashX)
# return {'confirmed': confirmed, 'unconfirmed': unconfirmed}
async def scripthash_get_balance(self, scripthash):
"""Return the confirmed and unconfirmed balance of a scripthash."""
@ -1543,15 +1543,12 @@ class LBRYElectrumX(SessionBase):
else:
batch_result[tx_hash] = [raw_tx, {'block_height': -1}]
def threaded_get_merkle():
for tx_hash, (raw_tx, block_txs, pos, block_height) in needed_merkles.items():
batch_result[tx_hash] = raw_tx, {
'merkle': self._get_merkle_branch(block_txs, pos),
'pos': pos,
'block_height': block_height
}
if needed_merkles:
await asyncio.get_running_loop().run_in_executor(self.db.executor, threaded_get_merkle)
for tx_hash, (raw_tx, block_txs, pos, block_height) in needed_merkles.items():
batch_result[tx_hash] = raw_tx, {
'merkle': self._get_merkle_branch(block_txs, pos),
'pos': pos,
'block_height': block_height
}
self.session_mgr.tx_replied_count_metric.inc(len(tx_hashes))
return batch_result