From 292d272a94ec0d3a3b6b0a9ad591e3dc35a567b0 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 16 Jul 2021 14:51:10 -0400 Subject: [PATCH] combine MemPool and Notifications classes --- lbry/wallet/server/block_processor.py | 10 +-- lbry/wallet/server/mempool.py | 121 +++++++++++--------------- lbry/wallet/server/server.py | 83 ++---------------- lbry/wallet/server/session.py | 4 +- 4 files changed, 68 insertions(+), 150 deletions(-) diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 14c4d175e..5ac14f4ee 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -174,11 +174,11 @@ class BlockProcessor: "reorg_count", "Number of reorgs", namespace=NAMESPACE ) - def __init__(self, env, db: 'LevelDB', daemon, notifications, shutdown_event: asyncio.Event): + def __init__(self, env, db: 'LevelDB', daemon, mempool, shutdown_event: asyncio.Event): self.env = env self.db = db self.daemon = daemon - self.notifications = notifications + self.mempool = mempool self.shutdown_event = shutdown_event self.coin = env.coin @@ -327,7 +327,7 @@ class BlockProcessor: s = '' if len(blocks) == 1 else 's' self.logger.info('processed {:,d} block{} in {:.1f}s'.format(len(blocks), s, processed_time)) if self._caught_up_event.is_set(): - await self.notifications.on_block(self.touched, self.height) + await self.mempool.on_block(self.touched, self.height) self.touched.clear() elif hprevs[0] != chain[0]: min_start_height = max(self.height - self.coin.REORG_LIMIT, 0) @@ -371,7 +371,6 @@ class BlockProcessor: 'resetting the prefetcher') await self.prefetcher.reset_height(self.height) - # - Flushing def flush_data(self): """The data for a flush. The lock must be taken.""" @@ -1135,7 +1134,6 @@ class BlockProcessor: # Use local vars for speed in the loops spend_utxo = self.spend_utxo add_utxo = self.add_utxo - spend_claim_or_support_txo = self._spend_claim_or_support_txo add_claim_or_support = self._add_claim_or_support @@ -1257,7 +1255,7 @@ class BlockProcessor: self.utxo_cache.clear() self.hashXs_by_tx.clear() self.history_cache.clear() - self.notifications.notified_mempool_txs.clear() + self.mempool.notified_mempool_txs.clear() self.removed_claim_hashes.clear() self.touched_claim_hashes.clear() self.pending_reposted.clear() diff --git a/lbry/wallet/server/mempool.py b/lbry/wallet/server/mempool.py index da01cb6d8..625649ac1 100644 --- a/lbry/wallet/server/mempool.py +++ b/lbry/wallet/server/mempool.py @@ -9,15 +9,16 @@ import asyncio import itertools import time -from abc import ABC, abstractmethod +import attr +import typing +from typing import Set, Optional, Callable, Awaitable from collections import defaultdict from prometheus_client import Histogram - -import attr - from lbry.wallet.server.hash import hash_to_hex_str, hex_str_to_hash from lbry.wallet.server.util import class_logger, chunks from lbry.wallet.server.leveldb import UTXO +if typing.TYPE_CHECKING: + from lbry.wallet.server.session import LBRYSessionManager @attr.s(slots=True) @@ -37,47 +38,6 @@ class MemPoolTxSummary: has_unconfirmed_inputs = attr.ib() -class MemPoolAPI(ABC): - """A concrete instance of this class is passed to the MemPool object - and used by it to query DB and blockchain state.""" - - @abstractmethod - async def height(self): - """Query bitcoind for its height.""" - - @abstractmethod - def cached_height(self): - """Return the height of bitcoind the last time it was queried, - for any reason, without actually querying it. - """ - - @abstractmethod - async def mempool_hashes(self): - """Query bitcoind for the hashes of all transactions in its - mempool, returned as a list.""" - - @abstractmethod - async def raw_transactions(self, hex_hashes): - """Query bitcoind for the serialized raw transactions with the given - hashes. Missing transactions are returned as None. - - hex_hashes is an iterable of hexadecimal hash strings.""" - - @abstractmethod - async def lookup_utxos(self, prevouts): - """Return a list of (hashX, value) pairs each prevout if unspent, - otherwise return None if spent or not found. - - prevouts - an iterable of (hash, index) pairs - """ - - @abstractmethod - async def on_mempool(self, touched, new_touched, height): - """Called each time the mempool is synchronized. touched is a set of - hashXs touched since the previous call. height is the - daemon's height at the time the mempool was obtained.""" - - NAMESPACE = "wallet_server" HISTOGRAM_BUCKETS = ( .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf') @@ -89,23 +49,14 @@ mempool_process_time_metric = Histogram( class MemPool: - """Representation of the daemon's mempool. - - coin - a coin class from coins.py - api - an object implementing MemPoolAPI - - Updated regularly in caught-up state. Goal is to enable efficient - response to the calls in the external interface. To that end we - maintain the following maps: - - tx: tx_hash -> MemPoolTx - hashXs: hashX -> set of all hashes of txs touching the hashX - """ - - def __init__(self, coin, api, refresh_secs=1.0, log_status_secs=120.0): - assert isinstance(api, MemPoolAPI) + def __init__(self, coin, daemon, db, refresh_secs=1.0, log_status_secs=120.0): self.coin = coin - self.api = api + self._daemon = daemon + self._db = db + self._touched_mp = {} + self._touched_bp = {} + self._highest_block = -1 + self.logger = class_logger(__name__, self.__class__.__name__) self.txs = {} self.hashXs = defaultdict(set) # None can be a key @@ -117,6 +68,7 @@ class MemPool: self.wakeup = asyncio.Event() self.mempool_process_time_metric = mempool_process_time_metric self.notified_mempool_txs = set() + self.notify_sessions: Optional[Callable[[int, Set[bytes], Set[bytes]], Awaitable[None]]] = None async def _logging(self, synchronized_event): """Print regular logs of mempool stats.""" @@ -189,9 +141,9 @@ class MemPool: """Refresh our view of the daemon's mempool.""" while True: start = time.perf_counter() - height = self.api.cached_height() - hex_hashes = await self.api.mempool_hashes() - if height != await self.api.height(): + height = self._daemon.cached_height() + hex_hashes = await self._daemon.mempool_hashes() + if height != await self._daemon.height(): continue hashes = {hex_str_to_hash(hh) for hh in hex_hashes} async with self.lock: @@ -203,7 +155,7 @@ class MemPool: } synchronized_event.set() synchronized_event.clear() - await self.api.on_mempool(touched, new_touched, height) + await self.on_mempool(touched, new_touched, height) duration = time.perf_counter() - start self.mempool_process_time_metric.observe(duration) try: @@ -258,8 +210,7 @@ class MemPool: async def _fetch_and_accept(self, hashes, all_hashes, touched): """Fetch a list of mempool transactions.""" - hex_hashes_iter = (hash_to_hex_str(hash) for hash in hashes) - raw_txs = await self.api.raw_transactions(hex_hashes_iter) + raw_txs = await self._daemon.getrawtransactions((hash_to_hex_str(hash) for hash in hashes)) to_hashX = self.coin.hashX_from_script deserializer = self.coin.DESERIALIZER @@ -289,7 +240,7 @@ class MemPool: prevouts = tuple(prevout for tx in tx_map.values() for prevout in tx.prevouts if prevout[0] not in all_hashes) - utxos = await self.api.lookup_utxos(prevouts) + utxos = await self._db.lookup_utxos(prevouts) utxo_map = dict(zip(prevouts, utxos)) return self._accept_transactions(tx_map, utxo_map, touched) @@ -373,3 +324,37 @@ class MemPool: if unspent_inputs: return -1 return 0 + + async def _maybe_notify(self, new_touched): + tmp, tbp = self._touched_mp, self._touched_bp + common = set(tmp).intersection(tbp) + if common: + height = max(common) + elif tmp and max(tmp) == self._highest_block: + height = self._highest_block + else: + # Either we are processing a block and waiting for it to + # come in, or we have not yet had a mempool update for the + # new block height + return + touched = tmp.pop(height) + for old in [h for h in tmp if h <= height]: + del tmp[old] + for old in [h for h in tbp if h <= height]: + touched.update(tbp.pop(old)) + # print("notify", height, len(touched), len(new_touched)) + await self.notify_sessions(height, touched, new_touched) + + async def start(self, height, session_manager: 'LBRYSessionManager'): + self._highest_block = height + self.notify_sessions = session_manager._notify_sessions + await self.notify_sessions(height, set(), set()) + + async def on_mempool(self, touched, new_touched, height): + self._touched_mp[height] = touched + await self._maybe_notify(new_touched) + + async def on_block(self, touched, height): + self._touched_bp[height] = touched + self._highest_block = height + await self._maybe_notify(set()) diff --git a/lbry/wallet/server/server.py b/lbry/wallet/server/server.py index bad970c78..ff7958872 100644 --- a/lbry/wallet/server/server.py +++ b/lbry/wallet/server/server.py @@ -5,66 +5,13 @@ from concurrent.futures.thread import ThreadPoolExecutor import typing import lbry -from lbry.wallet.server.mempool import MemPool, MemPoolAPI +from lbry.wallet.server.mempool import MemPool +from lbry.wallet.server.block_processor import BlockProcessor +from lbry.wallet.server.leveldb import LevelDB +from lbry.wallet.server.session import LBRYSessionManager from lbry.prometheus import PrometheusServer -class Notifications: - # hashX notifications come from two sources: new blocks and - # mempool refreshes. - # - # A user with a pending transaction is notified after the block it - # gets in is processed. Block processing can take an extended - # time, and the prefetcher might poll the daemon after the mempool - # code in any case. In such cases the transaction will not be in - # the mempool after the mempool refresh. We want to avoid - # notifying clients twice - for the mempool refresh and when the - # block is done. This object handles that logic by deferring - # notifications appropriately. - - def __init__(self): - self._touched_mp = {} - self._touched_bp = {} - self.notified_mempool_txs = set() - self._highest_block = -1 - - async def _maybe_notify(self, new_touched): - tmp, tbp = self._touched_mp, self._touched_bp - common = set(tmp).intersection(tbp) - if common: - height = max(common) - elif tmp and max(tmp) == self._highest_block: - height = self._highest_block - else: - # Either we are processing a block and waiting for it to - # come in, or we have not yet had a mempool update for the - # new block height - return - touched = tmp.pop(height) - for old in [h for h in tmp if h <= height]: - del tmp[old] - for old in [h for h in tbp if h <= height]: - touched.update(tbp.pop(old)) - await self.notify(height, touched, new_touched) - - async def notify(self, height, touched, new_touched): - pass - - async def start(self, height, notify_func): - self._highest_block = height - self.notify = notify_func - await self.notify(height, set(), set()) - - async def on_mempool(self, touched, new_touched, height): - self._touched_mp[height] = touched - await self._maybe_notify(new_touched) - - async def on_block(self, touched, height): - self._touched_bp[height] = touched - self._highest_block = height - await self._maybe_notify(set()) - - class Server: def __init__(self, env): @@ -73,25 +20,13 @@ class Server: self.shutdown_event = asyncio.Event() self.cancellable_tasks = [] - self.notifications = notifications = Notifications() self.daemon = daemon = env.coin.DAEMON(env.coin, env.daemon_url) - self.db = db = env.coin.DB(env) - self.bp = bp = env.coin.BLOCK_PROCESSOR(env, db, daemon, notifications, self.shutdown_event) + self.db = db = LevelDB(env) + self.mempool = mempool = MemPool(env.coin, daemon, db) + self.bp = bp = BlockProcessor(env, db, daemon, mempool, self.shutdown_event) self.prometheus_server: typing.Optional[PrometheusServer] = None - # Set notifications up to implement the MemPoolAPI - notifications.height = daemon.height - notifications.cached_height = daemon.cached_height - notifications.mempool_hashes = daemon.mempool_hashes - notifications.raw_transactions = daemon.getrawtransactions - notifications.lookup_utxos = db.lookup_utxos - - MemPoolAPI.register(Notifications) - self.mempool = mempool = MemPool(env.coin, notifications) - - notifications.notified_mempool_txs = self.mempool.notified_mempool_txs - - self.session_mgr = env.coin.SESSION_MANAGER( + self.session_mgr = LBRYSessionManager( env, db, bp, daemon, mempool, self.shutdown_event ) self._indexer_task = None @@ -121,7 +56,7 @@ class Server: await self.db.populate_header_merkle_cache() await _start_cancellable(self.mempool.keep_synchronized) - await _start_cancellable(self.session_mgr.serve, self.notifications) + await _start_cancellable(self.session_mgr.serve, self.mempool) async def stop(self): for task in reversed(self.cancellable_tasks): diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 88d07e513..a6d14c279 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -554,7 +554,7 @@ class SessionManager: # --- External Interface - async def serve(self, notifications, server_listening_event): + async def serve(self, mempool, server_listening_event): """Start the RPC server if enabled. When the event is triggered, start TCP and SSL servers.""" try: @@ -568,7 +568,7 @@ class SessionManager: if self.env.drop_client is not None: self.logger.info(f'drop clients matching: {self.env.drop_client.pattern}') # Start notifications; initialize hsub_results - await notifications.start(self.db.db_height, self._notify_sessions) + await mempool.start(self.db.db_height, self) await self.start_other() await self._start_external_servers() server_listening_event.set()