Merge pull request #2972 from lbryio/history-cache

Improve wallet server address history cache and the rate of sent notifications
This commit is contained in:
Lex Berezhny 2020-06-05 15:47:07 -04:00 committed by GitHub
commit 3b9ea2c9a4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 19 additions and 28 deletions

View file

@ -206,7 +206,7 @@ class SessionBase(asyncio.Protocol):
""" """
return self._address return self._address
def peer_address_str(self): def peer_address_str(self, for_log=True):
"""Returns the peer's IP address and port as a human-readable """Returns the peer's IP address and port as a human-readable
string.""" string."""
if not self._address: if not self._address:
@ -483,11 +483,17 @@ class RPCSession(SessionBase):
raise result raise result
return result return result
async def send_notification(self, method, args=()): async def send_notification(self, method, args=()) -> bool:
"""Send an RPC notification over the network.""" """Send an RPC notification over the network."""
message = self.connection.send_notification(Notification(method, args)) message = self.connection.send_notification(Notification(method, args))
self.NOTIFICATION_COUNT.labels(method=method, version=self.client_version).inc() self.NOTIFICATION_COUNT.labels(method=method, version=self.client_version).inc()
await self._send_message(message) try:
await self._send_message(message)
return True
except asyncio.TimeoutError:
self.logger.info("timeout sending address notification to %s", self.peer_address_str(for_log=True))
self.abort()
return False
def send_batch(self, raise_errors=False): def send_batch(self, raise_errors=False):
"""Return a BatchRequest. Intended to be used like so: """Return a BatchRequest. Intended to be used like so:

View file

@ -183,6 +183,7 @@ class BlockProcessor:
self.state_lock = asyncio.Lock() self.state_lock = asyncio.Lock()
self.search_cache = {} self.search_cache = {}
self.history_cache = {}
async def run_in_thread_with_lock(self, func, *args): async def run_in_thread_with_lock(self, func, *args):
# Run in a thread to prevent blocking. Shielded so that # Run in a thread to prevent blocking. Shielded so that
@ -213,6 +214,7 @@ class BlockProcessor:
await self.run_in_thread_with_lock(self.advance_blocks, blocks) await self.run_in_thread_with_lock(self.advance_blocks, blocks)
for cache in self.search_cache.values(): for cache in self.search_cache.values():
cache.clear() cache.clear()
self.history_cache.clear()
await self._maybe_flush() await self._maybe_flush()
processed_time = time.perf_counter() - start processed_time = time.perf_counter() - start
self.block_count_metric.set(self.height) self.block_count_metric.set(self.height)

View file

@ -135,7 +135,7 @@ class SessionManager:
"docker_tag": DOCKER_TAG, "docker_tag": DOCKER_TAG,
'version': lbry.__version__, 'version': lbry.__version__,
"min_version": util.version_string(VERSION.PROTOCOL_MIN), "min_version": util.version_string(VERSION.PROTOCOL_MIN),
"cpu_count": os.cpu_count() "cpu_count": str(os.cpu_count())
}) })
session_count_metric = Gauge("session_count", "Number of connected client sessions", namespace=NAMESPACE, session_count_metric = Gauge("session_count", "Number of connected client sessions", namespace=NAMESPACE,
labelnames=("version",)) labelnames=("version",))
@ -177,7 +177,7 @@ class SessionManager:
self.cur_group = SessionGroup(0) self.cur_group = SessionGroup(0)
self.txs_sent = 0 self.txs_sent = 0
self.start_time = time.time() self.start_time = time.time()
self.history_cache = pylru.lrucache(256) self.history_cache = self.bp.history_cache
self.notified_height: typing.Optional[int] = None self.notified_height: typing.Optional[int] = None
# Cache some idea of room to avoid recounting on each subscription # Cache some idea of room to avoid recounting on each subscription
self.subs_room = 0 self.subs_room = 0
@ -608,26 +608,20 @@ class SessionManager:
async def limited_history(self, hashX): async def limited_history(self, hashX):
"""A caching layer.""" """A caching layer."""
hc = self.history_cache if hashX not in self.history_cache:
if hashX not in hc:
# History DoS limit. Each element of history is about 99 # History DoS limit. Each element of history is about 99
# bytes when encoded as JSON. This limits resource usage # bytes when encoded as JSON. This limits resource usage
# on bloated history requests, and uses a smaller divisor # on bloated history requests, and uses a smaller divisor
# so large requests are logged before refusing them. # so large requests are logged before refusing them.
limit = self.env.max_send // 97 limit = self.env.max_send // 97
hc[hashX] = await self.db.limited_history(hashX, limit=limit) self.history_cache[hashX] = await self.db.limited_history(hashX, limit=limit)
return hc[hashX] return self.history_cache[hashX]
async def _notify_sessions(self, height, touched): async def _notify_sessions(self, height, touched):
"""Notify sessions about height changes and touched addresses.""" """Notify sessions about height changes and touched addresses."""
height_changed = height != self.notified_height height_changed = height != self.notified_height
if height_changed: if height_changed:
await self._refresh_hsub_results(height) await self._refresh_hsub_results(height)
# Invalidate our history cache for touched hashXs
hc = self.history_cache
for hashX in set(hc).intersection(touched):
del hc[hashX]
if self.sessions: if self.sessions:
await asyncio.wait([ await asyncio.wait([
session.notify(touched, height_changed) for session in self.sessions session.notify(touched, height_changed) for session in self.sessions
@ -927,13 +921,8 @@ class LBRYElectrumX(SessionBase):
""" """
if height_changed and self.subscribe_headers: if height_changed and self.subscribe_headers:
args = (await self.subscribe_headers_result(), ) args = (await self.subscribe_headers_result(), )
try: if not (await self.send_notification('blockchain.headers.subscribe', args)):
await self.send_notification('blockchain.headers.subscribe', args)
except asyncio.TimeoutError:
self.logger.info("timeout sending headers notification to %s", self.peer_address_str(for_log=True))
self.abort()
return return
touched = touched.intersection(self.hashX_subs) touched = touched.intersection(self.hashX_subs)
if touched or (height_changed and self.mempool_statuses): if touched or (height_changed and self.mempool_statuses):
changed = {} changed = {}
@ -960,14 +949,7 @@ class LBRYElectrumX(SessionBase):
method = 'blockchain.scripthash.subscribe' method = 'blockchain.scripthash.subscribe'
else: else:
method = 'blockchain.address.subscribe' method = 'blockchain.address.subscribe'
asyncio.create_task(self.send_notification(method, (alias, status)))
try:
await self.send_notification(method, (alias, status))
except asyncio.TimeoutError:
self.logger.info("timeout sending address notification to %s", self.peer_address_str(for_log=True))
self.abort()
return
if changed: if changed:
es = '' if len(changed) == 1 else 'es' es = '' if len(changed) == 1 else 'es'
self.logger.info(f'notified of {len(changed):,d} address{es}') self.logger.info(f'notified of {len(changed):,d} address{es}')
@ -1180,6 +1162,7 @@ class LBRYElectrumX(SessionBase):
""" """
# Note history is ordered and mempool unordered in electrum-server # Note history is ordered and mempool unordered in electrum-server
# For mempool, height is -1 if it has unconfirmed inputs, otherwise 0 # For mempool, height is -1 if it has unconfirmed inputs, otherwise 0
db_history = await self.session_mgr.limited_history(hashX) db_history = await self.session_mgr.limited_history(hashX)
mempool = await self.mempool.transaction_summaries(hashX) mempool = await self.mempool.transaction_summaries(hashX)