diff --git a/lbry/wallet/server/chain_reader.py b/lbry/wallet/server/chain_reader.py index f3c308eb9..2fefb48d8 100644 --- a/lbry/wallet/server/chain_reader.py +++ b/lbry/wallet/server/chain_reader.py @@ -107,7 +107,7 @@ class BlockchainReaderServer(BlockchainReader): self.resolve_outputs_cache = {} self.resolve_cache = {} self.notifications_to_send = [] - self.mempool_notifications = [] + self.mempool_notifications = set() self.status_server = StatusServer() self.daemon = env.coin.DAEMON(env.coin, env.daemon_url) # only needed for broadcasting txs self.prometheus_server: typing.Optional[PrometheusServer] = None @@ -143,7 +143,7 @@ class BlockchainReaderServer(BlockchainReader): def _detect_changes(self): super()._detect_changes() - self.mempool_notifications.append((self.db.fs_height, self.mempool.refresh())) + self.mempool_notifications.update(self.mempool.refresh()) async def poll_for_changes(self): await super().poll_for_changes() @@ -157,8 +157,9 @@ class BlockchainReaderServer(BlockchainReader): if self._es_height == self.db.db_height: self.synchronized.set() if self.mempool_notifications: - for (height, touched) in self.mempool_notifications: - await self.mempool.on_mempool(set(self.mempool.touched_hashXs), touched, height) + await self.mempool.on_mempool( + set(self.mempool.touched_hashXs), self.mempool_notifications, self.db.db_height + ) self.mempool_notifications.clear() self.notifications_to_send.clear() diff --git a/lbry/wallet/server/mempool.py b/lbry/wallet/server/mempool.py index 8b2eb21c4..cbafe6097 100644 --- a/lbry/wallet/server/mempool.py +++ b/lbry/wallet/server/mempool.py @@ -181,7 +181,7 @@ class MemPool: return 0 async def start(self, height, session_manager: 'LBRYSessionManager'): - self.notify_sessions = session_manager._notify_sessions + self.session_manager = session_manager await self._notify_sessions(height, set(), set()) async def on_mempool(self, touched, new_touched, height): diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 0a95f8e69..4fea3668f 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -622,47 +622,6 @@ class SessionManager: self.logger.info(f'notify {len(notify_tasks)} sessions of new peers') asyncio.create_task(asyncio.wait(notify_tasks)) - async def _notify_sessions(self, height, touched, new_touched): - """Notify sessions about height changes and touched addresses.""" - height_changed = height != self.notified_height - if height_changed: - await self._refresh_hsub_results(height) - - if not self.sessions: - return - - if height_changed: - header_tasks = [ - session.send_notification('blockchain.headers.subscribe', (self.hsub_results[session.subscribe_headers_raw], )) - for session in self.sessions.values() if session.subscribe_headers - ] - if header_tasks: - self.logger.info(f'notify {len(header_tasks)} sessions of new header') - asyncio.create_task(asyncio.wait(header_tasks)) - for hashX in touched.intersection(self.mempool_statuses.keys()): - self.mempool_statuses.pop(hashX, None) - - # self.bp._chain_executor - await asyncio.get_event_loop().run_in_executor( - None, touched.intersection_update, self.hashx_subscriptions_by_session.keys() - ) - - if touched or new_touched or (height_changed and self.mempool_statuses): - notified_hashxs = 0 - session_hashxes_to_notify = defaultdict(list) - to_notify = touched if height_changed else new_touched - - for hashX in to_notify: - if hashX not in self.hashx_subscriptions_by_session: - continue - for session_id in self.hashx_subscriptions_by_session[hashX]: - session_hashxes_to_notify[session_id].append(hashX) - notified_hashxs += 1 - for session_id, hashXes in session_hashxes_to_notify.items(): - asyncio.create_task(self.sessions[session_id].send_history_notifications(*hashXes)) - if session_hashxes_to_notify: - self.logger.info(f'notified {len(session_hashxes_to_notify)} sessions/{notified_hashxs:,d} touched addresses') - def add_session(self, session): self.sessions[id(session)] = session self.session_event.set()