move search_index object to HubServerService
This commit is contained in:
parent
9b17822229
commit
4586b344ce
2 changed files with 19 additions and 12 deletions
|
@ -1,18 +1,25 @@
|
||||||
import time
|
import time
|
||||||
import typing
|
import typing
|
||||||
import asyncio
|
import asyncio
|
||||||
|
from prometheus_client import Counter
|
||||||
|
from hub import PROMETHEUS_NAMESPACE
|
||||||
from hub.scribe.daemon import LBCDaemon
|
from hub.scribe.daemon import LBCDaemon
|
||||||
from hub.herald.session import SessionManager
|
from hub.herald.session import SessionManager
|
||||||
from hub.herald.mempool import HubMemPool
|
from hub.herald.mempool import HubMemPool
|
||||||
from hub.herald.udp import StatusServer
|
from hub.herald.udp import StatusServer
|
||||||
from hub.herald.db import HeraldDB
|
from hub.herald.db import HeraldDB
|
||||||
|
from hub.herald.search import SearchIndex
|
||||||
from hub.service import BlockchainReaderService
|
from hub.service import BlockchainReaderService
|
||||||
from hub.notifier_protocol import ElasticNotifierClientProtocol
|
from hub.notifier_protocol import ElasticNotifierClientProtocol
|
||||||
if typing.TYPE_CHECKING:
|
if typing.TYPE_CHECKING:
|
||||||
from hub.herald.env import ServerEnv
|
from hub.herald.env import ServerEnv
|
||||||
|
|
||||||
|
NAMESPACE = f"{PROMETHEUS_NAMESPACE}_hub"
|
||||||
|
|
||||||
|
|
||||||
class HubServerService(BlockchainReaderService):
|
class HubServerService(BlockchainReaderService):
|
||||||
|
interrupt_count_metric = Counter("interrupt", "Number of interrupted queries", namespace=NAMESPACE)
|
||||||
|
|
||||||
def __init__(self, env: 'ServerEnv'):
|
def __init__(self, env: 'ServerEnv'):
|
||||||
super().__init__(env, 'lbry-reader', thread_workers=max(1, env.max_query_workers), thread_prefix='hub-worker')
|
super().__init__(env, 'lbry-reader', thread_workers=max(1, env.max_query_workers), thread_prefix='hub-worker')
|
||||||
self.env = env
|
self.env = env
|
||||||
|
@ -21,8 +28,14 @@ class HubServerService(BlockchainReaderService):
|
||||||
self.status_server = StatusServer()
|
self.status_server = StatusServer()
|
||||||
self.daemon = LBCDaemon(env.coin, env.daemon_url, daemon_ca_path=env.daemon_ca_path) # only needed for broadcasting txs
|
self.daemon = LBCDaemon(env.coin, env.daemon_url, daemon_ca_path=env.daemon_ca_path) # only needed for broadcasting txs
|
||||||
self.mempool = HubMemPool(self.env.coin, self.db)
|
self.mempool = HubMemPool(self.env.coin, self.db)
|
||||||
|
self.search_index = SearchIndex(
|
||||||
|
self.db, self.env.es_index_prefix, self.env.database_query_timeout,
|
||||||
|
elastic_host=env.elastic_host, elastic_port=env.elastic_port,
|
||||||
|
timeout_counter=self.interrupt_count_metric
|
||||||
|
)
|
||||||
|
|
||||||
self.session_manager = SessionManager(
|
self.session_manager = SessionManager(
|
||||||
env, self.db, self.mempool, self.daemon,
|
env, self.db, self.mempool, self.daemon, self.search_index,
|
||||||
self.shutdown_event,
|
self.shutdown_event,
|
||||||
on_available_callback=self.status_server.set_available,
|
on_available_callback=self.status_server.set_available,
|
||||||
on_unavailable_callback=self.status_server.set_unavailable
|
on_unavailable_callback=self.status_server.set_unavailable
|
||||||
|
@ -52,7 +65,7 @@ class HubServerService(BlockchainReaderService):
|
||||||
# self.mempool.notified_mempool_txs.clear()
|
# self.mempool.notified_mempool_txs.clear()
|
||||||
|
|
||||||
def clear_search_cache(self):
|
def clear_search_cache(self):
|
||||||
self.session_manager.search_index.clear_caches()
|
self.search_index.clear_caches()
|
||||||
|
|
||||||
def advance(self, height: int):
|
def advance(self, height: int):
|
||||||
super().advance(height)
|
super().advance(height)
|
||||||
|
@ -134,7 +147,7 @@ class HubServerService(BlockchainReaderService):
|
||||||
self.block_count_metric.set(self.last_state.height)
|
self.block_count_metric.set(self.last_state.height)
|
||||||
yield self.start_prometheus()
|
yield self.start_prometheus()
|
||||||
yield self.start_cancellable(self.receive_es_notifications)
|
yield self.start_cancellable(self.receive_es_notifications)
|
||||||
yield self.session_manager.search_index.start()
|
yield self.search_index.start()
|
||||||
yield self.start_cancellable(self.session_manager.serve, self.mempool)
|
yield self.start_cancellable(self.session_manager.serve, self.mempool)
|
||||||
|
|
||||||
def _iter_stop_tasks(self):
|
def _iter_stop_tasks(self):
|
||||||
|
|
|
@ -142,7 +142,6 @@ class SessionManager:
|
||||||
tx_replied_count_metric = Counter("replied_transaction", "Number of transactions responded", namespace=NAMESPACE)
|
tx_replied_count_metric = Counter("replied_transaction", "Number of transactions responded", namespace=NAMESPACE)
|
||||||
urls_to_resolve_count_metric = Counter("urls_to_resolve", "Number of urls to resolve", namespace=NAMESPACE)
|
urls_to_resolve_count_metric = Counter("urls_to_resolve", "Number of urls to resolve", namespace=NAMESPACE)
|
||||||
resolved_url_count_metric = Counter("resolved_url", "Number of resolved urls", namespace=NAMESPACE)
|
resolved_url_count_metric = Counter("resolved_url", "Number of resolved urls", namespace=NAMESPACE)
|
||||||
interrupt_count_metric = Counter("interrupt", "Number of interrupted queries", namespace=NAMESPACE)
|
|
||||||
db_operational_error_metric = Counter(
|
db_operational_error_metric = Counter(
|
||||||
"operational_error", "Number of queries that raised operational errors", namespace=NAMESPACE
|
"operational_error", "Number of queries that raised operational errors", namespace=NAMESPACE
|
||||||
)
|
)
|
||||||
|
@ -181,7 +180,7 @@ class SessionManager:
|
||||||
)
|
)
|
||||||
|
|
||||||
def __init__(self, env: 'ServerEnv', db: 'SecondaryDB', mempool: 'HubMemPool',
|
def __init__(self, env: 'ServerEnv', db: 'SecondaryDB', mempool: 'HubMemPool',
|
||||||
daemon: 'LBCDaemon', shutdown_event: asyncio.Event,
|
daemon: 'LBCDaemon', search_index: 'SearchIndex', shutdown_event: asyncio.Event,
|
||||||
on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]):
|
on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]):
|
||||||
env.max_send = max(350000, env.max_send)
|
env.max_send = max(350000, env.max_send)
|
||||||
self.env = env
|
self.env = env
|
||||||
|
@ -190,6 +189,7 @@ class SessionManager:
|
||||||
self.on_unavailable_callback = on_unavailable_callback
|
self.on_unavailable_callback = on_unavailable_callback
|
||||||
self.daemon = daemon
|
self.daemon = daemon
|
||||||
self.mempool = mempool
|
self.mempool = mempool
|
||||||
|
self.search_index = search_index
|
||||||
self.shutdown_event = shutdown_event
|
self.shutdown_event = shutdown_event
|
||||||
self.logger = logging.getLogger(__name__)
|
self.logger = logging.getLogger(__name__)
|
||||||
self.servers: typing.Dict[str, asyncio.AbstractServer] = {}
|
self.servers: typing.Dict[str, asyncio.AbstractServer] = {}
|
||||||
|
@ -208,12 +208,6 @@ class SessionManager:
|
||||||
self.protocol_class = LBRYElectrumX
|
self.protocol_class = LBRYElectrumX
|
||||||
self.session_event = Event()
|
self.session_event = Event()
|
||||||
|
|
||||||
# Search index
|
|
||||||
self.search_index = SearchIndex(
|
|
||||||
self.db, self.env.es_index_prefix, self.env.database_query_timeout,
|
|
||||||
elastic_host=env.elastic_host, elastic_port=env.elastic_port,
|
|
||||||
timeout_counter=self.interrupt_count_metric
|
|
||||||
)
|
|
||||||
self.running = False
|
self.running = False
|
||||||
# hashX: List[int]
|
# hashX: List[int]
|
||||||
self.hashX_raw_history_cache = LFUCacheWithMetrics(env.hashX_history_cache_size, metric_name='raw_history', namespace=NAMESPACE)
|
self.hashX_raw_history_cache = LFUCacheWithMetrics(env.hashX_history_cache_size, metric_name='raw_history', namespace=NAMESPACE)
|
||||||
|
@ -1284,7 +1278,7 @@ class LBRYElectrumX(asyncio.Protocol):
|
||||||
kwargs['channel_id'] = channel_claim.claim_hash.hex()
|
kwargs['channel_id'] = channel_claim.claim_hash.hex()
|
||||||
return await self.session_manager.search_index.cached_search(kwargs)
|
return await self.session_manager.search_index.cached_search(kwargs)
|
||||||
except ConnectionTimeout:
|
except ConnectionTimeout:
|
||||||
self.session_manager.interrupt_count_metric.inc()
|
self.session_manager.search_index.timeout_counter.inc()
|
||||||
raise RPCError(JSONRPC.QUERY_TIMEOUT, 'query timed out')
|
raise RPCError(JSONRPC.QUERY_TIMEOUT, 'query timed out')
|
||||||
except TooManyClaimSearchParametersError as err:
|
except TooManyClaimSearchParametersError as err:
|
||||||
await asyncio.sleep(2)
|
await asyncio.sleep(2)
|
||||||
|
|
Loading…
Reference in a new issue