Merge pull request #3103 from lbryio/fix-mempool-bloat

fix mempool notification bloat
This commit is contained in:
Jack Robison 2020-12-07 15:09:13 -05:00 committed by GitHub
commit 34ed058c97
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -640,6 +640,7 @@ class SessionManager:
height_changed = height != self.notified_height height_changed = height != self.notified_height
if height_changed: if height_changed:
await self._refresh_hsub_results(height) await self._refresh_hsub_results(height)
if not self.sessions: if not self.sessions:
return return
@ -649,25 +650,22 @@ class SessionManager:
for session in self.sessions.values() if session.subscribe_headers for session in self.sessions.values() if session.subscribe_headers
] ]
if header_tasks: if header_tasks:
self.logger.info(f'notify {len(header_tasks)} sessions of new header')
await asyncio.wait(header_tasks) await asyncio.wait(header_tasks)
for hashX in touched.intersection(self.mempool_statuses.keys()):
self.mempool_statuses.pop(hashX, None)
touched = touched.intersection(self.hashx_subscriptions_by_session.keys()) touched.intersection_update(self.hashx_subscriptions_by_session.keys())
if touched or (height_changed and self.mempool_statuses): if touched or (height_changed and self.mempool_statuses):
mempool_hashxs = set(self.mempool_statuses.keys()) notified_hashxs = 0
notified = set() notified_sessions = 0
for hashX in touched: for hashX in touched.union(self.mempool_statuses.keys()):
for session_id in self.hashx_subscriptions_by_session[hashX]: for session_id in self.hashx_subscriptions_by_session[hashX]:
asyncio.create_task(self.sessions[session_id].send_history_notification(hashX)) asyncio.create_task(self.sessions[session_id].send_history_notification(hashX))
notified.add(hashX) notified_sessions += 1
for hashX in mempool_hashxs.difference(touched): notified_hashxs += 1
for session_id in self.hashx_subscriptions_by_session[hashX]: self.logger.info(f'notified {notified_sessions} sessions/{notified_hashxs:,d} touched addresses')
asyncio.create_task(self.sessions[session_id].send_history_notification(hashX))
notified.add(hashX)
if touched:
es = '' if len(touched) == 1 else 'es'
self.logger.info(f'notified {len(notified)} mempool/{len(touched):,d} touched address{es}')
def add_session(self, session): def add_session(self, session):
self.sessions[id(session)] = session self.sessions[id(session)] = session
@ -1231,7 +1229,9 @@ class LBRYElectrumX(SessionBase):
return await self.address_status(hashX) return await self.address_status(hashX)
async def hashX_unsubscribe(self, hashX, alias): async def hashX_unsubscribe(self, hashX, alias):
self.session_mgr.hashx_subscriptions_by_session[hashX].remove(id(self)) sessions = self.session_mgr.hashx_subscriptions_by_session[hashX]
sessions.remove(id(self))
if not sessions:
self.hashX_subs.pop(hashX, None) self.hashX_subs.pop(hashX, None)
def address_to_hashX(self, address): def address_to_hashX(self, address):