Elasticsearch and elastic sync notifier failover (WIP) #95
6 changed files with 115 additions and 74 deletions
|
@ -73,7 +73,10 @@ class ElasticSyncService(BlockchainReaderService):
|
||||||
info = {}
|
info = {}
|
||||||
if os.path.exists(self._es_info_path):
|
if os.path.exists(self._es_info_path):
|
||||||
with open(self._es_info_path, 'r') as f:
|
with open(self._es_info_path, 'r') as f:
|
||||||
info.update(json.loads(f.read()))
|
try:
|
||||||
|
info.update(json.loads(f.read()))
|
||||||
|
except json.decoder.JSONDecodeError:
|
||||||
|
self.log.warning('failed to parse es sync status file')
|
||||||
self._last_wrote_height = int(info.get('height', 0))
|
self._last_wrote_height = int(info.get('height', 0))
|
||||||
self._last_wrote_block_hash = info.get('block_hash', None)
|
self._last_wrote_block_hash = info.get('block_hash', None)
|
||||||
|
|
||||||
|
|
|
@ -1,29 +1,38 @@
|
||||||
import re
|
import re
|
||||||
|
from collections import deque
|
||||||
from hub.env import Env
|
from hub.env import Env
|
||||||
|
|
||||||
|
ELASTIC_SERVICES_REGEX = re.compile("(([\d|\.]|[^,:\/])*:\d*\/([\d|\.]|[^,:\/])*:\d*,?)*")
|
||||||
|
|||||||
|
|
||||||
|
|
||||||
|
def parse_es_services(elastic_services_arg: str):
|
||||||
|
match = ELASTIC_SERVICES_REGEX.match(elastic_services_arg)
|
||||||
|
if not match:
|
||||||
|
return []
|
||||||
|
matching = match.group()
|
||||||
|
services = [item.split('/') for item in matching.split(',') if item]
|
||||||
|
return [
|
||||||
|
((es.split(':')[0], int(es.split(':')[1])), (notifier.split(':')[0], int(notifier.split(':')[1])))
|
||||||
|
for (es, notifier) in services
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
class ServerEnv(Env):
|
class ServerEnv(Env):
|
||||||
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
|
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
|
||||||
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
|
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
|
||||||
daemon_url=None, host=None, elastic_host=None, elastic_port=None, es_index_prefix=None,
|
daemon_url=None, host=None, elastic_services=None, es_index_prefix=None,
|
||||||
tcp_port=None, udp_port=None, banner_file=None, allow_lan_udp=None, country=None,
|
tcp_port=None, udp_port=None, banner_file=None, allow_lan_udp=None, country=None,
|
||||||
payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None,
|
payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None,
|
||||||
session_timeout=None, drop_client=None, description=None, daily_fee=None,
|
session_timeout=None, drop_client=None, description=None, daily_fee=None,
|
||||||
database_query_timeout=None, elastic_notifier_host=None, elastic_notifier_port=None,
|
database_query_timeout=None, blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None,
|
||||||
blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, peer_announce=None,
|
peer_announce=None, index_address_status=None, address_history_cache_size=None, daemon_ca_path=None,
|
||||||
index_address_status=None, address_history_cache_size=None, daemon_ca_path=None,
|
|
||||||
merkle_cache_size=None, resolved_url_cache_size=None, tx_cache_size=None,
|
merkle_cache_size=None, resolved_url_cache_size=None, tx_cache_size=None,
|
||||||
history_tx_cache_size=None, largest_address_history_cache_size=None):
|
history_tx_cache_size=None, largest_address_history_cache_size=None):
|
||||||
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
|
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
|
||||||
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status)
|
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status)
|
||||||
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
|
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
|
||||||
self.host = host if host is not None else self.default('HOST', 'localhost')
|
self.host = host if host is not None else self.default('HOST', 'localhost')
|
||||||
self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost')
|
self.elastic_services = deque(parse_es_services(elastic_services or 'localhost:9200/localhost:19080'))
|
||||||
self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200)
|
|
||||||
self.elastic_notifier_host = elastic_notifier_host if elastic_notifier_host is not None else self.default(
|
|
||||||
'ELASTIC_NOTIFIER_HOST', 'localhost')
|
|
||||||
self.elastic_notifier_port = elastic_notifier_port if elastic_notifier_port is not None else self.integer(
|
|
||||||
'ELASTIC_NOTIFIER_PORT', 19080)
|
|
||||||
self.es_index_prefix = es_index_prefix if es_index_prefix is not None else self.default('ES_INDEX_PREFIX', '')
|
self.es_index_prefix = es_index_prefix if es_index_prefix is not None else self.default('ES_INDEX_PREFIX', '')
|
||||||
# Server stuff
|
# Server stuff
|
||||||
self.tcp_port = tcp_port if tcp_port is not None else self.integer('TCP_PORT', None)
|
self.tcp_port = tcp_port if tcp_port is not None else self.integer('TCP_PORT', None)
|
||||||
|
@ -93,15 +102,13 @@ class ServerEnv(Env):
|
||||||
help="Regex used for blocking clients")
|
help="Regex used for blocking clients")
|
||||||
parser.add_argument('--session_timeout', type=int, default=cls.integer('SESSION_TIMEOUT', 600),
|
parser.add_argument('--session_timeout', type=int, default=cls.integer('SESSION_TIMEOUT', 600),
|
||||||
help="Session inactivity timeout")
|
help="Session inactivity timeout")
|
||||||
parser.add_argument('--elastic_host', default=cls.default('ELASTIC_HOST', 'localhost'), type=str,
|
parser.add_argument('--elastic_services',
|
||||||
help="Hostname or ip address of the elasticsearch instance to connect to. "
|
default=cls.default('ELASTIC_SERVICES', 'localhost:9200/localhost:19080'), type=str,
|
||||||
"Can be set in env with 'ELASTIC_HOST'")
|
help="Hosts and ports for elastic search and the scribe elastic sync notifier. "
|
||||||
parser.add_argument('--elastic_port', default=cls.integer('ELASTIC_PORT', 9200), type=int,
|
"Given as a comma separated list without spaces of items in the format "
|
||||||
help="Elasticsearch port to connect to. Can be set in env with 'ELASTIC_PORT'")
|
"<elastic host>:<elastic port>/<notifier host>:<notifier port> . "
|
||||||
parser.add_argument('--elastic_notifier_host', default=cls.default('ELASTIC_NOTIFIER_HOST', 'localhost'),
|
"Defaults to 'localhost:9200/localhost:19080'. "
|
||||||
type=str, help='elasticsearch sync notifier host, defaults to localhost')
|
"Can be set in env with 'ELASTIC_SERVICES'")
|
||||||
parser.add_argument('--elastic_notifier_port', default=cls.integer('ELASTIC_NOTIFIER_PORT', 19080), type=int,
|
|
||||||
help='elasticsearch sync notifier port')
|
|
||||||
parser.add_argument('--es_index_prefix', default=cls.default('ES_INDEX_PREFIX', ''), type=str)
|
parser.add_argument('--es_index_prefix', default=cls.default('ES_INDEX_PREFIX', ''), type=str)
|
||||||
parser.add_argument('--allow_lan_udp', action='store_true',
|
parser.add_argument('--allow_lan_udp', action='store_true',
|
||||||
help="Reply to clients on the local network", default=cls.boolean('ALLOW_LAN_UDP', False))
|
help="Reply to clients on the local network", default=cls.boolean('ALLOW_LAN_UDP', False))
|
||||||
|
@ -141,8 +148,8 @@ class ServerEnv(Env):
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_arg_parser(cls, args):
|
def from_arg_parser(cls, args):
|
||||||
return cls(
|
return cls(
|
||||||
db_dir=args.db_dir, daemon_url=args.daemon_url, host=args.host, elastic_host=args.elastic_host,
|
db_dir=args.db_dir, daemon_url=args.daemon_url, host=args.host, elastic_services=args.elastic_services,
|
||||||
elastic_port=args.elastic_port, max_query_workers=args.max_query_workers, chain=args.chain,
|
max_query_workers=args.max_query_workers, chain=args.chain,
|
||||||
es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port,
|
es_index_prefix=args.es_index_prefix, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port,
|
||||||
udp_port=args.udp_port, prometheus_port=args.prometheus_port, banner_file=args.banner_file,
|
udp_port=args.udp_port, prometheus_port=args.prometheus_port, banner_file=args.banner_file,
|
||||||
allow_lan_udp=args.allow_lan_udp, cache_all_tx_hashes=args.cache_all_tx_hashes,
|
allow_lan_udp=args.allow_lan_udp, cache_all_tx_hashes=args.cache_all_tx_hashes,
|
||||||
|
@ -151,8 +158,7 @@ class ServerEnv(Env):
|
||||||
max_sessions=args.max_sessions, session_timeout=args.session_timeout,
|
max_sessions=args.max_sessions, session_timeout=args.session_timeout,
|
||||||
drop_client=args.drop_client, description=args.description, daily_fee=args.daily_fee,
|
drop_client=args.drop_client, description=args.description, daily_fee=args.daily_fee,
|
||||||
database_query_timeout=args.query_timeout_ms, blocking_channel_ids=args.blocking_channel_ids,
|
database_query_timeout=args.query_timeout_ms, blocking_channel_ids=args.blocking_channel_ids,
|
||||||
filtering_channel_ids=args.filtering_channel_ids, elastic_notifier_host=args.elastic_notifier_host,
|
filtering_channel_ids=args.filtering_channel_ids, index_address_status=args.index_address_statuses,
|
||||||
elastic_notifier_port=args.elastic_notifier_port, index_address_status=args.index_address_statuses,
|
|
||||||
address_history_cache_size=args.address_history_cache_size, daemon_ca_path=args.daemon_ca_path,
|
address_history_cache_size=args.address_history_cache_size, daemon_ca_path=args.daemon_ca_path,
|
||||||
merkle_cache_size=args.merkle_cache_size, resolved_url_cache_size=args.resolved_url_cache_size,
|
merkle_cache_size=args.merkle_cache_size, resolved_url_cache_size=args.resolved_url_cache_size,
|
||||||
tx_cache_size=args.tx_cache_size, history_tx_cache_size=args.history_tx_cache_size,
|
tx_cache_size=args.tx_cache_size, history_tx_cache_size=args.history_tx_cache_size,
|
||||||
|
|
|
@ -3,7 +3,7 @@ import asyncio
|
||||||
from bisect import bisect_right
|
from bisect import bisect_right
|
||||||
from collections import Counter, deque
|
from collections import Counter, deque
|
||||||
from operator import itemgetter
|
from operator import itemgetter
|
||||||
from typing import Optional, List, TYPE_CHECKING
|
from typing import Optional, List, TYPE_CHECKING, Deque, Tuple
|
||||||
|
|
||||||
from elasticsearch import AsyncElasticsearch, NotFoundError, ConnectionError
|
from elasticsearch import AsyncElasticsearch, NotFoundError, ConnectionError
|
||||||
from hub.schema.result import Censor, Outputs
|
from hub.schema.result import Censor, Outputs
|
||||||
|
@ -29,8 +29,9 @@ class StreamResolution(str):
|
||||||
class SearchIndex:
|
class SearchIndex:
|
||||||
VERSION = 1
|
VERSION = 1
|
||||||
|
|
||||||
def __init__(self, hub_db: 'SecondaryDB', index_prefix: str, search_timeout=3.0, elastic_host='localhost',
|
def __init__(self, hub_db: 'SecondaryDB', index_prefix: str, search_timeout=3.0,
|
||||||
elastic_port=9200, timeout_counter: Optional['PrometheusCounter'] = None):
|
elastic_services: Optional[Deque[Tuple[Tuple[str, int], Tuple[str, int]]]] = None,
|
||||||
|
timeout_counter: Optional['PrometheusCounter'] = None):
|
||||||
self.hub_db = hub_db
|
self.hub_db = hub_db
|
||||||
self.search_timeout = search_timeout
|
self.search_timeout = search_timeout
|
||||||
self.timeout_counter: Optional['PrometheusCounter'] = timeout_counter
|
self.timeout_counter: Optional['PrometheusCounter'] = timeout_counter
|
||||||
|
@ -41,8 +42,8 @@ class SearchIndex:
|
||||||
self.logger = logging.getLogger(__name__)
|
self.logger = logging.getLogger(__name__)
|
||||||
self.claim_cache = LRUCache(2 ** 15)
|
self.claim_cache = LRUCache(2 ** 15)
|
||||||
self.search_cache = LRUCache(2 ** 17)
|
self.search_cache = LRUCache(2 ** 17)
|
||||||
self._elastic_host = elastic_host
|
self._elastic_services = elastic_services
|
||||||
self._elastic_port = elastic_port
|
self.lost_connection = asyncio.Event()
|
||||||
|
|
||||||
async def get_index_version(self) -> int:
|
async def get_index_version(self) -> int:
|
||||||
try:
|
try:
|
||||||
|
@ -59,7 +60,7 @@ class SearchIndex:
|
||||||
async def start(self) -> bool:
|
async def start(self) -> bool:
|
||||||
if self.sync_client:
|
if self.sync_client:
|
||||||
return False
|
return False
|
||||||
hosts = [{'host': self._elastic_host, 'port': self._elastic_port}]
|
hosts = [{'host': self._elastic_services[0][0][0], 'port': self._elastic_services[0][0][1]}]
|
||||||
self.sync_client = AsyncElasticsearch(hosts, timeout=self.sync_timeout)
|
self.sync_client = AsyncElasticsearch(hosts, timeout=self.sync_timeout)
|
||||||
self.search_client = AsyncElasticsearch(hosts, timeout=self.search_timeout+1)
|
self.search_client = AsyncElasticsearch(hosts, timeout=self.search_timeout+1)
|
||||||
while True:
|
while True:
|
||||||
|
|
|
@ -1,18 +1,25 @@
|
||||||
import time
|
import time
|
||||||
import typing
|
import typing
|
||||||
import asyncio
|
import asyncio
|
||||||
|
from prometheus_client import Counter
|
||||||
|
from hub import PROMETHEUS_NAMESPACE
|
||||||
from hub.scribe.daemon import LBCDaemon
|
from hub.scribe.daemon import LBCDaemon
|
||||||
from hub.herald.session import SessionManager
|
from hub.herald.session import SessionManager
|
||||||
from hub.herald.mempool import HubMemPool
|
from hub.herald.mempool import HubMemPool
|
||||||
from hub.herald.udp import StatusServer
|
from hub.herald.udp import StatusServer
|
||||||
from hub.herald.db import HeraldDB
|
from hub.herald.db import HeraldDB
|
||||||
|
from hub.herald.search import SearchIndex
|
||||||
from hub.service import BlockchainReaderService
|
from hub.service import BlockchainReaderService
|
||||||
from hub.notifier_protocol import ElasticNotifierClientProtocol
|
from hub.notifier_protocol import ElasticNotifierClientProtocol
|
||||||
if typing.TYPE_CHECKING:
|
if typing.TYPE_CHECKING:
|
||||||
from hub.herald.env import ServerEnv
|
from hub.herald.env import ServerEnv
|
||||||
|
|
||||||
|
NAMESPACE = f"{PROMETHEUS_NAMESPACE}_hub"
|
||||||
|
|
||||||
|
|
||||||
class HubServerService(BlockchainReaderService):
|
class HubServerService(BlockchainReaderService):
|
||||||
|
interrupt_count_metric = Counter("interrupt", "Number of interrupted queries", namespace=NAMESPACE)
|
||||||
|
|
||||||
def __init__(self, env: 'ServerEnv'):
|
def __init__(self, env: 'ServerEnv'):
|
||||||
super().__init__(env, 'lbry-reader', thread_workers=max(1, env.max_query_workers), thread_prefix='hub-worker')
|
super().__init__(env, 'lbry-reader', thread_workers=max(1, env.max_query_workers), thread_prefix='hub-worker')
|
||||||
self.env = env
|
self.env = env
|
||||||
|
@ -21,8 +28,15 @@ class HubServerService(BlockchainReaderService):
|
||||||
self.status_server = StatusServer()
|
self.status_server = StatusServer()
|
||||||
self.daemon = LBCDaemon(env.coin, env.daemon_url, daemon_ca_path=env.daemon_ca_path) # only needed for broadcasting txs
|
self.daemon = LBCDaemon(env.coin, env.daemon_url, daemon_ca_path=env.daemon_ca_path) # only needed for broadcasting txs
|
||||||
self.mempool = HubMemPool(self.env.coin, self.db)
|
self.mempool = HubMemPool(self.env.coin, self.db)
|
||||||
|
|
||||||
|
self.search_index = SearchIndex(
|
||||||
|
self.db, self.env.es_index_prefix, self.env.database_query_timeout,
|
||||||
|
elastic_services=self.env.elastic_services,
|
||||||
|
timeout_counter=self.interrupt_count_metric
|
||||||
|
)
|
||||||
|
|
||||||
self.session_manager = SessionManager(
|
self.session_manager = SessionManager(
|
||||||
env, self.db, self.mempool, self.daemon,
|
env, self.db, self.mempool, self.daemon, self.search_index,
|
||||||
self.shutdown_event,
|
self.shutdown_event,
|
||||||
on_available_callback=self.status_server.set_available,
|
on_available_callback=self.status_server.set_available,
|
||||||
on_unavailable_callback=self.status_server.set_unavailable
|
on_unavailable_callback=self.status_server.set_unavailable
|
||||||
|
@ -30,7 +44,7 @@ class HubServerService(BlockchainReaderService):
|
||||||
self.mempool.session_manager = self.session_manager
|
self.mempool.session_manager = self.session_manager
|
||||||
self.es_notifications = asyncio.Queue()
|
self.es_notifications = asyncio.Queue()
|
||||||
self.es_notification_client = ElasticNotifierClientProtocol(
|
self.es_notification_client = ElasticNotifierClientProtocol(
|
||||||
self.es_notifications, self.env.elastic_notifier_host, self.env.elastic_notifier_port
|
self.es_notifications, self.env.elastic_services
|
||||||
)
|
)
|
||||||
self.synchronized = asyncio.Event()
|
self.synchronized = asyncio.Event()
|
||||||
self._es_height = None
|
self._es_height = None
|
||||||
|
@ -52,7 +66,7 @@ class HubServerService(BlockchainReaderService):
|
||||||
# self.mempool.notified_mempool_txs.clear()
|
# self.mempool.notified_mempool_txs.clear()
|
||||||
|
|
||||||
def clear_search_cache(self):
|
def clear_search_cache(self):
|
||||||
self.session_manager.search_index.clear_caches()
|
self.search_index.clear_caches()
|
||||||
|
|
||||||
def advance(self, height: int):
|
def advance(self, height: int):
|
||||||
super().advance(height)
|
super().advance(height)
|
||||||
|
@ -116,8 +130,44 @@ class HubServerService(BlockchainReaderService):
|
||||||
self.log.info("es and reader are not yet in sync (block %s vs %s)", self._es_height,
|
self.log.info("es and reader are not yet in sync (block %s vs %s)", self._es_height,
|
||||||
self.db.db_height)
|
self.db.db_height)
|
||||||
finally:
|
finally:
|
||||||
|
self.log.warning("closing es sync notification loop at %s", self._es_height)
|
||||||
self.es_notification_client.close()
|
self.es_notification_client.close()
|
||||||
|
|
||||||
|
async def failover_elastic_services(self, synchronized: asyncio.Event):
|
||||||
|
first_connect = True
|
||||||
|
if not self.es_notification_client.lost_connection.is_set():
|
||||||
|
synchronized.set()
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
await self.es_notification_client.lost_connection.wait()
|
||||||
|
if not first_connect:
|
||||||
|
self.log.warning("lost connection to scribe-elastic-sync notifier (%s:%i)",
|
||||||
|
self.es_notification_client.host, self.es_notification_client.port)
|
||||||
|
await self.es_notification_client.connect()
|
||||||
|
first_connect = False
|
||||||
|
synchronized.set()
|
||||||
|
self.log.info("connected to es notifier on %s:%i", self.es_notification_client.host,
|
||||||
|
self.es_notification_client.port)
|
||||||
|
await self.search_index.start()
|
||||||
|
except Exception as e:
|
||||||
|
if not isinstance(e, asyncio.CancelledError):
|
||||||
|
self.log.warning("lost connection to scribe-elastic-sync notifier")
|
||||||
|
await self.search_index.stop()
|
||||||
|
self.search_index.clear_caches()
|
||||||
|
if len(self.env.elastic_services) > 1:
|
||||||
|
self.env.elastic_services.rotate(-1)
|
||||||
|
self.log.warning("attempting to failover to %s:%i", self.es_notification_client.host,
|
||||||
|
self.es_notification_client.port)
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
else:
|
||||||
|
self.log.warning("waiting 30s for scribe-elastic-sync notifier to become available (%s:%i)",
|
||||||
|
self.es_notification_client.host, self.es_notification_client.port)
|
||||||
|
await asyncio.sleep(30)
|
||||||
|
else:
|
||||||
|
self.log.info("stopping the notifier loop")
|
||||||
|
raise e
|
||||||
|
|
||||||
async def start_status_server(self):
|
async def start_status_server(self):
|
||||||
if self.env.udp_port and int(self.env.udp_port):
|
if self.env.udp_port and int(self.env.udp_port):
|
||||||
await self.status_server.start(
|
await self.status_server.start(
|
||||||
|
@ -127,14 +177,13 @@ class HubServerService(BlockchainReaderService):
|
||||||
|
|
||||||
def _iter_start_tasks(self):
|
def _iter_start_tasks(self):
|
||||||
yield self.start_status_server()
|
yield self.start_status_server()
|
||||||
yield self.start_cancellable(self.es_notification_client.maintain_connection)
|
yield self.start_cancellable(self.receive_es_notifications)
|
||||||
|
yield self.start_cancellable(self.failover_elastic_services)
|
||||||
yield self.start_cancellable(self.mempool.send_notifications_forever)
|
yield self.start_cancellable(self.mempool.send_notifications_forever)
|
||||||
yield self.start_cancellable(self.refresh_blocks_forever)
|
yield self.start_cancellable(self.refresh_blocks_forever)
|
||||||
yield self.finished_initial_catch_up.wait()
|
yield self.finished_initial_catch_up.wait()
|
||||||
self.block_count_metric.set(self.last_state.height)
|
self.block_count_metric.set(self.last_state.height)
|
||||||
yield self.start_prometheus()
|
yield self.start_prometheus()
|
||||||
yield self.start_cancellable(self.receive_es_notifications)
|
|
||||||
yield self.session_manager.search_index.start()
|
|
||||||
yield self.start_cancellable(self.session_manager.serve, self.mempool)
|
yield self.start_cancellable(self.session_manager.serve, self.mempool)
|
||||||
|
|
||||||
def _iter_stop_tasks(self):
|
def _iter_stop_tasks(self):
|
||||||
|
|
|
@ -141,7 +141,6 @@ class SessionManager:
|
||||||
tx_replied_count_metric = Counter("replied_transaction", "Number of transactions responded", namespace=NAMESPACE)
|
tx_replied_count_metric = Counter("replied_transaction", "Number of transactions responded", namespace=NAMESPACE)
|
||||||
urls_to_resolve_count_metric = Counter("urls_to_resolve", "Number of urls to resolve", namespace=NAMESPACE)
|
urls_to_resolve_count_metric = Counter("urls_to_resolve", "Number of urls to resolve", namespace=NAMESPACE)
|
||||||
resolved_url_count_metric = Counter("resolved_url", "Number of resolved urls", namespace=NAMESPACE)
|
resolved_url_count_metric = Counter("resolved_url", "Number of resolved urls", namespace=NAMESPACE)
|
||||||
interrupt_count_metric = Counter("interrupt", "Number of interrupted queries", namespace=NAMESPACE)
|
|
||||||
db_operational_error_metric = Counter(
|
db_operational_error_metric = Counter(
|
||||||
"operational_error", "Number of queries that raised operational errors", namespace=NAMESPACE
|
"operational_error", "Number of queries that raised operational errors", namespace=NAMESPACE
|
||||||
)
|
)
|
||||||
|
@ -180,7 +179,7 @@ class SessionManager:
|
||||||
)
|
)
|
||||||
|
|
||||||
def __init__(self, env: 'ServerEnv', db: 'SecondaryDB', mempool: 'HubMemPool',
|
def __init__(self, env: 'ServerEnv', db: 'SecondaryDB', mempool: 'HubMemPool',
|
||||||
daemon: 'LBCDaemon', shutdown_event: asyncio.Event,
|
daemon: 'LBCDaemon', search_index: 'SearchIndex', shutdown_event: asyncio.Event,
|
||||||
on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]):
|
on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]):
|
||||||
env.max_send = max(350000, env.max_send)
|
env.max_send = max(350000, env.max_send)
|
||||||
self.env = env
|
self.env = env
|
||||||
|
@ -189,6 +188,7 @@ class SessionManager:
|
||||||
self.on_unavailable_callback = on_unavailable_callback
|
self.on_unavailable_callback = on_unavailable_callback
|
||||||
self.daemon = daemon
|
self.daemon = daemon
|
||||||
self.mempool = mempool
|
self.mempool = mempool
|
||||||
|
self.search_index = search_index
|
||||||
self.shutdown_event = shutdown_event
|
self.shutdown_event = shutdown_event
|
||||||
self.logger = logging.getLogger(__name__)
|
self.logger = logging.getLogger(__name__)
|
||||||
self.servers: typing.Dict[str, asyncio.AbstractServer] = {}
|
self.servers: typing.Dict[str, asyncio.AbstractServer] = {}
|
||||||
|
@ -207,12 +207,6 @@ class SessionManager:
|
||||||
self.protocol_class = LBRYElectrumX
|
self.protocol_class = LBRYElectrumX
|
||||||
self.session_event = Event()
|
self.session_event = Event()
|
||||||
|
|
||||||
# Search index
|
|
||||||
self.search_index = SearchIndex(
|
|
||||||
self.db, self.env.es_index_prefix, self.env.database_query_timeout,
|
|
||||||
elastic_host=env.elastic_host, elastic_port=env.elastic_port,
|
|
||||||
timeout_counter=self.interrupt_count_metric
|
|
||||||
)
|
|
||||||
self.running = False
|
self.running = False
|
||||||
# hashX: List[int]
|
# hashX: List[int]
|
||||||
self.hashX_raw_history_cache = LFUCacheWithMetrics(env.hashX_history_cache_size, metric_name='raw_history', namespace=NAMESPACE)
|
self.hashX_raw_history_cache = LFUCacheWithMetrics(env.hashX_history_cache_size, metric_name='raw_history', namespace=NAMESPACE)
|
||||||
|
@ -1270,7 +1264,7 @@ class LBRYElectrumX(asyncio.Protocol):
|
||||||
kwargs['channel_id'] = channel_claim.claim_hash.hex()
|
kwargs['channel_id'] = channel_claim.claim_hash.hex()
|
||||||
return await self.session_manager.search_index.cached_search(kwargs)
|
return await self.session_manager.search_index.cached_search(kwargs)
|
||||||
except ConnectionTimeout:
|
except ConnectionTimeout:
|
||||||
self.session_manager.interrupt_count_metric.inc()
|
self.session_manager.search_index.timeout_counter.inc()
|
||||||
raise RPCError(JSONRPC.QUERY_TIMEOUT, 'query timed out')
|
raise RPCError(JSONRPC.QUERY_TIMEOUT, 'query timed out')
|
||||||
except TooManyClaimSearchParametersError as err:
|
except TooManyClaimSearchParametersError as err:
|
||||||
await asyncio.sleep(2)
|
await asyncio.sleep(2)
|
||||||
|
|
|
@ -2,6 +2,7 @@ import typing
|
||||||
import struct
|
import struct
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
|
from typing import Deque, Tuple
|
||||||
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
@ -31,52 +32,39 @@ class ElasticNotifierProtocol(asyncio.Protocol):
|
||||||
class ElasticNotifierClientProtocol(asyncio.Protocol):
|
class ElasticNotifierClientProtocol(asyncio.Protocol):
|
||||||
"""notifies the reader when ES has written updates"""
|
"""notifies the reader when ES has written updates"""
|
||||||
|
|
||||||
def __init__(self, notifications: asyncio.Queue, host: str, port: int):
|
def __init__(self, notifications: asyncio.Queue, notifier_hosts: Deque[Tuple[Tuple[str, int], Tuple[str, int]]]):
|
||||||
|
assert len(notifier_hosts) > 0, 'no elastic notifier clients given'
|
||||||
self.notifications = notifications
|
self.notifications = notifications
|
||||||
self.transport: typing.Optional[asyncio.Transport] = None
|
self.transport: typing.Optional[asyncio.Transport] = None
|
||||||
self.host = host
|
self._notifier_hosts = notifier_hosts
|
||||||
self.port = port
|
self.lost_connection = asyncio.Event()
|
||||||
self._lost_connection = asyncio.Event()
|
self.lost_connection.set()
|
||||||
self._lost_connection.set()
|
|
||||||
|
@property
|
||||||
|
def host(self):
|
||||||
|
return self._notifier_hosts[0][1][0]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def port(self):
|
||||||
|
return self._notifier_hosts[0][1][1]
|
||||||
|
|
||||||
async def connect(self):
|
async def connect(self):
|
||||||
if self._lost_connection.is_set():
|
if self.lost_connection.is_set():
|
||||||
await asyncio.get_event_loop().create_connection(
|
await asyncio.get_event_loop().create_connection(
|
||||||
lambda: self, self.host, self.port
|
lambda: self, self.host, self.port
|
||||||
)
|
)
|
||||||
|
|
||||||
async def maintain_connection(self, synchronized: asyncio.Event):
|
|
||||||
first_connect = True
|
|
||||||
if not self._lost_connection.is_set():
|
|
||||||
synchronized.set()
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
await self._lost_connection.wait()
|
|
||||||
if not first_connect:
|
|
||||||
log.warning("lost connection to scribe-elastic-sync notifier")
|
|
||||||
await self.connect()
|
|
||||||
first_connect = False
|
|
||||||
synchronized.set()
|
|
||||||
log.info("connected to es notifier")
|
|
||||||
except Exception as e:
|
|
||||||
if not isinstance(e, asyncio.CancelledError):
|
|
||||||
log.warning("waiting 30s for scribe-elastic-sync notifier to become available (%s:%i)", self.host, self.port)
|
|
||||||
await asyncio.sleep(30)
|
|
||||||
else:
|
|
||||||
log.info("stopping the notifier loop")
|
|
||||||
raise e
|
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
if self.transport and not self.transport.is_closing():
|
if self.transport and not self.transport.is_closing():
|
||||||
self.transport.close()
|
self.transport.close()
|
||||||
|
|
||||||
def connection_made(self, transport):
|
def connection_made(self, transport):
|
||||||
self.transport = transport
|
self.transport = transport
|
||||||
self._lost_connection.clear()
|
self.lost_connection.clear()
|
||||||
|
|
||||||
def connection_lost(self, exc) -> None:
|
def connection_lost(self, exc) -> None:
|
||||||
self.transport = None
|
self.transport = None
|
||||||
self._lost_connection.set()
|
self.lost_connection.set()
|
||||||
|
|
||||||
def data_received(self, data: bytes) -> None:
|
def data_received(self, data: bytes) -> None:
|
||||||
try:
|
try:
|
||||||
|
|
Loading…
Reference in a new issue
Inefficient regular expression
This part of the regular expression may cause exponential backtracking on strings containing many repetitions of '.'.
Show more details
Inefficient regular expression
This part of the regular expression may cause exponential backtracking on strings starting with ':/' and containing many repetitions of '.'.
Show more details