drop ES_MODE hub setting, rename hub entry points

`lbry-hub-writer` - keeps an up-to-date database
`lbry-hub-server` - replies to clients with data from a read only snapshot of the db
`lbry-hub-elastic-sync` - watches for changes to the db and updates elasticsearch accordingly
This commit is contained in:
Jack Robison 2022-01-21 12:33:33 -05:00
parent e2a75758f8
commit d7ecde7040
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
4 changed files with 47 additions and 38 deletions

View file

@ -219,9 +219,10 @@ class SPVNode:
} }
if extraconf: if extraconf:
conf.update(extraconf) conf.update(extraconf)
self.writer = BlockProcessor(Env(self.coin_class, es_mode='writer', **conf)) env = Env(self.coin_class, **conf)
self.server = BlockchainReaderServer(Env(self.coin_class, es_mode='reader', **conf)) self.writer = BlockProcessor(env)
self.es_writer = ElasticWriter(Env(self.coin_class, es_mode='reader', **conf)) self.server = BlockchainReaderServer(env)
self.es_writer = ElasticWriter(env)
await self.writer.open() await self.writer.open()
await self.writer.start() await self.writer.start()
await asyncio.wait([self.server.start(), self.es_writer.start()]) await asyncio.wait([self.server.start(), self.es_writer.start()])

View file

@ -4,25 +4,23 @@ import argparse
from lbry.wallet.server.env import Env from lbry.wallet.server.env import Env
from lbry.wallet.server.block_processor import BlockProcessor from lbry.wallet.server.block_processor import BlockProcessor
from lbry.wallet.server.chain_reader import BlockchainReaderServer from lbry.wallet.server.chain_reader import BlockchainReaderServer
from lbry.wallet.server.db.elasticsearch.sync import ElasticWriter
def get_argument_parser(): def get_args_and_setup_logging(name):
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
prog="lbry-hub" prog=name
) )
Env.contribute_to_arg_parser(parser) Env.contribute_to_arg_parser(parser)
return parser
def main():
parser = get_argument_parser()
args = parser.parse_args() args = parser.parse_args()
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s") logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s")
logging.info('lbry.server starting')
logging.getLogger('aiohttp').setLevel(logging.WARNING) logging.getLogger('aiohttp').setLevel(logging.WARNING)
logging.getLogger('elasticsearch').setLevel(logging.WARNING) logging.getLogger('elasticsearch').setLevel(logging.WARNING)
return args
if args.es_mode == 'writer':
def run_writer_forever():
args = get_args_and_setup_logging('lbry-hub-writer')
try: try:
block_processor = BlockProcessor(Env.from_arg_parser(args)) block_processor = BlockProcessor(Env.from_arg_parser(args))
block_processor.run() block_processor.run()
@ -31,7 +29,11 @@ def main():
logging.critical('block processor terminated abnormally') logging.critical('block processor terminated abnormally')
else: else:
logging.info('block processor terminated normally') logging.info('block processor terminated normally')
else:
def run_server_forever():
args = get_args_and_setup_logging('lbry-hub-server')
try: try:
server = BlockchainReaderServer(Env.from_arg_parser(args)) server = BlockchainReaderServer(Env.from_arg_parser(args))
server.run() server.run()
@ -42,5 +44,13 @@ def main():
logging.info('server terminated normally') logging.info('server terminated normally')
if __name__ == "__main__": def run_es_sync_forever():
main() args = get_args_and_setup_logging('lbry-hub-elastic-sync')
try:
server = ElasticWriter(Env.from_arg_parser(args))
server.run()
except Exception:
traceback.print_exc()
logging.critical('es writer terminated abnormally')
else:
logging.info('es writer terminated normally')

View file

