add elastic_notifier_host cli arg and env setting

fixes https://github.com/lbryio/scribe/issues/21
This commit is contained in:
Jack Robison 2022-04-14 11:50:40 -04:00
parent 023fca5780
commit 4574680d9b
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
3 changed files with 16 additions and 8 deletions

View file

@ -44,9 +44,10 @@ class ElasticSyncService(BlockchainReaderService):
async def run_es_notifier(self, synchronized: asyncio.Event): async def run_es_notifier(self, synchronized: asyncio.Event):
server = await asyncio.get_event_loop().create_server( server = await asyncio.get_event_loop().create_server(
lambda: ElasticNotifierProtocol(self._listeners), '127.0.0.1', self.env.elastic_notifier_port lambda: ElasticNotifierProtocol(self._listeners), self.env.elastic_notifier_host, self.env.elastic_notifier_port
) )
self.log.info("ES notifier server listening on TCP localhost:%i", self.env.elastic_notifier_port) self.log.info("ES notifier server listening on TCP %s:%i", self.env.elastic_notifier_host,
self.env.elastic_notifier_port)
synchronized.set() synchronized.set()
async with server: async with server:
await server.serve_forever() await server.serve_forever()

View file

@ -38,8 +38,9 @@ class Env:
allow_lan_udp=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, country=None, allow_lan_udp=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, country=None,
payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None, payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None,
session_timeout=None, drop_client=None, description=None, daily_fee=None, session_timeout=None, drop_client=None, description=None, daily_fee=None,
database_query_timeout=None, db_max_open_files=64, elastic_notifier_port=None, database_query_timeout=None, db_max_open_files=64, elastic_notifier_host=None,
blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, peer_announce=None): elastic_notifier_port=None, blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None,
peer_announce=None):
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.db_dir = db_dir if db_dir is not None else self.required('DB_DIRECTORY') self.db_dir = db_dir if db_dir is not None else self.required('DB_DIRECTORY')
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL') self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
@ -48,6 +49,7 @@ class Env:
self.host = host if host is not None else self.default('HOST', 'localhost') self.host = host if host is not None else self.default('HOST', 'localhost')
self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost') self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost')
self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200) self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200)
self.elastic_notifier_host = elastic_notifier_host if elastic_notifier_host is not None else self.default('ELASTIC_NOTIFIER_HOST', 'localhost')
self.elastic_notifier_port = elastic_notifier_port if elastic_notifier_port is not None else self.integer('ELASTIC_NOTIFIER_PORT', 19080) self.elastic_notifier_port = elastic_notifier_port if elastic_notifier_port is not None else self.integer('ELASTIC_NOTIFIER_PORT', 19080)
self.loop_policy = self.set_event_loop_policy( self.loop_policy = self.set_event_loop_policy(
@ -309,9 +311,13 @@ class Env:
help='Path to SSL key file') help='Path to SSL key file')
parser.add_argument('--reorg_limit', default=cls.integer('REORG_LIMIT', 200), type=int, help='Max reorg depth') parser.add_argument('--reorg_limit', default=cls.integer('REORG_LIMIT', 200), type=int, help='Max reorg depth')
parser.add_argument('--elastic_host', default=cls.default('ELASTIC_HOST', 'localhost'), type=str, parser.add_argument('--elastic_host', default=cls.default('ELASTIC_HOST', 'localhost'), type=str,
help='elasticsearch host') help='elasticsearch host, defaults to localhost')
parser.add_argument('--elastic_port', default=cls.integer('ELASTIC_PORT', 9200), type=int, parser.add_argument('--elastic_port', default=cls.integer('ELASTIC_PORT', 9200), type=int,
help='elasticsearch port') help='elasticsearch port, defaults to 9200')
parser.add_argument('--elastic_notifier_host', default=cls.default('ELASTIC_NOTIFIER_HOST', 'localhost'),
type=str, help='elasticsearch sync notifier host, defaults to localhost')
parser.add_argument('--elastic_notifier_port', default=cls.integer('ELASTIC_NOTIFIER_PORT', 19080), type=int,
help='elasticsearch sync notifier port')
parser.add_argument('--es_index_prefix', default=cls.default('ES_INDEX_PREFIX', ''), type=str) parser.add_argument('--es_index_prefix', default=cls.default('ES_INDEX_PREFIX', ''), type=str)
parser.add_argument('--loop_policy', default=cls.default('EVENT_LOOP_POLICY', 'default'), type=str, parser.add_argument('--loop_policy', default=cls.default('EVENT_LOOP_POLICY', 'default'), type=str,
choices=['default', 'uvloop']) choices=['default', 'uvloop'])
@ -374,5 +380,6 @@ class Env:
max_send=args.max_send, max_receive=args.max_receive, max_sessions=args.max_sessions, max_send=args.max_send, max_receive=args.max_receive, max_sessions=args.max_sessions,
session_timeout=args.session_timeout, drop_client=args.drop_client, description=args.description, session_timeout=args.session_timeout, drop_client=args.drop_client, description=args.description,
daily_fee=args.daily_fee, database_query_timeout=(args.query_timeout_ms / 1000), daily_fee=args.daily_fee, database_query_timeout=(args.query_timeout_ms / 1000),
blocking_channel_ids=args.blocking_channel_ids, filtering_channel_ids=args.filtering_channel_ids blocking_channel_ids=args.blocking_channel_ids, filtering_channel_ids=args.filtering_channel_ids,
elastic_notifier_host=args.elastic_notifier_host, elastic_notifier_port=args.elastic_notifier_port
) )

View file

@ -26,7 +26,7 @@ class HubServerService(BlockchainReaderService):
self.mempool.session_manager = self.session_manager self.mempool.session_manager = self.session_manager
self.es_notifications = asyncio.Queue() self.es_notifications = asyncio.Queue()
self.es_notification_client = ElasticNotifierClientProtocol( self.es_notification_client = ElasticNotifierClientProtocol(
self.es_notifications, '127.0.0.1', self.env.elastic_notifier_port self.es_notifications, self.env.elastic_notifier_host, self.env.elastic_notifier_port
) )
self.synchronized = asyncio.Event() self.synchronized = asyncio.Event()
self._es_height = None self._es_height = None