diff --git a/scribe/hub/session.py b/scribe/hub/session.py index 53cf782..e56dcfa 100644 --- a/scribe/hub/session.py +++ b/scribe/hub/session.py @@ -168,7 +168,7 @@ class SessionManager: namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS ) - def __init__(self, env: 'Env', db: 'HubDB', mempool: 'MemPool', history_cache, resolve_cache, resolve_outputs_cache, + def __init__(self, env: 'Env', db: 'HubDB', mempool: 'MemPool', daemon: 'LBCDaemon', shutdown_event: asyncio.Event, on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]): env.max_send = max(350000, env.max_send) @@ -187,9 +187,9 @@ class SessionManager: self.cur_group = SessionGroup(0) self.txs_sent = 0 self.start_time = time.time() - self.history_cache = history_cache - self.resolve_cache = resolve_cache - self.resolve_outputs_cache = resolve_outputs_cache + self.history_cache = {} + self.resolve_outputs_cache = {} + self.resolve_cache = {} self.notified_height: typing.Optional[int] = None # Cache some idea of room to avoid recounting on each subscription self.subs_room = 0 @@ -203,6 +203,11 @@ class SessionManager: ) self.running = False + def clear_caches(self): + self.history_cache.clear() + self.resolve_outputs_cache.clear() + self.resolve_cache.clear() + async def _start_server(self, kind, *args, **kw_args): loop = asyncio.get_event_loop() diff --git a/scribe/reader/hub_server.py b/scribe/reader/hub_server.py index b509e7b..1bab810 100644 --- a/scribe/reader/hub_server.py +++ b/scribe/reader/hub_server.py @@ -10,17 +10,13 @@ from scribe.hub.udp import StatusServer class BlockchainReaderServer(BaseBlockchainReader): def __init__(self, env): super().__init__(env, 'lbry-reader', thread_workers=max(1, env.max_query_workers), thread_prefix='hub-worker') - self.history_cache = {} - self.resolve_outputs_cache = {} - self.resolve_cache = {} self.notifications_to_send = [] self.mempool_notifications = set() self.status_server = StatusServer() self.daemon = LBCDaemon(env.coin, env.daemon_url) # only needed for broadcasting txs self.mempool = MemPool(self.env.coin, self.db) self.session_manager = SessionManager( - env, self.db, self.mempool, self.history_cache, self.resolve_cache, - self.resolve_outputs_cache, self.daemon, + env, self.db, self.mempool, self.daemon, self.shutdown_event, on_available_callback=self.status_server.set_available, on_unavailable_callback=self.status_server.set_unavailable @@ -35,9 +31,7 @@ class BlockchainReaderServer(BaseBlockchainReader): self._es_block_hash = None def clear_caches(self): - self.history_cache.clear() - self.resolve_outputs_cache.clear() - self.resolve_cache.clear() + self.session_manager.clear_caches() # self.clear_search_cache() # self.mempool.notified_mempool_txs.clear()