forked from LBRYCommunity/lbry-sdk
fix reorg notifications
This commit is contained in:
parent
16bfb8589b
commit
6b2d4175be
3 changed files with 6 additions and 46 deletions
|
@ -107,7 +107,7 @@ class BlockchainReaderServer(BlockchainReader):
|
||||||
self.resolve_outputs_cache = {}
|
self.resolve_outputs_cache = {}
|
||||||
self.resolve_cache = {}
|
self.resolve_cache = {}
|
||||||
self.notifications_to_send = []
|
self.notifications_to_send = []
|
||||||
self.mempool_notifications = []
|
self.mempool_notifications = set()
|
||||||
self.status_server = StatusServer()
|
self.status_server = StatusServer()
|
||||||
self.daemon = env.coin.DAEMON(env.coin, env.daemon_url) # only needed for broadcasting txs
|
self.daemon = env.coin.DAEMON(env.coin, env.daemon_url) # only needed for broadcasting txs
|
||||||
self.prometheus_server: typing.Optional[PrometheusServer] = None
|
self.prometheus_server: typing.Optional[PrometheusServer] = None
|
||||||
|
@ -143,7 +143,7 @@ class BlockchainReaderServer(BlockchainReader):
|
||||||
|
|
||||||
def _detect_changes(self):
|
def _detect_changes(self):
|
||||||
super()._detect_changes()
|
super()._detect_changes()
|
||||||
self.mempool_notifications.append((self.db.fs_height, self.mempool.refresh()))
|
self.mempool_notifications.update(self.mempool.refresh())
|
||||||
|
|
||||||
async def poll_for_changes(self):
|
async def poll_for_changes(self):
|
||||||
await super().poll_for_changes()
|
await super().poll_for_changes()
|
||||||
|
@ -157,8 +157,9 @@ class BlockchainReaderServer(BlockchainReader):
|
||||||
if self._es_height == self.db.db_height:
|
if self._es_height == self.db.db_height:
|
||||||
self.synchronized.set()
|
self.synchronized.set()
|
||||||
if self.mempool_notifications:
|
if self.mempool_notifications:
|
||||||
for (height, touched) in self.mempool_notifications:
|
await self.mempool.on_mempool(
|
||||||
await self.mempool.on_mempool(set(self.mempool.touched_hashXs), touched, height)
|
set(self.mempool.touched_hashXs), self.mempool_notifications, self.db.db_height
|
||||||
|
)
|
||||||
self.mempool_notifications.clear()
|
self.mempool_notifications.clear()
|
||||||
self.notifications_to_send.clear()
|
self.notifications_to_send.clear()
|
||||||
|
|
||||||
|
|
|
@ -181,7 +181,7 @@ class MemPool:
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
async def start(self, height, session_manager: 'LBRYSessionManager'):
|
async def start(self, height, session_manager: 'LBRYSessionManager'):
|
||||||
self.notify_sessions = session_manager._notify_sessions
|
self.session_manager = session_manager
|
||||||
await self._notify_sessions(height, set(), set())
|
await self._notify_sessions(height, set(), set())
|
||||||
|
|
||||||
async def on_mempool(self, touched, new_touched, height):
|
async def on_mempool(self, touched, new_touched, height):
|
||||||
|
|
|
@ -622,47 +622,6 @@ class SessionManager:
|
||||||
self.logger.info(f'notify {len(notify_tasks)} sessions of new peers')
|
self.logger.info(f'notify {len(notify_tasks)} sessions of new peers')
|
||||||
asyncio.create_task(asyncio.wait(notify_tasks))
|
asyncio.create_task(asyncio.wait(notify_tasks))
|
||||||
|
|
||||||
async def _notify_sessions(self, height, touched, new_touched):
|
|
||||||
"""Notify sessions about height changes and touched addresses."""
|
|
||||||
height_changed = height != self.notified_height
|
|
||||||
if height_changed:
|
|
||||||
await self._refresh_hsub_results(height)
|
|
||||||
|
|
||||||
if not self.sessions:
|
|
||||||
return
|
|
||||||
|
|
||||||
if height_changed:
|
|
||||||
header_tasks = [
|
|
||||||
session.send_notification('blockchain.headers.subscribe', (self.hsub_results[session.subscribe_headers_raw], ))
|
|
||||||
for session in self.sessions.values() if session.subscribe_headers
|
|
||||||
]
|
|
||||||
if header_tasks:
|
|
||||||
self.logger.info(f'notify {len(header_tasks)} sessions of new header')
|
|
||||||
asyncio.create_task(asyncio.wait(header_tasks))
|
|
||||||
for hashX in touched.intersection(self.mempool_statuses.keys()):
|
|
||||||
self.mempool_statuses.pop(hashX, None)
|
|
||||||
|
|
||||||
# self.bp._chain_executor
|
|
||||||
await asyncio.get_event_loop().run_in_executor(
|
|
||||||
None, touched.intersection_update, self.hashx_subscriptions_by_session.keys()
|
|
||||||
)
|
|
||||||
|
|
||||||
if touched or new_touched or (height_changed and self.mempool_statuses):
|
|
||||||
notified_hashxs = 0
|
|
||||||
session_hashxes_to_notify = defaultdict(list)
|
|
||||||
to_notify = touched if height_changed else new_touched
|
|
||||||
|
|
||||||
for hashX in to_notify:
|
|
||||||
if hashX not in self.hashx_subscriptions_by_session:
|
|
||||||
continue
|
|
||||||
for session_id in self.hashx_subscriptions_by_session[hashX]:
|
|
||||||
session_hashxes_to_notify[session_id].append(hashX)
|
|
||||||
notified_hashxs += 1
|
|
||||||
for session_id, hashXes in session_hashxes_to_notify.items():
|
|
||||||
asyncio.create_task(self.sessions[session_id].send_history_notifications(*hashXes))
|
|
||||||
if session_hashxes_to_notify:
|
|
||||||
self.logger.info(f'notified {len(session_hashxes_to_notify)} sessions/{notified_hashxs:,d} touched addresses')
|
|
||||||
|
|
||||||
def add_session(self, session):
|
def add_session(self, session):
|
||||||
self.sessions[id(session)] = session
|
self.sessions[id(session)] = session
|
||||||
self.session_event.set()
|
self.session_event.set()
|
||||||
|
|
Loading…
Add table
Reference in a new issue