combine MemPool and Notifications classes

This commit is contained in:
Jack Robison 2021-07-16 14:51:10 -04:00 committed by Victor Shyba
parent 6e221fc7d9
commit d4194954d3
4 changed files with 68 additions and 150 deletions

View file

@ -174,11 +174,11 @@ class BlockProcessor:
"reorg_count", "Number of reorgs", namespace=NAMESPACE "reorg_count", "Number of reorgs", namespace=NAMESPACE
) )
def __init__(self, env, db: 'LevelDB', daemon, notifications, shutdown_event: asyncio.Event): def __init__(self, env, db: 'LevelDB', daemon, mempool, shutdown_event: asyncio.Event):
self.env = env self.env = env
self.db = db self.db = db
self.daemon = daemon self.daemon = daemon
self.notifications = notifications self.mempool = mempool
self.shutdown_event = shutdown_event self.shutdown_event = shutdown_event
self.coin = env.coin self.coin = env.coin
@ -327,7 +327,7 @@ class BlockProcessor:
s = '' if len(blocks) == 1 else 's' s = '' if len(blocks) == 1 else 's'
self.logger.info('processed {:,d} block{} in {:.1f}s'.format(len(blocks), s, processed_time)) self.logger.info('processed {:,d} block{} in {:.1f}s'.format(len(blocks), s, processed_time))
if self._caught_up_event.is_set(): if self._caught_up_event.is_set():
await self.notifications.on_block(self.touched, self.height) await self.mempool.on_block(self.touched, self.height)
self.touched.clear() self.touched.clear()
elif hprevs[0] != chain[0]: elif hprevs[0] != chain[0]:
min_start_height = max(self.height - self.coin.REORG_LIMIT, 0) min_start_height = max(self.height - self.coin.REORG_LIMIT, 0)
@ -371,7 +371,6 @@ class BlockProcessor:
'resetting the prefetcher') 'resetting the prefetcher')
await self.prefetcher.reset_height(self.height) await self.prefetcher.reset_height(self.height)
# - Flushing # - Flushing
def flush_data(self): def flush_data(self):
"""The data for a flush. The lock must be taken.""" """The data for a flush. The lock must be taken."""
@ -1135,7 +1134,6 @@ class BlockProcessor:
# Use local vars for speed in the loops # Use local vars for speed in the loops
spend_utxo = self.spend_utxo spend_utxo = self.spend_utxo
add_utxo = self.add_utxo add_utxo = self.add_utxo
spend_claim_or_support_txo = self._spend_claim_or_support_txo spend_claim_or_support_txo = self._spend_claim_or_support_txo
add_claim_or_support = self._add_claim_or_support add_claim_or_support = self._add_claim_or_support
@ -1257,7 +1255,7 @@ class BlockProcessor:
self.utxo_cache.clear() self.utxo_cache.clear()
self.hashXs_by_tx.clear() self.hashXs_by_tx.clear()
self.history_cache.clear() self.history_cache.clear()
self.notifications.notified_mempool_txs.clear() self.mempool.notified_mempool_txs.clear()
self.removed_claim_hashes.clear() self.removed_claim_hashes.clear()
self.touched_claim_hashes.clear() self.touched_claim_hashes.clear()
self.pending_reposted.clear() self.pending_reposted.clear()

View file

