From bd284be69dd89c0be0111b33af6ffb841e688ad0 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 21 Jan 2022 12:33:33 -0500 Subject: [PATCH] drop `ES_MODE` hub setting, rename hub entry points `lbry-hub-writer` - keeps an up-to-date database `lbry-hub-server` - replies to clients with data from a read only snapshot of the db `lbry-hub-elastic-sync` - watches for changes to the db and updates elasticsearch accordingly --- lbry/wallet/orchstr8/node.py | 7 ++-- lbry/wallet/server/cli.py | 64 +++++++++++++++++++++--------------- lbry/wallet/server/env.py | 7 ++-- setup.py | 7 ++-- 4 files changed, 47 insertions(+), 38 deletions(-) diff --git a/lbry/wallet/orchstr8/node.py b/lbry/wallet/orchstr8/node.py index 9f54dc4a7..d922e823b 100644 --- a/lbry/wallet/orchstr8/node.py +++ b/lbry/wallet/orchstr8/node.py @@ -219,9 +219,10 @@ class SPVNode: } if extraconf: conf.update(extraconf) - self.writer = BlockProcessor(Env(self.coin_class, es_mode='writer', **conf)) - self.server = BlockchainReaderServer(Env(self.coin_class, es_mode='reader', **conf)) - self.es_writer = ElasticWriter(Env(self.coin_class, es_mode='reader', **conf)) + env = Env(self.coin_class, **conf) + self.writer = BlockProcessor(env) + self.server = BlockchainReaderServer(env) + self.es_writer = ElasticWriter(env) await self.writer.open() await self.writer.start() await asyncio.wait([self.server.start(), self.es_writer.start()]) diff --git a/lbry/wallet/server/cli.py b/lbry/wallet/server/cli.py index be9dcfef5..59f313ab4 100644 --- a/lbry/wallet/server/cli.py +++ b/lbry/wallet/server/cli.py @@ -4,43 +4,53 @@ import argparse from lbry.wallet.server.env import Env from lbry.wallet.server.block_processor import BlockProcessor from lbry.wallet.server.chain_reader import BlockchainReaderServer +from lbry.wallet.server.db.elasticsearch.sync import ElasticWriter -def get_argument_parser(): +def get_args_and_setup_logging(name): parser = argparse.ArgumentParser( - prog="lbry-hub" + prog=name ) Env.contribute_to_arg_parser(parser) - return parser - - -def main(): - parser = get_argument_parser() args = parser.parse_args() logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s") - logging.info('lbry.server starting') logging.getLogger('aiohttp').setLevel(logging.WARNING) logging.getLogger('elasticsearch').setLevel(logging.WARNING) + return args - if args.es_mode == 'writer': - try: - block_processor = BlockProcessor(Env.from_arg_parser(args)) - block_processor.run() - except Exception: - traceback.print_exc() - logging.critical('block processor terminated abnormally') - else: - logging.info('block processor terminated normally') + +def run_writer_forever(): + args = get_args_and_setup_logging('lbry-hub-writer') + try: + block_processor = BlockProcessor(Env.from_arg_parser(args)) + block_processor.run() + except Exception: + traceback.print_exc() + logging.critical('block processor terminated abnormally') else: - try: - server = BlockchainReaderServer(Env.from_arg_parser(args)) - server.run() - except Exception: - traceback.print_exc() - logging.critical('server terminated abnormally') - else: - logging.info('server terminated normally') + logging.info('block processor terminated normally') -if __name__ == "__main__": - main() +def run_server_forever(): + args = get_args_and_setup_logging('lbry-hub-server') + + try: + server = BlockchainReaderServer(Env.from_arg_parser(args)) + server.run() + except Exception: + traceback.print_exc() + logging.critical('server terminated abnormally') + else: + logging.info('server terminated normally') + + +def run_es_sync_forever(): + args = get_args_and_setup_logging('lbry-hub-elastic-sync') + try: + server = ElasticWriter(Env.from_arg_parser(args)) + server.run() + except Exception: + traceback.print_exc() + logging.critical('es writer terminated abnormally') + else: + logging.info('es writer terminated normally') diff --git a/lbry/wallet/server/env.py b/lbry/wallet/server/env.py index 5ce9fb11f..7ef01349a 100644 --- a/lbry/wallet/server/env.py +++ b/lbry/wallet/server/env.py @@ -30,7 +30,7 @@ class Env: def __init__(self, coin=None, db_dir=None, daemon_url=None, host=None, rpc_host=None, elastic_host=None, elastic_port=None, loop_policy=None, max_query_workers=None, websocket_host=None, websocket_port=None, - chain=None, es_index_prefix=None, es_mode=None, cache_MB=None, reorg_limit=None, tcp_port=None, + chain=None, es_index_prefix=None, cache_MB=None, reorg_limit=None, tcp_port=None, udp_port=None, ssl_port=None, ssl_certfile=None, ssl_keyfile=None, rpc_port=None, prometheus_port=None, max_subscriptions=None, banner_file=None, anon_logs=None, log_sessions=None, allow_lan_udp=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, country=None, @@ -69,7 +69,6 @@ class Env: else: self.coin = LBCRegTest self.es_index_prefix = es_index_prefix if es_index_prefix is not None else self.default('ES_INDEX_PREFIX', '') - self.es_mode = es_mode if es_mode is not None else self.default('ES_MODE', 'writer') self.cache_MB = cache_MB if cache_MB is not None else self.integer('CACHE_MB', 1024) self.reorg_limit = reorg_limit if reorg_limit is not None else self.integer('REORG_LIMIT', self.coin.REORG_LIMIT) # Server stuff @@ -329,8 +328,6 @@ class Env: help='elasticsearch host') parser.add_argument('--elastic_port', default=cls.integer('ELASTIC_PORT', 9200), type=int, help='elasticsearch port') - parser.add_argument('--es_mode', default=cls.default('ES_MODE', 'writer'), type=str, - choices=['reader', 'writer']) parser.add_argument('--es_index_prefix', default=cls.default('ES_INDEX_PREFIX', ''), type=str) parser.add_argument('--loop_policy', default=cls.default('EVENT_LOOP_POLICY', 'default'), type=str, choices=['default', 'uvloop']) @@ -378,7 +375,7 @@ class Env: host=args.host, rpc_host=args.rpc_host, elastic_host=args.elastic_host, elastic_port=args.elastic_port, loop_policy=args.loop_policy, max_query_workers=args.max_query_workers, websocket_host=args.websocket_host, websocket_port=args.websocket_port, chain=args.chain, es_index_prefix=args.es_index_prefix, - es_mode=args.es_mode, cache_MB=args.cache_MB, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port, + cache_MB=args.cache_MB, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port, udp_port=args.udp_port, ssl_port=args.ssl_port, ssl_certfile=args.ssl_certfile, ssl_keyfile=args.ssl_keyfile, rpc_port=args.rpc_port, prometheus_port=args.prometheus_port, max_subscriptions=args.max_subscriptions, banner_file=args.banner_file, anon_logs=args.anon_logs, diff --git a/setup.py b/setup.py index e8921c45c..cd6940ecf 100644 --- a/setup.py +++ b/setup.py @@ -30,9 +30,10 @@ setup( entry_points={ 'console_scripts': [ 'lbrynet=lbry.extras.cli:main', - 'lbry-hub=lbry.wallet.server.cli:main', - 'orchstr8=lbry.wallet.orchstr8.cli:main', - 'lbry-hub-elastic-sync=lbry.wallet.server.db.elasticsearch.sync:run_elastic_sync' + 'lbry-hub-writer=lbry.wallet.server.cli:run_writer_forever', + 'lbry-hub-server=lbry.wallet.server.cli:run_server_forever', + 'lbry-hub-elastic-sync=lbry.wallet.server.db.elasticsearch.sync:run_elastic_sync', + 'orchstr8=lbry.wallet.orchstr8.cli:main' ], }, install_requires=[