@ -30,7 +30,7 @@ class Env:
def __init__(self, coin=None, db_dir=None, daemon_url=None, host=None, rpc_host=None, elastic_host=None, def __init__(self, coin=None, db_dir=None, daemon_url=None, host=None, rpc_host=None, elastic_host=None,
elastic_port=None, loop_policy=None, max_query_workers=None, websocket_host=None, websocket_port=None, elastic_port=None, loop_policy=None, max_query_workers=None, websocket_host=None, websocket_port=None,
chain=None, es_index_prefix=None, es_mode=None, cache_MB=None, reorg_limit=None, tcp_port=None, chain=None, es_index_prefix=None, cache_MB=None, reorg_limit=None, tcp_port=None,
udp_port=None, ssl_port=None, ssl_certfile=None, ssl_keyfile=None, rpc_port=None, udp_port=None, ssl_port=None, ssl_certfile=None, ssl_keyfile=None, rpc_port=None,
prometheus_port=None, max_subscriptions=None, banner_file=None, anon_logs=None, log_sessions=None, prometheus_port=None, max_subscriptions=None, banner_file=None, anon_logs=None, log_sessions=None,
allow_lan_udp=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, country=None, allow_lan_udp=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, country=None,
@ -69,7 +69,6 @@ class Env:
else: else:
self.coin = LBCRegTest self.coin = LBCRegTest
self.es_index_prefix = es_index_prefix if es_index_prefix is not None else self.default('ES_INDEX_PREFIX', '') self.es_index_prefix = es_index_prefix if es_index_prefix is not None else self.default('ES_INDEX_PREFIX', '')
self.es_mode = es_mode if es_mode is not None else self.default('ES_MODE', 'writer')
self.cache_MB = cache_MB if cache_MB is not None else self.integer('CACHE_MB', 1024) self.cache_MB = cache_MB if cache_MB is not None else self.integer('CACHE_MB', 1024)
self.reorg_limit = reorg_limit if reorg_limit is not None else self.integer('REORG_LIMIT', self.coin.REORG_LIMIT) self.reorg_limit = reorg_limit if reorg_limit is not None else self.integer('REORG_LIMIT', self.coin.REORG_LIMIT)
# Server stuff # Server stuff
@ -329,8 +328,6 @@ class Env:
help='elasticsearch host') help='elasticsearch host')
parser.add_argument('--elastic_port', default=cls.integer('ELASTIC_PORT', 9200), type=int, parser.add_argument('--elastic_port', default=cls.integer('ELASTIC_PORT', 9200), type=int,
help='elasticsearch port') help='elasticsearch port')
parser.add_argument('--es_mode', default=cls.default('ES_MODE', 'writer'), type=str,
choices=['reader', 'writer'])
parser.add_argument('--es_index_prefix', default=cls.default('ES_INDEX_PREFIX', ''), type=str) parser.add_argument('--es_index_prefix', default=cls.default('ES_INDEX_PREFIX', ''), type=str)
parser.add_argument('--loop_policy', default=cls.default('EVENT_LOOP_POLICY', 'default'), type=str, parser.add_argument('--loop_policy', default=cls.default('EVENT_LOOP_POLICY', 'default'), type=str,
choices=['default', 'uvloop']) choices=['default', 'uvloop'])
@ -378,7 +375,7 @@ class Env:
host=args.host, rpc_host=args.rpc_host, elastic_host=args.elastic_host, elastic_port=args.elastic_port, host=args.host, rpc_host=args.rpc_host, elastic_host=args.elastic_host, elastic_port=args.elastic_port,
loop_policy=args.loop_policy, max_query_workers=args.max_query_workers, websocket_host=args.websocket_host, loop_policy=args.loop_policy, max_query_workers=args.max_query_workers, websocket_host=args.websocket_host,
websocket_port=args.websocket_port, chain=args.chain, es_index_prefix=args.es_index_prefix, websocket_port=args.websocket_port, chain=args.chain, es_index_prefix=args.es_index_prefix,
es_mode=args.es_mode, cache_MB=args.cache_MB, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port, cache_MB=args.cache_MB, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port,
udp_port=args.udp_port, ssl_port=args.ssl_port, ssl_certfile=args.ssl_certfile, udp_port=args.udp_port, ssl_port=args.ssl_port, ssl_certfile=args.ssl_certfile,
ssl_keyfile=args.ssl_keyfile, rpc_port=args.rpc_port, prometheus_port=args.prometheus_port, ssl_keyfile=args.ssl_keyfile, rpc_port=args.rpc_port, prometheus_port=args.prometheus_port,
max_subscriptions=args.max_subscriptions, banner_file=args.banner_file, anon_logs=args.anon_logs, max_subscriptions=args.max_subscriptions, banner_file=args.banner_file, anon_logs=args.anon_logs,

View file

@ -30,9 +30,10 @@ setup(
entry_points={ entry_points={
'console_scripts': [ 'console_scripts': [
'lbrynet=lbry.extras.cli:main', 'lbrynet=lbry.extras.cli:main',
'lbry-hub=lbry.wallet.server.cli:main', 'lbry-hub-writer=lbry.wallet.server.cli:run_writer_forever',
'orchstr8=lbry.wallet.orchstr8.cli:main', 'lbry-hub-server=lbry.wallet.server.cli:run_server_forever',
'lbry-hub-elastic-sync=lbry.wallet.server.db.elasticsearch.sync:run_elastic_sync' 'lbry-hub-elastic-sync=lbry.wallet.server.db.elasticsearch.sync:run_elastic_sync',
'orchstr8=lbry.wallet.orchstr8.cli:main'
], ],
}, },
install_requires=[ install_requires=[