move MemPool into BlockProcessor

This commit is contained in:
Jack Robison 2021-07-16 15:12:46 -04:00 committed by Victor Shyba
parent d4194954d3
commit b4eaa5f918
5 changed files with 13 additions and 17 deletions

View file

@ -223,7 +223,7 @@ class SPVNode:
# TODO: don't use os.environ # TODO: don't use os.environ
os.environ.update(conf) os.environ.update(conf)
self.server = Server(Env(self.coin_class)) self.server = Server(Env(self.coin_class))
self.server.mempool.refresh_secs = self.server.bp.prefetcher.polling_delay = 0.5 self.server.bp.mempool.refresh_secs = self.server.bp.prefetcher.polling_delay = 0.5
await self.server.start() await self.server.start()
async def stop(self, cleanup=True): async def stop(self, cleanup=True):

View file

@ -23,6 +23,7 @@ from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
from lbry.wallet.server.util import chunks, class_logger from lbry.wallet.server.util import chunks, class_logger
from lbry.crypto.hash import hash160 from lbry.crypto.hash import hash160
from lbry.wallet.server.leveldb import FlushData from lbry.wallet.server.leveldb import FlushData
from lbry.wallet.server.mempool import MemPool
from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, StagedClaimtrieSupport from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, StagedClaimtrieSupport
from lbry.wallet.server.db.claimtrie import get_takeover_name_ops, StagedActivation, get_add_effective_amount_ops from lbry.wallet.server.db.claimtrie import get_takeover_name_ops, StagedActivation, get_add_effective_amount_ops
@ -174,11 +175,12 @@ class BlockProcessor:
"reorg_count", "Number of reorgs", namespace=NAMESPACE "reorg_count", "Number of reorgs", namespace=NAMESPACE
) )
def __init__(self, env, db: 'LevelDB', daemon, mempool, shutdown_event: asyncio.Event): def __init__(self, env, db: 'LevelDB', daemon, shutdown_event: asyncio.Event):
self.state_lock = asyncio.Lock()
self.env = env self.env = env
self.db = db self.db = db
self.daemon = daemon self.daemon = daemon
self.mempool = mempool self.mempool = MemPool(env.coin, daemon, db, self.state_lock)
self.shutdown_event = shutdown_event self.shutdown_event = shutdown_event
self.coin = env.coin self.coin = env.coin
@ -210,10 +212,6 @@ class BlockProcessor:
# Claimtrie cache # Claimtrie cache
self.db_op_stack: Optional[RevertableOpStack] = None self.db_op_stack: Optional[RevertableOpStack] = None
# If the lock is successfully acquired, in-memory chain state
# is consistent with self.height
self.state_lock = asyncio.Lock()
# self.search_cache = {} # self.search_cache = {}
self.history_cache = {} self.history_cache = {}
self.status_server = StatusServer() self.status_server = StatusServer()

View file

@ -49,7 +49,7 @@ mempool_process_time_metric = Histogram(
class MemPool: class MemPool:
def __init__(self, coin, daemon, db, refresh_secs=1.0, log_status_secs=120.0): def __init__(self, coin, daemon, db, state_lock: asyncio.Lock, refresh_secs=1.0, log_status_secs=120.0):
self.coin = coin self.coin = coin
self._daemon = daemon self._daemon = daemon
self._db = db self._db = db
@ -64,7 +64,7 @@ class MemPool:
self.refresh_secs = refresh_secs self.refresh_secs = refresh_secs
self.log_status_secs = log_status_secs self.log_status_secs = log_status_secs
# Prevents mempool refreshes during fee histogram calculation # Prevents mempool refreshes during fee histogram calculation
self.lock = asyncio.Lock() self.lock = state_lock
self.wakeup = asyncio.Event() self.wakeup = asyncio.Event()
self.mempool_process_time_metric = mempool_process_time_metric self.mempool_process_time_metric = mempool_process_time_metric
self.notified_mempool_txs = set() self.notified_mempool_txs = set()

View file

@ -22,12 +22,11 @@ class Server:
self.daemon = daemon = env.coin.DAEMON(env.coin, env.daemon_url) self.daemon = daemon = env.coin.DAEMON(env.coin, env.daemon_url)
self.db = db = LevelDB(env) self.db = db = LevelDB(env)
self.mempool = mempool = MemPool(env.coin, daemon, db) self.bp = bp = BlockProcessor(env, db, daemon, self.shutdown_event)
self.bp = bp = BlockProcessor(env, db, daemon, mempool, self.shutdown_event)
self.prometheus_server: typing.Optional[PrometheusServer] = None self.prometheus_server: typing.Optional[PrometheusServer] = None
self.session_mgr = LBRYSessionManager( self.session_mgr = LBRYSessionManager(
env, db, bp, daemon, mempool, self.shutdown_event env, db, bp, daemon, self.shutdown_event
) )
self._indexer_task = None self._indexer_task = None
@ -55,8 +54,8 @@ class Server:
await _start_cancellable(self.bp.fetch_and_process_blocks) await _start_cancellable(self.bp.fetch_and_process_blocks)
await self.db.populate_header_merkle_cache() await self.db.populate_header_merkle_cache()
await _start_cancellable(self.mempool.keep_synchronized) await _start_cancellable(self.bp.mempool.keep_synchronized)
await _start_cancellable(self.session_mgr.serve, self.mempool) await _start_cancellable(self.session_mgr.serve, self.bp.mempool)
async def stop(self): async def stop(self):
for task in reversed(self.cancellable_tasks): for task in reversed(self.cancellable_tasks):

View file

@ -176,14 +176,13 @@ class SessionManager:
namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS
) )
def __init__(self, env: 'Env', db: LevelDB, bp: BlockProcessor, daemon: 'Daemon', mempool: 'MemPool', def __init__(self, env: 'Env', db: LevelDB, bp: BlockProcessor, daemon: 'Daemon', shutdown_event: asyncio.Event):
shutdown_event: asyncio.Event):
env.max_send = max(350000, env.max_send) env.max_send = max(350000, env.max_send)
self.env = env self.env = env
self.db = db self.db = db
self.bp = bp self.bp = bp
self.daemon = daemon self.daemon = daemon
self.mempool = mempool self.mempool = bp.mempool
self.shutdown_event = shutdown_event self.shutdown_event = shutdown_event
self.logger = util.class_logger(__name__, self.__class__.__name__) self.logger = util.class_logger(__name__, self.__class__.__name__)
self.servers: typing.Dict[str, asyncio.AbstractServer] = {} self.servers: typing.Dict[str, asyncio.AbstractServer] = {}