Merge pull request #3113 from lbryio/leveldb-performance

Improve performance of fetching transactions and sending address notifications
This commit is contained in:
Jack Robison 2020-12-14 16:39:50 -05:00 committed by GitHub
commit ea279111c6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 92 additions and 133 deletions

View file

@ -216,6 +216,7 @@ class BlockProcessor:
for cache in self.search_cache.values():
cache.clear()
self.history_cache.clear()
self.notifications.notified_mempool_txs.clear()
await self._maybe_flush()
processed_time = time.perf_counter() - start
self.block_count_metric.set(self.height)

View file

@ -94,8 +94,7 @@ class LevelDB:
self.headers_db = None
self.tx_db = None
self._block_txs_cache = pylru.lrucache(50000)
self._merkle_tx_cache = pylru.lrucache(100000)
self._tx_and_merkle_cache = pylru.lrucache(100000)
self.total_transactions = None
async def _read_tx_counts(self):
@ -147,7 +146,7 @@ class LevelDB:
async def _open_dbs(self, for_sync, compacting):
if self.executor is None:
self.executor = ThreadPoolExecutor(max(1, os.cpu_count() - 1))
self.executor = ThreadPoolExecutor(1)
coin_path = os.path.join(self.env.db_dir, 'COIN')
if not os.path.isfile(coin_path):
with util.open_file(coin_path, create=True) as f:
@ -470,76 +469,52 @@ class LevelDB:
return None, tx_height
return self.total_transactions[tx_num], tx_height
async def tx_merkle(self, tx_num, tx_height):
if tx_height == -1:
return {
'block_height': -1
}
tx_counts = self.tx_counts
tx_pos = tx_num - tx_counts[tx_height - 1]
def _update_block_txs_cache():
block_txs = list(self.tx_db.iterator(
start=TX_HASH_PREFIX + util.pack_be_uint64(tx_counts[tx_height - 1]),
stop=None if tx_height + 1 == len(tx_counts) else
TX_HASH_PREFIX + util.pack_be_uint64(tx_counts[tx_height]), include_key=False
))
if tx_height + 100 > self.db_height:
return block_txs
self._block_txs_cache[tx_height] = block_txs
uncached = None
if (tx_num, tx_height) in self._merkle_tx_cache:
return self._merkle_tx_cache[(tx_num, tx_height)]
if tx_height not in self._block_txs_cache:
uncached = await asyncio.get_event_loop().run_in_executor(self.executor, _update_block_txs_cache)
block_txs = self._block_txs_cache.get(tx_height, uncached)
branch, root = self.merkle.branch_and_root(block_txs, tx_pos)
merkle = {
'block_height': tx_height,
'merkle': [
hash_to_hex_str(hash)
for hash in branch
],
'pos': tx_pos
}
if tx_height + 100 < self.db_height:
self._merkle_tx_cache[(tx_num, tx_height)] = merkle
return merkle
def _fs_transactions(self, txids: Iterable[str]) -> List[Tuple[str, Optional[str], int, int]]:
def _fs_transactions(self, txids: Iterable[str]):
unpack_be_uint64 = util.unpack_be_uint64
tx_counts = self.tx_counts
tx_db_get = self.tx_db.get
tx_infos = []
tx_cache = self._tx_and_merkle_cache
tx_infos = {}
for tx_hash in txids:
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
tx_num = tx_db_get(TX_NUM_PREFIX + tx_hash_bytes)
tx = None
tx_height = -1
if tx_num is not None:
tx_num = unpack_be_uint64(tx_num)
tx_height = bisect_right(tx_counts, tx_num)
if tx_height < self.db_height:
tx = tx_db_get(TX_PREFIX + tx_hash_bytes)
tx_infos.append((tx_hash, None if not tx else tx.hex(), tx_num, tx_height))
cached_tx = tx_cache.get(tx_hash)
if cached_tx:
tx, merkle = cached_tx
else:
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
tx_num = tx_db_get(TX_NUM_PREFIX + tx_hash_bytes)
tx = None
tx_height = -1
if tx_num is not None:
tx_num = unpack_be_uint64(tx_num)
tx_height = bisect_right(tx_counts, tx_num)
if tx_height < self.db_height:
tx = tx_db_get(TX_PREFIX + tx_hash_bytes)
if tx_height == -1:
merkle = {
'block_height': -1
}
else:
tx_pos = tx_num - tx_counts[tx_height - 1]
branch, root = self.merkle.branch_and_root(
self.total_transactions[tx_counts[tx_height - 1]:tx_counts[tx_height]], tx_pos
)
merkle = {
'block_height': tx_height,
'merkle': [
hash_to_hex_str(hash)
for hash in branch
],
'pos': tx_pos
}
if tx_height + 10 < self.db_height:
tx_cache[tx_hash] = tx, merkle
tx_infos[tx_hash] = (None if not tx else tx.hex(), merkle)
return tx_infos
async def fs_transactions(self, txids):
txs = await asyncio.get_event_loop().run_in_executor(
self.executor, self._fs_transactions, txids
)
unsorted_result = {}
async def add_result(item):
_txid, _tx, _tx_num, _tx_height = item
unsorted_result[_txid] = (_tx, await self.tx_merkle(_tx_num, _tx_height))
if txs:
await asyncio.gather(*map(add_result, txs))
return {txid: unsorted_result[txid] for txid, _, _, _ in txs}
return await asyncio.get_event_loop().run_in_executor(self.executor, self._fs_transactions, txids)
async def fs_block_hashes(self, height, count):
if height + count > len(self.headers):
@ -553,28 +528,10 @@ class LevelDB:
transactions. By default returns at most 1000 entries. Set
limit to None to get them all.
"""
# def read_history():
# hashx_history = []
# for key, hist in self.history.db.iterator(prefix=hashX):
# a = array.array('I')
# a.frombytes(hist)
# for tx_num in a:
# tx_height = bisect_right(self.tx_counts, tx_num)
# if tx_height > self.db_height:
# tx_hash = None
# else:
# tx_hash = self.tx_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num))
#
# hashx_history.append((tx_hash, tx_height))
# if limit and len(hashx_history) >= limit:
# return hashx_history
# return hashx_history
def read_history():
db_height = self.db_height
tx_counts = self.tx_counts
tx_db_get = self.tx_db.get
pack_be_uint64 = util.pack_be_uint64
cnt = 0
txs = []

View file

@ -6,13 +6,11 @@
# and warranty status of this software.
"""Mempool handling."""
import os
import asyncio
import itertools
import time
from abc import ABC, abstractmethod
from collections import defaultdict
from concurrent.futures.thread import ThreadPoolExecutor
from prometheus_client import Histogram
import attr
@ -74,7 +72,7 @@ class MemPoolAPI(ABC):
"""
@abstractmethod
async def on_mempool(self, touched, height):
async def on_mempool(self, touched, new_touched, height):
"""Called each time the mempool is synchronized. touched is a set of
hashXs touched since the previous call. height is the
daemon's height at the time the mempool was obtained."""
@ -117,8 +115,8 @@ class MemPool:
# Prevents mempool refreshes during fee histogram calculation
self.lock = asyncio.Lock()
self.wakeup = asyncio.Event()
self.executor = ThreadPoolExecutor(max(os.cpu_count() - 1, 1))
self.mempool_process_time_metric = mempool_process_time_metric
self.notified_mempool_txs = set()
async def _logging(self, synchronized_event):
"""Print regular logs of mempool stats."""
@ -135,15 +133,11 @@ class MemPool:
await synchronized_event.wait()
async def _refresh_histogram(self, synchronized_event):
try:
while True:
await synchronized_event.wait()
async with self.lock:
# Threaded as can be expensive
await asyncio.get_event_loop().run_in_executor(self.executor, self._update_histogram, 100_000)
await asyncio.sleep(self.coin.MEMPOOL_HISTOGRAM_REFRESH_SECS)
finally:
self.executor.shutdown(wait=True)
while True:
await synchronized_event.wait()
async with self.lock:
self._update_histogram(100_000)
await asyncio.sleep(self.coin.MEMPOOL_HISTOGRAM_REFRESH_SECS)
def _update_histogram(self, bin_size):
# Build a histogram by fee rate
@ -226,10 +220,15 @@ class MemPool:
continue
hashes = {hex_str_to_hash(hh) for hh in hex_hashes}
async with self.lock:
new_hashes = hashes.difference(self.notified_mempool_txs)
touched = await self._process_mempool(hashes)
self.notified_mempool_txs.update(new_hashes)
new_touched = {
touched_hashx for touched_hashx, txs in self.hashXs.items() if txs.intersection(new_hashes)
}
synchronized_event.set()
synchronized_event.clear()
await self.api.on_mempool(touched, height)
await self.api.on_mempool(touched, new_touched, height)
duration = time.perf_counter() - start
self.mempool_process_time_metric.observe(duration)
try:
@ -243,7 +242,8 @@ class MemPool:
async def _process_mempool(self, all_hashes):
# Re-sync with the new set of hashes
txs = self.txs
hashXs = self.hashXs
hashXs = self.hashXs # hashX: [tx_hash, ...]
touched = set()
# First handle txs that have disappeared
@ -274,8 +274,8 @@ class MemPool:
# FIXME: this is not particularly efficient
while tx_map and len(tx_map) != prior_count:
prior_count = len(tx_map)
tx_map, utxo_map = self._accept_transactions(tx_map, utxo_map,
touched)
tx_map, utxo_map = self._accept_transactions(tx_map, utxo_map, touched)
if tx_map:
self.logger.info(f'{len(tx_map)} txs dropped')
@ -286,30 +286,25 @@ class MemPool:
hex_hashes_iter = (hash_to_hex_str(hash) for hash in hashes)
raw_txs = await self.api.raw_transactions(hex_hashes_iter)
def deserialize_txs(): # This function is pure
to_hashX = self.coin.hashX_from_script
deserializer = self.coin.DESERIALIZER
to_hashX = self.coin.hashX_from_script
deserializer = self.coin.DESERIALIZER
txs = {}
for hash, raw_tx in zip(hashes, raw_txs):
# The daemon may have evicted the tx from its
# mempool or it may have gotten in a block
if not raw_tx:
continue
tx, tx_size = deserializer(raw_tx).read_tx_and_vsize()
# Convert the inputs and outputs into (hashX, value) pairs
# Drop generation-like inputs from MemPoolTx.prevouts
txin_pairs = tuple((txin.prev_hash, txin.prev_idx)
for txin in tx.inputs
if not txin.is_generation())
txout_pairs = tuple((to_hashX(txout.pk_script), txout.value)
for txout in tx.outputs)
txs[hash] = MemPoolTx(txin_pairs, None, txout_pairs,
0, tx_size)
return txs
# Thread this potentially slow operation so as not to block
tx_map = await asyncio.get_event_loop().run_in_executor(self.executor, deserialize_txs)
tx_map = {}
for hash, raw_tx in zip(hashes, raw_txs):
# The daemon may have evicted the tx from its
# mempool or it may have gotten in a block
if not raw_tx:
continue
tx, tx_size = deserializer(raw_tx).read_tx_and_vsize()
# Convert the inputs and outputs into (hashX, value) pairs
# Drop generation-like inputs from MemPoolTx.prevouts
txin_pairs = tuple((txin.prev_hash, txin.prev_idx)
for txin in tx.inputs
if not txin.is_generation())
txout_pairs = tuple((to_hashX(txout.pk_script), txout.value)
for txout in tx.outputs)
tx_map[hash] = MemPoolTx(txin_pairs, None, txout_pairs,
0, tx_size)
# Determine all prevouts not in the mempool, and fetch the
# UTXO information from the database. Failed prevout lookups

