get transactions from leveldb instead of lbrycrd rpc

This commit is contained in:
Jack Robison 2020-11-09 15:34:42 -05:00
parent 83bcab9cd2
commit 1694af8b5e
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
6 changed files with 161 additions and 45 deletions

View file

@ -722,7 +722,7 @@ class Ledger(metaclass=LedgerRegistry):
if cache_item is None: if cache_item is None:
cache_item = TransactionCacheItem() cache_item = TransactionCacheItem()
self._tx_cache[txid] = cache_item self._tx_cache[txid] = cache_item
tx = cache_item.tx or Transaction(unhexlify(raw), height=remote_height) tx = cache_item.tx or Transaction(bytes.fromhex(raw), height=remote_height)
tx.height = remote_height tx.height = remote_height
cache_item.tx = tx cache_item.tx = tx
if 'merkle' in merkle and remote_heights[txid] > 0: if 'merkle' in merkle and remote_heights[txid] > 0:

View file

@ -171,6 +171,7 @@ class BlockProcessor:
# Caches of unflushed items. # Caches of unflushed items.
self.headers = [] self.headers = []
self.block_hashes = []
self.block_txs = [] self.block_txs = []
self.undo_infos = [] self.undo_infos = []
@ -336,7 +337,7 @@ class BlockProcessor:
def flush_data(self): def flush_data(self):
"""The data for a flush. The lock must be taken.""" """The data for a flush. The lock must be taken."""
assert self.state_lock.locked() assert self.state_lock.locked()
return FlushData(self.height, self.tx_count, self.headers, return FlushData(self.height, self.tx_count, self.headers, self.block_hashes,
self.block_txs, self.undo_infos, self.utxo_cache, self.block_txs, self.undo_infos, self.utxo_cache,
self.db_deletes, self.tip) self.db_deletes, self.tip)
@ -392,7 +393,8 @@ class BlockProcessor:
for block in blocks: for block in blocks:
height += 1 height += 1
undo_info = self.advance_txs( undo_info = self.advance_txs(
height, block.transactions, self.coin.electrum_header(block.header, height) height, block.transactions, self.coin.electrum_header(block.header, height),
self.coin.header_hash(block.header)
) )
if height >= min_height: if height >= min_height:
self.undo_infos.append((undo_info, height)) self.undo_infos.append((undo_info, height))
@ -403,14 +405,22 @@ class BlockProcessor:
self.headers.extend(headers) self.headers.extend(headers)
self.tip = self.coin.header_hash(headers[-1]) self.tip = self.coin.header_hash(headers[-1])
def advance_txs(self, height, txs, header): def advance_txs(self, height, txs, header, block_hash):
self.block_hashes.append(block_hash)
self.block_txs.append((b''.join(tx_hash for tx, tx_hash in txs), [tx.raw for tx, _ in txs])) self.block_txs.append((b''.join(tx_hash for tx, tx_hash in txs), [tx.raw for tx, _ in txs]))
# Use local vars for speed in the loops
undo_info = [] undo_info = []
tx_num = self.tx_count tx_num = self.tx_count
hashXs_by_tx = [] hashXs_by_tx = []
# Use local vars for speed in the loops
put_utxo = self.utxo_cache.__setitem__
spend_utxo = self.spend_utxo
undo_info_append = undo_info.append
update_touched = self.touched.update
append_hashX_by_tx = hashXs_by_tx.append
hashX_from_script = self.coin.hashX_from_script
for tx, tx_hash in txs: for tx, tx_hash in txs:
hashXs = [] hashXs = []
append_hashX = hashXs.append append_hashX = hashXs.append
@ -420,24 +430,23 @@ class BlockProcessor:
for txin in tx.inputs: for txin in tx.inputs:
if txin.is_generation(): if txin.is_generation():
continue continue
cache_value = self.spend_utxo(txin.prev_hash, txin.prev_idx) cache_value = spend_utxo(txin.prev_hash, txin.prev_idx)
undo_info.append(cache_value) undo_info_append(cache_value)
append_hashX(cache_value[:-12]) append_hashX(cache_value[:-12])
# Add the new UTXOs # Add the new UTXOs
for idx, txout in enumerate(tx.outputs): for idx, txout in enumerate(tx.outputs):
# Get the hashX. Ignore unspendable outputs # Get the hashX. Ignore unspendable outputs
hashX = self.coin.hashX_from_script(txout.pk_script) hashX = hashX_from_script(txout.pk_script)
if hashX: if hashX:
append_hashX(hashX) append_hashX(hashX)
self.utxo_cache[tx_hash + pack('<H', idx)] = hashX + tx_numb + pack('<Q', txout.value) put_utxo(tx_hash + pack('<H', idx), hashX + tx_numb + pack('<Q', txout.value))
hashXs_by_tx.append(hashXs) append_hashX_by_tx(hashXs)
self.touched.update(hashXs) update_touched(hashXs)
tx_num += 1 tx_num += 1
self.db.history.add_unflushed(hashXs_by_tx, self.tx_count) self.db.history.add_unflushed(hashXs_by_tx, self.tx_count)
self.tx_count = tx_num self.tx_count = tx_num
self.db.tx_counts.append(tx_num) self.db.tx_counts.append(tx_num)
@ -757,9 +766,9 @@ class LBRYBlockProcessor(BlockProcessor):
self.timer.run(self.sql.execute, self.sql.TAG_INDEXES, timer_name='executing TAG_INDEXES') self.timer.run(self.sql.execute, self.sql.TAG_INDEXES, timer_name='executing TAG_INDEXES')
self.timer.run(self.sql.execute, self.sql.LANGUAGE_INDEXES, timer_name='executing LANGUAGE_INDEXES') self.timer.run(self.sql.execute, self.sql.LANGUAGE_INDEXES, timer_name='executing LANGUAGE_INDEXES')
def advance_txs(self, height, txs, header): def advance_txs(self, height, txs, header, block_hash):
timer = self.timer.sub_timers['advance_blocks'] timer = self.timer.sub_timers['advance_blocks']
undo = timer.run(super().advance_txs, height, txs, header, timer_name='super().advance_txs') undo = timer.run(super().advance_txs, height, txs, header, block_hash, timer_name='super().advance_txs')
timer.run(self.sql.advance_txs, height, txs, header, self.daemon.cached_height(), forward_timer=True) timer.run(self.sql.advance_txs, height, txs, header, self.daemon.cached_height(), forward_timer=True)
if (height % 10000 == 0 or not self.db.first_sync) and self.logger.isEnabledFor(10): if (height % 10000 == 0 or not self.db.first_sync) and self.logger.isEnabledFor(10):
self.timer.show(height=height) self.timer.show(height=height)

