Merge pull request from lbryio/faster-leveldb

store transactions for wallet server in leveldb instead of requesting from lbrycrd
This commit is contained in:
Jack Robison 2020-11-17 17:02:11 -05:00 committed by GitHub
commit ac752d5ec2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 190 additions and 142 deletions

View file

@ -722,7 +722,7 @@ class Ledger(metaclass=LedgerRegistry):
if cache_item is None: if cache_item is None:
cache_item = TransactionCacheItem() cache_item = TransactionCacheItem()
self._tx_cache[txid] = cache_item self._tx_cache[txid] = cache_item
tx = cache_item.tx or Transaction(unhexlify(raw), height=remote_height) tx = cache_item.tx or Transaction(bytes.fromhex(raw), height=remote_height)
tx.height = remote_height tx.height = remote_height
cache_item.tx = tx cache_item.tx = tx
if 'merkle' in merkle and remote_heights[txid] > 0: if 'merkle' in merkle and remote_heights[txid] > 0:

View file

@ -171,7 +171,8 @@ class BlockProcessor:
# Caches of unflushed items. # Caches of unflushed items.
self.headers = [] self.headers = []
self.tx_hashes = [] self.block_hashes = []
self.block_txs = []
self.undo_infos = [] self.undo_infos = []
# UTXO cache # UTXO cache
@ -336,8 +337,8 @@ class BlockProcessor:
def flush_data(self): def flush_data(self):
"""The data for a flush. The lock must be taken.""" """The data for a flush. The lock must be taken."""
assert self.state_lock.locked() assert self.state_lock.locked()
return FlushData(self.height, self.tx_count, self.headers, return FlushData(self.height, self.tx_count, self.headers, self.block_hashes,
self.tx_hashes, self.undo_infos, self.utxo_cache, self.block_txs, self.undo_infos, self.utxo_cache,
self.db_deletes, self.tip) self.db_deletes, self.tip)
async def flush(self, flush_utxos): async def flush(self, flush_utxos):
@ -392,7 +393,8 @@ class BlockProcessor:
for block in blocks: for block in blocks:
height += 1 height += 1
undo_info = self.advance_txs( undo_info = self.advance_txs(
height, block.transactions, self.coin.electrum_header(block.header, height) height, block.transactions, self.coin.electrum_header(block.header, height),
self.coin.header_hash(block.header)
) )
if height >= min_height: if height >= min_height:
self.undo_infos.append((undo_info, height)) self.undo_infos.append((undo_info, height))
@ -403,25 +405,26 @@ class BlockProcessor:
self.headers.extend(headers) self.headers.extend(headers)
self.tip = self.coin.header_hash(headers[-1]) self.tip = self.coin.header_hash(headers[-1])
def advance_txs(self, height, txs, header): def advance_txs(self, height, txs, header, block_hash):
self.tx_hashes.append(b''.join(tx_hash for tx, tx_hash in txs)) self.block_hashes.append(block_hash)
self.block_txs.append((b''.join(tx_hash for tx, tx_hash in txs), [tx.raw for tx, _ in txs]))
# Use local vars for speed in the loops
undo_info = [] undo_info = []
tx_num = self.tx_count tx_num = self.tx_count
script_hashX = self.coin.hashX_from_script hashXs_by_tx = []
s_pack = pack
# Use local vars for speed in the loops
put_utxo = self.utxo_cache.__setitem__ put_utxo = self.utxo_cache.__setitem__
spend_utxo = self.spend_utxo spend_utxo = self.spend_utxo
undo_info_append = undo_info.append undo_info_append = undo_info.append
update_touched = self.touched.update update_touched = self.touched.update
hashXs_by_tx = [] append_hashX_by_tx = hashXs_by_tx.append
append_hashXs = hashXs_by_tx.append hashX_from_script = self.coin.hashX_from_script
for tx, tx_hash in txs: for tx, tx_hash in txs:
hashXs = [] hashXs = []
append_hashX = hashXs.append append_hashX = hashXs.append
tx_numb = s_pack('<I', tx_num) tx_numb = pack('<I', tx_num)
# Spend the inputs # Spend the inputs
for txin in tx.inputs: for txin in tx.inputs:
@ -434,18 +437,16 @@ class BlockProcessor:
# Add the new UTXOs # Add the new UTXOs
for idx, txout in enumerate(tx.outputs): for idx, txout in enumerate(tx.outputs):
# Get the hashX. Ignore unspendable outputs # Get the hashX. Ignore unspendable outputs
hashX = script_hashX(txout.pk_script) hashX = hashX_from_script(txout.pk_script)
if hashX: if hashX:
append_hashX(hashX) append_hashX(hashX)
put_utxo(tx_hash + s_pack('<H', idx), put_utxo(tx_hash + pack('<H', idx), hashX + tx_numb + pack('<Q', txout.value))
hashX + tx_numb + s_pack('<Q', txout.value))
append_hashXs(hashXs) append_hashX_by_tx(hashXs)
update_touched(hashXs) update_touched(hashXs)
tx_num += 1 tx_num += 1
self.db.history.add_unflushed(hashXs_by_tx, self.tx_count) self.db.history.add_unflushed(hashXs_by_tx, self.tx_count)
self.tx_count = tx_num self.tx_count = tx_num
self.db.tx_counts.append(tx_num) self.db.tx_counts.append(tx_num)
@ -487,20 +488,16 @@ class BlockProcessor:
# Use local vars for speed in the loops # Use local vars for speed in the loops
s_pack = pack s_pack = pack
put_utxo = self.utxo_cache.__setitem__
spend_utxo = self.spend_utxo
script_hashX = self.coin.hashX_from_script
touched = self.touched
undo_entry_len = 12 + HASHX_LEN undo_entry_len = 12 + HASHX_LEN
for tx, tx_hash in reversed(txs): for tx, tx_hash in reversed(txs):
for idx, txout in enumerate(tx.outputs): for idx, txout in enumerate(tx.outputs):
# Spend the TX outputs. Be careful with unspendable # Spend the TX outputs. Be careful with unspendable
# outputs - we didn't save those in the first place. # outputs - we didn't save those in the first place.
hashX = script_hashX(txout.pk_script) hashX = self.coin.hashX_from_script(txout.pk_script)
if hashX: if hashX:
cache_value = spend_utxo(tx_hash, idx) cache_value = self.spend_utxo(tx_hash, idx)
touched.add(cache_value[:-12]) self.touched.add(cache_value[:-12])
# Restore the inputs # Restore the inputs
for txin in reversed(tx.inputs): for txin in reversed(tx.inputs):
@ -508,9 +505,8 @@ class BlockProcessor:
continue continue
n -= undo_entry_len n -= undo_entry_len
undo_item = undo_info[n:n + undo_entry_len] undo_item = undo_info[n:n + undo_entry_len]
put_utxo(txin.prev_hash + s_pack('<H', txin.prev_idx), self.utxo_cache[txin.prev_hash + s_pack('<H', txin.prev_idx)] = undo_item
undo_item) self.touched.add(undo_item[:-12])
touched.add(undo_item[:-12])
assert n == 0 assert n == 0
self.tx_count -= len(txs) self.tx_count -= len(txs)
@ -770,9 +766,9 @@ class LBRYBlockProcessor(BlockProcessor):
self.timer.run(self.sql.execute, self.sql.TAG_INDEXES, timer_name='executing TAG_INDEXES') self.timer.run(self.sql.execute, self.sql.TAG_INDEXES, timer_name='executing TAG_INDEXES')
self.timer.run(self.sql.execute, self.sql.LANGUAGE_INDEXES, timer_name='executing LANGUAGE_INDEXES') self.timer.run(self.sql.execute, self.sql.LANGUAGE_INDEXES, timer_name='executing LANGUAGE_INDEXES')
def advance_txs(self, height, txs, header): def advance_txs(self, height, txs, header, block_hash):
timer = self.timer.sub_timers['advance_blocks'] timer = self.timer.sub_timers['advance_blocks']
undo = timer.run(super().advance_txs, height, txs, header, timer_name='super().advance_txs') undo = timer.run(super().advance_txs, height, txs, header, block_hash, timer_name='super().advance_txs')
timer.run(self.sql.advance_txs, height, txs, header, self.daemon.cached_height(), forward_timer=True) timer.run(self.sql.advance_txs, height, txs, header, self.daemon.cached_height(), forward_timer=True)
if (height % 10000 == 0 or not self.db.first_sync) and self.logger.isEnabledFor(10): if (height % 10000 == 0 or not self.db.first_sync) and self.logger.isEnabledFor(10):
self.timer.show(height=height) self.timer.show(height=height)

