From eae02909785c53a5375a55f6d89d9e19db67b3a4 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 7 Dec 2020 13:16:37 -0500 Subject: [PATCH] fix mempool notification bloat --- lbry/wallet/server/session.py | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index cbbb2b30d..fd7477c6b 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -640,6 +640,7 @@ class SessionManager: height_changed = height != self.notified_height if height_changed: await self._refresh_hsub_results(height) + if not self.sessions: return @@ -649,25 +650,22 @@ class SessionManager: for session in self.sessions.values() if session.subscribe_headers ] if header_tasks: + self.logger.info(f'notify {len(header_tasks)} sessions of new header') await asyncio.wait(header_tasks) + for hashX in touched.intersection(self.mempool_statuses.keys()): + self.mempool_statuses.pop(hashX, None) - touched = touched.intersection(self.hashx_subscriptions_by_session.keys()) + touched.intersection_update(self.hashx_subscriptions_by_session.keys()) if touched or (height_changed and self.mempool_statuses): - mempool_hashxs = set(self.mempool_statuses.keys()) - notified = set() - for hashX in touched: + notified_hashxs = 0 + notified_sessions = 0 + for hashX in touched.union(self.mempool_statuses.keys()): for session_id in self.hashx_subscriptions_by_session[hashX]: asyncio.create_task(self.sessions[session_id].send_history_notification(hashX)) - notified.add(hashX) - for hashX in mempool_hashxs.difference(touched): - for session_id in self.hashx_subscriptions_by_session[hashX]: - asyncio.create_task(self.sessions[session_id].send_history_notification(hashX)) - notified.add(hashX) - - if touched: - es = '' if len(touched) == 1 else 'es' - self.logger.info(f'notified {len(notified)} mempool/{len(touched):,d} touched address{es}') + notified_sessions += 1 + notified_hashxs += 1 + self.logger.info(f'notified {notified_sessions} sessions/{notified_hashxs:,d} touched addresses') def add_session(self, session): self.sessions[id(session)] = session @@ -1231,8 +1229,10 @@ class LBRYElectrumX(SessionBase): return await self.address_status(hashX) async def hashX_unsubscribe(self, hashX, alias): - self.session_mgr.hashx_subscriptions_by_session[hashX].remove(id(self)) - self.hashX_subs.pop(hashX, None) + sessions = self.session_mgr.hashx_subscriptions_by_session[hashX] + sessions.remove(id(self)) + if not sessions: + self.hashX_subs.pop(hashX, None) def address_to_hashX(self, address): try: