added lrucache to search/resolve

This commit is contained in:
Lex Berezhny 2019-07-15 02:11:12 -04:00
parent f43e370e89
commit 03a58cf50a
3 changed files with 22 additions and 7 deletions

View file

@ -68,6 +68,7 @@ class LBRYBlockProcessor(BlockProcessor):
self.logger.info(f"LbryumX Block Processor - Validating signatures: {self.should_validate_signatures}") self.logger.info(f"LbryumX Block Processor - Validating signatures: {self.should_validate_signatures}")
self.sql: SQLDB = self.db.sql self.sql: SQLDB = self.db.sql
self.timer = Timer('BlockProcessor') self.timer = Timer('BlockProcessor')
self.search_cache = {}
def advance_blocks(self, blocks): def advance_blocks(self, blocks):
self.sql.begin() self.sql.begin()
@ -78,6 +79,8 @@ class LBRYBlockProcessor(BlockProcessor):
raise raise
finally: finally:
self.sql.commit() self.sql.commit()
for cache in self.search_cache.values():
cache.clear()
def advance_txs(self, height, txs, header): def advance_txs(self, height, txs, header):
timer = self.timer.sub_timers['advance_blocks'] timer = self.timer.sub_timers['advance_blocks']

View file

@ -5,6 +5,7 @@ import base64
import asyncio import asyncio
from binascii import hexlify from binascii import hexlify
from weakref import WeakSet from weakref import WeakSet
from pylru import lrucache
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from aiohttp.web import Application, AppRunner, WebSocketResponse, TCPSite from aiohttp.web import Application, AppRunner, WebSocketResponse, TCPSite
@ -69,7 +70,7 @@ class AdminWebSocket:
@staticmethod @staticmethod
async def on_shutdown(app): async def on_shutdown(app):
print('disconnecting websockets') print('disconnecting websockets')
for web_socket in app['websockets']: for web_socket in set(app['websockets']):
await web_socket.close(code=WSCloseCode.GOING_AWAY, message='Server shutdown') await web_socket.close(code=WSCloseCode.GOING_AWAY, message='Server shutdown')
@ -85,6 +86,9 @@ class LBRYSessionManager(SessionManager):
self.running = False self.running = False
if self.env.websocket_host is not None and self.env.websocket_port is not None: if self.env.websocket_host is not None and self.env.websocket_port is not None:
self.websocket = AdminWebSocket(self) self.websocket = AdminWebSocket(self)
self.search_cache = self.bp.search_cache
self.search_cache['search'] = lrucache(1000)
self.search_cache['resolve'] = lrucache(1000)
def get_command_tracking_info(self, command): def get_command_tracking_info(self, command):
if command not in self.command_metrics: if command not in self.command_metrics:
@ -191,19 +195,26 @@ class LBRYElectrumX(ElectrumX):
elapsed = int((time.perf_counter() - start) * 1000) elapsed = int((time.perf_counter() - start) * 1000)
(result, metrics) = result (result, metrics) = result
self.session_mgr.finish_command_tracking(name, elapsed, metrics) self.session_mgr.finish_command_tracking(name, elapsed, metrics)
result = base64.b64encode(result)
if self.env.cache_search:
self.session_mgr.search_cache[name][str(kwargs)] = result
return result return result
async def claimtrie_search(self, **kwargs): async def claimtrie_search(self, **kwargs):
if 'claim_id' in kwargs: if 'claim_id' in kwargs:
self.assert_claim_id(kwargs['claim_id']) self.assert_claim_id(kwargs['claim_id'])
return base64.b64encode( if self.env.cache_search:
await self.run_in_executor('search', reader.search_to_bytes, kwargs) key = str(kwargs)
).decode() if key in self.session_mgr.search_cache['search']:
return self.session_mgr.search_cache['search'][key]
return await self.run_in_executor('search', reader.search_to_bytes, kwargs)
async def claimtrie_resolve(self, *urls): async def claimtrie_resolve(self, *urls):
return base64.b64encode( if self.env.cache_search:
await self.run_in_executor('resolve', reader.resolve_to_bytes, urls) key = str(urls)
).decode() if key in self.session_mgr.search_cache['resolve']:
return self.session_mgr.search_cache['resolve'][key]
return await self.run_in_executor('resolve', reader.resolve_to_bytes, urls)
async def get_server_height(self): async def get_server_height(self):
return self.bp.height return self.bp.height

View file

@ -38,6 +38,7 @@ class Env:
self.db_dir = self.required('DB_DIRECTORY') self.db_dir = self.required('DB_DIRECTORY')
self.db_engine = self.default('DB_ENGINE', 'leveldb') self.db_engine = self.default('DB_ENGINE', 'leveldb')
self.max_query_workers = self.integer('MAX_QUERY_WORKERS', None) self.max_query_workers = self.integer('MAX_QUERY_WORKERS', None)
self.cache_search = self.boolean('CACHE_SEARCH', False)
self.track_metrics = self.boolean('TRACK_METRICS', False) self.track_metrics = self.boolean('TRACK_METRICS', False)
self.websocket_host = self.default('WEBSOCKET_HOST', None) self.websocket_host = self.default('WEBSOCKET_HOST', None)
self.websocket_port = self.integer('WEBSOCKET_PORT', None) self.websocket_port = self.integer('WEBSOCKET_PORT', None)