forked from LBRYCommunity/lbry-sdk
update Env to accept parameters from cli args
This commit is contained in:
parent
27cc83c03b
commit
3cd2227c29
3 changed files with 167 additions and 62 deletions
|
@ -1,7 +1,6 @@
|
|||
import logging
|
||||
import traceback
|
||||
import argparse
|
||||
import importlib
|
||||
from lbry.wallet.server.env import Env
|
||||
from lbry.wallet.server.server import Server
|
||||
|
||||
|
@ -10,27 +9,22 @@ def get_argument_parser():
|
|||
parser = argparse.ArgumentParser(
|
||||
prog="lbry-hub"
|
||||
)
|
||||
parser.add_argument("spvserver", type=str, help="Python class path to SPV server implementation.",
|
||||
nargs="?", default="lbry.wallet.server.coin.LBC")
|
||||
Env.contribute_to_arg_parser(parser)
|
||||
sub = parser.add_subparsers(metavar='COMMAND')
|
||||
start = sub.add_parser('start', help='Start LBRY Network interface.')
|
||||
|
||||
return parser
|
||||
|
||||
|
||||
def get_coin_class(spvserver):
|
||||
spvserver_path, coin_class_name = spvserver.rsplit('.', 1)
|
||||
spvserver_module = importlib.import_module(spvserver_path)
|
||||
return getattr(spvserver_module, coin_class_name)
|
||||
|
||||
|
||||
def main():
|
||||
parser = get_argument_parser()
|
||||
args = parser.parse_args()
|
||||
coin_class = get_coin_class(args.spvserver)
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s")
|
||||
logging.info('lbry.server starting')
|
||||
logging.getLogger('aiohttp').setLevel(logging.WARNING)
|
||||
logging.getLogger('elasticsearch').setLevel(logging.WARNING)
|
||||
try:
|
||||
server = Server(Env(coin_class))
|
||||
server = Server(Env.from_arg_parser(args))
|
||||
server.run()
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
|
|
|
@ -13,7 +13,7 @@ from collections import namedtuple
|
|||
from ipaddress import ip_address
|
||||
|
||||
from lbry.wallet.server.util import class_logger
|
||||
from lbry.wallet.server.coin import Coin
|
||||
from lbry.wallet.server.coin import Coin, LBC, LBCTestNet, LBCRegTest
|
||||
import lbry.wallet.server.util as lib_util
|
||||
|
||||
|
||||
|
@ -28,49 +28,65 @@ class Env:
|
|||
class Error(Exception):
|
||||
pass
|
||||
|
||||
def __init__(self, coin=None):
|
||||
def __init__(self, coin=None, db_dir=None, daemon_url=None, host=None, rpc_host=None, elastic_host=None,
|
||||
elastic_port=None, loop_policy=None, max_query_workers=None, websocket_host=None, websocket_port=None,
|
||||
chain=None, es_index_prefix=None, es_mode=None, cache_MB=None, reorg_limit=None, tcp_port=None,
|
||||
udp_port=None, ssl_port=None, ssl_certfile=None, ssl_keyfile=None, rpc_port=None,
|
||||
prometheus_port=None, max_subscriptions=None, banner_file=None, anon_logs=None, log_sessions=None,
|
||||
allow_lan_udp=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, country=None,
|
||||
payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None,
|
||||
session_timeout=None, drop_client=None, description=None, daily_fee=None,
|
||||
database_query_timeout=None, db_max_open_files=512):
|
||||
self.logger = class_logger(__name__, self.__class__.__name__)
|
||||
self.host = self.default('HOST', 'localhost')
|
||||
self.rpc_host = self.default('RPC_HOST', 'localhost')
|
||||
self.elastic_host = self.default('ELASTIC_HOST', 'localhost')
|
||||
self.elastic_port = self.integer('ELASTIC_PORT', 9200)
|
||||
self.loop_policy = self.set_event_loop_policy()
|
||||
self.obsolete(['UTXO_MB', 'HIST_MB', 'NETWORK'])
|
||||
self.db_dir = self.required('DB_DIRECTORY')
|
||||
|
||||
self.max_query_workers = self.integer('MAX_QUERY_WORKERS', 4)
|
||||
self.websocket_host = self.default('WEBSOCKET_HOST', self.host)
|
||||
self.websocket_port = self.integer('WEBSOCKET_PORT', None)
|
||||
self.daemon_url = self.required('DAEMON_URL')
|
||||
self.db_dir = db_dir if db_dir is not None else self.required('DB_DIRECTORY')
|
||||
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
|
||||
self.db_max_open_files = db_max_open_files
|
||||
|
||||
self.host = host if host is not None else self.default('HOST', 'localhost')
|
||||
self.rpc_host = rpc_host if rpc_host is not None else self.default('RPC_HOST', 'localhost')
|
||||
self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost')
|
||||
self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200)
|
||||
self.loop_policy = self.set_event_loop_policy(
|
||||
loop_policy if loop_policy is not None else self.default('EVENT_LOOP_POLICY', None)
|
||||
)
|
||||
self.obsolete(['UTXO_MB', 'HIST_MB', 'NETWORK'])
|
||||
self.max_query_workers = max_query_workers if max_query_workers is not None else self.integer('MAX_QUERY_WORKERS', 4)
|
||||
self.websocket_host = websocket_host if websocket_host is not None else self.default('WEBSOCKET_HOST', self.host)
|
||||
self.websocket_port = websocket_port if websocket_port is not None else self.integer('WEBSOCKET_PORT', None)
|
||||
if coin is not None:
|
||||
assert issubclass(coin, Coin)
|
||||
self.coin = coin
|
||||
else:
|
||||
coin_name = self.required('COIN').strip()
|
||||
network = self.default('NET', 'mainnet').strip()
|
||||
self.coin = Coin.lookup_coin_class(coin_name, network)
|
||||
self.es_index_prefix = self.default('ES_INDEX_PREFIX', '')
|
||||
self.es_mode = self.default('ES_MODE', 'writer')
|
||||
self.cache_MB = self.integer('CACHE_MB', 1024)
|
||||
self.reorg_limit = self.integer('REORG_LIMIT', self.coin.REORG_LIMIT)
|
||||
chain = chain if chain is not None else self.default('NET', 'mainnet').strip().lower()
|
||||
if chain == 'mainnet':
|
||||
self.coin = LBC
|
||||
elif chain == 'testnet':
|
||||
self.coin = LBCTestNet
|
||||
else:
|
||||
self.coin = LBCRegTest
|
||||
self.es_index_prefix = es_index_prefix if es_index_prefix is not None else self.default('ES_INDEX_PREFIX', '')
|
||||
self.es_mode = es_mode if es_mode is not None else self.default('ES_MODE', 'writer')
|
||||
self.cache_MB = cache_MB if cache_MB is not None else self.integer('CACHE_MB', 1024)
|
||||
self.reorg_limit = reorg_limit if reorg_limit is not None else self.integer('REORG_LIMIT', self.coin.REORG_LIMIT)
|
||||
# Server stuff
|
||||
self.tcp_port = self.integer('TCP_PORT', None)
|
||||
self.udp_port = self.integer('UDP_PORT', self.tcp_port)
|
||||
self.ssl_port = self.integer('SSL_PORT', None)
|
||||
self.tcp_port = tcp_port if tcp_port is not None else self.integer('TCP_PORT', None)
|
||||
self.udp_port = udp_port if udp_port is not None else self.integer('UDP_PORT', self.tcp_port)
|
||||
self.ssl_port = ssl_port if ssl_port is not None else self.integer('SSL_PORT', None)
|
||||
if self.ssl_port:
|
||||
self.ssl_certfile = self.required('SSL_CERTFILE')
|
||||
self.ssl_keyfile = self.required('SSL_KEYFILE')
|
||||
self.rpc_port = self.integer('RPC_PORT', 8000)
|
||||
self.prometheus_port = self.integer('PROMETHEUS_PORT', 0)
|
||||
self.max_subscriptions = self.integer('MAX_SUBSCRIPTIONS', 10000)
|
||||
self.banner_file = self.default('BANNER_FILE', None)
|
||||
self.tor_banner_file = self.default('TOR_BANNER_FILE', self.banner_file)
|
||||
self.anon_logs = self.boolean('ANON_LOGS', False)
|
||||
self.log_sessions = self.integer('LOG_SESSIONS', 3600)
|
||||
self.allow_lan_udp = self.boolean('ALLOW_LAN_UDP', False)
|
||||
self.cache_all_tx_hashes = self.boolean('CACHE_ALL_TX_HASHES', False)
|
||||
self.cache_all_claim_txos = self.boolean('CACHE_ALL_CLAIM_TXOS', False)
|
||||
self.country = self.default('COUNTRY', 'US')
|
||||
self.ssl_certfile = ssl_certfile if ssl_certfile is not None else self.required('SSL_CERTFILE')
|
||||
self.ssl_keyfile = ssl_keyfile if ssl_keyfile is not None else self.required('SSL_KEYFILE')
|
||||
self.rpc_port = rpc_port if rpc_port is not None else self.integer('RPC_PORT', 8000)
|
||||
self.prometheus_port = prometheus_port if prometheus_port is not None else self.integer('PROMETHEUS_PORT', 0)
|
||||
self.max_subscriptions = max_subscriptions if max_subscriptions is not None else self.integer('MAX_SUBSCRIPTIONS', 10000)
|
||||
self.banner_file = banner_file if banner_file is not None else self.default('BANNER_FILE', None)
|
||||
# self.tor_banner_file = self.default('TOR_BANNER_FILE', self.banner_file)
|
||||
self.anon_logs = anon_logs if anon_logs is not None else self.boolean('ANON_LOGS', False)
|
||||
self.log_sessions = log_sessions if log_sessions is not None else self.integer('LOG_SESSIONS', 3600)
|
||||
self.allow_lan_udp = allow_lan_udp if allow_lan_udp is not None else self.boolean('ALLOW_LAN_UDP', False)
|
||||
self.cache_all_tx_hashes = cache_all_tx_hashes if cache_all_tx_hashes is not None else self.boolean('CACHE_ALL_TX_HASHES', False)
|
||||
self.cache_all_claim_txos = cache_all_claim_txos if cache_all_claim_txos is not None else self.boolean('CACHE_ALL_CLAIM_TXOS', False)
|
||||
self.country = country if country is not None else self.default('COUNTRY', 'US')
|
||||
# Peer discovery
|
||||
self.peer_discovery = self.peer_discovery_enum()
|
||||
self.peer_announce = self.boolean('PEER_ANNOUNCE', True)
|
||||
|
@ -78,18 +94,18 @@ class Env:
|
|||
# self.tor_proxy_host = self.default('TOR_PROXY_HOST', 'localhost')
|
||||
# self.tor_proxy_port = self.integer('TOR_PROXY_PORT', None)
|
||||
# The electrum client takes the empty string as unspecified
|
||||
self.payment_address = self.default('PAYMENT_ADDRESS', '')
|
||||
self.donation_address = self.default('DONATION_ADDRESS', '')
|
||||
self.payment_address = payment_address if payment_address is not None else self.default('PAYMENT_ADDRESS', '')
|
||||
self.donation_address = donation_address if donation_address is not None else self.default('DONATION_ADDRESS', '')
|
||||
# Server limits to help prevent DoS
|
||||
self.max_send = self.integer('MAX_SEND', 1000000)
|
||||
self.max_receive = self.integer('MAX_RECEIVE', 1000000)
|
||||
self.max_send = max_send if max_send is not None else self.integer('MAX_SEND', 1000000)
|
||||
self.max_receive = max_receive if max_receive is not None else self.integer('MAX_RECEIVE', 1000000)
|
||||
# self.max_subs = self.integer('MAX_SUBS', 250000)
|
||||
self.max_sessions = self.sane_max_sessions()
|
||||
self.max_sessions = max_sessions if max_sessions is not None else self.sane_max_sessions()
|
||||
# self.max_session_subs = self.integer('MAX_SESSION_SUBS', 50000)
|
||||
self.session_timeout = self.integer('SESSION_TIMEOUT', 600)
|
||||
self.drop_client = self.custom("DROP_CLIENT", None, re.compile)
|
||||
self.description = self.default('DESCRIPTION', '')
|
||||
self.daily_fee = self.string_amount('DAILY_FEE', '0')
|
||||
self.session_timeout = session_timeout if session_timeout is not None else self.integer('SESSION_TIMEOUT', 600)
|
||||
self.drop_client = drop_client if drop_client is not None else self.custom("DROP_CLIENT", None, re.compile)
|
||||
self.description = description if description is not None else self.default('DESCRIPTION', '')
|
||||
self.daily_fee = daily_fee if daily_fee is not None else self.string_amount('DAILY_FEE', '0')
|
||||
|
||||
# Identities
|
||||
clearnet_identity = self.clearnet_identity()
|
||||
|
@ -97,7 +113,8 @@ class Env:
|
|||
self.identities = [identity
|
||||
for identity in (clearnet_identity, tor_identity)
|
||||
if identity is not None]
|
||||
self.database_query_timeout = float(self.integer('QUERY_TIMEOUT_MS', 10000)) / 1000.0
|
||||
self.database_query_timeout = database_query_timeout if database_query_timeout is not None else \
|
||||
(float(self.integer('QUERY_TIMEOUT_MS', 10000)) / 1000.0)
|
||||
|
||||
@classmethod
|
||||
def default(cls, envvar, default):
|
||||
|
@ -149,9 +166,9 @@ class Env:
|
|||
if bad:
|
||||
raise cls.Error(f'remove obsolete environment variables {bad}')
|
||||
|
||||
def set_event_loop_policy(self):
|
||||
policy_name = self.default('EVENT_LOOP_POLICY', None)
|
||||
if not policy_name:
|
||||
@classmethod
|
||||
def set_event_loop_policy(cls, policy_name: str = None):
|
||||
if not policy_name or policy_name == 'default':
|
||||
import asyncio
|
||||
return asyncio.get_event_loop_policy()
|
||||
elif policy_name == 'uvloop':
|
||||
|
@ -160,7 +177,7 @@ class Env:
|
|||
loop_policy = uvloop.EventLoopPolicy()
|
||||
asyncio.set_event_loop_policy(loop_policy)
|
||||
return loop_policy
|
||||
raise self.Error(f'unknown event loop policy "{policy_name}"')
|
||||
raise cls.Error(f'unknown event loop policy "{policy_name}"')
|
||||
|
||||
def cs_host(self, *, for_rpc):
|
||||
"""Returns the 'host' argument to pass to asyncio's create_server
|
||||
|
@ -269,3 +286,97 @@ class Env:
|
|||
|
||||
def extract_peer_hubs(self):
|
||||
return [hub.strip() for hub in self.default('PEER_HUBS', '').split(',') if hub.strip()]
|
||||
|
||||
@classmethod
|
||||
def contribute_to_arg_parser(cls, parser):
|
||||
parser.add_argument('--db_dir', type=str, help='path of the directory containing lbry-leveldb')
|
||||
parser.add_argument('--daemon_url',
|
||||
help='URL for rpc from lbrycrd, <rpcuser>:<rpcpassword>@<lbrycrd rpc ip><lbrycrd rpc port>')
|
||||
parser.add_argument('--db_max_open_files', type=int, default=512,
|
||||
help='number of files leveldb can have open at a time')
|
||||
parser.add_argument('--host', type=str, default=cls.default('HOST', 'localhost'),
|
||||
help='Interface for hub server to listen on')
|
||||
parser.add_argument('--tcp_port', type=int, default=cls.integer('TCP_PORT', 50001),
|
||||
help='TCP port to listen on for hub server')
|
||||
parser.add_argument('--udp_port', type=int, default=cls.integer('UDP_PORT', 50001),
|
||||
help='UDP port to listen on for hub server')
|
||||
parser.add_argument('--rpc_host', default=cls.default('RPC_HOST', 'localhost'), type=str,
|
||||
help='Listening interface for admin rpc')
|
||||
parser.add_argument('--rpc_port', default=cls.integer('RPC_PORT', 8000), type=int,
|
||||
help='Listening port for admin rpc')
|
||||
parser.add_argument('--websocket_host', default=cls.default('WEBSOCKET_HOST', 'localhost'), type=str,
|
||||
help='Listening interface for websocket')
|
||||
parser.add_argument('--websocket_port', default=cls.integer('WEBSOCKET_PORT', None), type=int,
|
||||
help='Listening port for websocket')
|
||||
|
||||
parser.add_argument('--ssl_port', default=cls.integer('SSL_PORT', None), type=int,
|
||||
help='SSL port to listen on for hub server')
|
||||
parser.add_argument('--ssl_certfile', default=cls.default('SSL_CERTFILE', None), type=str,
|
||||
help='Path to SSL cert file')
|
||||
parser.add_argument('--ssl_keyfile', default=cls.default('SSL_KEYFILE', None), type=str,
|
||||
help='Path to SSL key file')
|
||||
parser.add_argument('--reorg_limit', default=cls.integer('REORG_LIMIT', 200), type=int, help='Max reorg depth')
|
||||
parser.add_argument('--elastic_host', default=cls.default('ELASTIC_HOST', 'localhost'), type=str,
|
||||
help='elasticsearch host')
|
||||
parser.add_argument('--elastic_port', default=cls.integer('ELASTIC_PORT', 9200), type=int,
|
||||
help='elasticsearch port')
|
||||
parser.add_argument('--es_mode', default=cls.default('ES_MODE', 'writer'), type=str,
|
||||
choices=['reader', 'writer'])
|
||||
parser.add_argument('--es_index_prefix', default=cls.default('ES_INDEX_PREFIX', ''), type=str)
|
||||
parser.add_argument('--loop_policy', default=cls.default('EVENT_LOOP_POLICY', 'default'), type=str,
|
||||
choices=['default', 'uvloop'])
|
||||
parser.add_argument('--max_query_workers', type=int, default=cls.integer('MAX_QUERY_WORKERS', 4),
|
||||
help='number of threads used by the request handler to read the database')
|
||||
parser.add_argument('--cache_MB', type=int, default=cls.integer('CACHE_MB', 1024),
|
||||
help='size of the leveldb lru cache, in megabytes')
|
||||
parser.add_argument('--cache_all_tx_hashes', type=bool,
|
||||
help='Load all tx hashes into memory. This will make address subscriptions and sync, '
|
||||
'resolve, transaction fetching, and block sync all faster at the expense of higher '
|
||||
'memory usage')
|
||||
parser.add_argument('--cache_all_claim_txos', type=bool,
|
||||
help='Load all claim txos into memory. This will make address subscriptions and sync, '
|
||||
'resolve, transaction fetching, and block sync all faster at the expense of higher '
|
||||
'memory usage')
|
||||
parser.add_argument('--prometheus_port', type=int, default=cls.integer('PROMETHEUS_PORT', 0),
|
||||
help='port for hub prometheus metrics to listen on, disabled by default')
|
||||
parser.add_argument('--max_subscriptions', type=int, default=cls.integer('MAX_SUBSCRIPTIONS', 10000),
|
||||
help='max subscriptions per connection')
|
||||
parser.add_argument('--banner_file', type=str, default=cls.default('BANNER_FILE', None),
|
||||
help='path to file containing banner text')
|
||||
parser.add_argument('--anon_logs', type=bool, default=cls.boolean('ANON_LOGS', False),
|
||||
help="don't log ip addresses")
|
||||
parser.add_argument('--allow_lan_udp', type=bool, default=cls.boolean('ALLOW_LAN_UDP', False),
|
||||
help='reply to hub UDP ping messages from LAN ip addresses')
|
||||
parser.add_argument('--country', type=str, default=cls.default('COUNTRY', 'US'), help='')
|
||||
parser.add_argument('--max_send', type=int, default=cls.default('MAX_SEND', 1000000), help='')
|
||||
parser.add_argument('--max_receive', type=int, default=cls.default('MAX_RECEIVE', 1000000), help='')
|
||||
parser.add_argument('--max_sessions', type=int, default=cls.default('MAX_SESSIONS', 1000), help='')
|
||||
parser.add_argument('--session_timeout', type=int, default=cls.default('SESSION_TIMEOUT', 600), help='')
|
||||
parser.add_argument('--drop_client', type=str, default=cls.default('DROP_CLIENT', None), help='')
|
||||
parser.add_argument('--description', type=str, default=cls.default('DESCRIPTION', ''), help='')
|
||||
parser.add_argument('--daily_fee', type=float, default=cls.default('DAILY_FEE', 0.0), help='')
|
||||
parser.add_argument('--payment_address', type=str, default=cls.default('PAYMENT_ADDRESS', ''), help='')
|
||||
parser.add_argument('--donation_address', type=str, default=cls.default('DONATION_ADDRESS', ''), help='')
|
||||
parser.add_argument('--chain', type=str, default=cls.default('NET', 'mainnet'),
|
||||
help="Which chain to use, default is mainnet")
|
||||
parser.add_argument('--query_timeout_ms', type=int, default=cls.integer('QUERY_TIMEOUT_MS', 10000),
|
||||
help="elasticsearch query timeout")
|
||||
|
||||
@classmethod
|
||||
def from_arg_parser(cls, args):
|
||||
return cls(
|
||||
db_dir=args.db_dir, daemon_url=args.daemon_url, db_max_open_files=args.db_max_open_files,
|
||||
host=args.host, rpc_host=args.rpc_host, elastic_host=args.elastic_host, elastic_port=args.elastic_port,
|
||||
loop_policy=args.loop_policy, max_query_workers=args.max_query_workers, websocket_host=args.websocket_host,
|
||||
websocket_port=args.websocket_port, chain=args.chain, es_index_prefix=args.es_index_prefix,
|
||||
es_mode=args.es_mode, cache_MB=args.cache_MB, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port,
|
||||
udp_port=args.udp_port, ssl_port=args.ssl_port, ssl_certfile=args.ssl_certfile,
|
||||
ssl_keyfile=args.ssl_keyfile, rpc_port=args.rpc_port, prometheus_port=args.prometheus_port,
|
||||
max_subscriptions=args.max_subscriptions, banner_file=args.banner_file, anon_logs=args.anon_logs,
|
||||
log_sessions=None, allow_lan_udp=args.allow_lan_udp,
|
||||
cache_all_tx_hashes=args.cache_all_tx_hashes, cache_all_claim_txos=args.cache_all_claim_txos,
|
||||
country=args.country, payment_address=args.payment_address, donation_address=args.donation_address,
|
||||
max_send=args.max_send, max_receive=args.max_receive, max_sessions=args.max_sessions,
|
||||
session_timeout=args.session_timeout, drop_client=args.drop_client, description=args.description,
|
||||
daily_fee=args.daily_fee, database_query_timeout=(args.query_timeout_ms / 1000)
|
||||
)
|
||||
|
|
|
@ -846,7 +846,7 @@ class LevelDB:
|
|||
|
||||
self.prefix_db = HubDB(
|
||||
os.path.join(self.env.db_dir, 'lbry-leveldb'), cache_mb=self.env.cache_MB,
|
||||
reorg_limit=self.env.reorg_limit, max_open_files=512
|
||||
reorg_limit=self.env.reorg_limit, max_open_files=self.env.db_max_open_files
|
||||
)
|
||||
self.logger.info(f'opened db: lbry-leveldb')
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue