Elasticsearch and elastic sync notifier failover (WIP) #95

Closed
jackrobison wants to merge 3 commits from es-failover into master
6 changed files with 115 additions and 74 deletions

View file

@ -73,7 +73,10 @@ class ElasticSyncService(BlockchainReaderService):
info = {} info = {}
if os.path.exists(self._es_info_path): if os.path.exists(self._es_info_path):
with open(self._es_info_path, 'r') as f: with open(self._es_info_path, 'r') as f:
info.update(json.loads(f.read())) try:
info.update(json.loads(f.read()))
except json.decoder.JSONDecodeError:
self.log.warning('failed to parse es sync status file')
self._last_wrote_height = int(info.get('height', 0)) self._last_wrote_height = int(info.get('height', 0))
self._last_wrote_block_hash = info.get('block_hash', None) self._last_wrote_block_hash = info.get('block_hash', None)

View file

@ -1,29 +1,38 @@
import re import re
from collections import deque
from hub.env import Env from hub.env import Env
ELASTIC_SERVICES_REGEX = re.compile("(([\d|\.]|[^,:\/])*:\d*\/([\d|\.]|[^,:\/])*:\d*,?)*")
github-advanced-security[bot] commented 2022-09-19 16:22:32 +02:00 (Migrated from github.com)
Review

Inefficient regular expression

This part of the regular expression may cause exponential backtracking on strings containing many repetitions of '.'.

Show more details

## Inefficient regular expression This part of the regular expression may cause exponential backtracking on strings containing many repetitions of '.'. [Show more details](https://github.com/lbryio/hub/security/code-scanning/2)
github-advanced-security[bot] commented 2022-09-19 16:22:32 +02:00 (Migrated from github.com)
Review

Inefficient regular expression

This part of the regular expression may cause exponential backtracking on strings starting with ':/' and containing many repetitions of '.'.

Show more details