View file

@ -18,6 +18,7 @@ from asyncio import sleep
from bisect import bisect_right from bisect import bisect_right
from collections import namedtuple from collections import namedtuple
from functools import partial from functools import partial
from binascii import unhexlify, hexlify
from glob import glob from glob import glob
from struct import pack, unpack from struct import pack, unpack
from concurrent.futures.thread import ThreadPoolExecutor from concurrent.futures.thread import ThreadPoolExecutor
@ -36,6 +37,9 @@ HEADER_PREFIX = b'H'
TX_COUNT_PREFIX = b'T' TX_COUNT_PREFIX = b'T'
TX_HASH_PREFIX = b'X' TX_HASH_PREFIX = b'X'
TX_PREFIX = b'B' TX_PREFIX = b'B'
TX_NUM_PREFIX = b'N'
BLOCK_HASH_PREFIX = b'C'
@attr.s(slots=True) @attr.s(slots=True)
@ -43,6 +47,7 @@ class FlushData:
height = attr.ib() height = attr.ib()
tx_count = attr.ib() tx_count = attr.ib()
headers = attr.ib() headers = attr.ib()
block_hashes = attr.ib()
block_txs = attr.ib() block_txs = attr.ib()
# The following are flushed to the UTXO DB if undo_infos is not None # The following are flushed to the UTXO DB if undo_infos is not None
undo_infos = attr.ib() undo_infos = attr.ib()
@ -279,19 +284,24 @@ class LevelDB:
height_start = self.fs_height + 1 height_start = self.fs_height + 1
tx_num = prior_tx_count tx_num = prior_tx_count
for header, (tx_hashes, txs) in zip(flush_data.headers, flush_data.block_txs): for header, block_hash, (tx_hashes, txs) in zip(
flush_data.headers, flush_data.block_hashes, flush_data.block_txs):
tx_count = self.tx_counts[height_start] tx_count = self.tx_counts[height_start]
self.headers_db.put(HEADER_PREFIX + util.pack_be_uint64(height_start), header) self.headers_db.put(HEADER_PREFIX + util.pack_be_uint64(height_start), header)
self.tx_db.put(BLOCK_HASH_PREFIX + util.pack_be_uint64(height_start), block_hash[::-1])
self.tx_db.put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count)) self.tx_db.put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count))
height_start += 1 height_start += 1
offset = 0 offset = 0
while offset < len(tx_hashes): while offset < len(tx_hashes):
self.tx_db.put(TX_HASH_PREFIX + util.pack_be_uint64(tx_num), tx_hashes[offset:offset+32]) self.tx_db.put(TX_HASH_PREFIX + util.pack_be_uint64(tx_num), tx_hashes[offset:offset+32])
self.tx_db.put(TX_PREFIX + util.pack_be_uint64(tx_num), txs[offset // 32]) self.tx_db.put(TX_NUM_PREFIX + tx_hashes[offset:offset+32], util.pack_be_uint64(tx_num))
self.tx_db.put(TX_PREFIX + tx_hashes[offset:offset+32], txs[offset // 32])
tx_num += 1 tx_num += 1
offset += 32 offset += 32
flush_data.block_txs.clear() flush_data.block_txs.clear()
flush_data.block_hashes.clear()
self.fs_height = flush_data.height self.fs_height = flush_data.height
self.fs_tx_count = flush_data.tx_count self.fs_tx_count = flush_data.tx_count
flush_data.headers.clear() flush_data.headers.clear()
@ -416,6 +426,19 @@ class LevelDB:
return await asyncio.get_event_loop().run_in_executor(self.executor, read_headers) return await asyncio.get_event_loop().run_in_executor(self.executor, read_headers)
async def fs_block_tx_hashes(self, height):
def _get_tx_hashes():
return self.tx_db.get(BLOCK_HASH_PREFIX + util.pack_be_uint64(height)).hex().decode(), list(map(
lambda tx_hash: hash_to_hex_str(tx_hash).decode(),
self.tx_db.iterator(
start=TX_HASH_PREFIX + util.pack_be_uint64(self.tx_counts[height - 1]),
stop=None if height + 1 == len(self.tx_counts) else TX_HASH_PREFIX + util.pack_be_uint64(self.tx_counts[height]),
include_key=False
)
))
return await asyncio.get_event_loop().run_in_executor(self.executor, _get_tx_hashes)
def fs_tx_hash(self, tx_num): def fs_tx_hash(self, tx_num):
"""Return a par (tx_hash, tx_height) for the given tx number. """Return a par (tx_hash, tx_height) for the given tx number.
@ -427,6 +450,51 @@ class LevelDB:
tx_hash = self.tx_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num)) tx_hash = self.tx_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num))
return tx_hash, tx_height return tx_hash, tx_height
def _fs_transactions(self, txids):
def _iter_transactions():
block_txs = {}
branch_and_root = self.merkle.branch_and_root
tx_iterator = self.tx_db.iterator
tx_counts = self.tx_counts
tx_db_get = self.tx_db.get
unpack_be_uint64 = util.unpack_be_uint64
pack_be_uint64 = util.pack_be_uint64
for tx_hash in txids:
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
tx_num = tx_db_get(TX_NUM_PREFIX + tx_hash_bytes)
if tx_num is not None:
tx_num = unpack_be_uint64(tx_num)
tx_height = bisect_right(tx_counts, tx_num)
else:
yield tx_hash, (None, {'block_height': -1})
continue
if tx_height >= self.db_height:
yield tx_hash, (None, {'block_height': -1})
continue
tx = tx_db_get(TX_PREFIX + tx_hash_bytes)
if tx_height not in block_txs:
block_txs[tx_height] = list(tx_iterator(
start=TX_HASH_PREFIX + pack_be_uint64(tx_counts[tx_height - 1]),
stop=None if tx_height + 1 == len(tx_counts) else
TX_HASH_PREFIX + pack_be_uint64(tx_counts[tx_height]),
include_key=False
))
tx_pos = tx_counts[tx_height] - tx_num
branch, root = branch_and_root(block_txs[tx_height], tx_pos)
merkle = {
'block_height': tx_height,
'merkle': [hash_to_hex_str(hash) for hash in branch],
'pos': tx_pos
}
yield tx_hash, (None if not tx else tx.hex(), merkle)
return {
_tx_hash: _val for (_tx_hash, _val) in _iter_transactions()
}
async def fs_transactions(self, txids):
return await asyncio.get_event_loop().run_in_executor(self.executor, self._fs_transactions, txids)
async def fs_block_hashes(self, height, count): async def fs_block_hashes(self, height, count):
headers_concat, headers_count = await self.read_headers(height, count) headers_concat, headers_count = await self.read_headers(height, count)
if headers_count != count: if headers_count != count:
@ -446,25 +514,54 @@ class LevelDB:
transactions. By default returns at most 1000 entries. Set transactions. By default returns at most 1000 entries. Set
limit to None to get them all. limit to None to get them all.
""" """
def read_history(): # def read_history():
hashx_history = [] # hashx_history = []
# for key, hist in self.history.db.iterator(prefix=hashX):
# a = array.array('I')
# a.frombytes(hist)
# for tx_num in a:
# tx_height = bisect_right(self.tx_counts, tx_num)
# if tx_height > self.db_height:
# tx_hash = None
# else:
# tx_hash = self.tx_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num))
#
# hashx_history.append((tx_hash, tx_height))
# if limit and len(hashx_history) >= limit:
# return hashx_history
# return hashx_history
def iter_tx_heights():
db_height = self.db_height
tx_counts = self.tx_counts
tx_db_get = self.tx_db.get
pack_be_uint64 = util.pack_be_uint64
cnt = 0
for key, hist in self.history.db.iterator(prefix=hashX): for key, hist in self.history.db.iterator(prefix=hashX):
a = array.array('I') a = array.array('I')
a.frombytes(hist) a.frombytes(hist)
for tx_num in a: for tx_num in a:
tx_height = bisect_right(self.tx_counts, tx_num) tx_height = bisect_right(tx_counts, tx_num)
if tx_height > self.db_height: if tx_height > db_height:
tx_hash = None yield None, tx_height
else: return
tx_hash = self.tx_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num)) yield tx_db_get(TX_HASH_PREFIX + pack_be_uint64(tx_num)), tx_height
hashx_history.append((tx_hash, tx_height)) cnt += 1
if limit and len(hashx_history) >= limit: if limit and cnt >= limit:
return hashx_history return
return hashx_history if limit and cnt >= limit:
return
def read_history():
return [
(tx_num, tx_height) for (tx_num, tx_height) in iter_tx_heights()
]
while True: while True:
history = await asyncio.get_event_loop().run_in_executor(self.executor, read_history) history = await asyncio.get_event_loop().run_in_executor(self.executor, read_history)
if all(hash is not None for hash, height in history): if not history or history[-1][0] is not None:
return history return history
self.logger.warning(f'limited_history: tx hash ' self.logger.warning(f'limited_history: tx hash '
f'not found (reorg?), retrying...') f'not found (reorg?), retrying...')
@ -622,13 +719,14 @@ class LevelDB:
utxos = [] utxos = []
utxos_append = utxos.append utxos_append = utxos.append
s_unpack = unpack s_unpack = unpack
fs_tx_hash = self.fs_tx_hash
# Key: b'u' + address_hashX + tx_idx + tx_num # Key: b'u' + address_hashX + tx_idx + tx_num
# Value: the UTXO value as a 64-bit unsigned integer # Value: the UTXO value as a 64-bit unsigned integer
prefix = b'u' + hashX prefix = b'u' + hashX
for db_key, db_value in self.utxo_db.iterator(prefix=prefix): for db_key, db_value in self.utxo_db.iterator(prefix=prefix):
tx_pos, tx_num = s_unpack('<HI', db_key[-6:]) tx_pos, tx_num = s_unpack('<HI', db_key[-6:])
value, = unpack('<Q', db_value) value, = unpack('<Q', db_value)
tx_hash, height = self.fs_tx_hash(tx_num) tx_hash, height = fs_tx_hash(tx_num)
utxos_append(UTXO(tx_num, tx_pos, tx_hash, height, value)) utxos_append(UTXO(tx_num, tx_pos, tx_hash, height, value))
return utxos return utxos

