combine MemPool and Notifications classes

This commit is contained in:
Jack Robison 2021-07-16 14:51:10 -04:00
parent a6ee8dc66e
commit 292d272a94
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
4 changed files with 68 additions and 150 deletions

View file

@ -174,11 +174,11 @@ class BlockProcessor:
"reorg_count", "Number of reorgs", namespace=NAMESPACE
)
def __init__(self, env, db: 'LevelDB', daemon, notifications, shutdown_event: asyncio.Event):
def __init__(self, env, db: 'LevelDB', daemon, mempool, shutdown_event: asyncio.Event):
self.env = env
self.db = db
self.daemon = daemon
self.notifications = notifications
self.mempool = mempool
self.shutdown_event = shutdown_event
self.coin = env.coin
@ -327,7 +327,7 @@ class BlockProcessor:
s = '' if len(blocks) == 1 else 's'
self.logger.info('processed {:,d} block{} in {:.1f}s'.format(len(blocks), s, processed_time))
if self._caught_up_event.is_set():
await self.notifications.on_block(self.touched, self.height)
await self.mempool.on_block(self.touched, self.height)
self.touched.clear()
elif hprevs[0] != chain[0]:
min_start_height = max(self.height - self.coin.REORG_LIMIT, 0)
@ -371,7 +371,6 @@ class BlockProcessor:
'resetting the prefetcher')
await self.prefetcher.reset_height(self.height)
# - Flushing
def flush_data(self):
"""The data for a flush. The lock must be taken."""
@ -1135,7 +1134,6 @@ class BlockProcessor:
# Use local vars for speed in the loops
spend_utxo = self.spend_utxo
add_utxo = self.add_utxo
spend_claim_or_support_txo = self._spend_claim_or_support_txo
add_claim_or_support = self._add_claim_or_support
@ -1257,7 +1255,7 @@ class BlockProcessor:
self.utxo_cache.clear()
self.hashXs_by_tx.clear()
self.history_cache.clear()
self.notifications.notified_mempool_txs.clear()
self.mempool.notified_mempool_txs.clear()
self.removed_claim_hashes.clear()
self.touched_claim_hashes.clear()
self.pending_reposted.clear()

View file