View file

@ -18,6 +18,7 @@ from asyncio import sleep
from bisect import bisect_right from bisect import bisect_right
from collections import namedtuple from collections import namedtuple
from functools import partial from functools import partial
from binascii import unhexlify, hexlify
from glob import glob from glob import glob
from struct import pack, unpack from struct import pack, unpack
from concurrent.futures.thread import ThreadPoolExecutor from concurrent.futures.thread import ThreadPoolExecutor
@ -35,6 +36,10 @@ UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
HEADER_PREFIX = b'H' HEADER_PREFIX = b'H'
TX_COUNT_PREFIX = b'T' TX_COUNT_PREFIX = b'T'
TX_HASH_PREFIX = b'X' TX_HASH_PREFIX = b'X'
TX_PREFIX = b'B'
TX_NUM_PREFIX = b'N'
BLOCK_HASH_PREFIX = b'C'
@attr.s(slots=True) @attr.s(slots=True)
@ -42,7 +47,8 @@ class FlushData:
height = attr.ib() height = attr.ib()
tx_count = attr.ib() tx_count = attr.ib()
headers = attr.ib() headers = attr.ib()
block_tx_hashes = attr.ib() block_hashes = attr.ib()
block_txs = attr.ib()
# The following are flushed to the UTXO DB if undo_infos is not None # The following are flushed to the UTXO DB if undo_infos is not None
undo_infos = attr.ib() undo_infos = attr.ib()
adds = attr.ib() adds = attr.ib()
@ -82,9 +88,8 @@ class LevelDB:
self.merkle = Merkle() self.merkle = Merkle()
self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes) self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes)
self.hashes_db = None
self.headers_db = None self.headers_db = None
self.tx_count_db = None self.tx_db = None
async def _read_tx_counts(self): async def _read_tx_counts(self):
if self.tx_counts is not None: if self.tx_counts is not None:
@ -95,7 +100,7 @@ class LevelDB:
def get_counts(): def get_counts():
return tuple( return tuple(
util.unpack_be_uint64(tx_count) util.unpack_be_uint64(tx_count)
for tx_count in self.tx_count_db.iterator(prefix=TX_COUNT_PREFIX, include_key=False) for tx_count in self.tx_db.iterator(prefix=TX_COUNT_PREFIX, include_key=False)
) )
tx_counts = await asyncio.get_event_loop().run_in_executor(self.executor, get_counts) tx_counts = await asyncio.get_event_loop().run_in_executor(self.executor, get_counts)
@ -123,17 +128,11 @@ class LevelDB:
self.logger.info('created new headers db') self.logger.info('created new headers db')
self.logger.info(f'opened headers DB (for sync: {for_sync})') self.logger.info(f'opened headers DB (for sync: {for_sync})')
assert self.tx_count_db is None assert self.tx_db is None
self.tx_count_db = self.db_class('tx_count', for_sync) self.tx_db = self.db_class('tx', for_sync)
if self.tx_count_db.is_new: if self.tx_db.is_new:
self.logger.info('created new tx count db') self.logger.info('created new tx db')
self.logger.info(f'opened tx count DB (for sync: {for_sync})') self.logger.info(f'opened tx DB (for sync: {for_sync})')
assert self.hashes_db is None
self.hashes_db = self.db_class('hashes', for_sync)
if self.hashes_db.is_new:
self.logger.info('created new tx hashes db')
self.logger.info(f'opened tx hashes DB (for sync: {for_sync})')
assert self.utxo_db is None assert self.utxo_db is None
# First UTXO DB # First UTXO DB
@ -156,8 +155,7 @@ class LevelDB:
self.utxo_db.close() self.utxo_db.close()
self.history.close_db() self.history.close_db()
self.headers_db.close() self.headers_db.close()
self.tx_count_db.close() self.tx_db.close()
self.hashes_db.close()
self.executor.shutdown(wait=True) self.executor.shutdown(wait=True)
self.executor = None self.executor = None
@ -187,12 +185,9 @@ class LevelDB:
if self.headers_db: if self.headers_db:
self.headers_db.close() self.headers_db.close()
self.headers_db = None self.headers_db = None
if self.tx_count_db: if self.tx_db:
self.tx_count_db.close() self.tx_db.close()
self.tx_count_db = None self.tx_db = None
if self.hashes_db:
self.hashes_db.close()
self.hashes_db = None
await self._open_dbs(False, False) await self._open_dbs(False, False)
self.logger.info("opened for serving") self.logger.info("opened for serving")
@ -217,7 +212,7 @@ class LevelDB:
assert flush_data.height == self.fs_height == self.db_height assert flush_data.height == self.fs_height == self.db_height
assert flush_data.tip == self.db_tip assert flush_data.tip == self.db_tip
assert not flush_data.headers assert not flush_data.headers
assert not flush_data.block_tx_hashes assert not flush_data.block_txs
assert not flush_data.adds assert not flush_data.adds
assert not flush_data.deletes assert not flush_data.deletes
assert not flush_data.undo_infos assert not flush_data.undo_infos
@ -275,30 +270,38 @@ class LevelDB:
""" """
prior_tx_count = (self.tx_counts[self.fs_height] prior_tx_count = (self.tx_counts[self.fs_height]
if self.fs_height >= 0 else 0) if self.fs_height >= 0 else 0)
assert len(flush_data.block_tx_hashes) == len(flush_data.headers) assert len(flush_data.block_txs) == len(flush_data.headers)
assert flush_data.height == self.fs_height + len(flush_data.headers) assert flush_data.height == self.fs_height + len(flush_data.headers)
assert flush_data.tx_count == (self.tx_counts[-1] if self.tx_counts assert flush_data.tx_count == (self.tx_counts[-1] if self.tx_counts
else 0) else 0)
assert len(self.tx_counts) == flush_data.height + 1 assert len(self.tx_counts) == flush_data.height + 1
assert len(b''.join(flush_data.block_tx_hashes)) // 32 == flush_data.tx_count - prior_tx_count assert len(
b''.join(hashes for hashes, _ in flush_data.block_txs)
) // 32 == flush_data.tx_count - prior_tx_count
# Write the headers, tx counts, and tx hashes # Write the headers, tx counts, and tx hashes
start_time = time.perf_counter() start_time = time.perf_counter()
height_start = self.fs_height + 1 height_start = self.fs_height + 1
tx_num = prior_tx_count tx_num = prior_tx_count
for header, tx_hashes in zip(flush_data.headers, flush_data.block_tx_hashes): for header, block_hash, (tx_hashes, txs) in zip(
flush_data.headers, flush_data.block_hashes, flush_data.block_txs):
tx_count = self.tx_counts[height_start] tx_count = self.tx_counts[height_start]
self.headers_db.put(HEADER_PREFIX + util.pack_be_uint64(height_start), header) self.headers_db.put(HEADER_PREFIX + util.pack_be_uint64(height_start), header)
self.tx_count_db.put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count)) self.tx_db.put(BLOCK_HASH_PREFIX + util.pack_be_uint64(height_start), block_hash[::-1])
self.tx_db.put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count))
height_start += 1 height_start += 1
offset = 0 offset = 0
while offset < len(tx_hashes): while offset < len(tx_hashes):
self.hashes_db.put(TX_HASH_PREFIX + util.pack_be_uint64(tx_num), tx_hashes[offset:offset+32]) self.tx_db.put(TX_HASH_PREFIX + util.pack_be_uint64(tx_num), tx_hashes[offset:offset+32])
self.tx_db.put(TX_NUM_PREFIX + tx_hashes[offset:offset+32], util.pack_be_uint64(tx_num))
self.tx_db.put(TX_PREFIX + tx_hashes[offset:offset+32], txs[offset // 32])
tx_num += 1 tx_num += 1
offset += 32 offset += 32
flush_data.block_tx_hashes.clear() flush_data.block_txs.clear()
flush_data.block_hashes.clear()
self.fs_height = flush_data.height self.fs_height = flush_data.height
self.fs_tx_count = flush_data.tx_count self.fs_tx_count = flush_data.tx_count
flush_data.headers.clear() flush_data.headers.clear()
@ -362,7 +365,7 @@ class LevelDB:
def flush_backup(self, flush_data, touched): def flush_backup(self, flush_data, touched):
"""Like flush_dbs() but when backing up. All UTXOs are flushed.""" """Like flush_dbs() but when backing up. All UTXOs are flushed."""
assert not flush_data.headers assert not flush_data.headers
assert not flush_data.block_tx_hashes assert not flush_data.block_txs
assert flush_data.height < self.db_height assert flush_data.height < self.db_height
self.history.assert_flushed() self.history.assert_flushed()
@ -431,9 +434,54 @@ class LevelDB:
if tx_height > self.db_height: if tx_height > self.db_height:
tx_hash = None tx_hash = None
else: else:
tx_hash = self.hashes_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num)) tx_hash = self.tx_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num))
return tx_hash, tx_height return tx_hash, tx_height
def _fs_transactions(self, txids):
def _iter_transactions():
block_txs = {}
branch_and_root = self.merkle.branch_and_root
tx_iterator = self.tx_db.iterator
tx_counts = self.tx_counts
tx_db_get = self.tx_db.get
unpack_be_uint64 = util.unpack_be_uint64
pack_be_uint64 = util.pack_be_uint64
for tx_hash in txids:
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
tx_num = tx_db_get(TX_NUM_PREFIX + tx_hash_bytes)
if tx_num is not None:
tx_num = unpack_be_uint64(tx_num)
tx_height = bisect_right(tx_counts, tx_num)
else:
yield tx_hash, (None, {'block_height': -1})
continue
if tx_height >= self.db_height:
yield tx_hash, (None, {'block_height': -1})
continue
tx = tx_db_get(TX_PREFIX + tx_hash_bytes)
if tx_height not in block_txs:
block_txs[tx_height] = list(tx_iterator(
start=TX_HASH_PREFIX + pack_be_uint64(tx_counts[tx_height - 1]),
stop=None if tx_height + 1 == len(tx_counts) else
TX_HASH_PREFIX + pack_be_uint64(tx_counts[tx_height]),
include_key=False
))
tx_pos = tx_counts[tx_height] - tx_num
branch, root = branch_and_root(block_txs[tx_height], tx_pos)
merkle = {
'block_height': tx_height,
'merkle': [hash_to_hex_str(hash) for hash in branch],
'pos': tx_pos
}
yield tx_hash, (None if not tx else tx.hex(), merkle)
return {
_tx_hash: _val for (_tx_hash, _val) in _iter_transactions()
}
async def fs_transactions(self, txids):
return await asyncio.get_event_loop().run_in_executor(self.executor, self._fs_transactions, txids)
async def fs_block_hashes(self, height, count): async def fs_block_hashes(self, height, count):
headers_concat, headers_count = await self.read_headers(height, count) headers_concat, headers_count = await self.read_headers(height, count)
if headers_count != count: if headers_count != count:
@ -453,25 +501,54 @@ class LevelDB:
transactions. By default returns at most 1000 entries. Set transactions. By default returns at most 1000 entries. Set
limit to None to get them all. limit to None to get them all.
""" """
def read_history(): # def read_history():
hashx_history = [] # hashx_history = []
# for key, hist in self.history.db.iterator(prefix=hashX):
# a = array.array('I')
# a.frombytes(hist)
# for tx_num in a:
# tx_height = bisect_right(self.tx_counts, tx_num)
# if tx_height > self.db_height:
# tx_hash = None
# else:
# tx_hash = self.tx_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num))
#
# hashx_history.append((tx_hash, tx_height))
# if limit and len(hashx_history) >= limit:
# return hashx_history
# return hashx_history
def iter_tx_heights():
db_height = self.db_height
tx_counts = self.tx_counts
tx_db_get = self.tx_db.get
pack_be_uint64 = util.pack_be_uint64
cnt = 0
for key, hist in self.history.db.iterator(prefix=hashX): for key, hist in self.history.db.iterator(prefix=hashX):
a = array.array('I') a = array.array('I')
a.frombytes(hist) a.frombytes(hist)
for tx_num in a: for tx_num in a:
tx_height = bisect_right(self.tx_counts, tx_num) tx_height = bisect_right(tx_counts, tx_num)
if tx_height > self.db_height: if tx_height > db_height:
tx_hash = None yield None, tx_height
else: return
tx_hash = self.hashes_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num)) yield tx_db_get(TX_HASH_PREFIX + pack_be_uint64(tx_num)), tx_height
hashx_history.append((tx_hash, tx_height)) cnt += 1
if limit and len(hashx_history) >= limit: if limit and cnt >= limit:
return hashx_history return
return hashx_history if limit and cnt >= limit:
return
def read_history():
return [
(tx_num, tx_height) for (tx_num, tx_height) in iter_tx_heights()
]
while True: while True:
history = await asyncio.get_event_loop().run_in_executor(self.executor, read_history) history = await asyncio.get_event_loop().run_in_executor(self.executor, read_history)
if all(hash is not None for hash, height in history): if not history or history[-1][0] is not None:
return history return history
self.logger.warning(f'limited_history: tx hash ' self.logger.warning(f'limited_history: tx hash '
f'not found (reorg?), retrying...') f'not found (reorg?), retrying...')
@ -629,13 +706,14 @@ class LevelDB:
utxos = [] utxos = []
utxos_append = utxos.append utxos_append = utxos.append
s_unpack = unpack s_unpack = unpack
fs_tx_hash = self.fs_tx_hash
# Key: b'u' + address_hashX + tx_idx + tx_num # Key: b'u' + address_hashX + tx_idx + tx_num
# Value: the UTXO value as a 64-bit unsigned integer # Value: the UTXO value as a 64-bit unsigned integer
prefix = b'u' + hashX prefix = b'u' + hashX
for db_key, db_value in self.utxo_db.iterator(prefix=prefix): for db_key, db_value in self.utxo_db.iterator(prefix=prefix):
tx_pos, tx_num = s_unpack('<HI', db_key[-6:]) tx_pos, tx_num = s_unpack('<HI', db_key[-6:])
value, = unpack('<Q', db_value) value, = unpack('<Q', db_value)
tx_hash, height = self.fs_tx_hash(tx_num) tx_hash, height = fs_tx_hash(tx_num)
utxos_append(UTXO(tx_num, tx_pos, tx_hash, height, value)) utxos_append(UTXO(tx_num, tx_pos, tx_hash, height, value))
return utxos return utxos

