move session caches

This commit is contained in:
Jack Robison 2022-03-10 13:47:33 -05:00
parent 0cd23d0901
commit bc09dcae90
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
2 changed files with 11 additions and 12 deletions

View file

@ -168,7 +168,7 @@ class SessionManager:
namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS
) )
def __init__(self, env: 'Env', db: 'HubDB', mempool: 'MemPool', history_cache, resolve_cache, resolve_outputs_cache, def __init__(self, env: 'Env', db: 'HubDB', mempool: 'MemPool',
daemon: 'LBCDaemon', shutdown_event: asyncio.Event, daemon: 'LBCDaemon', shutdown_event: asyncio.Event,
on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]): on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]):
env.max_send = max(350000, env.max_send) env.max_send = max(350000, env.max_send)
@ -187,9 +187,9 @@ class SessionManager:
self.cur_group = SessionGroup(0) self.cur_group = SessionGroup(0)
self.txs_sent = 0 self.txs_sent = 0
self.start_time = time.time() self.start_time = time.time()
self.history_cache = history_cache self.history_cache = {}
self.resolve_cache = resolve_cache self.resolve_outputs_cache = {}
self.resolve_outputs_cache = resolve_outputs_cache self.resolve_cache = {}
self.notified_height: typing.Optional[int] = None self.notified_height: typing.Optional[int] = None
# Cache some idea of room to avoid recounting on each subscription # Cache some idea of room to avoid recounting on each subscription
self.subs_room = 0 self.subs_room = 0
@ -203,6 +203,11 @@ class SessionManager:
) )
self.running = False self.running = False
def clear_caches(self):
self.history_cache.clear()
self.resolve_outputs_cache.clear()
self.resolve_cache.clear()
async def _start_server(self, kind, *args, **kw_args): async def _start_server(self, kind, *args, **kw_args):
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()

View file

@ -10,17 +10,13 @@ from scribe.hub.udp import StatusServer
class BlockchainReaderServer(BaseBlockchainReader): class BlockchainReaderServer(BaseBlockchainReader):
def __init__(self, env): def __init__(self, env):
super().__init__(env, 'lbry-reader', thread_workers=max(1, env.max_query_workers), thread_prefix='hub-worker') super().__init__(env, 'lbry-reader', thread_workers=max(1, env.max_query_workers), thread_prefix='hub-worker')
self.history_cache = {}
self.resolve_outputs_cache = {}
self.resolve_cache = {}
self.notifications_to_send = [] self.notifications_to_send = []
self.mempool_notifications = set() self.mempool_notifications = set()
self.status_server = StatusServer() self.status_server = StatusServer()
self.daemon = LBCDaemon(env.coin, env.daemon_url) # only needed for broadcasting txs self.daemon = LBCDaemon(env.coin, env.daemon_url) # only needed for broadcasting txs
self.mempool = MemPool(self.env.coin, self.db) self.mempool = MemPool(self.env.coin, self.db)
self.session_manager = SessionManager( self.session_manager = SessionManager(
env, self.db, self.mempool, self.history_cache, self.resolve_cache, env, self.db, self.mempool, self.daemon,
self.resolve_outputs_cache, self.daemon,
self.shutdown_event, self.shutdown_event,
on_available_callback=self.status_server.set_available, on_available_callback=self.status_server.set_available,
on_unavailable_callback=self.status_server.set_unavailable on_unavailable_callback=self.status_server.set_unavailable
@ -35,9 +31,7 @@ class BlockchainReaderServer(BaseBlockchainReader):
self._es_block_hash = None self._es_block_hash = None
def clear_caches(self): def clear_caches(self):
self.history_cache.clear() self.session_manager.clear_caches()
self.resolve_outputs_cache.clear()
self.resolve_cache.clear()
# self.clear_search_cache() # self.clear_search_cache()
# self.mempool.notified_mempool_txs.clear() # self.mempool.notified_mempool_txs.clear()