@ -9,15 +9,16 @@
import asyncio
import itertools
import time
from abc import ABC, abstractmethod
import attr
import typing
from typing import Set, Optional, Callable, Awaitable
from collections import defaultdict
from prometheus_client import Histogram
import attr
from lbry.wallet.server.hash import hash_to_hex_str, hex_str_to_hash
from lbry.wallet.server.util import class_logger, chunks
from lbry.wallet.server.leveldb import UTXO
if typing.TYPE_CHECKING:
from lbry.wallet.server.session import LBRYSessionManager
@attr.s(slots=True)
@ -37,47 +38,6 @@ class MemPoolTxSummary:
has_unconfirmed_inputs = attr.ib()
class MemPoolAPI(ABC):
"""A concrete instance of this class is passed to the MemPool object
and used by it to query DB and blockchain state."""
@abstractmethod
async def height(self):
"""Query bitcoind for its height."""
@abstractmethod
def cached_height(self):
"""Return the height of bitcoind the last time it was queried,
for any reason, without actually querying it.
"""
@abstractmethod
async def mempool_hashes(self):
"""Query bitcoind for the hashes of all transactions in its
mempool, returned as a list."""
@abstractmethod
async def raw_transactions(self, hex_hashes):
"""Query bitcoind for the serialized raw transactions with the given
hashes. Missing transactions are returned as None.
hex_hashes is an iterable of hexadecimal hash strings."""
@abstractmethod
async def lookup_utxos(self, prevouts):
"""Return a list of (hashX, value) pairs each prevout if unspent,
otherwise return None if spent or not found.
prevouts - an iterable of (hash, index) pairs
"""
@abstractmethod
async def on_mempool(self, touched, new_touched, height):
"""Called each time the mempool is synchronized. touched is a set of
hashXs touched since the previous call. height is the
daemon's height at the time the mempool was obtained."""
NAMESPACE = "wallet_server"
HISTOGRAM_BUCKETS = (
.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf')
@ -89,23 +49,14 @@ mempool_process_time_metric = Histogram(
class MemPool:
"""Representation of the daemon's mempool.
coin - a coin class from coins.py
api - an object implementing MemPoolAPI
Updated regularly in caught-up state. Goal is to enable efficient
response to the calls in the external interface. To that end we
maintain the following maps:
tx: tx_hash -> MemPoolTx
hashXs: hashX -> set of all hashes of txs touching the hashX
"""
def __init__(self, coin, api, refresh_secs=1.0, log_status_secs=120.0):
assert isinstance(api, MemPoolAPI)
def __init__(self, coin, daemon, db, refresh_secs=1.0, log_status_secs=120.0):
self.coin = coin
self.api = api
self._daemon = daemon
self._db = db
self._touched_mp = {}
self._touched_bp = {}
self._highest_block = -1
self.logger = class_logger(__name__, self.__class__.__name__)
self.txs = {}
self.hashXs = defaultdict(set) # None can be a key
@ -117,6 +68,7 @@ class MemPool:
self.wakeup = asyncio.Event()
self.mempool_process_time_metric = mempool_process_time_metric
self.notified_mempool_txs = set()
self.notify_sessions: Optional[Callable[[int, Set[bytes], Set[bytes]], Awaitable[None]]] = None
async def _logging(self, synchronized_event):
"""Print regular logs of mempool stats."""
@ -189,9 +141,9 @@ class MemPool:
"""Refresh our view of the daemon's mempool."""
while True:
start = time.perf_counter()
height = self.api.cached_height()
hex_hashes = await self.api.mempool_hashes()
if height != await self.api.height():
height = self._daemon.cached_height()
hex_hashes = await self._daemon.mempool_hashes()
if height != await self._daemon.height():
continue
hashes = {hex_str_to_hash(hh) for hh in hex_hashes}
async with self.lock:
@ -203,7 +155,7 @@ class MemPool:
}
synchronized_event.set()
synchronized_event.clear()
await self.api.on_mempool(touched, new_touched, height)
await self.on_mempool(touched, new_touched, height)
duration = time.perf_counter() - start
self.mempool_process_time_metric.observe(duration)
try:
@ -258,8 +210,7 @@ class MemPool:
async def _fetch_and_accept(self, hashes, all_hashes, touched):
"""Fetch a list of mempool transactions."""
hex_hashes_iter = (hash_to_hex_str(hash) for hash in hashes)
raw_txs = await self.api.raw_transactions(hex_hashes_iter)
raw_txs = await self._daemon.getrawtransactions((hash_to_hex_str(hash) for hash in hashes))
to_hashX = self.coin.hashX_from_script
deserializer = self.coin.DESERIALIZER
@ -289,7 +240,7 @@ class MemPool:
prevouts = tuple(prevout for tx in tx_map.values()
for prevout in tx.prevouts
if prevout[0] not in all_hashes)
utxos = await self.api.lookup_utxos(prevouts)
utxos = await self._db.lookup_utxos(prevouts)
utxo_map = dict(zip(prevouts, utxos))
return self._accept_transactions(tx_map, utxo_map, touched)
@ -373,3 +324,37 @@ class MemPool:
if unspent_inputs:
return -1
return 0
async def _maybe_notify(self, new_touched):
tmp, tbp = self._touched_mp, self._touched_bp
common = set(tmp).intersection(tbp)
if common:
height = max(common)
elif tmp and max(tmp) == self._highest_block:
height = self._highest_block
else:
# Either we are processing a block and waiting for it to
# come in, or we have not yet had a mempool update for the
# new block height
return
touched = tmp.pop(height)
for old in [h for h in tmp if h <= height]:
del tmp[old]
for old in [h for h in tbp if h <= height]:
touched.update(tbp.pop(old))
# print("notify", height, len(touched), len(new_touched))
await self.notify_sessions(height, touched, new_touched)
async def start(self, height, session_manager: 'LBRYSessionManager'):
self._highest_block = height
self.notify_sessions = session_manager._notify_sessions
await self.notify_sessions(height, set(), set())
async def on_mempool(self, touched, new_touched, height):
self._touched_mp[height] = touched
await self._maybe_notify(new_touched)
async def on_block(self, touched, height):
self._touched_bp[height] = touched
self._highest_block = height
await self._maybe_notify(set())

View file

@ -5,66 +5,13 @@ from concurrent.futures.thread import ThreadPoolExecutor
import typing
import lbry
from lbry.wallet.server.mempool import MemPool, MemPoolAPI
from lbry.wallet.server.mempool import MemPool
from lbry.wallet.server.block_processor import BlockProcessor
from lbry.wallet.server.leveldb import LevelDB
from lbry.wallet.server.session import LBRYSessionManager
from lbry.prometheus import PrometheusServer
class Notifications:
# hashX notifications come from two sources: new blocks and
# mempool refreshes.
#
# A user with a pending transaction is notified after the block it
# gets in is processed. Block processing can take an extended
# time, and the prefetcher might poll the daemon after the mempool
# code in any case. In such cases the transaction will not be in
# the mempool after the mempool refresh. We want to avoid
# notifying clients twice - for the mempool refresh and when the
# block is done. This object handles that logic by deferring
# notifications appropriately.
def __init__(self):
self._touched_mp = {}
self._touched_bp = {}
self.notified_mempool_txs = set()
self._highest_block = -1
async def _maybe_notify(self, new_touched):
tmp, tbp = self._touched_mp, self._touched_bp
common = set(tmp).intersection(tbp)
if common:
height = max(common)
elif tmp and max(tmp) == self._highest_block:
height = self._highest_block
else:
# Either we are processing a block and waiting for it to
# come in, or we have not yet had a mempool update for the
# new block height
return
touched = tmp.pop(height)
for old in [h for h in tmp if h <= height]:
del tmp[old]
for old in [h for h in tbp if h <= height]:
touched.update(tbp.pop(old))
await self.notify(height, touched, new_touched)
async def notify(self, height, touched, new_touched):
pass
async def start(self, height, notify_func):
self._highest_block = height
self.notify = notify_func
await self.notify(height, set(), set())
async def on_mempool(self, touched, new_touched, height):
self._touched_mp[height] = touched
await self._maybe_notify(new_touched)
async def on_block(self, touched, height):
self._touched_bp[height] = touched
self._highest_block = height
await self._maybe_notify(set())
class Server:
def __init__(self, env):
@ -73,25 +20,13 @@ class Server:
self.shutdown_event = asyncio.Event()
self.cancellable_tasks = []
self.notifications = notifications = Notifications()
self.daemon = daemon = env.coin.DAEMON(env.coin, env.daemon_url)
self.db = db = env.coin.DB(env)
self.bp = bp = env.coin.BLOCK_PROCESSOR(env, db, daemon, notifications, self.shutdown_event)
self.db = db = LevelDB(env)
self.mempool = mempool = MemPool(env.coin, daemon, db)
self.bp = bp = BlockProcessor(env, db, daemon, mempool, self.shutdown_event)
self.prometheus_server: typing.Optional[PrometheusServer] = None
# Set notifications up to implement the MemPoolAPI
notifications.height = daemon.height
notifications.cached_height = daemon.cached_height
notifications.mempool_hashes = daemon.mempool_hashes
notifications.raw_transactions = daemon.getrawtransactions
notifications.lookup_utxos = db.lookup_utxos
MemPoolAPI.register(Notifications)
self.mempool = mempool = MemPool(env.coin, notifications)
notifications.notified_mempool_txs = self.mempool.notified_mempool_txs
self.session_mgr = env.coin.SESSION_MANAGER(
self.session_mgr = LBRYSessionManager(
env, db, bp, daemon, mempool, self.shutdown_event
)
self._indexer_task = None
@ -121,7 +56,7 @@ class Server:
await self.db.populate_header_merkle_cache()
await _start_cancellable(self.mempool.keep_synchronized)
await _start_cancellable(self.session_mgr.serve, self.notifications)
await _start_cancellable(self.session_mgr.serve, self.mempool)
async def stop(self):
for task in reversed(self.cancellable_tasks):

View file

@ -554,7 +554,7 @@ class SessionManager:
# --- External Interface
async def serve(self, notifications, server_listening_event):
async def serve(self, mempool, server_listening_event):
"""Start the RPC server if enabled. When the event is triggered,
start TCP and SSL servers."""
try:
@ -568,7 +568,7 @@ class SessionManager:
if self.env.drop_client is not None:
self.logger.info(f'drop clients matching: {self.env.drop_client.pattern}')
# Start notifications; initialize hsub_results
await notifications.start(self.db.db_height, self._notify_sessions)
await mempool.start(self.db.db_height, self)
await self.start_other()
await self._start_external_servers()
server_listening_event.set()