View file

@ -25,9 +25,10 @@ class Notifications:
def __init__(self):
self._touched_mp = {}
self._touched_bp = {}
self.notified_mempool_txs = set()
self._highest_block = -1
async def _maybe_notify(self):
async def _maybe_notify(self, new_touched):
tmp, tbp = self._touched_mp, self._touched_bp
common = set(tmp).intersection(tbp)
if common:
@ -44,24 +45,24 @@ class Notifications:
del tmp[old]
for old in [h for h in tbp if h <= height]:
touched.update(tbp.pop(old))
await self.notify(height, touched)
await self.notify(height, touched, new_touched)
async def notify(self, height, touched):
async def notify(self, height, touched, new_touched):
pass
async def start(self, height, notify_func):
self._highest_block = height
self.notify = notify_func
await self.notify(height, set())
await self.notify(height, set(), set())
async def on_mempool(self, touched, height):
async def on_mempool(self, touched, new_touched, height):
self._touched_mp[height] = touched
await self._maybe_notify()
await self._maybe_notify(new_touched)
async def on_block(self, touched, height):
self._touched_bp[height] = touched
self._highest_block = height
await self._maybe_notify()
await self._maybe_notify(set())
class Server:
@ -84,9 +85,12 @@ class Server:
notifications.mempool_hashes = daemon.mempool_hashes
notifications.raw_transactions = daemon.getrawtransactions
notifications.lookup_utxos = db.lookup_utxos
MemPoolAPI.register(Notifications)
self.mempool = mempool = MemPool(env.coin, notifications)
notifications.notified_mempool_txs = self.mempool.notified_mempool_txs
self.session_mgr = env.coin.SESSION_MANAGER(
env, db, bp, daemon, mempool, self.shutdown_event
)

View file

@ -635,7 +635,7 @@ class SessionManager:
self.history_cache[hashX] = await self.db.limited_history(hashX, limit=limit)
return self.history_cache[hashX]
async def _notify_sessions(self, height, touched):
async def _notify_sessions(self, height, touched, new_touched):
"""Notify sessions about height changes and touched addresses."""
height_changed = height != self.notified_height
if height_changed:
@ -660,12 +660,14 @@ class SessionManager:
if touched or (height_changed and self.mempool_statuses):
notified_hashxs = 0
notified_sessions = 0
for hashX in touched.union(self.mempool_statuses.keys()):
to_notify = touched if height_changed else new_touched
for hashX in to_notify:
for session_id in self.hashx_subscriptions_by_session[hashX]:
asyncio.create_task(self.sessions[session_id].send_history_notification(hashX))
notified_sessions += 1
notified_hashxs += 1
self.logger.info(f'notified {notified_sessions} sessions/{notified_hashxs:,d} touched addresses')
if notified_sessions:
self.logger.info(f'notified {notified_sessions} sessions/{notified_hashxs:,d} touched addresses')
def add_session(self, session):
self.sessions[id(session)] = session