## Inefficient regular expression This part of the regular expression may cause exponential backtracking on strings starting with ':/' and containing many repetitions of '.'. [Show more details](https://github.com/lbryio/hub/security/code-scanning/3)
def parse_es_services(elastic_services_arg: str):
match = ELASTIC_SERVICES_REGEX.match(elastic_services_arg)
if not match:
return []
matching = match.group()
services = [item.split('/') for item in matching.split(',') if item]
return [
((es.split(':')[0], int(es.split(':')[1])), (notifier.split(':')[0], int(notifier.split(':')[1])))
for (es, notifier) in services
]
class ServerEnv(Env): class ServerEnv(Env):
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None, def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
daemon_url=None, host=None, elastic_host=None, elastic_port=None, es_index_prefix=None, daemon_url=None, host=None, elastic_services=None, es_index_prefix=None,
tcp_port=None, udp_port=None, banner_file=None, allow_lan_udp=None, country=None, tcp_port=None, udp_port=None, banner_file=None, allow_lan_udp=None, country=None,
payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None, payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None,
session_timeout=None, drop_client=None, description=None, daily_fee=None, session_timeout=None, drop_client=None, description=None, daily_fee=None,
database_query_timeout=None, elastic_notifier_host=None, elastic_notifier_port=None, database_query_timeout=None, blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None,
blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, peer_announce=None, peer_announce=None, index_address_status=None, address_history_cache_size=None, daemon_ca_path=None,
index_address_status=None, address_history_cache_size=None, daemon_ca_path=None,
merkle_cache_size=None, resolved_url_cache_size=None, tx_cache_size=None, merkle_cache_size=None, resolved_url_cache_size=None, tx_cache_size=None,
history_tx_cache_size=None, largest_address_history_cache_size=None): history_tx_cache_size=None, largest_address_history_cache_size=None):
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes, super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status) cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status)
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL') self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
self.host = host if host is not None else self.default('HOST', 'localhost') self.host = host if host is not None else self.default('HOST', 'localhost')
self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost') self.elastic_services = deque(parse_es_services(elastic_services or 'localhost:9200/localhost:19080'))
self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200)
self.elastic_notifier_host = elastic_notifier_host if elastic_notifier_host is not None else self.default(
'ELASTIC_NOTIFIER_HOST', 'localhost')
self.elastic_notifier_port = elastic_notifier_port if elastic_notifier_port is not None else self.integer(
'ELASTIC_NOTIFIER_PORT', 19080)
self.es_index_prefix = es_index_prefix if es_index_prefix is not None else self.default('ES_INDEX_PREFIX', '') self.es_index_prefix = es_index_prefix if es_index_prefix is not None else self.default('ES_INDEX_PREFIX', '')
# Server stuff # Server stuff
self.tcp_port = tcp_port if tcp_port is not None else self.integer('TCP_PORT', None) self.tcp_port = tcp_port if tcp_port is not None else self.integer('TCP_PORT', None)
@ -93,15 +102,13 @@ class ServerEnv(Env):
help="Regex used for blocking clients") help="Regex used for blocking clients")
parser.add_argument('--session_timeout', type=int, default=cls.integer('SESSION_TIMEOUT', 600), parser.add_argument('--session_timeout', type=int, default=cls.integer('SESSION_TIMEOUT', 600),
help="Session inactivity timeout") help="Session inactivity timeout")
parser.add_argument('--elastic_host', default=cls.default('ELASTIC_HOST', 'localhost'), type=str, parser.add_argument('--elastic_services',
help="Hostname or ip address of the elasticsearch instance to connect to. " default=cls.default('ELASTIC_SERVICES', 'localhost:9200/localhost:19080'), type=str,
"Can be set in env with 'ELASTIC_HOST'") help="Hosts and ports for elastic search and the scribe elastic sync notifier. "
parser.add_argument('--elastic_port', default=cls.integer('ELASTIC_PORT', 9200), type=int, "Given as a comma separated list without spaces of items in the format "
help="Elasticsearch port to connect to. Can be set in env with 'ELASTIC_PORT'") "<elastic host>:<elastic port>/<notifier host>:<notifier port> . "
parser.add_argument('--elastic_notifier_host', default=cls.default('ELASTIC_NOTIFIER_HOST', 'localhost'), "Defaults to 'localhost:9200/localhost:19080'. "
type=str, help='elasticsearch sync notifier host, defaults to localhost') "Can be set in env with 'ELASTIC_SERVICES'")
parser.add_argument('--elastic_notifier_port', default=cls.integer('ELASTIC_NOTIFIER_PORT', 19080), type=int,
help='elasticsearch sync notifier port')
parser.add_argument('--es_index_prefix', default=cls.default('ES_INDEX_PREFIX', ''), type=str) parser.add_argument('--es_index_prefix', default=cls.default('ES_INDEX_PREFIX', ''), type=str)
parser.add_argument('--allow_lan_udp', action='store_true', parser.add_argument('--allow_lan_udp', action='store_true',
help="Reply to clients on the local network", default=cls.boolean('ALLOW_LAN_UDP', False)) help="Reply to clients on the local network", default=cls.boolean('ALLOW_LAN_UDP', False))
@ -141,8 +148,8 @@ class ServerEnv(Env):
@classmethod @classmethod
def from_arg_parser(cls, args): def from_arg_parser(cls, args):
return cls( return cls(
db_dir=args.db_dir, daemon_url=args.daemon_url, host=args.host, elastic_host=args.elastic_host, db_dir=args.db_dir, daemon_url=args.daemon_url, host=args.host, elastic_services=args.elastic_services,
elastic_port=args.elastic_port, max_query_workers=args.max_query_workers, chain=args.chain, max_query_workers=args.max_query_workers, chain=args.chain,
es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port, es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port,
udp_port=args.udp_port, prometheus_port=args.prometheus_port, banner_file=args.banner_file, udp_port=args.udp_port, prometheus_port=args.prometheus_port, banner_file=args.banner_file,
allow_lan_udp=args.allow_lan_udp, cache_all_tx_hashes=args.cache_all_tx_hashes, allow_lan_udp=args.allow_lan_udp, cache_all_tx_hashes=args.cache_all_tx_hashes,
@ -151,8 +158,7 @@ class ServerEnv(Env):
max_sessions=args.max_sessions, session_timeout=args.session_timeout, max_sessions=args.max_sessions, session_timeout=args.session_timeout,
drop_client=args.drop_client, description=args.description, daily_fee=args.daily_fee, drop_client=args.drop_client, description=args.description, daily_fee=args.daily_fee,
database_query_timeout=args.query_timeout_ms, blocking_channel_ids=args.blocking_channel_ids, database_query_timeout=args.query_timeout_ms, blocking_channel_ids=args.blocking_channel_ids,
filtering_channel_ids=args.filtering_channel_ids, elastic_notifier_host=args.elastic_notifier_host, filtering_channel_ids=args.filtering_channel_ids, index_address_status=args.index_address_statuses,
elastic_notifier_port=args.elastic_notifier_port, index_address_status=args.index_address_statuses,
address_history_cache_size=args.address_history_cache_size, daemon_ca_path=args.daemon_ca_path, address_history_cache_size=args.address_history_cache_size, daemon_ca_path=args.daemon_ca_path,
merkle_cache_size=args.merkle_cache_size, resolved_url_cache_size=args.resolved_url_cache_size, merkle_cache_size=args.merkle_cache_size, resolved_url_cache_size=args.resolved_url_cache_size,
tx_cache_size=args.tx_cache_size, history_tx_cache_size=args.history_tx_cache_size, tx_cache_size=args.tx_cache_size, history_tx_cache_size=args.history_tx_cache_size,

View file

@ -3,7 +3,7 @@ import asyncio
from bisect import bisect_right from bisect import bisect_right
from collections import Counter, deque from collections import Counter, deque
from operator import itemgetter from operator import itemgetter
from typing import Optional, List, TYPE_CHECKING from typing import Optional, List, TYPE_CHECKING, Deque, Tuple
from elasticsearch import AsyncElasticsearch, NotFoundError, ConnectionError from elasticsearch import AsyncElasticsearch, NotFoundError, ConnectionError
from hub.schema.result import Censor, Outputs from hub.schema.result import Censor, Outputs
@ -29,8 +29,9 @@ class StreamResolution(str):
class SearchIndex: class SearchIndex:
VERSION = 1 VERSION = 1
def __init__(self, hub_db: 'SecondaryDB', index_prefix: str, search_timeout=3.0, elastic_host='localhost', def __init__(self, hub_db: 'SecondaryDB', index_prefix: str, search_timeout=3.0,
elastic_port=9200, timeout_counter: Optional['PrometheusCounter'] = None): elastic_services: Optional[Deque[Tuple[Tuple[str, int], Tuple[str, int]]]] = None,
timeout_counter: Optional['PrometheusCounter'] = None):
self.hub_db = hub_db self.hub_db = hub_db
self.search_timeout = search_timeout self.search_timeout = search_timeout
self.timeout_counter: Optional['PrometheusCounter'] = timeout_counter self.timeout_counter: Optional['PrometheusCounter'] = timeout_counter
@ -41,8 +42,8 @@ class SearchIndex:
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.claim_cache = LRUCache(2 ** 15) self.claim_cache = LRUCache(2 ** 15)
self.search_cache = LRUCache(2 ** 17) self.search_cache = LRUCache(2 ** 17)
self._elastic_host = elastic_host self._elastic_services = elastic_services
self._elastic_port = elastic_port self.lost_connection = asyncio.Event()
async def get_index_version(self) -> int: async def get_index_version(self) -> int:
try: try:
@ -59,7 +60,7 @@ class SearchIndex:
async def start(self) -> bool: async def start(self) -> bool:
if self.sync_client: if self.sync_client:
return False return False
hosts = [{'host': self._elastic_host, 'port': self._elastic_port}] hosts = [{'host': self._elastic_services[0][0][0], 'port': self._elastic_services[0][0][1]}]
self.sync_client = AsyncElasticsearch(hosts, timeout=self.sync_timeout) self.sync_client = AsyncElasticsearch(hosts, timeout=self.sync_timeout)
self.search_client = AsyncElasticsearch(hosts, timeout=self.search_timeout+1) self.search_client = AsyncElasticsearch(hosts, timeout=self.search_timeout+1)
while True: while True:

View file

@ -1,18 +1,25 @@
import time import time
import typing import typing
import asyncio import asyncio
from prometheus_client import Counter
from hub import PROMETHEUS_NAMESPACE
from hub.scribe.daemon import LBCDaemon from hub.scribe.daemon import LBCDaemon
from hub.herald.session import SessionManager from hub.herald.session import SessionManager
from hub.herald.mempool import HubMemPool from hub.herald.mempool import HubMemPool
from hub.herald.udp import StatusServer from hub.herald.udp import StatusServer
from hub.herald.db import HeraldDB from hub.herald.db import HeraldDB
from hub.herald.search import SearchIndex
from hub.service import BlockchainReaderService from hub.service import BlockchainReaderService
from hub.notifier_protocol import ElasticNotifierClientProtocol from hub.notifier_protocol import ElasticNotifierClientProtocol
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from hub.herald.env import ServerEnv from hub.herald.env import ServerEnv
NAMESPACE = f"{PROMETHEUS_NAMESPACE}_hub"
class HubServerService(BlockchainReaderService): class HubServerService(BlockchainReaderService):
interrupt_count_metric = Counter("interrupt", "Number of interrupted queries", namespace=NAMESPACE)
def __init__(self, env: 'ServerEnv'): def __init__(self, env: 'ServerEnv'):
super().__init__(env, 'lbry-reader', thread_workers=max(1, env.max_query_workers), thread_prefix='hub-worker') super().__init__(env, 'lbry-reader', thread_workers=max(1, env.max_query_workers), thread_prefix='hub-worker')
self.env = env self.env = env
@ -21,8 +28,15 @@ class HubServerService(BlockchainReaderService):
self.status_server = StatusServer() self.status_server = StatusServer()
self.daemon = LBCDaemon(env.coin, env.daemon_url, daemon_ca_path=env.daemon_ca_path) # only needed for broadcasting txs self.daemon = LBCDaemon(env.coin, env.daemon_url, daemon_ca_path=env.daemon_ca_path) # only needed for broadcasting txs
self.mempool = HubMemPool(self.env.coin, self.db) self.mempool = HubMemPool(self.env.coin, self.db)
self.search_index = SearchIndex(
self.db, self.env.es_index_prefix, self.env.database_query_timeout,
elastic_services=self.env.elastic_services,
timeout_counter=self.interrupt_count_metric
)
self.session_manager = SessionManager( self.session_manager = SessionManager(
env, self.db, self.mempool, self.daemon, env, self.db, self.mempool, self.daemon, self.search_index,
self.shutdown_event, self.shutdown_event,
on_available_callback=self.status_server.set_available, on_available_callback=self.status_server.set_available,
on_unavailable_callback=self.status_server.set_unavailable on_unavailable_callback=self.status_server.set_unavailable
@ -30,7 +44,7 @@ class HubServerService(BlockchainReaderService):
self.mempool.session_manager = self.session_manager self.mempool.session_manager = self.session_manager
self.es_notifications = asyncio.Queue() self.es_notifications = asyncio.Queue()
self.es_notification_client = ElasticNotifierClientProtocol( self.es_notification_client = ElasticNotifierClientProtocol(
self.es_notifications, self.env.elastic_notifier_host, self.env.elastic_notifier_port self.es_notifications, self.env.elastic_services
) )
self.synchronized = asyncio.Event() self.synchronized = asyncio.Event()
self._es_height = None self._es_height = None
@ -52,7 +66,7 @@ class HubServerService(BlockchainReaderService):
# self.mempool.notified_mempool_txs.clear() # self.mempool.notified_mempool_txs.clear()
def clear_search_cache(self): def clear_search_cache(self):
self.session_manager.search_index.clear_caches() self.search_index.clear_caches()
def advance(self, height: int): def advance(self, height: int):
super().advance(height) super().advance(height)
@ -116,8 +130,44 @@ class HubServerService(BlockchainReaderService):
self.log.info("es and reader are not yet in sync (block %s vs %s)", self._es_height, self.log.info("es and reader are not yet in sync (block %s vs %s)", self._es_height,
self.db.db_height) self.db.db_height)
finally: finally:
self.log.warning("closing es sync notification loop at %s", self._es_height)
self.es_notification_client.close() self.es_notification_client.close()
async def failover_elastic_services(self, synchronized: asyncio.Event):
first_connect = True
if not self.es_notification_client.lost_connection.is_set():
synchronized.set()
while True:
try:
await self.es_notification_client.lost_connection.wait()
if not first_connect:
self.log.warning("lost connection to scribe-elastic-sync notifier (%s:%i)",
self.es_notification_client.host, self.es_notification_client.port)
await self.es_notification_client.connect()
first_connect = False
synchronized.set()
self.log.info("connected to es notifier on %s:%i", self.es_notification_client.host,
self.es_notification_client.port)
await self.search_index.start()
except Exception as e:
if not isinstance(e, asyncio.CancelledError):
self.log.warning("lost connection to scribe-elastic-sync notifier")
await self.search_index.stop()
self.search_index.clear_caches()
if len(self.env.elastic_services) > 1:
self.env.elastic_services.rotate(-1)
self.log.warning("attempting to failover to %s:%i", self.es_notification_client.host,
self.es_notification_client.port)
await asyncio.sleep(1)
else:
self.log.warning("waiting 30s for scribe-elastic-sync notifier to become available (%s:%i)",
self.es_notification_client.host, self.es_notification_client.port)
await asyncio.sleep(30)
else:
self.log.info("stopping the notifier loop")
raise e
async def start_status_server(self): async def start_status_server(self):
if self.env.udp_port and int(self.env.udp_port): if self.env.udp_port and int(self.env.udp_port):
await self.status_server.start( await self.status_server.start(
@ -127,14 +177,13 @@ class HubServerService(BlockchainReaderService):
def _iter_start_tasks(self): def _iter_start_tasks(self):
yield self.start_status_server() yield self.start_status_server()
yield self.start_cancellable(self.es_notification_client.maintain_connection) yield self.start_cancellable(self.receive_es_notifications)
yield self.start_cancellable(self.failover_elastic_services)
yield self.start_cancellable(self.mempool.send_notifications_forever) yield self.start_cancellable(self.mempool.send_notifications_forever)
yield self.start_cancellable(self.refresh_blocks_forever) yield self.start_cancellable(self.refresh_blocks_forever)
yield self.finished_initial_catch_up.wait() yield self.finished_initial_catch_up.wait()
self.block_count_metric.set(self.last_state.height) self.block_count_metric.set(self.last_state.height)
yield self.start_prometheus() yield self.start_prometheus()
yield self.start_cancellable(self.receive_es_notifications)
yield self.session_manager.search_index.start()
yield self.start_cancellable(self.session_manager.serve, self.mempool) yield self.start_cancellable(self.session_manager.serve, self.mempool)
def _iter_stop_tasks(self): def _iter_stop_tasks(self):

View file

@ -141,7 +141,6 @@ class SessionManager:
tx_replied_count_metric = Counter("replied_transaction", "Number of transactions responded", namespace=NAMESPACE) tx_replied_count_metric = Counter("replied_transaction", "Number of transactions responded", namespace=NAMESPACE)
urls_to_resolve_count_metric = Counter("urls_to_resolve", "Number of urls to resolve", namespace=NAMESPACE) urls_to_resolve_count_metric = Counter("urls_to_resolve", "Number of urls to resolve", namespace=NAMESPACE)
resolved_url_count_metric = Counter("resolved_url", "Number of resolved urls", namespace=NAMESPACE) resolved_url_count_metric = Counter("resolved_url", "Number of resolved urls", namespace=NAMESPACE)
interrupt_count_metric = Counter("interrupt", "Number of interrupted queries", namespace=NAMESPACE)
db_operational_error_metric = Counter( db_operational_error_metric = Counter(
"operational_error", "Number of queries that raised operational errors", namespace=NAMESPACE "operational_error", "Number of queries that raised operational errors", namespace=NAMESPACE
) )
@ -180,7 +179,7 @@ class SessionManager:
) )
def __init__(self, env: 'ServerEnv', db: 'SecondaryDB', mempool: 'HubMemPool', def __init__(self, env: 'ServerEnv', db: 'SecondaryDB', mempool: 'HubMemPool',
daemon: 'LBCDaemon', shutdown_event: asyncio.Event, daemon: 'LBCDaemon', search_index: 'SearchIndex', shutdown_event: asyncio.Event,
on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]): on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]):
env.max_send = max(350000, env.max_send) env.max_send = max(350000, env.max_send)
self.env = env self.env = env
@ -189,6 +188,7 @@ class SessionManager:
self.on_unavailable_callback = on_unavailable_callback self.on_unavailable_callback = on_unavailable_callback
self.daemon = daemon self.daemon = daemon
self.mempool = mempool self.mempool = mempool
self.search_index = search_index
self.shutdown_event = shutdown_event self.shutdown_event = shutdown_event
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.servers: typing.Dict[str, asyncio.AbstractServer] = {} self.servers: typing.Dict[str, asyncio.AbstractServer] = {}
@ -207,12 +207,6 @@ class SessionManager:
self.protocol_class = LBRYElectrumX self.protocol_class = LBRYElectrumX
self.session_event = Event() self.session_event = Event()
# Search index
self.search_index = SearchIndex(
self.db, self.env.es_index_prefix, self.env.database_query_timeout,
elastic_host=env.elastic_host, elastic_port=env.elastic_port,
timeout_counter=self.interrupt_count_metric
)
self.running = False self.running = False
# hashX: List[int] # hashX: List[int]
self.hashX_raw_history_cache = LFUCacheWithMetrics(env.hashX_history_cache_size, metric_name='raw_history', namespace=NAMESPACE) self.hashX_raw_history_cache = LFUCacheWithMetrics(env.hashX_history_cache_size, metric_name='raw_history', namespace=NAMESPACE)
@ -1270,7 +1264,7 @@ class LBRYElectrumX(asyncio.Protocol):
kwargs['channel_id'] = channel_claim.claim_hash.hex() kwargs['channel_id'] = channel_claim.claim_hash.hex()
return await self.session_manager.search_index.cached_search(kwargs) return await self.session_manager.search_index.cached_search(kwargs)
except ConnectionTimeout: except ConnectionTimeout:
self.session_manager.interrupt_count_metric.inc() self.session_manager.search_index.timeout_counter.inc()
raise RPCError(JSONRPC.QUERY_TIMEOUT, 'query timed out') raise RPCError(JSONRPC.QUERY_TIMEOUT, 'query timed out')
except TooManyClaimSearchParametersError as err: except TooManyClaimSearchParametersError as err:
await asyncio.sleep(2) await asyncio.sleep(2)

View file

@ -2,6 +2,7 @@ import typing
import struct import struct
import asyncio import asyncio
import logging import logging
from typing import Deque, Tuple
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -31,52 +32,39 @@ class ElasticNotifierProtocol(asyncio.Protocol):
class ElasticNotifierClientProtocol(asyncio.Protocol): class ElasticNotifierClientProtocol(asyncio.Protocol):
"""notifies the reader when ES has written updates""" """notifies the reader when ES has written updates"""
def __init__(self, notifications: asyncio.Queue, host: str, port: int): def __init__(self, notifications: asyncio.Queue, notifier_hosts: Deque[Tuple[Tuple[str, int], Tuple[str, int]]]):
assert len(notifier_hosts) > 0, 'no elastic notifier clients given'
self.notifications = notifications self.notifications = notifications
self.transport: typing.Optional[asyncio.Transport] = None self.transport: typing.Optional[asyncio.Transport] = None
self.host = host self._notifier_hosts = notifier_hosts
self.port = port self.lost_connection = asyncio.Event()
self._lost_connection = asyncio.Event() self.lost_connection.set()
self._lost_connection.set()
@property
def host(self):
return self._notifier_hosts[0][1][0]
@property
def port(self):
return self._notifier_hosts[0][1][1]
async def connect(self): async def connect(self):
if self._lost_connection.is_set(): if self.lost_connection.is_set():
await asyncio.get_event_loop().create_connection( await asyncio.get_event_loop().create_connection(
lambda: self, self.host, self.port lambda: self, self.host, self.port
) )
async def maintain_connection(self, synchronized: asyncio.Event):
first_connect = True
if not self._lost_connection.is_set():
synchronized.set()
while True:
try:
await self._lost_connection.wait()
if not first_connect:
log.warning("lost connection to scribe-elastic-sync notifier")
await self.connect()
first_connect = False
synchronized.set()
log.info("connected to es notifier")
except Exception as e:
if not isinstance(e, asyncio.CancelledError):
log.warning("waiting 30s for scribe-elastic-sync notifier to become available (%s:%i)", self.host, self.port)
await asyncio.sleep(30)
else:
log.info("stopping the notifier loop")
raise e
def close(self): def close(self):
if self.transport and not self.transport.is_closing(): if self.transport and not self.transport.is_closing():
self.transport.close() self.transport.close()
def connection_made(self, transport): def connection_made(self, transport):
self.transport = transport self.transport = transport
self._lost_connection.clear() self.lost_connection.clear()
def connection_lost(self, exc) -> None: def connection_lost(self, exc) -> None:
self.transport = None self.transport = None
self._lost_connection.set() self.lost_connection.set()
def data_received(self, data: bytes) -> None: def data_received(self, data: bytes) -> None:
try: try: