diff --git a/hub/herald/env.py b/hub/herald/env.py index 9090b02..52d16b6 100644 --- a/hub/herald/env.py +++ b/hub/herald/env.py @@ -1,29 +1,38 @@ import re +from collections import deque from hub.env import Env +ELASTIC_SERVICES_REGEX = re.compile("(([\d|\.]|[^,:\/])*:\d*\/([\d|\.]|[^,:\/])*:\d*,?)*") + + +def parse_es_services(elastic_services_arg: str): + match = ELASTIC_SERVICES_REGEX.match(elastic_services_arg) + if not match: + return [] + matching = match.group() + services = [item.split('/') for item in matching.split(',') if item] + return [ + ((es.split(':')[0], int(es.split(':')[1])), (notifier.split(':')[0], int(notifier.split(':')[1]))) + for (es, notifier) in services + ] + class ServerEnv(Env): def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, - daemon_url=None, host=None, elastic_host=None, elastic_port=None, es_index_prefix=None, + daemon_url=None, host=None, elastic_services=None, es_index_prefix=None, tcp_port=None, udp_port=None, banner_file=None, allow_lan_udp=None, country=None, payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None, session_timeout=None, drop_client=None, description=None, daily_fee=None, - database_query_timeout=None, elastic_notifier_host=None, elastic_notifier_port=None, - blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, peer_announce=None, - index_address_status=None, address_history_cache_size=None, daemon_ca_path=None, + database_query_timeout=None, blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, + peer_announce=None, index_address_status=None, address_history_cache_size=None, daemon_ca_path=None, merkle_cache_size=None, resolved_url_cache_size=None, tx_cache_size=None, history_tx_cache_size=None, largest_address_history_cache_size=None): super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes, cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status) self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL') self.host = host if host is not None else self.default('HOST', 'localhost') - self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost') - self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200) - self.elastic_notifier_host = elastic_notifier_host if elastic_notifier_host is not None else self.default( - 'ELASTIC_NOTIFIER_HOST', 'localhost') - self.elastic_notifier_port = elastic_notifier_port if elastic_notifier_port is not None else self.integer( - 'ELASTIC_NOTIFIER_PORT', 19080) + self.elastic_services = deque(parse_es_services(elastic_services or 'localhost:9200/localhost:19080')) self.es_index_prefix = es_index_prefix if es_index_prefix is not None else self.default('ES_INDEX_PREFIX', '') # Server stuff self.tcp_port = tcp_port if tcp_port is not None else self.integer('TCP_PORT', None) @@ -93,15 +102,13 @@ class ServerEnv(Env): help="Regex used for blocking clients") parser.add_argument('--session_timeout', type=int, default=cls.integer('SESSION_TIMEOUT', 600), help="Session inactivity timeout") - parser.add_argument('--elastic_host', default=cls.default('ELASTIC_HOST', 'localhost'), type=str, - help="Hostname or ip address of the elasticsearch instance to connect to. " - "Can be set in env with 'ELASTIC_HOST'") - parser.add_argument('--elastic_port', default=cls.integer('ELASTIC_PORT', 9200), type=int, - help="Elasticsearch port to connect to. Can be set in env with 'ELASTIC_PORT'") - parser.add_argument('--elastic_notifier_host', default=cls.default('ELASTIC_NOTIFIER_HOST', 'localhost'), - type=str, help='elasticsearch sync notifier host, defaults to localhost') - parser.add_argument('--elastic_notifier_port', default=cls.integer('ELASTIC_NOTIFIER_PORT', 19080), type=int, - help='elasticsearch sync notifier port') + parser.add_argument('--elastic_services', + default=cls.default('ELASTIC_SERVICES', 'localhost:9200/localhost:19080'), type=str, + help="Hosts and ports for elastic search and the scribe elastic sync notifier. " + "Given as a comma separated list without spaces of items in the format " + ":/: . " + "Defaults to 'localhost:9200/localhost:19080'. " + "Can be set in env with 'ELASTIC_SERVICES'") parser.add_argument('--es_index_prefix', default=cls.default('ES_INDEX_PREFIX', ''), type=str) parser.add_argument('--allow_lan_udp', action='store_true', help="Reply to clients on the local network", default=cls.boolean('ALLOW_LAN_UDP', False)) @@ -141,8 +148,8 @@ class ServerEnv(Env): @classmethod def from_arg_parser(cls, args): return cls( - db_dir=args.db_dir, daemon_url=args.daemon_url, host=args.host, elastic_host=args.elastic_host, - elastic_port=args.elastic_port, max_query_workers=args.max_query_workers, chain=args.chain, + db_dir=args.db_dir, daemon_url=args.daemon_url, host=args.host, elastic_services=args.elastic_services, + max_query_workers=args.max_query_workers, chain=args.chain, es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port, udp_port=args.udp_port, prometheus_port=args.prometheus_port, banner_file=args.banner_file, allow_lan_udp=args.allow_lan_udp, cache_all_tx_hashes=args.cache_all_tx_hashes, @@ -151,8 +158,7 @@ class ServerEnv(Env): max_sessions=args.max_sessions, session_timeout=args.session_timeout, drop_client=args.drop_client, description=args.description, daily_fee=args.daily_fee, database_query_timeout=args.query_timeout_ms, blocking_channel_ids=args.blocking_channel_ids, - filtering_channel_ids=args.filtering_channel_ids, elastic_notifier_host=args.elastic_notifier_host, - elastic_notifier_port=args.elastic_notifier_port, index_address_status=args.index_address_statuses, + filtering_channel_ids=args.filtering_channel_ids, index_address_status=args.index_address_statuses, address_history_cache_size=args.address_history_cache_size, daemon_ca_path=args.daemon_ca_path, merkle_cache_size=args.merkle_cache_size, resolved_url_cache_size=args.resolved_url_cache_size, tx_cache_size=args.tx_cache_size, history_tx_cache_size=args.history_tx_cache_size, diff --git a/hub/herald/search.py b/hub/herald/search.py index 7a0e10b..b19d46b 100644 --- a/hub/herald/search.py +++ b/hub/herald/search.py @@ -3,7 +3,7 @@ import asyncio from bisect import bisect_right from collections import Counter, deque from operator import itemgetter -from typing import Optional, List, TYPE_CHECKING +from typing import Optional, List, TYPE_CHECKING, Deque, Tuple from elasticsearch import AsyncElasticsearch, NotFoundError, ConnectionError from hub.schema.result import Censor, Outputs @@ -29,8 +29,9 @@ class StreamResolution(str): class SearchIndex: VERSION = 1 - def __init__(self, hub_db: 'SecondaryDB', index_prefix: str, search_timeout=3.0, elastic_host='localhost', - elastic_port=9200, timeout_counter: Optional['PrometheusCounter'] = None): + def __init__(self, hub_db: 'SecondaryDB', index_prefix: str, search_timeout=3.0, + elastic_services: Optional[Deque[Tuple[Tuple[str, int], Tuple[str, int]]]] = None, + timeout_counter: Optional['PrometheusCounter'] = None): self.hub_db = hub_db self.search_timeout = search_timeout self.timeout_counter: Optional['PrometheusCounter'] = timeout_counter @@ -41,8 +42,8 @@ class SearchIndex: self.logger = logging.getLogger(__name__) self.claim_cache = LRUCache(2 ** 15) self.search_cache = LRUCache(2 ** 17) - self._elastic_host = elastic_host - self._elastic_port = elastic_port + self._elastic_services = elastic_services + self.lost_connection = asyncio.Event() async def get_index_version(self) -> int: try: @@ -59,7 +60,7 @@ class SearchIndex: async def start(self) -> bool: if self.sync_client: return False - hosts = [{'host': self._elastic_host, 'port': self._elastic_port}] + hosts = [{'host': self._elastic_services[0][0][0], 'port': self._elastic_services[0][0][1]}] self.sync_client = AsyncElasticsearch(hosts, timeout=self.sync_timeout) self.search_client = AsyncElasticsearch(hosts, timeout=self.search_timeout+1) while True: diff --git a/hub/herald/service.py b/hub/herald/service.py index e20be37..9aa53e8 100644 --- a/hub/herald/service.py +++ b/hub/herald/service.py @@ -28,9 +28,10 @@ class HubServerService(BlockchainReaderService): self.status_server = StatusServer() self.daemon = LBCDaemon(env.coin, env.daemon_url, daemon_ca_path=env.daemon_ca_path) # only needed for broadcasting txs self.mempool = HubMemPool(self.env.coin, self.db) + self.search_index = SearchIndex( self.db, self.env.es_index_prefix, self.env.database_query_timeout, - elastic_host=env.elastic_host, elastic_port=env.elastic_port, + elastic_services=self.env.elastic_services, timeout_counter=self.interrupt_count_metric ) @@ -43,7 +44,7 @@ class HubServerService(BlockchainReaderService): self.mempool.session_manager = self.session_manager self.es_notifications = asyncio.Queue() self.es_notification_client = ElasticNotifierClientProtocol( - self.es_notifications, self.env.elastic_notifier_host, self.env.elastic_notifier_port + self.es_notifications, self.env.elastic_services ) self.synchronized = asyncio.Event() self._es_height = None @@ -129,8 +130,44 @@ class HubServerService(BlockchainReaderService): self.log.info("es and reader are not yet in sync (block %s vs %s)", self._es_height, self.db.db_height) finally: + self.log.warning("closing es sync notification loop at %s", self._es_height) self.es_notification_client.close() + async def failover_elastic_services(self, synchronized: asyncio.Event): + first_connect = True + if not self.es_notification_client.lost_connection.is_set(): + synchronized.set() + + while True: + try: + await self.es_notification_client.lost_connection.wait() + if not first_connect: + self.log.warning("lost connection to scribe-elastic-sync notifier (%s:%i)", + self.es_notification_client.host, self.es_notification_client.port) + await self.es_notification_client.connect() + first_connect = False + synchronized.set() + self.log.info("connected to es notifier on %s:%i", self.es_notification_client.host, + self.es_notification_client.port) + await self.search_index.start() + except Exception as e: + if not isinstance(e, asyncio.CancelledError): + self.log.warning("lost connection to scribe-elastic-sync notifier") + await self.search_index.stop() + self.search_index.clear_caches() + if len(self.env.elastic_services) > 1: + self.env.elastic_services.rotate(-1) + self.log.warning("attempting to failover to %s:%i", self.es_notification_client.host, + self.es_notification_client.port) + await asyncio.sleep(1) + else: + self.log.warning("waiting 30s for scribe-elastic-sync notifier to become available (%s:%i)", + self.es_notification_client.host, self.es_notification_client.port) + await asyncio.sleep(30) + else: + self.log.info("stopping the notifier loop") + raise e + async def start_status_server(self): if self.env.udp_port and int(self.env.udp_port): await self.status_server.start( @@ -140,14 +177,13 @@ class HubServerService(BlockchainReaderService): def _iter_start_tasks(self): yield self.start_status_server() - yield self.start_cancellable(self.es_notification_client.maintain_connection) + yield self.start_cancellable(self.receive_es_notifications) + yield self.start_cancellable(self.failover_elastic_services) yield self.start_cancellable(self.mempool.send_notifications_forever) yield self.start_cancellable(self.refresh_blocks_forever) yield self.finished_initial_catch_up.wait() self.block_count_metric.set(self.last_state.height) yield self.start_prometheus() - yield self.start_cancellable(self.receive_es_notifications) - yield self.search_index.start() yield self.start_cancellable(self.session_manager.serve, self.mempool) def _iter_stop_tasks(self): diff --git a/hub/notifier_protocol.py b/hub/notifier_protocol.py index adc2f67..13f0ad1 100644 --- a/hub/notifier_protocol.py +++ b/hub/notifier_protocol.py @@ -2,6 +2,7 @@ import typing import struct import asyncio import logging +from typing import Deque, Tuple log = logging.getLogger(__name__) @@ -31,52 +32,39 @@ class ElasticNotifierProtocol(asyncio.Protocol): class ElasticNotifierClientProtocol(asyncio.Protocol): """notifies the reader when ES has written updates""" - def __init__(self, notifications: asyncio.Queue, host: str, port: int): + def __init__(self, notifications: asyncio.Queue, notifier_hosts: Deque[Tuple[Tuple[str, int], Tuple[str, int]]]): + assert len(notifier_hosts) > 0, 'no elastic notifier clients given' self.notifications = notifications self.transport: typing.Optional[asyncio.Transport] = None - self.host = host - self.port = port - self._lost_connection = asyncio.Event() - self._lost_connection.set() + self._notifier_hosts = notifier_hosts + self.lost_connection = asyncio.Event() + self.lost_connection.set() + + @property + def host(self): + return self._notifier_hosts[0][1][0] + + @property + def port(self): + return self._notifier_hosts[0][1][1] async def connect(self): - if self._lost_connection.is_set(): + if self.lost_connection.is_set(): await asyncio.get_event_loop().create_connection( lambda: self, self.host, self.port ) - async def maintain_connection(self, synchronized: asyncio.Event): - first_connect = True - if not self._lost_connection.is_set(): - synchronized.set() - while True: - try: - await self._lost_connection.wait() - if not first_connect: - log.warning("lost connection to scribe-elastic-sync notifier") - await self.connect() - first_connect = False - synchronized.set() - log.info("connected to es notifier") - except Exception as e: - if not isinstance(e, asyncio.CancelledError): - log.warning("waiting 30s for scribe-elastic-sync notifier to become available (%s:%i)", self.host, self.port) - await asyncio.sleep(30) - else: - log.info("stopping the notifier loop") - raise e - def close(self): if self.transport and not self.transport.is_closing(): self.transport.close() def connection_made(self, transport): self.transport = transport - self._lost_connection.clear() + self.lost_connection.clear() def connection_lost(self, exc) -> None: self.transport = None - self._lost_connection.set() + self.lost_connection.set() def data_received(self, data: bytes) -> None: try: