Elasticsearch and elastic sync notifier failover (WIP) #95

Closed
jackrobison wants to merge 3 commits from es-failover into master
6 changed files with 115 additions and 74 deletions

View file

@ -73,7 +73,10 @@ class ElasticSyncService(BlockchainReaderService):
info = {}
if os.path.exists(self._es_info_path):
with open(self._es_info_path, 'r') as f:
info.update(json.loads(f.read()))
try:
info.update(json.loads(f.read()))
except json.decoder.JSONDecodeError:
self.log.warning('failed to parse es sync status file')
self._last_wrote_height = int(info.get('height', 0))
self._last_wrote_block_hash = info.get('block_hash', None)

View file

@ -1,29 +1,38 @@
import re
from collections import deque
from hub.env import Env
ELASTIC_SERVICES_REGEX = re.compile("(([\d|\.]|[^,:\/])*:\d*\/([\d|\.]|[^,:\/])*:\d*,?)*")
github-advanced-security[bot] commented 2022-09-19 16:22:32 +02:00 (Migrated from github.com)
Review

Inefficient regular expression

This part of the regular expression may cause exponential backtracking on strings containing many repetitions of '.'.

Show more details

## Inefficient regular expression This part of the regular expression may cause exponential backtracking on strings containing many repetitions of '.'. [Show more details](https://github.com/lbryio/hub/security/code-scanning/2)
github-advanced-security[bot] commented 2022-09-19 16:22:32 +02:00 (Migrated from github.com)
Review

Inefficient regular expression

This part of the regular expression may cause exponential backtracking on strings starting with ':/' and containing many repetitions of '.'.

Show more details