@ -9,15 +9,16 @@
import asyncio import asyncio
import itertools import itertools
import time import time
from abc import ABC, abstractmethod import attr
import typing
from typing import Set, Optional, Callable, Awaitable
from collections import defaultdict from collections import defaultdict
from prometheus_client import Histogram from prometheus_client import Histogram
import attr
from lbry.wallet.server.hash import hash_to_hex_str, hex_str_to_hash from lbry.wallet.server.hash import hash_to_hex_str, hex_str_to_hash
from lbry.wallet.server.util import class_logger, chunks from lbry.wallet.server.util import class_logger, chunks
from lbry.wallet.server.leveldb import UTXO from lbry.wallet.server.leveldb import UTXO
if typing.TYPE_CHECKING:
from lbry.wallet.server.session import LBRYSessionManager
@attr.s(slots=True) @attr.s(slots=True)
@ -37,47 +38,6 @@ class MemPoolTxSummary:
has_unconfirmed_inputs = attr.ib() has_unconfirmed_inputs = attr.ib()
class MemPoolAPI(ABC):
"""A concrete instance of this class is passed to the MemPool object
and used by it to query DB and blockchain state."""
@abstractmethod
async def height(self):
"""Query bitcoind for its height."""
@abstractmethod
def cached_height(self):
"""Return the height of bitcoind the last time it was queried,
for any reason, without actually querying it.
"""
@abstractmethod
async def mempool_hashes(self):
"""Query bitcoind for the hashes of all transactions in its
mempool, returned as a list."""
@abstractmethod
async def raw_transactions(self, hex_hashes):
"""Query bitcoind for the serialized raw transactions with the given
hashes. Missing transactions are returned as None.
hex_hashes is an iterable of hexadecimal hash strings."""
@abstractmethod
async def lookup_utxos(self, prevouts):
"""Return a list of (hashX, value) pairs each prevout if unspent,
otherwise return None if spent or not found.
prevouts - an iterable of (hash, index) pairs
"""
@abstractmethod
async def on_mempool(self, touched, new_touched, height):
"""Called each time the mempool is synchronized. touched is a set of
hashXs touched since the previous call. height is the
daemon's height at the time the mempool was obtained."""
NAMESPACE = "wallet_server" NAMESPACE = "wallet_server"
HISTOGRAM_BUCKETS = ( HISTOGRAM_BUCKETS = (
.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf') .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf')
@ -89,23 +49,14 @@ mempool_process_time_metric = Histogram(
class MemPool: class MemPool:
"""Representation of the daemon's mempool. def __init__(self, coin, daemon, db, refresh_secs=1.0, log_status_secs=120.0):
coin - a coin class from coins.py
api - an object implementing MemPoolAPI
Updated regularly in caught-up state. Goal is to enable efficient
response to the calls in the external interface. To that end we
maintain the following maps:
tx: tx_hash -> MemPoolTx
hashXs: hashX -> set of all hashes of txs touching the hashX
"""
def __init__(self, coin, api, refresh_secs=1.0, log_status_secs=120.0):
assert isinstance(api, MemPoolAPI)
self.coin = coin self.coin = coin
self.api = api self._daemon = daemon
self._db = db
self._touched_mp = {}
self._touched_bp = {}
self._highest_block = -1
self.logger = class_logger(__name__, self.__class__.__name__) self.logger = class_logger(__name__, self.__class__.__name__)
self.txs = {} self.txs = {}
self.hashXs = defaultdict(set) # None can be a key self.hashXs = defaultdict(set) # None can be a key
@ -117,6 +68,7 @@ class MemPool:
self.wakeup = asyncio.Event() self.wakeup = asyncio.Event()
self.mempool_process_time_metric = mempool_process_time_metric self.mempool_process_time_metric = mempool_process_time_metric
self.notified_mempool_txs = set() self.notified_mempool_txs = set()
self.notify_sessions: Optional[Callable[[int, Set[bytes], Set[bytes]], Awaitable[None]]] = None
async def _logging(self, synchronized_event): async def _logging(self, synchronized_event):
"""Print regular logs of mempool stats.""" """Print regular logs of mempool stats."""
@ -189,9 +141,9 @@ class MemPool:
"""Refresh our view of the daemon's mempool.""" """Refresh our view of the daemon's mempool."""
while True: while True:
start = time.perf_counter() start = time.perf_counter()
height = self.api.cached_height() height = self._daemon.cached_height()
hex_hashes = await self.api.mempool_hashes() hex_hashes = await self._daemon.mempool_hashes()
if height != await self.api.height(): if height != await self._daemon.height():
continue continue
hashes = {hex_str_to_hash(hh) for hh in hex_hashes} hashes = {hex_str_to_hash(hh) for hh in hex_hashes}
async with self.lock: async with self.lock:
@ -203,7 +155,7 @@ class MemPool:
} }
synchronized_event.set() synchronized_event.set()
synchronized_event.clear() synchronized_event.clear()
await self.api.on_mempool(touched, new_touched, height) await self.on_mempool(touched, new_touched, height)
duration = time.perf_counter() - start duration = time.perf_counter() - start
self.mempool_process_time_metric.observe(duration) self.mempool_process_time_metric.observe(duration)
try: try:
@ -258,8 +210,7 @@ class MemPool:
async def _fetch_and_accept(self, hashes, all_hashes, touched): async def _fetch_and_accept(self, hashes, all_hashes, touched):
"""Fetch a list of mempool transactions.""" """Fetch a list of mempool transactions."""
hex_hashes_iter = (hash_to_hex_str(hash) for hash in hashes) raw_txs = await self._daemon.getrawtransactions((hash_to_hex_str(hash) for hash in hashes))
raw_txs = await self.api.raw_transactions(hex_hashes_iter)
to_hashX = self.coin.hashX_from_script to_hashX = self.coin.hashX_from_script
deserializer = self.coin.DESERIALIZER deserializer = self.coin.DESERIALIZER
@ -289,7 +240,7 @@ class MemPool:
prevouts = tuple(prevout for tx in tx_map.values() prevouts = tuple(prevout for tx in tx_map.values()
for prevout in tx.prevouts for prevout in tx.prevouts
if prevout[0] not in all_hashes) if prevout[0] not in all_hashes)
utxos = await self.api.lookup_utxos(prevouts) utxos = await self._db.lookup_utxos(prevouts)
utxo_map = dict(zip(prevouts, utxos)) utxo_map = dict(zip(prevouts, utxos))
return self._accept_transactions(tx_map, utxo_map, touched) return self._accept_transactions(tx_map, utxo_map, touched)
@ -373,3 +324,37 @@ class MemPool:
if unspent_inputs: if unspent_inputs:
return -1 return -1
return 0 return 0
async def _maybe_notify(self, new_touched):
tmp, tbp = self._touched_mp, self._touched_bp
common = set(tmp).intersection(tbp)
if common:
height = max(common)
elif tmp and max(tmp) == self._highest_block:
height = self._highest_block
else:
# Either we are processing a block and waiting for it to
# come in, or we have not yet had a mempool update for the
# new block height
return
touched = tmp.pop(height)
for old in [h for h in tmp if h <= height]:
del tmp[old]
for old in [h for h in tbp if h <= height]:
touched.update(tbp.pop(old))
# print("notify", height, len(touched), len(new_touched))
await self.notify_sessions(height, touched, new_touched)
async def start(self, height, session_manager: 'LBRYSessionManager'):
self._highest_block = height
self.notify_sessions = session_manager._notify_sessions
await self.notify_sessions(height, set(), set())
async def on_mempool(self, touched, new_touched, height):
self._touched_mp[height] = touched
await self._maybe_notify(new_touched)
async def on_block(self, touched, height):
self._touched_bp[height] = touched
self._highest_block = height
await self._maybe_notify(set())

View file

@ -5,66 +5,13 @@ from concurrent.futures.thread import ThreadPoolExecutor
import typing import typing
import lbry import lbry
from lbry.wallet.server.mempool import MemPool, MemPoolAPI from lbry.wallet.server.mempool import MemPool
from lbry.wallet.server.block_processor import BlockProcessor
from lbry.wallet.server.leveldb import LevelDB
from lbry.wallet.server.session import LBRYSessionManager
from lbry.prometheus import PrometheusServer from lbry.prometheus import PrometheusServer
class Notifications:
# hashX notifications come from two sources: new blocks and
# mempool refreshes.
#
# A user with a pending transaction is notified after the block it
# gets in is processed. Block processing can take an extended
# time, and the prefetcher might poll the daemon after the mempool
# code in any case. In such cases the transaction will not be in
# the mempool after the mempool refresh. We want to avoid
# notifying clients twice - for the mempool refresh and when the
# block is done. This object handles that logic by deferring
# notifications appropriately.
def __init__(self):
self._touched_mp = {}
self._touched_bp = {}
self.notified_mempool_txs = set()
self._highest_block = -1
async def _maybe_notify(self, new_touched):
tmp, tbp = self._touched_mp, self._touched_bp
common = set(tmp).intersection(tbp)
if common:
height = max(common)
elif tmp and max(tmp) == self._highest_block:
height = self._highest_block
else:
# Either we are processing a block and waiting for it to
# come in, or we have not yet had a mempool update for the
# new block height
return
touched = tmp.pop(height)
for old in [h for h in tmp if h <= height]:
del tmp[old]
for old in [h for h in tbp if h <= height]:
touched.update(tbp.pop(old))
await self.notify(height, touched, new_touched)
async def notify(self, height, touched, new_touched):
pass
async def start(self, height, notify_func):
self._highest_block = height
self.notify = notify_func
await self.notify(height, set(), set())
async def on_mempool(self, touched, new_touched, height):
self._touched_mp[height] = touched
await self._maybe_notify(new_touched)
async def on_block(self, touched, height):
self._touched_bp[height] = touched
self._highest_block = height
await self._maybe_notify(set())
class Server: class Server:
def __init__(self, env): def __init__(self, env):
@ -73,25 +20,13 @@ class Server:
self.shutdown_event = asyncio.Event() self.shutdown_event = asyncio.Event()
self.cancellable_tasks = [] self.cancellable_tasks = []
self.notifications = notifications = Notifications()
self.daemon = daemon = env.coin.DAEMON(env.coin, env.daemon_url) self.daemon = daemon = env.coin.DAEMON(env.coin, env.daemon_url)
self.db = db = env.coin.DB(env) self.db = db = LevelDB(env)
self.bp = bp = env.coin.BLOCK_PROCESSOR(env, db, daemon, notifications, self.shutdown_event) self.mempool = mempool = MemPool(env.coin, daemon, db)
self.bp = bp = BlockProcessor(env, db, daemon, mempool, self.shutdown_event)
self.prometheus_server: typing.Optional[PrometheusServer] = None self.prometheus_server: typing.Optional[PrometheusServer] = None
# Set notifications up to implement the MemPoolAPI self.session_mgr = LBRYSessionManager(
notifications.height = daemon.height
notifications.cached_height = daemon.cached_height
notifications.mempool_hashes = daemon.mempool_hashes
notifications.raw_transactions = daemon.getrawtransactions
notifications.lookup_utxos = db.lookup_utxos
MemPoolAPI.register(Notifications)
self.mempool = mempool = MemPool(env.coin, notifications)
notifications.notified_mempool_txs = self.mempool.notified_mempool_txs
self.session_mgr = env.coin.SESSION_MANAGER(
env, db, bp, daemon, mempool, self.shutdown_event env, db, bp, daemon, mempool, self.shutdown_event
) )
self._indexer_task = None self._indexer_task = None
@ -121,7 +56,7 @@ class Server:
await self.db.populate_header_merkle_cache() await self.db.populate_header_merkle_cache()
await _start_cancellable(self.mempool.keep_synchronized) await _start_cancellable(self.mempool.keep_synchronized)
await _start_cancellable(self.session_mgr.serve, self.notifications) await _start_cancellable(self.session_mgr.serve, self.mempool)
async def stop(self): async def stop(self):
for task in reversed(self.cancellable_tasks): for task in reversed(self.cancellable_tasks):

View file

@ -554,7 +554,7 @@ class SessionManager:
# --- External Interface # --- External Interface
async def serve(self, notifications, server_listening_event): async def serve(self, mempool, server_listening_event):
"""Start the RPC server if enabled. When the event is triggered, """Start the RPC server if enabled. When the event is triggered,
start TCP and SSL servers.""" start TCP and SSL servers."""
try: try:
@ -568,7 +568,7 @@ class SessionManager:
if self.env.drop_client is not None: if self.env.drop_client is not None:
self.logger.info(f'drop clients matching: {self.env.drop_client.pattern}') self.logger.info(f'drop clients matching: {self.env.drop_client.pattern}')
# Start notifications; initialize hsub_results # Start notifications; initialize hsub_results
await notifications.start(self.db.db_height, self._notify_sessions) await mempool.start(self.db.db_height, self)
await self.start_other() await self.start_other()
await self._start_external_servers() await self._start_external_servers()
server_listening_event.set() server_listening_event.set()