From b78e3dab9abbd86a2c01556551f7a3662b2e2094 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Sun, 18 Sep 2022 12:45:53 -0400 Subject: [PATCH 1/3] move search_index object to HubServerService --- hub/herald/service.py | 19 ++++++++++++++++--- hub/herald/session.py | 12 +++--------- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/hub/herald/service.py b/hub/herald/service.py index 3a2a71c..e20be37 100644 --- a/hub/herald/service.py +++ b/hub/herald/service.py @@ -1,18 +1,25 @@ import time import typing import asyncio +from prometheus_client import Counter +from hub import PROMETHEUS_NAMESPACE from hub.scribe.daemon import LBCDaemon from hub.herald.session import SessionManager from hub.herald.mempool import HubMemPool from hub.herald.udp import StatusServer from hub.herald.db import HeraldDB +from hub.herald.search import SearchIndex from hub.service import BlockchainReaderService from hub.notifier_protocol import ElasticNotifierClientProtocol if typing.TYPE_CHECKING: from hub.herald.env import ServerEnv +NAMESPACE = f"{PROMETHEUS_NAMESPACE}_hub" + class HubServerService(BlockchainReaderService): + interrupt_count_metric = Counter("interrupt", "Number of interrupted queries", namespace=NAMESPACE) + def __init__(self, env: 'ServerEnv'): super().__init__(env, 'lbry-reader', thread_workers=max(1, env.max_query_workers), thread_prefix='hub-worker') self.env = env @@ -21,8 +28,14 @@ class HubServerService(BlockchainReaderService): self.status_server = StatusServer() self.daemon = LBCDaemon(env.coin, env.daemon_url, daemon_ca_path=env.daemon_ca_path) # only needed for broadcasting txs self.mempool = HubMemPool(self.env.coin, self.db) + self.search_index = SearchIndex( + self.db, self.env.es_index_prefix, self.env.database_query_timeout, + elastic_host=env.elastic_host, elastic_port=env.elastic_port, + timeout_counter=self.interrupt_count_metric + ) + self.session_manager = SessionManager( - env, self.db, self.mempool, self.daemon, + env, self.db, self.mempool, self.daemon, self.search_index, self.shutdown_event, on_available_callback=self.status_server.set_available, on_unavailable_callback=self.status_server.set_unavailable @@ -52,7 +65,7 @@ class HubServerService(BlockchainReaderService): # self.mempool.notified_mempool_txs.clear() def clear_search_cache(self): - self.session_manager.search_index.clear_caches() + self.search_index.clear_caches() def advance(self, height: int): super().advance(height) @@ -134,7 +147,7 @@ class HubServerService(BlockchainReaderService): self.block_count_metric.set(self.last_state.height) yield self.start_prometheus() yield self.start_cancellable(self.receive_es_notifications) - yield self.session_manager.search_index.start() + yield self.search_index.start() yield self.start_cancellable(self.session_manager.serve, self.mempool) def _iter_stop_tasks(self): diff --git a/hub/herald/session.py b/hub/herald/session.py index 40b5085..af29760 100644 --- a/hub/herald/session.py +++ b/hub/herald/session.py @@ -141,7 +141,6 @@ class SessionManager: tx_replied_count_metric = Counter("replied_transaction", "Number of transactions responded", namespace=NAMESPACE) urls_to_resolve_count_metric = Counter("urls_to_resolve", "Number of urls to resolve", namespace=NAMESPACE) resolved_url_count_metric = Counter("resolved_url", "Number of resolved urls", namespace=NAMESPACE) - interrupt_count_metric = Counter("interrupt", "Number of interrupted queries", namespace=NAMESPACE) db_operational_error_metric = Counter( "operational_error", "Number of queries that raised operational errors", namespace=NAMESPACE ) @@ -180,7 +179,7 @@ class SessionManager: ) def __init__(self, env: 'ServerEnv', db: 'SecondaryDB', mempool: 'HubMemPool', - daemon: 'LBCDaemon', shutdown_event: asyncio.Event, + daemon: 'LBCDaemon', search_index: 'SearchIndex', shutdown_event: asyncio.Event, on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]): env.max_send = max(350000, env.max_send) self.env = env @@ -189,6 +188,7 @@ class SessionManager: self.on_unavailable_callback = on_unavailable_callback self.daemon = daemon self.mempool = mempool + self.search_index = search_index self.shutdown_event = shutdown_event self.logger = logging.getLogger(__name__) self.servers: typing.Dict[str, asyncio.AbstractServer] = {} @@ -207,12 +207,6 @@ class SessionManager: self.protocol_class = LBRYElectrumX self.session_event = Event() - # Search index - self.search_index = SearchIndex( - self.db, self.env.es_index_prefix, self.env.database_query_timeout, - elastic_host=env.elastic_host, elastic_port=env.elastic_port, - timeout_counter=self.interrupt_count_metric - ) self.running = False # hashX: List[int] self.hashX_raw_history_cache = LFUCacheWithMetrics(env.hashX_history_cache_size, metric_name='raw_history', namespace=NAMESPACE) @@ -1270,7 +1264,7 @@ class LBRYElectrumX(asyncio.Protocol): kwargs['channel_id'] = channel_claim.claim_hash.hex() return await self.session_manager.search_index.cached_search(kwargs) except ConnectionTimeout: - self.session_manager.interrupt_count_metric.inc() + self.session_manager.search_index.timeout_counter.inc() raise RPCError(JSONRPC.QUERY_TIMEOUT, 'query timed out') except TooManyClaimSearchParametersError as err: await asyncio.sleep(2) -- 2.45.3 From a2a595163866ba7b2ec2c9995e24f82c94be84bc Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Sun, 18 Sep 2022 14:58:46 -0400 Subject: [PATCH 2/3] failover support for elastic-sync-notifier and elasticsearch deprecates herald options `elastic_host`, `elastic_port`, `elastic_notifier_host`, and `elastic_notifier_port` in favor of the single new `elastic_services` option --- hub/herald/env.py | 52 ++++++++++++++++++++++------------------ hub/herald/search.py | 13 +++++----- hub/herald/service.py | 46 +++++++++++++++++++++++++++++++---- hub/notifier_protocol.py | 46 +++++++++++++---------------------- 4 files changed, 94 insertions(+), 63 deletions(-) diff --git a/hub/herald/env.py b/hub/herald/env.py index 9090b02..52d16b6 100644 --- a/hub/herald/env.py +++ b/hub/herald/env.py @@ -1,29 +1,38 @@ import re +from collections import deque from hub.env import Env +ELASTIC_SERVICES_REGEX = re.compile("(([\d|\.]|[^,:\/])*:\d*\/([\d|\.]|[^,:\/])*:\d*,?)*") + + +def parse_es_services(elastic_services_arg: str): + match = ELASTIC_SERVICES_REGEX.match(elastic_services_arg) + if not match: + return [] + matching = match.group() + services = [item.split('/') for item in matching.split(',') if item] + return [ + ((es.split(':')[0], int(es.split(':')[1])), (notifier.split(':')[0], int(notifier.split(':')[1]))) + for (es, notifier) in services + ] + class ServerEnv(Env): def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, - daemon_url=None, host=None, elastic_host=None, elastic_port=None, es_index_prefix=None, + daemon_url=None, host=None, elastic_services=None, es_index_prefix=None, tcp_port=None, udp_port=None, banner_file=None, allow_lan_udp=None, country=None, payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None, session_timeout=None, drop_client=None, description=None, daily_fee=None, - database_query_timeout=None, elastic_notifier_host=None, elastic_notifier_port=None, - blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, peer_announce=None, - index_address_status=None, address_history_cache_size=None, daemon_ca_path=None, + database_query_timeout=None, blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, + peer_announce=None, index_address_status=None, address_history_cache_size=None, daemon_ca_path=None, merkle_cache_size=None, resolved_url_cache_size=None, tx_cache_size=None, history_tx_cache_size=None, largest_address_history_cache_size=None): super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes, cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status) self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL') self.host = host if host is not None else self.default('HOST', 'localhost') - self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost') - self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200) - self.elastic_notifier_host = elastic_notifier_host if elastic_notifier_host is not None else self.default( - 'ELASTIC_NOTIFIER_HOST', 'localhost') - self.elastic_notifier_port = elastic_notifier_port if elastic_notifier_port is not None else self.integer( - 'ELASTIC_NOTIFIER_PORT', 19080) + self.elastic_services = deque(parse_es_services(elastic_services or 'localhost:9200/localhost:19080')) self.es_index_prefix = es_index_prefix if es_index_prefix is not None else self.default('ES_INDEX_PREFIX', '') # Server stuff self.tcp_port = tcp_port if tcp_port is not None else self.integer('TCP_PORT', None) @@ -93,15 +102,13 @@ class ServerEnv(Env): help="Regex used for blocking clients") parser.add_argument('--session_timeout', type=int, default=cls.integer('SESSION_TIMEOUT', 600), help="Session inactivity timeout") - parser.add_argument('--elastic_host', default=cls.default('ELASTIC_HOST', 'localhost'), type=str, - help="Hostname or ip address of the elasticsearch instance to connect to. " - "Can be set in env with 'ELASTIC_HOST'") - parser.add_argument('--elastic_port', default=cls.integer('ELASTIC_PORT', 9200), type=int, - help="Elasticsearch port to connect to. Can be set in env with 'ELASTIC_PORT'") - parser.add_argument('--elastic_notifier_host', default=cls.default('ELASTIC_NOTIFIER_HOST', 'localhost'), - type=str, help='elasticsearch sync notifier host, defaults to localhost') - parser.add_argument('--elastic_notifier_port', default=cls.integer('ELASTIC_NOTIFIER_PORT', 19080), type=int, - help='elasticsearch sync notifier port') + parser.add_argument('--elastic_services', + default=cls.default('ELASTIC_SERVICES', 'localhost:9200/localhost:19080'), type=str, + help="Hosts and ports for elastic search and the scribe elastic sync notifier. " + "Given as a comma separated list without spaces of items in the format " + ":/: . " + "Defaults to 'localhost:9200/localhost:19080'. " + "Can be set in env with 'ELASTIC_SERVICES'") parser.add_argument('--es_index_prefix', default=cls.default('ES_INDEX_PREFIX', ''), type=str) parser.add_argument('--allow_lan_udp', action='store_true', help="Reply to clients on the local network", default=cls.boolean('ALLOW_LAN_UDP', False)) @@ -141,8 +148,8 @@ class ServerEnv(Env): @classmethod def from_arg_parser(cls, args): return cls( - db_dir=args.db_dir, daemon_url=args.daemon_url, host=args.host, elastic_host=args.elastic_host, - elastic_port=args.elastic_port, max_query_workers=args.max_query_workers, chain=args.chain, + db_dir=args.db_dir, daemon_url=args.daemon_url, host=args.host, elastic_services=args.elastic_services, + max_query_workers=args.max_query_workers, chain=args.chain, es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port, udp_port=args.udp_port, prometheus_port=args.prometheus_port, banner_file=args.banner_file, allow_lan_udp=args.allow_lan_udp, cache_all_tx_hashes=args.cache_all_tx_hashes, @@ -151,8 +158,7 @@ class ServerEnv(Env): max_sessions=args.max_sessions, session_timeout=args.session_timeout, drop_client=args.drop_client, description=args.description, daily_fee=args.daily_fee, database_query_timeout=args.query_timeout_ms, blocking_channel_ids=args.blocking_channel_ids, - filtering_channel_ids=args.filtering_channel_ids, elastic_notifier_host=args.elastic_notifier_host, - elastic_notifier_port=args.elastic_notifier_port, index_address_status=args.index_address_statuses, + filtering_channel_ids=args.filtering_channel_ids, index_address_status=args.index_address_statuses, address_history_cache_size=args.address_history_cache_size, daemon_ca_path=args.daemon_ca_path, merkle_cache_size=args.merkle_cache_size, resolved_url_cache_size=args.resolved_url_cache_size, tx_cache_size=args.tx_cache_size, history_tx_cache_size=args.history_tx_cache_size, diff --git a/hub/herald/search.py b/hub/herald/search.py index 7a0e10b..b19d46b 100644 --- a/hub/herald/search.py +++ b/hub/herald/search.py @@ -3,7 +3,7 @@ import asyncio from bisect import bisect_right from collections import Counter, deque from operator import itemgetter -from typing import Optional, List, TYPE_CHECKING +from typing import Optional, List, TYPE_CHECKING, Deque, Tuple from elasticsearch import AsyncElasticsearch, NotFoundError, ConnectionError from hub.schema.result import Censor, Outputs @@ -29,8 +29,9 @@ class StreamResolution(str): class SearchIndex: VERSION = 1 - def __init__(self, hub_db: 'SecondaryDB', index_prefix: str, search_timeout=3.0, elastic_host='localhost', - elastic_port=9200, timeout_counter: Optional['PrometheusCounter'] = None): + def __init__(self, hub_db: 'SecondaryDB', index_prefix: str, search_timeout=3.0, + elastic_services: Optional[Deque[Tuple[Tuple[str, int], Tuple[str, int]]]] = None, + timeout_counter: Optional['PrometheusCounter'] = None): self.hub_db = hub_db self.search_timeout = search_timeout self.timeout_counter: Optional['PrometheusCounter'] = timeout_counter @@ -41,8 +42,8 @@ class SearchIndex: self.logger = logging.getLogger(__name__) self.claim_cache = LRUCache(2 ** 15) self.search_cache = LRUCache(2 ** 17) - self._elastic_host = elastic_host - self._elastic_port = elastic_port + self._elastic_services = elastic_services + self.lost_connection = asyncio.Event() async def get_index_version(self) -> int: try: @@ -59,7 +60,7 @@ class SearchIndex: async def start(self) -> bool: if self.sync_client: return False - hosts = [{'host': self._elastic_host, 'port': self._elastic_port}] + hosts = [{'host': self._elastic_services[0][0][0], 'port': self._elastic_services[0][0][1]}] self.sync_client = AsyncElasticsearch(hosts, timeout=self.sync_timeout) self.search_client = AsyncElasticsearch(hosts, timeout=self.search_timeout+1) while True: diff --git a/hub/herald/service.py b/hub/herald/service.py index e20be37..9aa53e8 100644 --- a/hub/herald/service.py +++ b/hub/herald/service.py @@ -28,9 +28,10 @@ class HubServerService(BlockchainReaderService): self.status_server = StatusServer() self.daemon = LBCDaemon(env.coin, env.daemon_url, daemon_ca_path=env.daemon_ca_path) # only needed for broadcasting txs self.mempool = HubMemPool(self.env.coin, self.db) + self.search_index = SearchIndex( self.db, self.env.es_index_prefix, self.env.database_query_timeout, - elastic_host=env.elastic_host, elastic_port=env.elastic_port, + elastic_services=self.env.elastic_services, timeout_counter=self.interrupt_count_metric ) @@ -43,7 +44,7 @@ class HubServerService(BlockchainReaderService): self.mempool.session_manager = self.session_manager self.es_notifications = asyncio.Queue() self.es_notification_client = ElasticNotifierClientProtocol( - self.es_notifications, self.env.elastic_notifier_host, self.env.elastic_notifier_port + self.es_notifications, self.env.elastic_services ) self.synchronized = asyncio.Event() self._es_height = None @@ -129,8 +130,44 @@ class HubServerService(BlockchainReaderService): self.log.info("es and reader are not yet in sync (block %s vs %s)", self._es_height, self.db.db_height) finally: + self.log.warning("closing es sync notification loop at %s", self._es_height) self.es_notification_client.close() + async def failover_elastic_services(self, synchronized: asyncio.Event): + first_connect = True + if not self.es_notification_client.lost_connection.is_set(): + synchronized.set() + + while True: + try: + await self.es_notification_client.lost_connection.wait() + if not first_connect: + self.log.warning("lost connection to scribe-elastic-sync notifier (%s:%i)", + self.es_notification_client.host, self.es_notification_client.port) + await self.es_notification_client.connect() + first_connect = False + synchronized.set() + self.log.info("connected to es notifier on %s:%i", self.es_notification_client.host, + self.es_notification_client.port) + await self.search_index.start() + except Exception as e: + if not isinstance(e, asyncio.CancelledError): + self.log.warning("lost connection to scribe-elastic-sync notifier") + await self.search_index.stop() + self.search_index.clear_caches() + if len(self.env.elastic_services) > 1: + self.env.elastic_services.rotate(-1) + self.log.warning("attempting to failover to %s:%i", self.es_notification_client.host, + self.es_notification_client.port) + await asyncio.sleep(1) + else: + self.log.warning("waiting 30s for scribe-elastic-sync notifier to become available (%s:%i)", + self.es_notification_client.host, self.es_notification_client.port) + await asyncio.sleep(30) + else: + self.log.info("stopping the notifier loop") + raise e + async def start_status_server(self): if self.env.udp_port and int(self.env.udp_port): await self.status_server.start( @@ -140,14 +177,13 @@ class HubServerService(BlockchainReaderService): def _iter_start_tasks(self): yield self.start_status_server() - yield self.start_cancellable(self.es_notification_client.maintain_connection) + yield self.start_cancellable(self.receive_es_notifications) + yield self.start_cancellable(self.failover_elastic_services) yield self.start_cancellable(self.mempool.send_notifications_forever) yield self.start_cancellable(self.refresh_blocks_forever) yield self.finished_initial_catch_up.wait() self.block_count_metric.set(self.last_state.height) yield self.start_prometheus() - yield self.start_cancellable(self.receive_es_notifications) - yield self.search_index.start() yield self.start_cancellable(self.session_manager.serve, self.mempool) def _iter_stop_tasks(self): diff --git a/hub/notifier_protocol.py b/hub/notifier_protocol.py index adc2f67..13f0ad1 100644 --- a/hub/notifier_protocol.py +++ b/hub/notifier_protocol.py @@ -2,6 +2,7 @@ import typing import struct import asyncio import logging +from typing import Deque, Tuple log = logging.getLogger(__name__) @@ -31,52 +32,39 @@ class ElasticNotifierProtocol(asyncio.Protocol): class ElasticNotifierClientProtocol(asyncio.Protocol): """notifies the reader when ES has written updates""" - def __init__(self, notifications: asyncio.Queue, host: str, port: int): + def __init__(self, notifications: asyncio.Queue, notifier_hosts: Deque[Tuple[Tuple[str, int], Tuple[str, int]]]): + assert len(notifier_hosts) > 0, 'no elastic notifier clients given' self.notifications = notifications self.transport: typing.Optional[asyncio.Transport] = None - self.host = host - self.port = port - self._lost_connection = asyncio.Event() - self._lost_connection.set() + self._notifier_hosts = notifier_hosts + self.lost_connection = asyncio.Event() + self.lost_connection.set() + + @property + def host(self): + return self._notifier_hosts[0][1][0] + + @property + def port(self): + return self._notifier_hosts[0][1][1] async def connect(self): - if self._lost_connection.is_set(): + if self.lost_connection.is_set(): await asyncio.get_event_loop().create_connection( lambda: self, self.host, self.port ) - async def maintain_connection(self, synchronized: asyncio.Event): - first_connect = True - if not self._lost_connection.is_set(): - synchronized.set() - while True: - try: - await self._lost_connection.wait() - if not first_connect: - log.warning("lost connection to scribe-elastic-sync notifier") - await self.connect() - first_connect = False - synchronized.set() - log.info("connected to es notifier") - except Exception as e: - if not isinstance(e, asyncio.CancelledError): - log.warning("waiting 30s for scribe-elastic-sync notifier to become available (%s:%i)", self.host, self.port) - await asyncio.sleep(30) - else: - log.info("stopping the notifier loop") - raise e - def close(self): if self.transport and not self.transport.is_closing(): self.transport.close() def connection_made(self, transport): self.transport = transport - self._lost_connection.clear() + self.lost_connection.clear() def connection_lost(self, exc) -> None: self.transport = None - self._lost_connection.set() + self.lost_connection.set() def data_received(self, data: bytes) -> None: try: -- 2.45.3 From c76b7675b98c441b3919208b53cbaf2eaf7b2df5 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 20 Sep 2022 15:58:12 -0400 Subject: [PATCH 3/3] handle es_info being an empty file --- hub/elastic_sync/service.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/hub/elastic_sync/service.py b/hub/elastic_sync/service.py index ee246be..fc426da 100644 --- a/hub/elastic_sync/service.py +++ b/hub/elastic_sync/service.py @@ -73,7 +73,10 @@ class ElasticSyncService(BlockchainReaderService): info = {} if os.path.exists(self._es_info_path): with open(self._es_info_path, 'r') as f: - info.update(json.loads(f.read())) + try: + info.update(json.loads(f.read())) + except json.decoder.JSONDecodeError: + self.log.warning('failed to parse es sync status file') self._last_wrote_height = int(info.get('height', 0)) self._last_wrote_block_hash = info.get('block_hash', None) -- 2.45.3