diff --git a/hub/herald/service.py b/hub/herald/service.py index 3a2a71c..e20be37 100644 --- a/hub/herald/service.py +++ b/hub/herald/service.py @@ -1,18 +1,25 @@ import time import typing import asyncio +from prometheus_client import Counter +from hub import PROMETHEUS_NAMESPACE from hub.scribe.daemon import LBCDaemon from hub.herald.session import SessionManager from hub.herald.mempool import HubMemPool from hub.herald.udp import StatusServer from hub.herald.db import HeraldDB +from hub.herald.search import SearchIndex from hub.service import BlockchainReaderService from hub.notifier_protocol import ElasticNotifierClientProtocol if typing.TYPE_CHECKING: from hub.herald.env import ServerEnv +NAMESPACE = f"{PROMETHEUS_NAMESPACE}_hub" + class HubServerService(BlockchainReaderService): + interrupt_count_metric = Counter("interrupt", "Number of interrupted queries", namespace=NAMESPACE) + def __init__(self, env: 'ServerEnv'): super().__init__(env, 'lbry-reader', thread_workers=max(1, env.max_query_workers), thread_prefix='hub-worker') self.env = env @@ -21,8 +28,14 @@ class HubServerService(BlockchainReaderService): self.status_server = StatusServer() self.daemon = LBCDaemon(env.coin, env.daemon_url, daemon_ca_path=env.daemon_ca_path) # only needed for broadcasting txs self.mempool = HubMemPool(self.env.coin, self.db) + self.search_index = SearchIndex( + self.db, self.env.es_index_prefix, self.env.database_query_timeout, + elastic_host=env.elastic_host, elastic_port=env.elastic_port, + timeout_counter=self.interrupt_count_metric + ) + self.session_manager = SessionManager( - env, self.db, self.mempool, self.daemon, + env, self.db, self.mempool, self.daemon, self.search_index, self.shutdown_event, on_available_callback=self.status_server.set_available, on_unavailable_callback=self.status_server.set_unavailable @@ -52,7 +65,7 @@ class HubServerService(BlockchainReaderService): # self.mempool.notified_mempool_txs.clear() def clear_search_cache(self): - self.session_manager.search_index.clear_caches() + self.search_index.clear_caches() def advance(self, height: int): super().advance(height) @@ -134,7 +147,7 @@ class HubServerService(BlockchainReaderService): self.block_count_metric.set(self.last_state.height) yield self.start_prometheus() yield self.start_cancellable(self.receive_es_notifications) - yield self.session_manager.search_index.start() + yield self.search_index.start() yield self.start_cancellable(self.session_manager.serve, self.mempool) def _iter_stop_tasks(self): diff --git a/hub/herald/session.py b/hub/herald/session.py index 7a04f0d..56b33d2 100644 --- a/hub/herald/session.py +++ b/hub/herald/session.py @@ -142,7 +142,6 @@ class SessionManager: tx_replied_count_metric = Counter("replied_transaction", "Number of transactions responded", namespace=NAMESPACE) urls_to_resolve_count_metric = Counter("urls_to_resolve", "Number of urls to resolve", namespace=NAMESPACE) resolved_url_count_metric = Counter("resolved_url", "Number of resolved urls", namespace=NAMESPACE) - interrupt_count_metric = Counter("interrupt", "Number of interrupted queries", namespace=NAMESPACE) db_operational_error_metric = Counter( "operational_error", "Number of queries that raised operational errors", namespace=NAMESPACE ) @@ -181,7 +180,7 @@ class SessionManager: ) def __init__(self, env: 'ServerEnv', db: 'SecondaryDB', mempool: 'HubMemPool', - daemon: 'LBCDaemon', shutdown_event: asyncio.Event, + daemon: 'LBCDaemon', search_index: 'SearchIndex', shutdown_event: asyncio.Event, on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]): env.max_send = max(350000, env.max_send) self.env = env @@ -190,6 +189,7 @@ class SessionManager: self.on_unavailable_callback = on_unavailable_callback self.daemon = daemon self.mempool = mempool + self.search_index = search_index self.shutdown_event = shutdown_event self.logger = logging.getLogger(__name__) self.servers: typing.Dict[str, asyncio.AbstractServer] = {} @@ -208,12 +208,6 @@ class SessionManager: self.protocol_class = LBRYElectrumX self.session_event = Event() - # Search index - self.search_index = SearchIndex( - self.db, self.env.es_index_prefix, self.env.database_query_timeout, - elastic_host=env.elastic_host, elastic_port=env.elastic_port, - timeout_counter=self.interrupt_count_metric - ) self.running = False # hashX: List[int] self.hashX_raw_history_cache = LFUCacheWithMetrics(env.hashX_history_cache_size, metric_name='raw_history', namespace=NAMESPACE) @@ -1284,7 +1278,7 @@ class LBRYElectrumX(asyncio.Protocol): kwargs['channel_id'] = channel_claim.claim_hash.hex() return await self.session_manager.search_index.cached_search(kwargs) except ConnectionTimeout: - self.session_manager.interrupt_count_metric.inc() + self.session_manager.search_index.timeout_counter.inc() raise RPCError(JSONRPC.QUERY_TIMEOUT, 'query timed out') except TooManyClaimSearchParametersError as err: await asyncio.sleep(2)