Elasticsearch and elastic sync notifier failover (WIP) #95
6 changed files with 115 additions and 74 deletions
|
@ -73,7 +73,10 @@ class ElasticSyncService(BlockchainReaderService):
|
|||
info = {}
|
||||
if os.path.exists(self._es_info_path):
|
||||
with open(self._es_info_path, 'r') as f:
|
||||
info.update(json.loads(f.read()))
|
||||
try:
|
||||
info.update(json.loads(f.read()))
|
||||
except json.decoder.JSONDecodeError:
|
||||
self.log.warning('failed to parse es sync status file')
|
||||
self._last_wrote_height = int(info.get('height', 0))
|
||||
self._last_wrote_block_hash = info.get('block_hash', None)
|
||||
|
||||
|
|
|
@ -1,29 +1,38 @@
|
|||
import re
|
||||
from collections import deque
|
||||
from hub.env import Env
|
||||
|
||||
ELASTIC_SERVICES_REGEX = re.compile("(([\d|\.]|[^,:\/])*:\d*\/([\d|\.]|[^,:\/])*:\d*,?)*")
|
||||
|
||||
|
||||
|
||||
def parse_es_services(elastic_services_arg: str):
|
||||
match = ELASTIC_SERVICES_REGEX.match(elastic_services_arg)
|
||||
if not match:
|
||||
return []
|
||||
matching = match.group()
|
||||
services = [item.split('/') for item in matching.split(',') if item]
|
||||
return [
|
||||
((es.split(':')[0], int(es.split(':')[1])), (notifier.split(':')[0], int(notifier.split(':')[1])))
|
||||
for (es, notifier) in services
|
||||
]
|
||||
|
||||
|
||||
class ServerEnv(Env):
|
||||
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
|
||||
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
|
||||
daemon_url=None, host=None, elastic_host=None, elastic_port=None, es_index_prefix=None,
|
||||
daemon_url=None, host=None, elastic_services=None, es_index_prefix=None,
|
||||
tcp_port=None, udp_port=None, banner_file=None, allow_lan_udp=None, country=None,
|
||||
payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None,
|
||||
session_timeout=None, drop_client=None, description=None, daily_fee=None,
|
||||
database_query_timeout=None, elastic_notifier_host=None, elastic_notifier_port=None,
|
||||
blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, peer_announce=None,
|
||||
index_address_status=None, address_history_cache_size=None, daemon_ca_path=None,
|
||||
database_query_timeout=None, blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None,
|
||||
peer_announce=None, index_address_status=None, address_history_cache_size=None, daemon_ca_path=None,
|
||||
merkle_cache_size=None, resolved_url_cache_size=None, tx_cache_size=None,
|
||||
history_tx_cache_size=None, largest_address_history_cache_size=None):
|
||||
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
|
||||
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status)
|
||||
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
|
||||
self.host = host if host is not None else self.default('HOST', 'localhost')
|
||||
self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost')
|
||||
self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200)
|
||||
self.elastic_notifier_host = elastic_notifier_host if elastic_notifier_host is not None else self.default(
|
||||
'ELASTIC_NOTIFIER_HOST', 'localhost')
|
||||
self.elastic_notifier_port = elastic_notifier_port if elastic_notifier_port is not None else self.integer(
|
||||
'ELASTIC_NOTIFIER_PORT', 19080)
|
||||
self.elastic_services = deque(parse_es_services(elastic_services or 'localhost:9200/localhost:19080'))
|
||||
self.es_index_prefix = es_index_prefix if es_index_prefix is not None else self.default('ES_INDEX_PREFIX', '')
|
||||
# Server stuff
|
||||
self.tcp_port = tcp_port if tcp_port is not None else self.integer('TCP_PORT', None)
|
||||
|
@ -93,15 +102,13 @@ class ServerEnv(Env):
|
|||
help="Regex used for blocking clients")
|
||||
parser.add_argument('--session_timeout', type=int, default=cls.integer('SESSION_TIMEOUT', 600),
|
||||
help="Session inactivity timeout")
|
||||
parser.add_argument('--elastic_host', default=cls.default('ELASTIC_HOST', 'localhost'), type=str,
|
||||
help="Hostname or ip address of the elasticsearch instance to connect to. "
|
||||
"Can be set in env with 'ELASTIC_HOST'")
|
||||
parser.add_argument('--elastic_port', default=cls.integer('ELASTIC_PORT', 9200), type=int,
|
||||
help="Elasticsearch port to connect to. Can be set in env with 'ELASTIC_PORT'")
|
||||
parser.add_argument('--elastic_notifier_host', default=cls.default('ELASTIC_NOTIFIER_HOST', 'localhost'),
|
||||
type=str, help='elasticsearch sync notifier host, defaults to localhost')
|
||||
parser.add_argument('--elastic_notifier_port', default=cls.integer('ELASTIC_NOTIFIER_PORT', 19080), type=int,
|
||||
help='elasticsearch sync notifier port')
|
||||
parser.add_argument('--elastic_services',
|
||||
default=cls.default('ELASTIC_SERVICES', 'localhost:9200/localhost:19080'), type=str,
|
||||
help="Hosts and ports for elastic search and the scribe elastic sync notifier. "
|
||||
"Given as a comma separated list without spaces of items in the format "
|
||||
"<elastic host>:<elastic port>/<notifier host>:<notifier port> . "
|
||||
"Defaults to 'localhost:9200/localhost:19080'. "
|
||||
"Can be set in env with 'ELASTIC_SERVICES'")
|
||||
parser.add_argument('--es_index_prefix', default=cls.default('ES_INDEX_PREFIX', ''), type=str)
|
||||
parser.add_argument('--allow_lan_udp', action='store_true',
|
||||
help="Reply to clients on the local network", default=cls.boolean('ALLOW_LAN_UDP', False))
|
||||
|
@ -141,8 +148,8 @@ class ServerEnv(Env):
|
|||
@classmethod
|
||||
def from_arg_parser(cls, args):
|
||||
return cls(
|
||||
db_dir=args.db_dir, daemon_url=args.daemon_url, host=args.host, elastic_host=args.elastic_host,
|
||||
elastic_port=args.elastic_port, max_query_workers=args.max_query_workers, chain=args.chain,
|
||||
db_dir=args.db_dir, daemon_url=args.daemon_url, host=args.host, elastic_services=args.elastic_services,
|
||||
max_query_workers=args.max_query_workers, chain=args.chain,
|
||||
es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port,
|
||||
udp_port=args.udp_port, prometheus_port=args.prometheus_port, banner_file=args.banner_file,
|
||||
allow_lan_udp=args.allow_lan_udp, cache_all_tx_hashes=args.cache_all_tx_hashes,
|
||||
|
@ -151,8 +158,7 @@ class ServerEnv(Env):
|
|||
max_sessions=args.max_sessions, session_timeout=args.session_timeout,
|
||||
drop_client=args.drop_client, description=args.description, daily_fee=args.daily_fee,
|
||||
database_query_timeout=args.query_timeout_ms, blocking_channel_ids=args.blocking_channel_ids,
|
||||
filtering_channel_ids=args.filtering_channel_ids, elastic_notifier_host=args.elastic_notifier_host,
|
||||
elastic_notifier_port=args.elastic_notifier_port, index_address_status=args.index_address_statuses,
|
||||
filtering_channel_ids=args.filtering_channel_ids, index_address_status=args.index_address_statuses,
|
||||
address_history_cache_size=args.address_history_cache_size, daemon_ca_path=args.daemon_ca_path,
|
||||
merkle_cache_size=args.merkle_cache_size, resolved_url_cache_size=args.resolved_url_cache_size,
|
||||
tx_cache_size=args.tx_cache_size, history_tx_cache_size=args.history_tx_cache_size,
|
||||
|
|
|
@ -3,7 +3,7 @@ import asyncio
|
|||
from bisect import bisect_right
|
||||
from collections import Counter, deque
|
||||
from operator import itemgetter
|
||||
from typing import Optional, List, TYPE_CHECKING
|
||||
from typing import Optional, List, TYPE_CHECKING, Deque, Tuple
|
||||
|
||||
from elasticsearch import AsyncElasticsearch, NotFoundError, ConnectionError
|
||||
from hub.schema.result import Censor, Outputs
|
||||
|
@ -29,8 +29,9 @@ class StreamResolution(str):
|
|||
class SearchIndex:
|
||||
VERSION = 1
|
||||
|
||||
def __init__(self, hub_db: 'SecondaryDB', index_prefix: str, search_timeout=3.0, elastic_host='localhost',
|
||||
elastic_port=9200, timeout_counter: Optional['PrometheusCounter'] = None):
|
||||
def __init__(self, hub_db: 'SecondaryDB', index_prefix: str, search_timeout=3.0,
|
||||
elastic_services: Optional[Deque[Tuple[Tuple[str, int], Tuple[str, int]]]] = None,
|
||||
timeout_counter: Optional['PrometheusCounter'] = None):
|
||||
self.hub_db = hub_db
|
||||
self.search_timeout = search_timeout
|
||||
self.timeout_counter: Optional['PrometheusCounter'] = timeout_counter
|
||||
|
@ -41,8 +42,8 @@ class SearchIndex:
|
|||
self.logger = logging.getLogger(__name__)
|
||||
self.claim_cache = LRUCache(2 ** 15)
|
||||
self.search_cache = LRUCache(2 ** 17)
|
||||
self._elastic_host = elastic_host
|
||||
self._elastic_port = elastic_port
|
||||
self._elastic_services = elastic_services
|
||||
self.lost_connection = asyncio.Event()
|
||||
|
||||
async def get_index_version(self) -> int:
|
||||
try:
|
||||
|
@ -59,7 +60,7 @@ class SearchIndex:
|
|||
async def start(self) -> bool:
|
||||
if self.sync_client:
|
||||
return False
|
||||
hosts = [{'host': self._elastic_host, 'port': self._elastic_port}]
|
||||
hosts = [{'host': self._elastic_services[0][0][0], 'port': self._elastic_services[0][0][1]}]
|
||||
self.sync_client = AsyncElasticsearch(hosts, timeout=self.sync_timeout)
|
||||
self.search_client = AsyncElasticsearch(hosts, timeout=self.search_timeout+1)
|
||||
while True:
|
||||
|
|
|
@ -1,18 +1,25 @@
|
|||
import time
|
||||
import typing
|
||||
import asyncio
|
||||
from prometheus_client import Counter
|
||||
from hub import PROMETHEUS_NAMESPACE
|
||||
from hub.scribe.daemon import LBCDaemon
|
||||
from hub.herald.session import SessionManager
|
||||
from hub.herald.mempool import HubMemPool
|
||||
from hub.herald.udp import StatusServer
|
||||
from hub.herald.db import HeraldDB
|
||||
from hub.herald.search import SearchIndex
|
||||
from hub.service import BlockchainReaderService
|
||||
from hub.notifier_protocol import ElasticNotifierClientProtocol
|
||||
if typing.TYPE_CHECKING:
|
||||
from hub.herald.env import ServerEnv
|
||||
|
||||
NAMESPACE = f"{PROMETHEUS_NAMESPACE}_hub"
|
||||
|
||||
|
||||
class HubServerService(BlockchainReaderService):
|
||||
interrupt_count_metric = Counter("interrupt", "Number of interrupted queries", namespace=NAMESPACE)
|
||||
|
||||
def __init__(self, env: 'ServerEnv'):
|
||||
super().__init__(env, 'lbry-reader', thread_workers=max(1, env.max_query_workers), thread_prefix='hub-worker')
|
||||
self.env = env
|
||||
|
@ -21,8 +28,15 @@ class HubServerService(BlockchainReaderService):
|
|||
self.status_server = StatusServer()
|
||||
self.daemon = LBCDaemon(env.coin, env.daemon_url, daemon_ca_path=env.daemon_ca_path) # only needed for broadcasting txs
|
||||
self.mempool = HubMemPool(self.env.coin, self.db)
|
||||
|
||||
self.search_index = SearchIndex(
|
||||
self.db, self.env.es_index_prefix, self.env.database_query_timeout,
|
||||
elastic_services=self.env.elastic_services,
|
||||
timeout_counter=self.interrupt_count_metric
|
||||
)
|
||||
|
||||
self.session_manager = SessionManager(
|
||||
env, self.db, self.mempool, self.daemon,
|
||||
env, self.db, self.mempool, self.daemon, self.search_index,
|
||||
self.shutdown_event,
|
||||
on_available_callback=self.status_server.set_available,
|
||||
on_unavailable_callback=self.status_server.set_unavailable
|
||||
|
@ -30,7 +44,7 @@ class HubServerService(BlockchainReaderService):
|
|||
self.mempool.session_manager = self.session_manager
|
||||
self.es_notifications = asyncio.Queue()
|
||||
self.es_notification_client = ElasticNotifierClientProtocol(
|
||||
self.es_notifications, self.env.elastic_notifier_host, self.env.elastic_notifier_port
|
||||
self.es_notifications, self.env.elastic_services
|
||||
)
|
||||
self.synchronized = asyncio.Event()
|
||||
self._es_height = None
|
||||
|
@ -52,7 +66,7 @@ class HubServerService(BlockchainReaderService):
|
|||
# self.mempool.notified_mempool_txs.clear()
|
||||
|
||||
def clear_search_cache(self):
|
||||
self.session_manager.search_index.clear_caches()
|
||||
self.search_index.clear_caches()
|
||||
|
||||
def advance(self, height: int):
|
||||
super().advance(height)
|
||||
|
@ -116,8 +130,44 @@ class HubServerService(BlockchainReaderService):
|
|||
self.log.info("es and reader are not yet in sync (block %s vs %s)", self._es_height,
|
||||
self.db.db_height)
|
||||
finally:
|
||||
self.log.warning("closing es sync notification loop at %s", self._es_height)
|
||||
self.es_notification_client.close()
|
||||
|
||||
async def failover_elastic_services(self, synchronized: asyncio.Event):
|
||||
first_connect = True
|
||||
if not self.es_notification_client.lost_connection.is_set():
|
||||
synchronized.set()
|
||||
|
||||
while True:
|
||||
try:
|
||||
await self.es_notification_client.lost_connection.wait()
|
||||
if not first_connect:
|
||||
self.log.warning("lost connection to scribe-elastic-sync notifier (%s:%i)",
|
||||
self.es_notification_client.host, self.es_notification_client.port)
|
||||
await self.es_notification_client.connect()
|
||||
first_connect = False
|
||||
synchronized.set()
|
||||
self.log.info("connected to es notifier on %s:%i", self.es_notification_client.host,
|
||||
self.es_notification_client.port)
|
||||
await self.search_index.start()
|
||||
except Exception as e:
|
||||
if not isinstance(e, asyncio.CancelledError):
|
||||
self.log.warning("lost connection to scribe-elastic-sync notifier")
|
||||
await self.search_index.stop()
|
||||
self.search_index.clear_caches()
|
||||
if len(self.env.elastic_services) > 1:
|
||||
self.env.elastic_services.rotate(-1)
|
||||
self.log.warning("attempting to failover to %s:%i", self.es_notification_client.host,
|
||||
self.es_notification_client.port)
|
||||
await asyncio.sleep(1)
|
||||
else:
|
||||
self.log.warning("waiting 30s for scribe-elastic-sync notifier to become available (%s:%i)",
|
||||
self.es_notification_client.host, self.es_notification_client.port)
|
||||
await asyncio.sleep(30)
|
||||
else:
|
||||
self.log.info("stopping the notifier loop")
|
||||
raise e
|
||||
|
||||
async def start_status_server(self):
|
||||
if self.env.udp_port and int(self.env.udp_port):
|
||||
await self.status_server.start(
|
||||
|
@ -127,14 +177,13 @@ class HubServerService(BlockchainReaderService):
|
|||
|
||||
def _iter_start_tasks(self):
|
||||
yield self.start_status_server()
|
||||
yield self.start_cancellable(self.es_notification_client.maintain_connection)
|
||||
yield self.start_cancellable(self.receive_es_notifications)
|
||||
yield self.start_cancellable(self.failover_elastic_services)
|
||||
yield self.start_cancellable(self.mempool.send_notifications_forever)
|
||||
yield self.start_cancellable(self.refresh_blocks_forever)
|
||||
yield self.finished_initial_catch_up.wait()
|
||||
self.block_count_metric.set(self.last_state.height)
|
||||
yield self.start_prometheus()
|
||||
yield self.start_cancellable(self.receive_es_notifications)
|
||||
yield self.session_manager.search_index.start()
|
||||
yield self.start_cancellable(self.session_manager.serve, self.mempool)
|
||||
|
||||
def _iter_stop_tasks(self):
|
||||
|
|
|
@ -141,7 +141,6 @@ class SessionManager:
|
|||
tx_replied_count_metric = Counter("replied_transaction", "Number of transactions responded", namespace=NAMESPACE)
|
||||
urls_to_resolve_count_metric = Counter("urls_to_resolve", "Number of urls to resolve", namespace=NAMESPACE)
|
||||
resolved_url_count_metric = Counter("resolved_url", "Number of resolved urls", namespace=NAMESPACE)
|
||||
interrupt_count_metric = Counter("interrupt", "Number of interrupted queries", namespace=NAMESPACE)
|
||||
db_operational_error_metric = Counter(
|
||||
"operational_error", "Number of queries that raised operational errors", namespace=NAMESPACE
|
||||
)
|
||||
|
@ -180,7 +179,7 @@ class SessionManager:
|
|||
)
|
||||
|
||||
def __init__(self, env: 'ServerEnv', db: 'SecondaryDB', mempool: 'HubMemPool',
|
||||
daemon: 'LBCDaemon', shutdown_event: asyncio.Event,
|
||||
daemon: 'LBCDaemon', search_index: 'SearchIndex', shutdown_event: asyncio.Event,
|
||||
on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]):
|
||||
env.max_send = max(350000, env.max_send)
|
||||
self.env = env
|
||||
|
@ -189,6 +188,7 @@ class SessionManager:
|
|||
self.on_unavailable_callback = on_unavailable_callback
|
||||
self.daemon = daemon
|
||||
self.mempool = mempool
|
||||
self.search_index = search_index
|
||||
self.shutdown_event = shutdown_event
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self.servers: typing.Dict[str, asyncio.AbstractServer] = {}
|
||||
|
@ -207,12 +207,6 @@ class SessionManager:
|
|||
self.protocol_class = LBRYElectrumX
|
||||
self.session_event = Event()
|
||||
|
||||
# Search index
|
||||
self.search_index = SearchIndex(
|
||||
self.db, self.env.es_index_prefix, self.env.database_query_timeout,
|
||||
elastic_host=env.elastic_host, elastic_port=env.elastic_port,
|
||||
timeout_counter=self.interrupt_count_metric
|
||||
)
|
||||
self.running = False
|
||||
# hashX: List[int]
|
||||
self.hashX_raw_history_cache = LFUCacheWithMetrics(env.hashX_history_cache_size, metric_name='raw_history', namespace=NAMESPACE)
|
||||
|
@ -1270,7 +1264,7 @@ class LBRYElectrumX(asyncio.Protocol):
|
|||
kwargs['channel_id'] = channel_claim.claim_hash.hex()
|
||||
return await self.session_manager.search_index.cached_search(kwargs)
|
||||
except ConnectionTimeout:
|
||||
self.session_manager.interrupt_count_metric.inc()
|
||||
self.session_manager.search_index.timeout_counter.inc()
|
||||
raise RPCError(JSONRPC.QUERY_TIMEOUT, 'query timed out')
|
||||
except TooManyClaimSearchParametersError as err:
|
||||
await asyncio.sleep(2)
|
||||
|
|
|
@ -2,6 +2,7 @@ import typing
|
|||
import struct
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import Deque, Tuple
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
@ -31,52 +32,39 @@ class ElasticNotifierProtocol(asyncio.Protocol):
|
|||
class ElasticNotifierClientProtocol(asyncio.Protocol):
|
||||
"""notifies the reader when ES has written updates"""
|
||||
|
||||
def __init__(self, notifications: asyncio.Queue, host: str, port: int):
|
||||
def __init__(self, notifications: asyncio.Queue, notifier_hosts: Deque[Tuple[Tuple[str, int], Tuple[str, int]]]):
|
||||
assert len(notifier_hosts) > 0, 'no elastic notifier clients given'
|
||||
self.notifications = notifications
|
||||
self.transport: typing.Optional[asyncio.Transport] = None
|
||||
self.host = host
|
||||
self.port = port
|
||||
self._lost_connection = asyncio.Event()
|
||||
self._lost_connection.set()
|
||||
self._notifier_hosts = notifier_hosts
|
||||
self.lost_connection = asyncio.Event()
|
||||
self.lost_connection.set()
|
||||
|
||||
@property
|
||||
def host(self):
|
||||
return self._notifier_hosts[0][1][0]
|
||||
|
||||
@property
|
||||
def port(self):
|
||||
return self._notifier_hosts[0][1][1]
|
||||
|
||||
async def connect(self):
|
||||
if self._lost_connection.is_set():
|
||||
if self.lost_connection.is_set():
|
||||
await asyncio.get_event_loop().create_connection(
|
||||
lambda: self, self.host, self.port
|
||||
)
|
||||
|
||||
async def maintain_connection(self, synchronized: asyncio.Event):
|
||||
first_connect = True
|
||||
if not self._lost_connection.is_set():
|
||||
synchronized.set()
|
||||
while True:
|
||||
try:
|
||||
await self._lost_connection.wait()
|
||||
if not first_connect:
|
||||
log.warning("lost connection to scribe-elastic-sync notifier")
|
||||
await self.connect()
|
||||
first_connect = False
|
||||
synchronized.set()
|
||||
log.info("connected to es notifier")
|
||||
except Exception as e:
|
||||
if not isinstance(e, asyncio.CancelledError):
|
||||
log.warning("waiting 30s for scribe-elastic-sync notifier to become available (%s:%i)", self.host, self.port)
|
||||
await asyncio.sleep(30)
|
||||
else:
|
||||
log.info("stopping the notifier loop")
|
||||
raise e
|
||||
|
||||
def close(self):
|
||||
if self.transport and not self.transport.is_closing():
|
||||
self.transport.close()
|
||||
|
||||
def connection_made(self, transport):
|
||||
self.transport = transport
|
||||
self._lost_connection.clear()
|
||||
self.lost_connection.clear()
|
||||
|
||||
def connection_lost(self, exc) -> None:
|
||||
self.transport = None
|
||||
self._lost_connection.set()
|
||||
self.lost_connection.set()
|
||||
|
||||
def data_received(self, data: bytes) -> None:
|
||||
try:
|
||||
|
|
Loading…
Reference in a new issue
Inefficient regular expression
This part of the regular expression may cause exponential backtracking on strings containing many repetitions of '.'.
Show more details
Inefficient regular expression
This part of the regular expression may cause exponential backtracking on strings starting with ':/' and containing many repetitions of '.'.
Show more details