failover support for elastic-sync-notifier and elasticsearch

deprecates herald options `elastic_host`, `elastic_port`, `elastic_notifier_host`, and `elastic_notifier_port` in favor of the single new `elastic_services` option
This commit is contained in:
Jack Robison 2022-09-18 14:58:46 -04:00
parent 4586b344ce
commit 04d747ff99
4 changed files with 94 additions and 63 deletions

View file

@ -1,29 +1,38 @@
import re import re
from collections import deque
from hub.env import Env from hub.env import Env
ELASTIC_SERVICES_REGEX = re.compile("(([\d|\.]|[^,:\/])*:\d*\/([\d|\.]|[^,:\/])*:\d*,?)*")
def parse_es_services(elastic_services_arg: str):
match = ELASTIC_SERVICES_REGEX.match(elastic_services_arg)
if not match:
return []
matching = match.group()
services = [item.split('/') for item in matching.split(',') if item]
return [
((es.split(':')[0], int(es.split(':')[1])), (notifier.split(':')[0], int(notifier.split(':')[1])))
for (es, notifier) in services
]
class ServerEnv(Env): class ServerEnv(Env):
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
daemon_url=None, host=None, elastic_host=None, elastic_port=None, es_index_prefix=None, daemon_url=None, host=None, elastic_services=None, es_index_prefix=None,
tcp_port=None, udp_port=None, banner_file=None, allow_lan_udp=None, country=None, tcp_port=None, udp_port=None, banner_file=None, allow_lan_udp=None, country=None,
payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None, payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None,
session_timeout=None, drop_client=None, description=None, daily_fee=None, session_timeout=None, drop_client=None, description=None, daily_fee=None,
database_query_timeout=None, elastic_notifier_host=None, elastic_notifier_port=None, database_query_timeout=None, blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None,
blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, peer_announce=None, peer_announce=None, index_address_status=None, address_history_cache_size=None, daemon_ca_path=None,
index_address_status=None, address_history_cache_size=None, daemon_ca_path=None,
merkle_cache_size=None, resolved_url_cache_size=None, tx_cache_size=None, merkle_cache_size=None, resolved_url_cache_size=None, tx_cache_size=None,
history_tx_cache_size=None, largest_address_history_cache_size=None): history_tx_cache_size=None, largest_address_history_cache_size=None):
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes, super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status) cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status)
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL') self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
self.host = host if host is not None else self.default('HOST', 'localhost') self.host = host if host is not None else self.default('HOST', 'localhost')
self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost') self.elastic_services = deque(parse_es_services(elastic_services or 'localhost:9200/localhost:19080'))
self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200)
self.elastic_notifier_host = elastic_notifier_host if elastic_notifier_host is not None else self.default(
'ELASTIC_NOTIFIER_HOST', 'localhost')
self.elastic_notifier_port = elastic_notifier_port if elastic_notifier_port is not None else self.integer(
'ELASTIC_NOTIFIER_PORT', 19080)
self.es_index_prefix = es_index_prefix if es_index_prefix is not None else self.default('ES_INDEX_PREFIX', '') self.es_index_prefix = es_index_prefix if es_index_prefix is not None else self.default('ES_INDEX_PREFIX', '')
# Server stuff # Server stuff
self.tcp_port = tcp_port if tcp_port is not None else self.integer('TCP_PORT', None) self.tcp_port = tcp_port if tcp_port is not None else self.integer('TCP_PORT', None)
@ -93,15 +102,13 @@ class ServerEnv(Env):
help="Regex used for blocking clients") help="Regex used for blocking clients")
parser.add_argument('--session_timeout', type=int, default=cls.integer('SESSION_TIMEOUT', 600), parser.add_argument('--session_timeout', type=int, default=cls.integer('SESSION_TIMEOUT', 600),
help="Session inactivity timeout") help="Session inactivity timeout")
parser.add_argument('--elastic_host', default=cls.default('ELASTIC_HOST', 'localhost'), type=str, parser.add_argument('--elastic_services',
help="Hostname or ip address of the elasticsearch instance to connect to. " default=cls.default('ELASTIC_SERVICES', 'localhost:9200/localhost:19080'), type=str,
"Can be set in env with 'ELASTIC_HOST'") help="Hosts and ports for elastic search and the scribe elastic sync notifier. "
parser.add_argument('--elastic_port', default=cls.integer('ELASTIC_PORT', 9200), type=int, "Given as a comma separated list without spaces of items in the format "
help="Elasticsearch port to connect to. Can be set in env with 'ELASTIC_PORT'") "<elastic host>:<elastic port>/<notifier host>:<notifier port> . "
parser.add_argument('--elastic_notifier_host', default=cls.default('ELASTIC_NOTIFIER_HOST', 'localhost'), "Defaults to 'localhost:9200/localhost:19080'. "
type=str, help='elasticsearch sync notifier host, defaults to localhost') "Can be set in env with 'ELASTIC_SERVICES'")
parser.add_argument('--elastic_notifier_port', default=cls.integer('ELASTIC_NOTIFIER_PORT', 19080), type=int,
help='elasticsearch sync notifier port')
parser.add_argument('--es_index_prefix', default=cls.default('ES_INDEX_PREFIX', ''), type=str) parser.add_argument('--es_index_prefix', default=cls.default('ES_INDEX_PREFIX', ''), type=str)
parser.add_argument('--allow_lan_udp', action='store_true', parser.add_argument('--allow_lan_udp', action='store_true',
help="Reply to clients on the local network", default=cls.boolean('ALLOW_LAN_UDP', False)) help="Reply to clients on the local network", default=cls.boolean('ALLOW_LAN_UDP', False))
@ -141,8 +148,8 @@ class ServerEnv(Env):
@classmethod @classmethod
def from_arg_parser(cls, args): def from_arg_parser(cls, args):
return cls( return cls(
db_dir=args.db_dir, daemon_url=args.daemon_url, host=args.host, elastic_host=args.elastic_host, db_dir=args.db_dir, daemon_url=args.daemon_url, host=args.host, elastic_services=args.elastic_services,
elastic_port=args.elastic_port, max_query_workers=args.max_query_workers, chain=args.chain, max_query_workers=args.max_query_workers, chain=args.chain,
es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port, es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port,
udp_port=args.udp_port, prometheus_port=args.prometheus_port, banner_file=args.banner_file, udp_port=args.udp_port, prometheus_port=args.prometheus_port, banner_file=args.banner_file,
allow_lan_udp=args.allow_lan_udp, cache_all_tx_hashes=args.cache_all_tx_hashes, allow_lan_udp=args.allow_lan_udp, cache_all_tx_hashes=args.cache_all_tx_hashes,
@ -151,8 +158,7 @@ class ServerEnv(Env):
max_sessions=args.max_sessions, session_timeout=args.session_timeout, max_sessions=args.max_sessions, session_timeout=args.session_timeout,
drop_client=args.drop_client, description=args.description, daily_fee=args.daily_fee, drop_client=args.drop_client, description=args.description, daily_fee=args.daily_fee,
database_query_timeout=args.query_timeout_ms, blocking_channel_ids=args.blocking_channel_ids, database_query_timeout=args.query_timeout_ms, blocking_channel_ids=args.blocking_channel_ids,
filtering_channel_ids=args.filtering_channel_ids, elastic_notifier_host=args.elastic_notifier_host, filtering_channel_ids=args.filtering_channel_ids, index_address_status=args.index_address_statuses,
elastic_notifier_port=args.elastic_notifier_port, index_address_status=args.index_address_statuses,
address_history_cache_size=args.address_history_cache_size, daemon_ca_path=args.daemon_ca_path, address_history_cache_size=args.address_history_cache_size, daemon_ca_path=args.daemon_ca_path,
merkle_cache_size=args.merkle_cache_size, resolved_url_cache_size=args.resolved_url_cache_size, merkle_cache_size=args.merkle_cache_size, resolved_url_cache_size=args.resolved_url_cache_size,
tx_cache_size=args.tx_cache_size, history_tx_cache_size=args.history_tx_cache_size, tx_cache_size=args.tx_cache_size, history_tx_cache_size=args.history_tx_cache_size,

View file

@ -3,7 +3,7 @@ import asyncio
from bisect import bisect_right from bisect import bisect_right
from collections import Counter, deque from collections import Counter, deque
from operator import itemgetter from operator import itemgetter
from typing import Optional, List, TYPE_CHECKING from typing import Optional, List, TYPE_CHECKING, Deque, Tuple
from elasticsearch import AsyncElasticsearch, NotFoundError, ConnectionError from elasticsearch import AsyncElasticsearch, NotFoundError, ConnectionError
from hub.schema.result import Censor, Outputs from hub.schema.result import Censor, Outputs
@ -29,8 +29,9 @@ class StreamResolution(str):
class SearchIndex: class SearchIndex:
VERSION = 1 VERSION = 1
def __init__(self, hub_db: 'SecondaryDB', index_prefix: str, search_timeout=3.0, elastic_host='localhost', def __init__(self, hub_db: 'SecondaryDB', index_prefix: str, search_timeout=3.0,
elastic_port=9200, timeout_counter: Optional['PrometheusCounter'] = None): elastic_services: Optional[Deque[Tuple[Tuple[str, int], Tuple[str, int]]]] = None,
timeout_counter: Optional['PrometheusCounter'] = None):
self.hub_db = hub_db self.hub_db = hub_db
self.search_timeout = search_timeout self.search_timeout = search_timeout
self.timeout_counter: Optional['PrometheusCounter'] = timeout_counter self.timeout_counter: Optional['PrometheusCounter'] = timeout_counter
@ -41,8 +42,8 @@ class SearchIndex:
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.claim_cache = LRUCache(2 ** 15) self.claim_cache = LRUCache(2 ** 15)
self.search_cache = LRUCache(2 ** 17) self.search_cache = LRUCache(2 ** 17)
self._elastic_host = elastic_host self._elastic_services = elastic_services
self._elastic_port = elastic_port self.lost_connection = asyncio.Event()
async def get_index_version(self) -> int: async def get_index_version(self) -> int:
try: try:
@ -59,7 +60,7 @@ class SearchIndex:
async def start(self) -> bool: async def start(self) -> bool:
if self.sync_client: if self.sync_client:
return False return False
hosts = [{'host': self._elastic_host, 'port': self._elastic_port}] hosts = [{'host': self._elastic_services[0][0][0], 'port': self._elastic_services[0][0][1]}]
self.sync_client = AsyncElasticsearch(hosts, timeout=self.sync_timeout) self.sync_client = AsyncElasticsearch(hosts, timeout=self.sync_timeout)
self.search_client = AsyncElasticsearch(hosts, timeout=self.search_timeout+1) self.search_client = AsyncElasticsearch(hosts, timeout=self.search_timeout+1)
while True: while True:

View file

@ -28,9 +28,10 @@ class HubServerService(BlockchainReaderService):
self.status_server = StatusServer() self.status_server = StatusServer()
self.daemon = LBCDaemon(env.coin, env.daemon_url, daemon_ca_path=env.daemon_ca_path) # only needed for broadcasting txs self.daemon = LBCDaemon(env.coin, env.daemon_url, daemon_ca_path=env.daemon_ca_path) # only needed for broadcasting txs
self.mempool = HubMemPool(self.env.coin, self.db) self.mempool = HubMemPool(self.env.coin, self.db)
self.search_index = SearchIndex( self.search_index = SearchIndex(
self.db, self.env.es_index_prefix, self.env.database_query_timeout, self.db, self.env.es_index_prefix, self.env.database_query_timeout,
elastic_host=env.elastic_host, elastic_port=env.elastic_port, elastic_services=self.env.elastic_services,
timeout_counter=self.interrupt_count_metric timeout_counter=self.interrupt_count_metric
) )
@ -43,7 +44,7 @@ class HubServerService(BlockchainReaderService):
self.mempool.session_manager = self.session_manager self.mempool.session_manager = self.session_manager
self.es_notifications = asyncio.Queue() self.es_notifications = asyncio.Queue()
self.es_notification_client = ElasticNotifierClientProtocol( self.es_notification_client = ElasticNotifierClientProtocol(
self.es_notifications, self.env.elastic_notifier_host, self.env.elastic_notifier_port self.es_notifications, self.env.elastic_services
) )
self.synchronized = asyncio.Event() self.synchronized = asyncio.Event()
self._es_height = None self._es_height = None
@ -129,8 +130,44 @@ class HubServerService(BlockchainReaderService):
self.log.info("es and reader are not yet in sync (block %s vs %s)", self._es_height, self.log.info("es and reader are not yet in sync (block %s vs %s)", self._es_height,
self.db.db_height) self.db.db_height)
finally: finally:
self.log.warning("closing es sync notification loop at %s", self._es_height)
self.es_notification_client.close() self.es_notification_client.close()
async def failover_elastic_services(self, synchronized: asyncio.Event):
first_connect = True
if not self.es_notification_client.lost_connection.is_set():
synchronized.set()
while True:
try:
await self.es_notification_client.lost_connection.wait()
if not first_connect:
self.log.warning("lost connection to scribe-elastic-sync notifier (%s:%i)",
self.es_notification_client.host, self.es_notification_client.port)
await self.es_notification_client.connect()
first_connect = False
synchronized.set()
self.log.info("connected to es notifier on %s:%i", self.es_notification_client.host,
self.es_notification_client.port)
await self.search_index.start()
except Exception as e:
if not isinstance(e, asyncio.CancelledError):
self.log.warning("lost connection to scribe-elastic-sync notifier")
await self.search_index.stop()
self.search_index.clear_caches()
if len(self.env.elastic_services) > 1:
self.env.elastic_services.rotate(-1)
self.log.warning("attempting to failover to %s:%i", self.es_notification_client.host,
self.es_notification_client.port)
await asyncio.sleep(1)
else:
self.log.warning("waiting 30s for scribe-elastic-sync notifier to become available (%s:%i)",
self.es_notification_client.host, self.es_notification_client.port)
await asyncio.sleep(30)
else:
self.log.info("stopping the notifier loop")
raise e
async def start_status_server(self): async def start_status_server(self):
if self.env.udp_port and int(self.env.udp_port): if self.env.udp_port and int(self.env.udp_port):
await self.status_server.start( await self.status_server.start(
@ -140,14 +177,13 @@ class HubServerService(BlockchainReaderService):
def _iter_start_tasks(self): def _iter_start_tasks(self):
yield self.start_status_server() yield self.start_status_server()
yield self.start_cancellable(self.es_notification_client.maintain_connection) yield self.start_cancellable(self.receive_es_notifications)
yield self.start_cancellable(self.failover_elastic_services)
yield self.start_cancellable(self.mempool.send_notifications_forever) yield self.start_cancellable(self.mempool.send_notifications_forever)
yield self.start_cancellable(self.refresh_blocks_forever) yield self.start_cancellable(self.refresh_blocks_forever)
yield self.finished_initial_catch_up.wait() yield self.finished_initial_catch_up.wait()
self.block_count_metric.set(self.last_state.height) self.block_count_metric.set(self.last_state.height)
yield self.start_prometheus() yield self.start_prometheus()
yield self.start_cancellable(self.receive_es_notifications)
yield self.search_index.start()
yield self.start_cancellable(self.session_manager.serve, self.mempool) yield self.start_cancellable(self.session_manager.serve, self.mempool)
def _iter_stop_tasks(self): def _iter_stop_tasks(self):

View file

@ -2,6 +2,7 @@ import typing
import struct import struct
import asyncio import asyncio
import logging import logging
from typing import Deque, Tuple
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -31,52 +32,39 @@ class ElasticNotifierProtocol(asyncio.Protocol):
class ElasticNotifierClientProtocol(asyncio.Protocol): class ElasticNotifierClientProtocol(asyncio.Protocol):
"""notifies the reader when ES has written updates""" """notifies the reader when ES has written updates"""
def __init__(self, notifications: asyncio.Queue, host: str, port: int): def __init__(self, notifications: asyncio.Queue, notifier_hosts: Deque[Tuple[Tuple[str, int], Tuple[str, int]]]):
assert len(notifier_hosts) > 0, 'no elastic notifier clients given'
self.notifications = notifications self.notifications = notifications
self.transport: typing.Optional[asyncio.Transport] = None self.transport: typing.Optional[asyncio.Transport] = None
self.host = host self._notifier_hosts = notifier_hosts
self.port = port self.lost_connection = asyncio.Event()
self._lost_connection = asyncio.Event() self.lost_connection.set()
self._lost_connection.set()
@property
def host(self):
return self._notifier_hosts[0][1][0]
@property
def port(self):
return self._notifier_hosts[0][1][1]
async def connect(self): async def connect(self):
if self._lost_connection.is_set(): if self.lost_connection.is_set():
await asyncio.get_event_loop().create_connection( await asyncio.get_event_loop().create_connection(
lambda: self, self.host, self.port lambda: self, self.host, self.port
) )
async def maintain_connection(self, synchronized: asyncio.Event):
first_connect = True
if not self._lost_connection.is_set():
synchronized.set()
while True:
try:
await self._lost_connection.wait()
if not first_connect:
log.warning("lost connection to scribe-elastic-sync notifier")
await self.connect()
first_connect = False
synchronized.set()
log.info("connected to es notifier")
except Exception as e:
if not isinstance(e, asyncio.CancelledError):
log.warning("waiting 30s for scribe-elastic-sync notifier to become available (%s:%i)", self.host, self.port)
await asyncio.sleep(30)
else:
log.info("stopping the notifier loop")
raise e
def close(self): def close(self):
if self.transport and not self.transport.is_closing(): if self.transport and not self.transport.is_closing():
self.transport.close() self.transport.close()
def connection_made(self, transport): def connection_made(self, transport):
self.transport = transport self.transport = transport
self._lost_connection.clear() self.lost_connection.clear()
def connection_lost(self, exc) -> None: def connection_lost(self, exc) -> None:
self.transport = None self.transport = None
self._lost_connection.set() self.lost_connection.set()
def data_received(self, data: bytes) -> None: def data_received(self, data: bytes) -> None:
try: try: