diff --git a/lbry/wallet/orchstr8/node.py b/lbry/wallet/orchstr8/node.py index d592f74f7..3987e777a 100644 --- a/lbry/wallet/orchstr8/node.py +++ b/lbry/wallet/orchstr8/node.py @@ -223,7 +223,7 @@ class SPVNode: # TODO: don't use os.environ os.environ.update(conf) self.server = Server(Env(self.coin_class)) - self.server.mempool.refresh_secs = self.server.bp.prefetcher.polling_delay = 0.5 + self.server.bp.mempool.refresh_secs = self.server.bp.prefetcher.polling_delay = 0.5 await self.server.start() async def stop(self, cleanup=True): diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 5ac14f4ee..5fe9663a0 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -23,6 +23,7 @@ from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN from lbry.wallet.server.util import chunks, class_logger from lbry.crypto.hash import hash160 from lbry.wallet.server.leveldb import FlushData +from lbry.wallet.server.mempool import MemPool from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, StagedClaimtrieSupport from lbry.wallet.server.db.claimtrie import get_takeover_name_ops, StagedActivation, get_add_effective_amount_ops @@ -174,11 +175,12 @@ class BlockProcessor: "reorg_count", "Number of reorgs", namespace=NAMESPACE ) - def __init__(self, env, db: 'LevelDB', daemon, mempool, shutdown_event: asyncio.Event): + def __init__(self, env, db: 'LevelDB', daemon, shutdown_event: asyncio.Event): + self.state_lock = asyncio.Lock() self.env = env self.db = db self.daemon = daemon - self.mempool = mempool + self.mempool = MemPool(env.coin, daemon, db, self.state_lock) self.shutdown_event = shutdown_event self.coin = env.coin @@ -210,10 +212,6 @@ class BlockProcessor: # Claimtrie cache self.db_op_stack: Optional[RevertableOpStack] = None - # If the lock is successfully acquired, in-memory chain state - # is consistent with self.height - self.state_lock = asyncio.Lock() - # self.search_cache = {} self.history_cache = {} self.status_server = StatusServer() diff --git a/lbry/wallet/server/mempool.py b/lbry/wallet/server/mempool.py index 625649ac1..146378233 100644 --- a/lbry/wallet/server/mempool.py +++ b/lbry/wallet/server/mempool.py @@ -49,7 +49,7 @@ mempool_process_time_metric = Histogram( class MemPool: - def __init__(self, coin, daemon, db, refresh_secs=1.0, log_status_secs=120.0): + def __init__(self, coin, daemon, db, state_lock: asyncio.Lock, refresh_secs=1.0, log_status_secs=120.0): self.coin = coin self._daemon = daemon self._db = db @@ -64,7 +64,7 @@ class MemPool: self.refresh_secs = refresh_secs self.log_status_secs = log_status_secs # Prevents mempool refreshes during fee histogram calculation - self.lock = asyncio.Lock() + self.lock = state_lock self.wakeup = asyncio.Event() self.mempool_process_time_metric = mempool_process_time_metric self.notified_mempool_txs = set() diff --git a/lbry/wallet/server/server.py b/lbry/wallet/server/server.py index ff7958872..24d8c395d 100644 --- a/lbry/wallet/server/server.py +++ b/lbry/wallet/server/server.py @@ -22,12 +22,11 @@ class Server: self.daemon = daemon = env.coin.DAEMON(env.coin, env.daemon_url) self.db = db = LevelDB(env) - self.mempool = mempool = MemPool(env.coin, daemon, db) - self.bp = bp = BlockProcessor(env, db, daemon, mempool, self.shutdown_event) + self.bp = bp = BlockProcessor(env, db, daemon, self.shutdown_event) self.prometheus_server: typing.Optional[PrometheusServer] = None self.session_mgr = LBRYSessionManager( - env, db, bp, daemon, mempool, self.shutdown_event + env, db, bp, daemon, self.shutdown_event ) self._indexer_task = None @@ -55,8 +54,8 @@ class Server: await _start_cancellable(self.bp.fetch_and_process_blocks) await self.db.populate_header_merkle_cache() - await _start_cancellable(self.mempool.keep_synchronized) - await _start_cancellable(self.session_mgr.serve, self.mempool) + await _start_cancellable(self.bp.mempool.keep_synchronized) + await _start_cancellable(self.session_mgr.serve, self.bp.mempool) async def stop(self): for task in reversed(self.cancellable_tasks): diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index a6d14c279..cc78dd1ef 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -176,14 +176,13 @@ class SessionManager: namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS ) - def __init__(self, env: 'Env', db: LevelDB, bp: BlockProcessor, daemon: 'Daemon', mempool: 'MemPool', - shutdown_event: asyncio.Event): + def __init__(self, env: 'Env', db: LevelDB, bp: BlockProcessor, daemon: 'Daemon', shutdown_event: asyncio.Event): env.max_send = max(350000, env.max_send) self.env = env self.db = db self.bp = bp self.daemon = daemon - self.mempool = mempool + self.mempool = bp.mempool self.shutdown_event = shutdown_event self.logger = util.class_logger(__name__, self.__class__.__name__) self.servers: typing.Dict[str, asyncio.AbstractServer] = {}