forked from LBRYCommunity/lbry-sdk
improve wallet server address history cache
This commit is contained in:
parent
48502961cf
commit
8d93dd5adc
2 changed files with 7 additions and 11 deletions
|
@ -183,6 +183,7 @@ class BlockProcessor:
|
|||
self.state_lock = asyncio.Lock()
|
||||
|
||||
self.search_cache = {}
|
||||
self.history_cache = {}
|
||||
|
||||
async def run_in_thread_with_lock(self, func, *args):
|
||||
# Run in a thread to prevent blocking. Shielded so that
|
||||
|
@ -213,6 +214,7 @@ class BlockProcessor:
|
|||
await self.run_in_thread_with_lock(self.advance_blocks, blocks)
|
||||
for cache in self.search_cache.values():
|
||||
cache.clear()
|
||||
self.history_cache.clear()
|
||||
await self._maybe_flush()
|
||||
processed_time = time.perf_counter() - start
|
||||
self.block_count_metric.set(self.height)
|
||||
|
|
|
@ -135,7 +135,7 @@ class SessionManager:
|
|||
"docker_tag": DOCKER_TAG,
|
||||
'version': lbry.__version__,
|
||||
"min_version": util.version_string(VERSION.PROTOCOL_MIN),
|
||||
"cpu_count": os.cpu_count()
|
||||
"cpu_count": str(os.cpu_count())
|
||||
})
|
||||
session_count_metric = Gauge("session_count", "Number of connected client sessions", namespace=NAMESPACE,
|
||||
labelnames=("version",))
|
||||
|
@ -177,7 +177,7 @@ class SessionManager:
|
|||
self.cur_group = SessionGroup(0)
|
||||
self.txs_sent = 0
|
||||
self.start_time = time.time()
|
||||
self.history_cache = pylru.lrucache(256)
|
||||
self.history_cache = self.bp.history_cache
|
||||
self.notified_height: typing.Optional[int] = None
|
||||
# Cache some idea of room to avoid recounting on each subscription
|
||||
self.subs_room = 0
|
||||
|
@ -608,26 +608,20 @@ class SessionManager:
|
|||
|
||||
async def limited_history(self, hashX):
|
||||
"""A caching layer."""
|
||||
hc = self.history_cache
|
||||
if hashX not in hc:
|
||||
if hashX not in self.history_cache:
|
||||
# History DoS limit. Each element of history is about 99
|
||||
# bytes when encoded as JSON. This limits resource usage
|
||||
# on bloated history requests, and uses a smaller divisor
|
||||
# so large requests are logged before refusing them.
|
||||
limit = self.env.max_send // 97
|
||||
hc[hashX] = await self.db.limited_history(hashX, limit=limit)
|
||||
return hc[hashX]
|
||||
self.history_cache[hashX] = await self.db.limited_history(hashX, limit=limit)
|
||||
return self.history_cache[hashX]
|
||||
|
||||
async def _notify_sessions(self, height, touched):
|
||||
"""Notify sessions about height changes and touched addresses."""
|
||||
height_changed = height != self.notified_height
|
||||
if height_changed:
|
||||
await self._refresh_hsub_results(height)
|
||||
# Invalidate our history cache for touched hashXs
|
||||
hc = self.history_cache
|
||||
for hashX in set(hc).intersection(touched):
|
||||
del hc[hashX]
|
||||
|
||||
if self.sessions:
|
||||
await asyncio.wait([
|
||||
session.notify(touched, height_changed) for session in self.sessions
|
||||
|
|
Loading…
Reference in a new issue