View file

@ -63,7 +63,7 @@ class Merkle:
raise TypeError('index must be an integer') raise TypeError('index must be an integer')
# This also asserts hashes is not empty # This also asserts hashes is not empty
if not 0 <= index < len(hashes): if not 0 <= index < len(hashes):
raise ValueError('index out of range') raise ValueError(f"index '{index}/{len(hashes)}' out of range")
natural_length = self.branch_length(len(hashes)) natural_length = self.branch_length(len(hashes))
if length is None: if length is None:
length = natural_length length = natural_length

View file

@ -17,7 +17,7 @@ from asyncio import Event, sleep
from collections import defaultdict from collections import defaultdict
from functools import partial from functools import partial
from binascii import hexlify from binascii import hexlify, unhexlify
from pylru import lrucache from pylru import lrucache
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from prometheus_client import Counter, Info, Histogram, Gauge from prometheus_client import Counter, Info, Histogram, Gauge
@ -141,7 +141,11 @@ class SessionManager:
session_count_metric = Gauge("session_count", "Number of connected client sessions", namespace=NAMESPACE, session_count_metric = Gauge("session_count", "Number of connected client sessions", namespace=NAMESPACE,
labelnames=("version",)) labelnames=("version",))
request_count_metric = Counter("requests_count", "Number of requests received", namespace=NAMESPACE, request_count_metric = Counter("requests_count", "Number of requests received", namespace=NAMESPACE,
labelnames=("method", "version")) labelnames=("method", "version"))
tx_request_count_metric = Counter("requested_transaction", "Number of transactions requested", namespace=NAMESPACE)
tx_replied_count_metric = Counter("replied_transaction", "Number of transactions responded", namespace=NAMESPACE)
urls_to_resolve_count_metric = Counter("urls_to_resolve", "Number of urls to resolve", namespace=NAMESPACE)
resolved_url_count_metric = Counter("resolved_url", "Number of resolved urls", namespace=NAMESPACE)
interrupt_count_metric = Counter("interrupt", "Number of interrupted queries", namespace=NAMESPACE) interrupt_count_metric = Counter("interrupt", "Number of interrupted queries", namespace=NAMESPACE)
db_operational_error_metric = Counter( db_operational_error_metric = Counter(
@ -1045,7 +1049,12 @@ class LBRYElectrumX(SessionBase):
async def claimtrie_resolve(self, *urls): async def claimtrie_resolve(self, *urls):
if urls: if urls:
return await self.run_and_cache_query('resolve', reader.resolve_to_bytes, urls) count = len(urls)
try:
self.session_mgr.urls_to_resolve_count_metric.inc(count)
return await self.run_and_cache_query('resolve', reader.resolve_to_bytes, urls)
finally:
self.session_mgr.resolved_url_count_metric.inc(count)
async def get_server_height(self): async def get_server_height(self):
return self.bp.height return self.bp.height
@ -1523,27 +1532,22 @@ class LBRYElectrumX(SessionBase):
f'network rules.\n\n{message}\n[{raw_tx}]') f'network rules.\n\n{message}\n[{raw_tx}]')
async def transaction_info(self, tx_hash: str): async def transaction_info(self, tx_hash: str):
assert_tx_hash(tx_hash) return (await self.transaction_get_batch(tx_hash))[tx_hash]
tx_info = await self.daemon_request('getrawtransaction', tx_hash, True)
raw_tx = tx_info['hex']
block_hash = tx_info.get('blockhash')
if not block_hash:
return raw_tx, {'block_height': -1}
merkle_height = (await self.daemon.deserialised_block(block_hash))['height']
merkle = await self.transaction_merkle(tx_hash, merkle_height)
return raw_tx, merkle
async def transaction_get_batch(self, *tx_hashes): async def transaction_get_batch(self, *tx_hashes):
self.session_mgr.tx_request_count_metric.inc(len(tx_hashes))
if len(tx_hashes) > 100: if len(tx_hashes) > 100:
raise RPCError(BAD_REQUEST, f'too many tx hashes in request: {len(tx_hashes)}') raise RPCError(BAD_REQUEST, f'too many tx hashes in request: {len(tx_hashes)}')
for tx_hash in tx_hashes: for tx_hash in tx_hashes:
assert_tx_hash(tx_hash) assert_tx_hash(tx_hash)
batch_result = {} batch_result = await self.db.fs_transactions(tx_hashes)
needed_merkles = {}
for tx_hash in tx_hashes: for tx_hash in tx_hashes:
if tx_hash in batch_result and batch_result[tx_hash][0]:
continue
tx_info = await self.daemon_request('getrawtransaction', tx_hash, True) tx_info = await self.daemon_request('getrawtransaction', tx_hash, True)
raw_tx = tx_info['hex'] raw_tx = tx_info['hex']
block_hash = tx_info.get('blockhash') block_hash = tx_info.get('blockhash')
merkle = {}
if block_hash: if block_hash:
block = await self.daemon.deserialised_block(block_hash) block = await self.daemon.deserialised_block(block_hash)
height = block['height'] height = block['height']
@ -1552,12 +1556,21 @@ class LBRYElectrumX(SessionBase):
except ValueError: except ValueError:
raise RPCError(BAD_REQUEST, f'tx hash {tx_hash} not in ' raise RPCError(BAD_REQUEST, f'tx hash {tx_hash} not in '
f'block {block_hash} at height {height:,d}') f'block {block_hash} at height {height:,d}')
merkle["merkle"] = self._get_merkle_branch(block['tx'], pos) needed_merkles[tx_hash] = raw_tx, block['tx'], pos, height
merkle["pos"] = pos
else: else:
height = -1 batch_result[tx_hash] = [raw_tx, {'block_height': -1}]
merkle['block_height'] = height
batch_result[tx_hash] = [raw_tx, merkle] def threaded_get_merkle():
for tx_hash, (raw_tx, block_txs, pos, block_height) in needed_merkles.items():
batch_result[tx_hash] = raw_tx, {
'merkle': self._get_merkle_branch(block_txs, pos),
'pos': pos,
'block_height': block_height
}
if needed_merkles:
await asyncio.get_running_loop().run_in_executor(self.db.executor, threaded_get_merkle)
self.session_mgr.tx_replied_count_metric.inc(len(tx_hashes))
return batch_result return batch_result
async def transaction_get(self, tx_hash, verbose=False): async def transaction_get(self, tx_hash, verbose=False):
@ -1572,19 +1585,6 @@ class LBRYElectrumX(SessionBase):
return await self.daemon_request('getrawtransaction', tx_hash, verbose) return await self.daemon_request('getrawtransaction', tx_hash, verbose)
async def _block_hash_and_tx_hashes(self, height):
"""Returns a pair (block_hash, tx_hashes) for the main chain block at
the given height.
block_hash is a hexadecimal string, and tx_hashes is an
ordered list of hexadecimal strings.
"""
height = non_negative_integer(height)
hex_hashes = await self.daemon_request('block_hex_hashes', height, 1)
block_hash = hex_hashes[0]
block = await self.daemon.deserialised_block(block_hash)
return block_hash, block['tx']
def _get_merkle_branch(self, tx_hashes, tx_pos): def _get_merkle_branch(self, tx_hashes, tx_pos):
"""Return a merkle branch to a transaction. """Return a merkle branch to a transaction.
@ -1604,35 +1604,11 @@ class LBRYElectrumX(SessionBase):
height: the height of the block it is in height: the height of the block it is in
""" """
assert_tx_hash(tx_hash) assert_tx_hash(tx_hash)
block_hash, tx_hashes = await self._block_hash_and_tx_hashes(height) result = await self.transaction_get_batch(tx_hash)
try: if tx_hash not in result or result[tx_hash][1]['block_height'] <= 0:
pos = tx_hashes.index(tx_hash)
except ValueError:
raise RPCError(BAD_REQUEST, f'tx hash {tx_hash} not in ' raise RPCError(BAD_REQUEST, f'tx hash {tx_hash} not in '
f'block {block_hash} at height {height:,d}') f'block at height {height:,d}')
branch = self._get_merkle_branch(tx_hashes, pos) return result[tx_hash][1]
return {"block_height": height, "merkle": branch, "pos": pos}
async def transaction_id_from_pos(self, height, tx_pos, merkle=False):
"""Return the txid and optionally a merkle proof, given
a block height and position in the block.
"""
tx_pos = non_negative_integer(tx_pos)
if merkle not in (True, False):
raise RPCError(BAD_REQUEST, f'"merkle" must be a boolean')
block_hash, tx_hashes = await self._block_hash_and_tx_hashes(height)
try:
tx_hash = tx_hashes[tx_pos]
except IndexError:
raise RPCError(BAD_REQUEST, f'no tx at position {tx_pos:,d} in '
f'block {block_hash} at height {height:,d}')
if merkle:
branch = self._get_merkle_branch(tx_hashes, tx_pos)
return {"tx_hash": tx_hash, "merkle": branch}
else:
return tx_hash
class LocalRPC(SessionBase): class LocalRPC(SessionBase):

View file

@ -77,18 +77,16 @@ class LevelDB(Storage):
import plyvel import plyvel
cls.module = plyvel cls.module = plyvel
def open(self, name, create): def open(self, name, create, lru_cache_size=None):
mof = 512 if self.for_sync else 128 mof = 10000
path = os.path.join(self.db_dir, name) path = os.path.join(self.db_dir, name)
# Use snappy compression (the default) # Use snappy compression (the default)
self.db = self.module.DB(path, create_if_missing=create, self.db = self.module.DB(path, create_if_missing=create, max_open_files=mof)
max_open_files=mof)
self.close = self.db.close self.close = self.db.close
self.get = self.db.get self.get = self.db.get
self.put = self.db.put self.put = self.db.put
self.iterator = self.db.iterator self.iterator = self.db.iterator
self.write_batch = partial(self.db.write_batch, transaction=True, self.write_batch = partial(self.db.write_batch, transaction=True, sync=True)
sync=True)
class RocksDB(Storage): class RocksDB(Storage):