diff --git a/lbry/lbry/wallet/server/block_processor.py b/lbry/lbry/wallet/server/block_processor.py index 2b5678788..e09eb25a4 100644 --- a/lbry/lbry/wallet/server/block_processor.py +++ b/lbry/lbry/wallet/server/block_processor.py @@ -68,6 +68,7 @@ class LBRYBlockProcessor(BlockProcessor): self.logger.info(f"LbryumX Block Processor - Validating signatures: {self.should_validate_signatures}") self.sql: SQLDB = self.db.sql self.timer = Timer('BlockProcessor') + self.search_cache = {} def advance_blocks(self, blocks): self.sql.begin() @@ -78,6 +79,8 @@ class LBRYBlockProcessor(BlockProcessor): raise finally: self.sql.commit() + for cache in self.search_cache.values(): + cache.clear() def advance_txs(self, height, txs, header): timer = self.timer.sub_timers['advance_blocks'] diff --git a/lbry/lbry/wallet/server/session.py b/lbry/lbry/wallet/server/session.py index cd0d3f0d7..7fda0c7c1 100644 --- a/lbry/lbry/wallet/server/session.py +++ b/lbry/lbry/wallet/server/session.py @@ -5,6 +5,7 @@ import base64 import asyncio from binascii import hexlify from weakref import WeakSet +from pylru import lrucache from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from aiohttp.web import Application, AppRunner, WebSocketResponse, TCPSite @@ -69,7 +70,7 @@ class AdminWebSocket: @staticmethod async def on_shutdown(app): print('disconnecting websockets') - for web_socket in app['websockets']: + for web_socket in set(app['websockets']): await web_socket.close(code=WSCloseCode.GOING_AWAY, message='Server shutdown') @@ -85,6 +86,9 @@ class LBRYSessionManager(SessionManager): self.running = False if self.env.websocket_host is not None and self.env.websocket_port is not None: self.websocket = AdminWebSocket(self) + self.search_cache = self.bp.search_cache + self.search_cache['search'] = lrucache(1000) + self.search_cache['resolve'] = lrucache(1000) def get_command_tracking_info(self, command): if command not in self.command_metrics: @@ -191,19 +195,26 @@ class LBRYElectrumX(ElectrumX): elapsed = int((time.perf_counter() - start) * 1000) (result, metrics) = result self.session_mgr.finish_command_tracking(name, elapsed, metrics) + result = base64.b64encode(result) + if self.env.cache_search: + self.session_mgr.search_cache[name][str(kwargs)] = result return result async def claimtrie_search(self, **kwargs): if 'claim_id' in kwargs: self.assert_claim_id(kwargs['claim_id']) - return base64.b64encode( - await self.run_in_executor('search', reader.search_to_bytes, kwargs) - ).decode() + if self.env.cache_search: + key = str(kwargs) + if key in self.session_mgr.search_cache['search']: + return self.session_mgr.search_cache['search'][key] + return await self.run_in_executor('search', reader.search_to_bytes, kwargs) async def claimtrie_resolve(self, *urls): - return base64.b64encode( - await self.run_in_executor('resolve', reader.resolve_to_bytes, urls) - ).decode() + if self.env.cache_search: + key = str(urls) + if key in self.session_mgr.search_cache['resolve']: + return self.session_mgr.search_cache['resolve'][key] + return await self.run_in_executor('resolve', reader.resolve_to_bytes, urls) async def get_server_height(self): return self.bp.height diff --git a/torba/torba/server/env.py b/torba/torba/server/env.py index 72bf263b4..0b6a8a77a 100644 --- a/torba/torba/server/env.py +++ b/torba/torba/server/env.py @@ -38,6 +38,7 @@ class Env: self.db_dir = self.required('DB_DIRECTORY') self.db_engine = self.default('DB_ENGINE', 'leveldb') self.max_query_workers = self.integer('MAX_QUERY_WORKERS', None) + self.cache_search = self.boolean('CACHE_SEARCH', False) self.track_metrics = self.boolean('TRACK_METRICS', False) self.websocket_host = self.default('WEBSOCKET_HOST', None) self.websocket_port = self.integer('WEBSOCKET_PORT', None)