View file

@ -63,7 +63,7 @@ class Merkle:
raise TypeError('index must be an integer') raise TypeError('index must be an integer')
# This also asserts hashes is not empty # This also asserts hashes is not empty
if not 0 <= index < len(hashes): if not 0 <= index < len(hashes):
raise ValueError('index out of range') raise ValueError(f"index '{index}/{len(hashes)}' out of range")
natural_length = self.branch_length(len(hashes)) natural_length = self.branch_length(len(hashes))
if length is None: if length is None:
length = natural_length length = natural_length

View file

@ -17,7 +17,7 @@ from asyncio import Event, sleep
from collections import defaultdict from collections import defaultdict
from functools import partial from functools import partial
from binascii import hexlify from binascii import hexlify, unhexlify
from pylru import lrucache from pylru import lrucache
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from prometheus_client import Counter, Info, Histogram, Gauge from prometheus_client import Counter, Info, Histogram, Gauge
@ -141,7 +141,11 @@ class SessionManager:
session_count_metric = Gauge("session_count", "Number of connected client sessions", namespace=NAMESPACE, session_count_metric = Gauge("session_count", "Number of connected client sessions", namespace=NAMESPACE,
labelnames=("version",)) labelnames=("version",))
request_count_metric = Counter("requests_count", "Number of requests received", namespace=NAMESPACE, request_count_metric = Counter("requests_count", "Number of requests received", namespace=NAMESPACE,
labelnames=("method", "version")) labelnames=("method", "version"))
tx_request_count_metric = Counter("requested_transaction", "Number of transactions requested", namespace=NAMESPACE)
tx_replied_count_metric = Counter("replied_transaction", "Number of transactions responded", namespace=NAMESPACE)
urls_to_resolve_count_metric = Counter("urls_to_resolve", "Number of urls to resolve", namespace=NAMESPACE)
resolved_url_count_metric = Counter("resolved_url", "Number of resolved urls", namespace=NAMESPACE)
interrupt_count_metric = Counter("interrupt", "Number of interrupted queries", namespace=NAMESPACE) interrupt_count_metric = Counter("interrupt", "Number of interrupted queries", namespace=NAMESPACE)
db_operational_error_metric = Counter( db_operational_error_metric = Counter(
@ -1045,7 +1049,12 @@ class LBRYElectrumX(SessionBase):
async def claimtrie_resolve(self, *urls): async def claimtrie_resolve(self, *urls):
if urls: if urls:
return await self.run_and_cache_query('resolve', reader.resolve_to_bytes, urls) count = len(urls)
try:
self.session_mgr.urls_to_resolve_count_metric.inc(count)
return await self.run_and_cache_query('resolve', reader.resolve_to_bytes, urls)
finally:
self.session_mgr.resolved_url_count_metric.inc(count)
async def get_server_height(self): async def get_server_height(self):
return self.bp.height return self.bp.height
@ -1524,6 +1533,7 @@ class LBRYElectrumX(SessionBase):
async def transaction_info(self, tx_hash: str): async def transaction_info(self, tx_hash: str):
assert_tx_hash(tx_hash) assert_tx_hash(tx_hash)
self.session_mgr.tx_request_count_metric.inc()
tx_info = await self.daemon_request('getrawtransaction', tx_hash, True) tx_info = await self.daemon_request('getrawtransaction', tx_hash, True)
raw_tx = tx_info['hex'] raw_tx = tx_info['hex']
block_hash = tx_info.get('blockhash') block_hash = tx_info.get('blockhash')
@ -1531,15 +1541,19 @@ class LBRYElectrumX(SessionBase):
return raw_tx, {'block_height': -1} return raw_tx, {'block_height': -1}
merkle_height = (await self.daemon.deserialised_block(block_hash))['height'] merkle_height = (await self.daemon.deserialised_block(block_hash))['height']
merkle = await self.transaction_merkle(tx_hash, merkle_height) merkle = await self.transaction_merkle(tx_hash, merkle_height)
self.session_mgr.tx_replied_count_metric.inc()
return raw_tx, merkle return raw_tx, merkle
async def transaction_get_batch(self, *tx_hashes): async def transaction_get_batch(self, *tx_hashes):
self.session_mgr.tx_request_count_metric.inc(len(tx_hashes))
if len(tx_hashes) > 100: if len(tx_hashes) > 100:
raise RPCError(BAD_REQUEST, f'too many tx hashes in request: {len(tx_hashes)}') raise RPCError(BAD_REQUEST, f'too many tx hashes in request: {len(tx_hashes)}')
for tx_hash in tx_hashes: for tx_hash in tx_hashes:
assert_tx_hash(tx_hash) assert_tx_hash(tx_hash)
batch_result = {} batch_result = await self.db.fs_transactions(tx_hashes)
for tx_hash in tx_hashes: for tx_hash in tx_hashes:
if tx_hash in batch_result and batch_result[tx_hash][0]:
continue
tx_info = await self.daemon_request('getrawtransaction', tx_hash, True) tx_info = await self.daemon_request('getrawtransaction', tx_hash, True)
raw_tx = tx_info['hex'] raw_tx = tx_info['hex']
block_hash = tx_info.get('blockhash') block_hash = tx_info.get('blockhash')
@ -1558,6 +1572,7 @@ class LBRYElectrumX(SessionBase):
height = -1 height = -1
merkle['block_height'] = height merkle['block_height'] = height
batch_result[tx_hash] = [raw_tx, merkle] batch_result[tx_hash] = [raw_tx, merkle]
self.session_mgr.tx_replied_count_metric.inc(len(tx_hashes))
return batch_result return batch_result
async def transaction_get(self, tx_hash, verbose=False): async def transaction_get(self, tx_hash, verbose=False):
@ -1579,11 +1594,7 @@ class LBRYElectrumX(SessionBase):
block_hash is a hexadecimal string, and tx_hashes is an block_hash is a hexadecimal string, and tx_hashes is an
ordered list of hexadecimal strings. ordered list of hexadecimal strings.
""" """
height = non_negative_integer(height) return await self.db.fs_block_tx_hashes(height)
hex_hashes = await self.daemon_request('block_hex_hashes', height, 1)
block_hash = hex_hashes[0]
block = await self.daemon.deserialised_block(block_hash)
return block_hash, block['tx']
def _get_merkle_branch(self, tx_hashes, tx_pos): def _get_merkle_branch(self, tx_hashes, tx_pos):
"""Return a merkle branch to a transaction. """Return a merkle branch to a transaction.

View file

@ -77,18 +77,16 @@ class LevelDB(Storage):
import plyvel import plyvel
cls.module = plyvel cls.module = plyvel
def open(self, name, create): def open(self, name, create, lru_cache_size=None):
mof = 512 if self.for_sync else 128 mof = 10000
path = os.path.join(self.db_dir, name) path = os.path.join(self.db_dir, name)
# Use snappy compression (the default) # Use snappy compression (the default)
self.db = self.module.DB(path, create_if_missing=create, self.db = self.module.DB(path, create_if_missing=create, max_open_files=mof)
max_open_files=mof)
self.close = self.db.close self.close = self.db.close
self.get = self.db.get self.get = self.db.get
self.put = self.db.put self.put = self.db.put
self.iterator = self.db.iterator self.iterator = self.db.iterator
self.write_batch = partial(self.db.write_batch, transaction=True, self.write_batch = partial(self.db.write_batch, transaction=True, sync=True)
sync=True)
class RocksDB(Storage): class RocksDB(Storage):