## Inefficient regular expression This part of the regular expression may cause exponential backtracking on strings starting with ':/' and containing many repetitions of '.'. [Show more details](https://github.com/lbryio/hub/security/code-scanning/3)
def parse_es_services(elastic_services_arg: str):
match = ELASTIC_SERVICES_REGEX.match(elastic_services_arg)
if not match:
return []
matching = match.group()
services = [item.split('/') for item in matching.split(',') if item]
return [
((es.split(':')[0], int(es.split(':')[1])), (notifier.split(':')[0], int(notifier.split(':')[1])))
for (es, notifier) in services
]
class ServerEnv(Env):
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
daemon_url=None, host=None, elastic_host=None, elastic_port=None, es_index_prefix=None,
daemon_url=None, host=None, elastic_services=None, es_index_prefix=None,
tcp_port=None, udp_port=None, banner_file=None, allow_lan_udp=None, country=None,
payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None,
session_timeout=None, drop_client=None, description=None, daily_fee=None,
database_query_timeout=None, elastic_notifier_host=None, elastic_notifier_port=None,
blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, peer_announce=None,
index_address_status=None, address_history_cache_size=None, daemon_ca_path=None,
database_query_timeout=None, blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None,
peer_announce=None, index_address_status=None, address_history_cache_size=None, daemon_ca_path=None,
merkle_cache_size=None, resolved_url_cache_size=None, tx_cache_size=None,
history_tx_cache_size=None, largest_address_history_cache_size=None):
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status)
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
self.host = host if host is not None else self.default('HOST', 'localhost')
self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost')
self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200)
self.elastic_notifier_host = elastic_notifier_host if elastic_notifier_host is not None else self.default(
'ELASTIC_NOTIFIER_HOST', 'localhost')
self.elastic_notifier_port = elastic_notifier_port if elastic_notifier_port is not None else self.integer(
'ELASTIC_NOTIFIER_PORT', 19080)
self.elastic_services = deque(parse_es_services(elastic_services or 'localhost:9200/localhost:19080'))
self.es_index_prefix = es_index_prefix if es_index_prefix is not None else self.default('ES_INDEX_PREFIX', '')
# Server stuff
self.tcp_port = tcp_port if tcp_port is not None else self.integer('TCP_PORT', None)
@ -93,15 +102,13 @@ class ServerEnv(Env):
help="Regex used for blocking clients")
parser.add_argument('--session_timeout', type=int, default=cls.integer('SESSION_TIMEOUT', 600),
help="Session inactivity timeout")
parser.add_argument('--elastic_host', default=cls.default('ELASTIC_HOST', 'localhost'), type=str,
help="Hostname or ip address of the elasticsearch instance to connect to. "
"Can be set in env with 'ELASTIC_HOST'")
parser.add_argument('--elastic_port', default=cls.integer('ELASTIC_PORT', 9200), type=int,
help="Elasticsearch port to connect to. Can be set in env with 'ELASTIC_PORT'")
parser.add_argument('--elastic_notifier_host', default=cls.default('ELASTIC_NOTIFIER_HOST', 'localhost'),
type=str, help='elasticsearch sync notifier host, defaults to localhost')
parser.add_argument('--elastic_notifier_port', default=cls.integer('ELASTIC_NOTIFIER_PORT', 19080), type=int,
help='elasticsearch sync notifier port')
parser.add_argument('--elastic_services',
default=cls.default('ELASTIC_SERVICES', 'localhost:9200/localhost:19080'), type=str,
help="Hosts and ports for elastic search and the scribe elastic sync notifier. "
"Given as a comma separated list without spaces of items in the format "
"<elastic host>:<elastic port>/<notifier host>:<notifier port> . "
"Defaults to 'localhost:9200/localhost:19080'. "
"Can be set in env with 'ELASTIC_SERVICES'")
parser.add_argument('--es_index_prefix', default=cls.default('ES_INDEX_PREFIX', ''), type=str)
parser.add_argument('--allow_lan_udp', action='store_true',
help="Reply to clients on the local network", default=cls.boolean('ALLOW_LAN_UDP', False))
@ -141,8 +148,8 @@ class ServerEnv(Env):
@classmethod
def from_arg_parser(cls, args):
return cls(
db_dir=args.db_dir, daemon_url=args.daemon_url, host=args.host, elastic_host=args.elastic_host,
elastic_port=args.elastic_port, max_query_workers=args.max_query_workers, chain=args.chain,
db_dir=args.db_dir, daemon_url=args.daemon_url, host=args.host, elastic_services=args.elastic_services,
max_query_workers=args.max_query_workers, chain=args.chain,
es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port,
udp_port=args.udp_port, prometheus_port=args.prometheus_port, banner_file=args.banner_file,
allow_lan_udp=args.allow_lan_udp, cache_all_tx_hashes=args.cache_all_tx_hashes,
@ -151,8 +158,7 @@ class ServerEnv(Env):
max_sessions=args.max_sessions, session_timeout=args.session_timeout,
drop_client=args.drop_client, description=args.description, daily_fee=args.daily_fee,
database_query_timeout=args.query_timeout_ms, blocking_channel_ids=args.blocking_channel_ids,
filtering_channel_ids=args.filtering_channel_ids, elastic_notifier_host=args.elastic_notifier_host,
elastic_notifier_port=args.elastic_notifier_port, index_address_status=args.index_address_statuses,
filtering_channel_ids=args.filtering_channel_ids, index_address_status=args.index_address_statuses,
address_history_cache_size=args.address_history_cache_size, daemon_ca_path=args.daemon_ca_path,
merkle_cache_size=args.merkle_cache_size, resolved_url_cache_size=args.resolved_url_cache_size,
tx_cache_size=args.tx_cache_size, history_tx_cache_size=args.history_tx_cache_size,

View file

@ -3,7 +3,7 @@ import asyncio
from bisect import bisect_right
from collections import Counter, deque
from operator import itemgetter
from typing import Optional, List, TYPE_CHECKING
from typing import Optional, List, TYPE_CHECKING, Deque, Tuple
from elasticsearch import AsyncElasticsearch, NotFoundError, ConnectionError
from hub.schema.result import Censor, Outputs
@ -29,8 +29,9 @@ class StreamResolution(str):
class SearchIndex:
VERSION = 1
def __init__(self, hub_db: 'SecondaryDB', index_prefix: str, search_timeout=3.0, elastic_host='localhost',
elastic_port=9200, timeout_counter: Optional['PrometheusCounter'] = None):
def __init__(self, hub_db: 'SecondaryDB', index_prefix: str, search_timeout=3.0,
elastic_services: Optional[Deque[Tuple[Tuple[str, int], Tuple[str, int]]]] = None,
timeout_counter: Optional['PrometheusCounter'] = None):
self.hub_db = hub_db
self.search_timeout = search_timeout
self.timeout_counter: Optional['PrometheusCounter'] = timeout_counter
@ -41,8 +42,8 @@ class SearchIndex:
self.logger = logging.getLogger(__name__)
self.claim_cache = LRUCache(2 ** 15)
self.search_cache = LRUCache(2 ** 17)
self._elastic_host = elastic_host
self._elastic_port = elastic_port
self._elastic_services = elastic_services
self.lost_connection = asyncio.Event()
async def get_index_version(self) -> int:
try:
@ -59,7 +60,7 @@ class SearchIndex:
async def start(self) -> bool:
if self.sync_client:
return False
hosts = [{'host': self._elastic_host, 'port': self._elastic_port}]
hosts = [{'host': self._elastic_services[0][0][0], 'port': self._elastic_services[0][0][1]}]
self.sync_client = AsyncElasticsearch(hosts, timeout=self.sync_timeout)
self.search_client = AsyncElasticsearch(hosts, timeout=self.search_timeout+1)
while True:

View file

@ -1,18 +1,25 @@
import time
import typing
import asyncio
from prometheus_client import Counter
from hub import PROMETHEUS_NAMESPACE
from hub.scribe.daemon import LBCDaemon
from hub.herald.session import SessionManager
from hub.herald.mempool import HubMemPool
from hub.herald.udp import StatusServer
from hub.herald.db import HeraldDB
from hub.herald.search import SearchIndex
from hub.service import BlockchainReaderService
from hub.notifier_protocol import ElasticNotifierClientProtocol
if typing.TYPE_CHECKING:
from hub.herald.env import ServerEnv
NAMESPACE = f"{PROMETHEUS_NAMESPACE}_hub"
class HubServerService(BlockchainReaderService):
interrupt_count_metric = Counter("interrupt", "Number of interrupted queries", namespace=NAMESPACE)
def __init__(self, env: 'ServerEnv'):
super().__init__(env, 'lbry-reader', thread_workers=max(1, env.max_query_workers), thread_prefix='hub-worker')
self.env = env
@ -21,8 +28,15 @@ class HubServerService(BlockchainReaderService):
self.status_server = StatusServer()
self.daemon = LBCDaemon(env.coin, env.daemon_url, daemon_ca_path=env.daemon_ca_path) # only needed for broadcasting txs
self.mempool = HubMemPool(self.env.coin, self.db)
self.search_index = SearchIndex(
self.db, self.env.es_index_prefix, self.env.database_query_timeout,
elastic_services=self.env.elastic_services,
timeout_counter=self.interrupt_count_metric
)
self.session_manager = SessionManager(
env, self.db, self.mempool, self.daemon,
env, self.db, self.mempool, self.daemon, self.search_index,
self.shutdown_event,
on_available_callback=self.status_server.set_available,
on_unavailable_callback=self.status_server.set_unavailable
@ -30,7 +44,7 @@ class HubServerService(BlockchainReaderService):
self.mempool.session_manager = self.session_manager
self.es_notifications = asyncio.Queue()
self.es_notification_client = ElasticNotifierClientProtocol(
self.es_notifications, self.env.elastic_notifier_host, self.env.elastic_notifier_port
self.es_notifications, self.env.elastic_services
)
self.synchronized = asyncio.Event()
self._es_height = None
@ -52,7 +66,7 @@ class HubServerService(BlockchainReaderService):
# self.mempool.notified_mempool_txs.clear()
def clear_search_cache(self):
self.session_manager.search_index.clear_caches()
self.search_index.clear_caches()
def advance(self, height: int):
super().advance(height)
@ -116,8 +130,44 @@ class HubServerService(BlockchainReaderService):
self.log.info("es and reader are not yet in sync (block %s vs %s)", self._es_height,
self.db.db_height)
finally:
self.log.warning("closing es sync notification loop at %s", self._es_height)
self.es_notification_client.close()
async def failover_elastic_services(self, synchronized: asyncio.Event):
first_connect = True
if not self.es_notification_client.lost_connection.is_set():
synchronized.set()
while True:
try:
await self.es_notification_client.lost_connection.wait()
if not first_connect:
self.log.warning("lost connection to scribe-elastic-sync notifier (%s:%i)",
self.es_notification_client.host, self.es_notification_client.port)
await self.es_notification_client.connect()
first_connect = False
synchronized.set()
self.log.info("connected to es notifier on %s:%i", self.es_notification_client.host,
self.es_notification_client.port)
await self.search_index.start()
except Exception as e:
if not isinstance(e, asyncio.CancelledError):
self.log.warning("lost connection to scribe-elastic-sync notifier")
await self.search_index.stop()
self.search_index.clear_caches()
if len(self.env.elastic_services) > 1:
self.env.elastic_services.rotate(-1)
self.log.warning("attempting to failover to %s:%i", self.es_notification_client.host,
self.es_notification_client.port)
await asyncio.sleep(1)
else:
self.log.warning("waiting 30s for scribe-elastic-sync notifier to become available (%s:%i)",
self.es_notification_client.host, self.es_notification_client.port)
await asyncio.sleep(30)
else:
self.log.info("stopping the notifier loop")
raise e
async def start_status_server(self):
if self.env.udp_port and int(self.env.udp_port):
await self.status_server.start(
@ -127,14 +177,13 @@ class HubServerService(BlockchainReaderService):
def _iter_start_tasks(self):
yield self.start_status_server()
yield self.start_cancellable(self.es_notification_client.maintain_connection)
yield self.start_cancellable(self.receive_es_notifications)
yield self.start_cancellable(self.failover_elastic_services)
yield self.start_cancellable(self.mempool.send_notifications_forever)
yield self.start_cancellable(self.refresh_blocks_forever)
yield self.finished_initial_catch_up.wait()
self.block_count_metric.set(self.last_state.height)
yield self.start_prometheus()
yield self.start_cancellable(self.receive_es_notifications)
yield self.session_manager.search_index.start()
yield self.start_cancellable(self.session_manager.serve, self.mempool)
def _iter_stop_tasks(self):

View file

@ -141,7 +141,6 @@ class SessionManager:
tx_replied_count_metric = Counter("replied_transaction", "Number of transactions responded", namespace=NAMESPACE)
urls_to_resolve_count_metric = Counter("urls_to_resolve", "Number of urls to resolve", namespace=NAMESPACE)
resolved_url_count_metric = Counter("resolved_url", "Number of resolved urls", namespace=NAMESPACE)
interrupt_count_metric = Counter("interrupt", "Number of interrupted queries", namespace=NAMESPACE)
db_operational_error_metric = Counter(
"operational_error", "Number of queries that raised operational errors", namespace=NAMESPACE
)
@ -180,7 +179,7 @@ class SessionManager:
)
def __init__(self, env: 'ServerEnv', db: 'SecondaryDB', mempool: 'HubMemPool',
daemon: 'LBCDaemon', shutdown_event: asyncio.Event,
daemon: 'LBCDaemon', search_index: 'SearchIndex', shutdown_event: asyncio.Event,
on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]):
env.max_send = max(350000, env.max_send)
self.env = env
@ -189,6 +188,7 @@ class SessionManager:
self.on_unavailable_callback = on_unavailable_callback
self.daemon = daemon
self.mempool = mempool
self.search_index = search_index
self.shutdown_event = shutdown_event
self.logger = logging.getLogger(__name__)
self.servers: typing.Dict[str, asyncio.AbstractServer] = {}
@ -207,12 +207,6 @@ class SessionManager:
self.protocol_class = LBRYElectrumX
self.session_event = Event()
# Search index
self.search_index = SearchIndex(
self.db, self.env.es_index_prefix, self.env.database_query_timeout,
elastic_host=env.elastic_host, elastic_port=env.elastic_port,
timeout_counter=self.interrupt_count_metric
)
self.running = False
# hashX: List[int]
self.hashX_raw_history_cache = LFUCacheWithMetrics(env.hashX_history_cache_size, metric_name='raw_history', namespace=NAMESPACE)
@ -1270,7 +1264,7 @@ class LBRYElectrumX(asyncio.Protocol):
kwargs['channel_id'] = channel_claim.claim_hash.hex()
return await self.session_manager.search_index.cached_search(kwargs)
except ConnectionTimeout:
self.session_manager.interrupt_count_metric.inc()
self.session_manager.search_index.timeout_counter.inc()
raise RPCError(JSONRPC.QUERY_TIMEOUT, 'query timed out')
except TooManyClaimSearchParametersError as err:
await asyncio.sleep(2)

View file

@ -2,6 +2,7 @@ import typing
import struct
import asyncio
import logging
from typing import Deque, Tuple
log = logging.getLogger(__name__)
@ -31,52 +32,39 @@ class ElasticNotifierProtocol(asyncio.Protocol):
class ElasticNotifierClientProtocol(asyncio.Protocol):
"""notifies the reader when ES has written updates"""
def __init__(self, notifications: asyncio.Queue, host: str, port: int):
def __init__(self, notifications: asyncio.Queue, notifier_hosts: Deque[Tuple[Tuple[str, int], Tuple[str, int]]]):
assert len(notifier_hosts) > 0, 'no elastic notifier clients given'
self.notifications = notifications
self.transport: typing.Optional[asyncio.Transport] = None
self.host = host
self.port = port
self._lost_connection = asyncio.Event()
self._lost_connection.set()
self._notifier_hosts = notifier_hosts
self.lost_connection = asyncio.Event()
self.lost_connection.set()
@property
def host(self):
return self._notifier_hosts[0][1][0]
@property
def port(self):
return self._notifier_hosts[0][1][1]
async def connect(self):
if self._lost_connection.is_set():
if self.lost_connection.is_set():
await asyncio.get_event_loop().create_connection(
lambda: self, self.host, self.port
)
async def maintain_connection(self, synchronized: asyncio.Event):
first_connect = True
if not self._lost_connection.is_set():
synchronized.set()
while True:
try:
await self._lost_connection.wait()
if not first_connect:
log.warning("lost connection to scribe-elastic-sync notifier")
await self.connect()
first_connect = False
synchronized.set()
log.info("connected to es notifier")
except Exception as e:
if not isinstance(e, asyncio.CancelledError):
log.warning("waiting 30s for scribe-elastic-sync notifier to become available (%s:%i)", self.host, self.port)
await asyncio.sleep(30)
else:
log.info("stopping the notifier loop")
raise e
def close(self):
if self.transport and not self.transport.is_closing():
self.transport.close()
def connection_made(self, transport):
self.transport = transport
self._lost_connection.clear()
self.lost_connection.clear()
def connection_lost(self, exc) -> None:
self.transport = None
self._lost_connection.set()
self.lost_connection.set()
def data_received(self, data: bytes) -> None:
try: