forked from LBRYCommunity/lbry-sdk
drop ES_MODE
hub setting, rename hub entry points
`lbry-hub-writer` - keeps an up-to-date database `lbry-hub-server` - replies to clients with data from a read only snapshot of the db `lbry-hub-elastic-sync` - watches for changes to the db and updates elasticsearch accordingly
This commit is contained in:
parent
d6035c1ead
commit
bd284be69d
4 changed files with 47 additions and 38 deletions
|
@ -219,9 +219,10 @@ class SPVNode:
|
||||||
}
|
}
|
||||||
if extraconf:
|
if extraconf:
|
||||||
conf.update(extraconf)
|
conf.update(extraconf)
|
||||||
self.writer = BlockProcessor(Env(self.coin_class, es_mode='writer', **conf))
|
env = Env(self.coin_class, **conf)
|
||||||
self.server = BlockchainReaderServer(Env(self.coin_class, es_mode='reader', **conf))
|
self.writer = BlockProcessor(env)
|
||||||
self.es_writer = ElasticWriter(Env(self.coin_class, es_mode='reader', **conf))
|
self.server = BlockchainReaderServer(env)
|
||||||
|
self.es_writer = ElasticWriter(env)
|
||||||
await self.writer.open()
|
await self.writer.open()
|
||||||
await self.writer.start()
|
await self.writer.start()
|
||||||
await asyncio.wait([self.server.start(), self.es_writer.start()])
|
await asyncio.wait([self.server.start(), self.es_writer.start()])
|
||||||
|
|
|
@ -4,43 +4,53 @@ import argparse
|
||||||
from lbry.wallet.server.env import Env
|
from lbry.wallet.server.env import Env
|
||||||
from lbry.wallet.server.block_processor import BlockProcessor
|
from lbry.wallet.server.block_processor import BlockProcessor
|
||||||
from lbry.wallet.server.chain_reader import BlockchainReaderServer
|
from lbry.wallet.server.chain_reader import BlockchainReaderServer
|
||||||
|
from lbry.wallet.server.db.elasticsearch.sync import ElasticWriter
|
||||||
|
|
||||||
|
|
||||||
def get_argument_parser():
|
def get_args_and_setup_logging(name):
|
||||||
parser = argparse.ArgumentParser(
|
parser = argparse.ArgumentParser(
|
||||||
prog="lbry-hub"
|
prog=name
|
||||||
)
|
)
|
||||||
Env.contribute_to_arg_parser(parser)
|
Env.contribute_to_arg_parser(parser)
|
||||||
return parser
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
parser = get_argument_parser()
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s")
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s")
|
||||||
logging.info('lbry.server starting')
|
|
||||||
logging.getLogger('aiohttp').setLevel(logging.WARNING)
|
logging.getLogger('aiohttp').setLevel(logging.WARNING)
|
||||||
logging.getLogger('elasticsearch').setLevel(logging.WARNING)
|
logging.getLogger('elasticsearch').setLevel(logging.WARNING)
|
||||||
|
return args
|
||||||
|
|
||||||
if args.es_mode == 'writer':
|
|
||||||
try:
|
def run_writer_forever():
|
||||||
block_processor = BlockProcessor(Env.from_arg_parser(args))
|
args = get_args_and_setup_logging('lbry-hub-writer')
|
||||||
block_processor.run()
|
try:
|
||||||
except Exception:
|
block_processor = BlockProcessor(Env.from_arg_parser(args))
|
||||||
traceback.print_exc()
|
block_processor.run()
|
||||||
logging.critical('block processor terminated abnormally')
|
except Exception:
|
||||||
else:
|
traceback.print_exc()
|
||||||
logging.info('block processor terminated normally')
|
logging.critical('block processor terminated abnormally')
|
||||||
else:
|
else:
|
||||||
try:
|
logging.info('block processor terminated normally')
|
||||||
server = BlockchainReaderServer(Env.from_arg_parser(args))
|
|
||||||
server.run()
|
|
||||||
except Exception:
|
|
||||||
traceback.print_exc()
|
|
||||||
logging.critical('server terminated abnormally')
|
|
||||||
else:
|
|
||||||
logging.info('server terminated normally')
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
def run_server_forever():
|
||||||
main()
|
args = get_args_and_setup_logging('lbry-hub-server')
|
||||||
|
|
||||||
|
try:
|
||||||
|
server = BlockchainReaderServer(Env.from_arg_parser(args))
|
||||||
|
server.run()
|
||||||
|
except Exception:
|
||||||
|
traceback.print_exc()
|
||||||
|
logging.critical('server terminated abnormally')
|
||||||
|
else:
|
||||||
|
logging.info('server terminated normally')
|
||||||
|
|
||||||
|
|
||||||
|
def run_es_sync_forever():
|
||||||
|
args = get_args_and_setup_logging('lbry-hub-elastic-sync')
|
||||||
|
try:
|
||||||
|
server = ElasticWriter(Env.from_arg_parser(args))
|
||||||
|
server.run()
|
||||||
|
except Exception:
|
||||||
|
traceback.print_exc()
|
||||||
|
logging.critical('es writer terminated abnormally')
|
||||||
|
else:
|
||||||
|
logging.info('es writer terminated normally')
|
||||||
|
|
|
@ -30,7 +30,7 @@ class Env:
|
||||||
|
|
||||||
def __init__(self, coin=None, db_dir=None, daemon_url=None, host=None, rpc_host=None, elastic_host=None,
|
def __init__(self, coin=None, db_dir=None, daemon_url=None, host=None, rpc_host=None, elastic_host=None,
|
||||||
elastic_port=None, loop_policy=None, max_query_workers=None, websocket_host=None, websocket_port=None,
|
elastic_port=None, loop_policy=None, max_query_workers=None, websocket_host=None, websocket_port=None,
|
||||||
chain=None, es_index_prefix=None, es_mode=None, cache_MB=None, reorg_limit=None, tcp_port=None,
|
chain=None, es_index_prefix=None, cache_MB=None, reorg_limit=None, tcp_port=None,
|
||||||
udp_port=None, ssl_port=None, ssl_certfile=None, ssl_keyfile=None, rpc_port=None,
|
udp_port=None, ssl_port=None, ssl_certfile=None, ssl_keyfile=None, rpc_port=None,
|
||||||
prometheus_port=None, max_subscriptions=None, banner_file=None, anon_logs=None, log_sessions=None,
|
prometheus_port=None, max_subscriptions=None, banner_file=None, anon_logs=None, log_sessions=None,
|
||||||
allow_lan_udp=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, country=None,
|
allow_lan_udp=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, country=None,
|
||||||
|
@ -69,7 +69,6 @@ class Env:
|
||||||
else:
|
else:
|
||||||
self.coin = LBCRegTest
|
self.coin = LBCRegTest
|
||||||
self.es_index_prefix = es_index_prefix if es_index_prefix is not None else self.default('ES_INDEX_PREFIX', '')
|
self.es_index_prefix = es_index_prefix if es_index_prefix is not None else self.default('ES_INDEX_PREFIX', '')
|
||||||
self.es_mode = es_mode if es_mode is not None else self.default('ES_MODE', 'writer')
|
|
||||||
self.cache_MB = cache_MB if cache_MB is not None else self.integer('CACHE_MB', 1024)
|
self.cache_MB = cache_MB if cache_MB is not None else self.integer('CACHE_MB', 1024)
|
||||||
self.reorg_limit = reorg_limit if reorg_limit is not None else self.integer('REORG_LIMIT', self.coin.REORG_LIMIT)
|
self.reorg_limit = reorg_limit if reorg_limit is not None else self.integer('REORG_LIMIT', self.coin.REORG_LIMIT)
|
||||||
# Server stuff
|
# Server stuff
|
||||||
|
@ -329,8 +328,6 @@ class Env:
|
||||||
help='elasticsearch host')
|
help='elasticsearch host')
|
||||||
parser.add_argument('--elastic_port', default=cls.integer('ELASTIC_PORT', 9200), type=int,
|
parser.add_argument('--elastic_port', default=cls.integer('ELASTIC_PORT', 9200), type=int,
|
||||||
help='elasticsearch port')
|
help='elasticsearch port')
|
||||||
parser.add_argument('--es_mode', default=cls.default('ES_MODE', 'writer'), type=str,
|
|
||||||
choices=['reader', 'writer'])
|
|
||||||
parser.add_argument('--es_index_prefix', default=cls.default('ES_INDEX_PREFIX', ''), type=str)
|
parser.add_argument('--es_index_prefix', default=cls.default('ES_INDEX_PREFIX', ''), type=str)
|
||||||
parser.add_argument('--loop_policy', default=cls.default('EVENT_LOOP_POLICY', 'default'), type=str,
|
parser.add_argument('--loop_policy', default=cls.default('EVENT_LOOP_POLICY', 'default'), type=str,
|
||||||
choices=['default', 'uvloop'])
|
choices=['default', 'uvloop'])
|
||||||
|
@ -378,7 +375,7 @@ class Env:
|
||||||
host=args.host, rpc_host=args.rpc_host, elastic_host=args.elastic_host, elastic_port=args.elastic_port,
|
host=args.host, rpc_host=args.rpc_host, elastic_host=args.elastic_host, elastic_port=args.elastic_port,
|
||||||
loop_policy=args.loop_policy, max_query_workers=args.max_query_workers, websocket_host=args.websocket_host,
|
loop_policy=args.loop_policy, max_query_workers=args.max_query_workers, websocket_host=args.websocket_host,
|
||||||
websocket_port=args.websocket_port, chain=args.chain, es_index_prefix=args.es_index_prefix,
|
websocket_port=args.websocket_port, chain=args.chain, es_index_prefix=args.es_index_prefix,
|
||||||
es_mode=args.es_mode, cache_MB=args.cache_MB, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port,
|
cache_MB=args.cache_MB, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port,
|
||||||
udp_port=args.udp_port, ssl_port=args.ssl_port, ssl_certfile=args.ssl_certfile,
|
udp_port=args.udp_port, ssl_port=args.ssl_port, ssl_certfile=args.ssl_certfile,
|
||||||
ssl_keyfile=args.ssl_keyfile, rpc_port=args.rpc_port, prometheus_port=args.prometheus_port,
|
ssl_keyfile=args.ssl_keyfile, rpc_port=args.rpc_port, prometheus_port=args.prometheus_port,
|
||||||
max_subscriptions=args.max_subscriptions, banner_file=args.banner_file, anon_logs=args.anon_logs,
|
max_subscriptions=args.max_subscriptions, banner_file=args.banner_file, anon_logs=args.anon_logs,
|
||||||
|
|
7
setup.py
7
setup.py
|
@ -30,9 +30,10 @@ setup(
|
||||||
entry_points={
|
entry_points={
|
||||||
'console_scripts': [
|
'console_scripts': [
|
||||||
'lbrynet=lbry.extras.cli:main',
|
'lbrynet=lbry.extras.cli:main',
|
||||||
'lbry-hub=lbry.wallet.server.cli:main',
|
'lbry-hub-writer=lbry.wallet.server.cli:run_writer_forever',
|
||||||
'orchstr8=lbry.wallet.orchstr8.cli:main',
|
'lbry-hub-server=lbry.wallet.server.cli:run_server_forever',
|
||||||
'lbry-hub-elastic-sync=lbry.wallet.server.db.elasticsearch.sync:run_elastic_sync'
|
'lbry-hub-elastic-sync=lbry.wallet.server.db.elasticsearch.sync:run_elastic_sync',
|
||||||
|
'orchstr8=lbry.wallet.orchstr8.cli:main'
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
install_requires=[
|
install_requires=[
|
||||||
|
|
Loading…
Add table
Reference in a new issue