diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 26eaad00e..e7076a5a7 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -216,6 +216,7 @@ class BlockProcessor: for cache in self.search_cache.values(): cache.clear() self.history_cache.clear() + self.notifications.notified_mempool_txs.clear() await self._maybe_flush() processed_time = time.perf_counter() - start self.block_count_metric.set(self.height) diff --git a/lbry/wallet/server/mempool.py b/lbry/wallet/server/mempool.py index 3ef6c5628..4ebc22544 100644 --- a/lbry/wallet/server/mempool.py +++ b/lbry/wallet/server/mempool.py @@ -72,7 +72,7 @@ class MemPoolAPI(ABC): """ @abstractmethod - async def on_mempool(self, touched, height): + async def on_mempool(self, touched, new_touched, height): """Called each time the mempool is synchronized. touched is a set of hashXs touched since the previous call. height is the daemon's height at the time the mempool was obtained.""" @@ -116,6 +116,7 @@ class MemPool: self.lock = asyncio.Lock() self.wakeup = asyncio.Event() self.mempool_process_time_metric = mempool_process_time_metric + self.notified_mempool_txs = set() async def _logging(self, synchronized_event): """Print regular logs of mempool stats.""" @@ -219,10 +220,15 @@ class MemPool: continue hashes = {hex_str_to_hash(hh) for hh in hex_hashes} async with self.lock: + new_hashes = hashes.difference(self.notified_mempool_txs) touched = await self._process_mempool(hashes) + self.notified_mempool_txs.update(new_hashes) + new_touched = { + touched_hashx for touched_hashx, txs in self.hashXs.items() if txs.intersection(new_hashes) + } synchronized_event.set() synchronized_event.clear() - await self.api.on_mempool(touched, height) + await self.api.on_mempool(touched, new_touched, height) duration = time.perf_counter() - start self.mempool_process_time_metric.observe(duration) try: @@ -236,7 +242,8 @@ class MemPool: async def _process_mempool(self, all_hashes): # Re-sync with the new set of hashes txs = self.txs - hashXs = self.hashXs + + hashXs = self.hashXs # hashX: [tx_hash, ...] touched = set() # First handle txs that have disappeared @@ -267,8 +274,8 @@ class MemPool: # FIXME: this is not particularly efficient while tx_map and len(tx_map) != prior_count: prior_count = len(tx_map) - tx_map, utxo_map = self._accept_transactions(tx_map, utxo_map, - touched) + tx_map, utxo_map = self._accept_transactions(tx_map, utxo_map, touched) + if tx_map: self.logger.info(f'{len(tx_map)} txs dropped') diff --git a/lbry/wallet/server/server.py b/lbry/wallet/server/server.py index cca84c852..56b8ffb9b 100644 --- a/lbry/wallet/server/server.py +++ b/lbry/wallet/server/server.py @@ -25,9 +25,10 @@ class Notifications: def __init__(self): self._touched_mp = {} self._touched_bp = {} + self.notified_mempool_txs = set() self._highest_block = -1 - async def _maybe_notify(self): + async def _maybe_notify(self, new_touched): tmp, tbp = self._touched_mp, self._touched_bp common = set(tmp).intersection(tbp) if common: @@ -44,24 +45,24 @@ class Notifications: del tmp[old] for old in [h for h in tbp if h <= height]: touched.update(tbp.pop(old)) - await self.notify(height, touched) + await self.notify(height, touched, new_touched) - async def notify(self, height, touched): + async def notify(self, height, touched, new_touched): pass async def start(self, height, notify_func): self._highest_block = height self.notify = notify_func - await self.notify(height, set()) + await self.notify(height, set(), set()) - async def on_mempool(self, touched, height): + async def on_mempool(self, touched, new_touched, height): self._touched_mp[height] = touched - await self._maybe_notify() + await self._maybe_notify(new_touched) async def on_block(self, touched, height): self._touched_bp[height] = touched self._highest_block = height - await self._maybe_notify() + await self._maybe_notify(set()) class Server: @@ -84,9 +85,12 @@ class Server: notifications.mempool_hashes = daemon.mempool_hashes notifications.raw_transactions = daemon.getrawtransactions notifications.lookup_utxos = db.lookup_utxos + MemPoolAPI.register(Notifications) self.mempool = mempool = MemPool(env.coin, notifications) + notifications.notified_mempool_txs = self.mempool.notified_mempool_txs + self.session_mgr = env.coin.SESSION_MANAGER( env, db, bp, daemon, mempool, self.shutdown_event ) diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 0a996d68e..6009009b2 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -635,7 +635,7 @@ class SessionManager: self.history_cache[hashX] = await self.db.limited_history(hashX, limit=limit) return self.history_cache[hashX] - async def _notify_sessions(self, height, touched): + async def _notify_sessions(self, height, touched, new_touched): """Notify sessions about height changes and touched addresses.""" height_changed = height != self.notified_height if height_changed: @@ -660,7 +660,8 @@ class SessionManager: if touched or (height_changed and self.mempool_statuses): notified_hashxs = 0 notified_sessions = 0 - for hashX in touched.union(self.mempool_statuses.keys()): + to_notify = touched.union(self.mempool_statuses.keys() if height_changed else new_touched.intersection(self.mempool_statuses.keys())) + for hashX in to_notify: for session_id in self.hashx_subscriptions_by_session[hashX]: asyncio.create_task(self.sessions[session_id].send_history_notification(hashX)) notified_sessions += 1