add --daemon_ca_path arg to use ssl with lbcd

fixes https://github.com/lbryio/hub/issues/41
This commit is contained in:
Jack Robison 2022-06-14 15:25:28 -04:00
parent 6fd718f353
commit 9dbd8cab4b
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
5 changed files with 20 additions and 8 deletions

View file

@ -11,7 +11,7 @@ class ServerEnv(Env):
session_timeout=None, drop_client=None, description=None, daily_fee=None, session_timeout=None, drop_client=None, description=None, daily_fee=None,
database_query_timeout=None, elastic_notifier_host=None, elastic_notifier_port=None, database_query_timeout=None, elastic_notifier_host=None, elastic_notifier_port=None,
blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, peer_announce=None, blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, peer_announce=None,
index_address_status=None, address_history_cache_size=None): index_address_status=None, address_history_cache_size=None, daemon_ca_path=None):
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes, super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status) cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status)
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL') self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
@ -52,6 +52,7 @@ class ServerEnv(Env):
(float(self.integer('QUERY_TIMEOUT_MS', 10000)) / 1000.0) (float(self.integer('QUERY_TIMEOUT_MS', 10000)) / 1000.0)
self.hashX_history_cache_size = address_history_cache_size if address_history_cache_size is not None \ self.hashX_history_cache_size = address_history_cache_size if address_history_cache_size is not None \
else self.integer('ADDRESS_HISTORY_CACHE_SIZE', 1000) else self.integer('ADDRESS_HISTORY_CACHE_SIZE', 1000)
self.daemon_ca_path = daemon_ca_path if daemon_ca_path else None
@classmethod @classmethod
def contribute_to_arg_parser(cls, parser): def contribute_to_arg_parser(cls, parser):
@ -61,6 +62,8 @@ class ServerEnv(Env):
help="URL for rpc from lbrycrd or lbcd, " help="URL for rpc from lbrycrd or lbcd, "
"<rpcuser>:<rpcpassword>@<lbrycrd rpc ip><lbrycrd rpc port>.", "<rpcuser>:<rpcpassword>@<lbrycrd rpc ip><lbrycrd rpc port>.",
default=env_daemon_url) default=env_daemon_url)
parser.add_argument('--daemon_ca_path', type=str, default='',
help='Path to the lbcd ca file, used for lbcd with ssl')
parser.add_argument('--host', type=str, default=cls.default('HOST', 'localhost'), parser.add_argument('--host', type=str, default=cls.default('HOST', 'localhost'),
help="Interface for hub server to listen on, use 0.0.0.0 to listen on the external " help="Interface for hub server to listen on, use 0.0.0.0 to listen on the external "
"interface. Can be set in env with 'HOST'") "interface. Can be set in env with 'HOST'")
@ -118,5 +121,5 @@ class ServerEnv(Env):
database_query_timeout=args.query_timeout_ms, blocking_channel_ids=args.blocking_channel_ids, database_query_timeout=args.query_timeout_ms, blocking_channel_ids=args.blocking_channel_ids,
filtering_channel_ids=args.filtering_channel_ids, elastic_notifier_host=args.elastic_notifier_host, filtering_channel_ids=args.filtering_channel_ids, elastic_notifier_host=args.elastic_notifier_host,
elastic_notifier_port=args.elastic_notifier_port, index_address_status=args.index_address_statuses, elastic_notifier_port=args.elastic_notifier_port, index_address_status=args.index_address_statuses,
address_history_cache_size=args.address_history_cache_size address_history_cache_size=args.address_history_cache_size, daemon_ca_path=args.daemon_ca_path
) )

View file

@ -19,7 +19,7 @@ class HubServerService(BlockchainReaderService):
self.notifications_to_send = [] self.notifications_to_send = []
self.mempool_notifications = set() self.mempool_notifications = set()
self.status_server = StatusServer() self.status_server = StatusServer()
self.daemon = LBCDaemon(env.coin, env.daemon_url) # only needed for broadcasting txs self.daemon = LBCDaemon(env.coin, env.daemon_url, daemon_ca_path=env.daemon_ca_path) # only needed for broadcasting txs
self.mempool = HubMemPool(self.env.coin, self.db) self.mempool = HubMemPool(self.env.coin, self.db)
self.session_manager = SessionManager( self.session_manager = SessionManager(
env, self.db, self.mempool, self.daemon, env, self.db, self.mempool, self.daemon,

View file

@ -3,6 +3,7 @@ import itertools
import json import json
import time import time
import logging import logging
import ssl
from functools import wraps from functools import wraps
import aiohttp import aiohttp
@ -43,7 +44,7 @@ class LBCDaemon:
) )
def __init__(self, coin, url, max_workqueue=10, init_retry=0.25, def __init__(self, coin, url, max_workqueue=10, init_retry=0.25,
max_retry=4.0): max_retry=4.0, daemon_ca_path=None):
self.coin = coin self.coin = coin
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.set_url(url) self.set_url(url)
@ -54,7 +55,10 @@ class LBCDaemon:
self.max_retry = max_retry self.max_retry = max_retry
self._height = None self._height = None
self.available_rpcs = {} self.available_rpcs = {}
self.connector = aiohttp.TCPConnector(ssl=False) ssl_context = None if not daemon_ca_path else ssl.create_default_context(
purpose=ssl.Purpose.CLIENT_AUTH, capath=daemon_ca_path
)
self.connector = aiohttp.TCPConnector(ssl=ssl_context is not None, ssl_context=ssl_context)
self._block_hash_cache = LRUCacheWithMetrics(1024) self._block_hash_cache = LRUCacheWithMetrics(1024)
self._block_cache = LRUCacheWithMetrics(64, metric_name='block', namespace=NAMESPACE) self._block_cache = LRUCacheWithMetrics(64, metric_name='block', namespace=NAMESPACE)

View file

@ -6,7 +6,8 @@ class BlockchainEnv(Env):
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
blocking_channel_ids=None, filtering_channel_ids=None, blocking_channel_ids=None, filtering_channel_ids=None,
db_max_open_files=64, daemon_url=None, hashX_history_cache_size=None, db_max_open_files=64, daemon_url=None, hashX_history_cache_size=None,
index_address_status=None, rebuild_address_status_from_height=None): index_address_status=None, rebuild_address_status_from_height=None,
daemon_ca_path=None):
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes, super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status) cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status)
self.db_max_open_files = db_max_open_files self.db_max_open_files = db_max_open_files
@ -15,6 +16,7 @@ class BlockchainEnv(Env):
else self.integer('ADDRESS_HISTORY_CACHE_SIZE', 1000) else self.integer('ADDRESS_HISTORY_CACHE_SIZE', 1000)
self.rebuild_address_status_from_height = rebuild_address_status_from_height \ self.rebuild_address_status_from_height = rebuild_address_status_from_height \
if isinstance(rebuild_address_status_from_height, int) else -1 if isinstance(rebuild_address_status_from_height, int) else -1
self.daemon_ca_path = daemon_ca_path if daemon_ca_path else None
@classmethod @classmethod
def contribute_to_arg_parser(cls, parser): def contribute_to_arg_parser(cls, parser):
@ -24,6 +26,8 @@ class BlockchainEnv(Env):
help="URL for rpc from lbrycrd or lbcd, " help="URL for rpc from lbrycrd or lbcd, "
"<rpcuser>:<rpcpassword>@<lbrycrd rpc ip><lbrycrd rpc port>.", "<rpcuser>:<rpcpassword>@<lbrycrd rpc ip><lbrycrd rpc port>.",
default=env_daemon_url) default=env_daemon_url)
parser.add_argument('--daemon_ca_path', type=str, default='',
help='Path to the lbcd ca file, used for lbcd with ssl')
parser.add_argument('--db_max_open_files', type=int, default=64, parser.add_argument('--db_max_open_files', type=int, default=64,
help='This setting translates into the max_open_files option given to rocksdb. ' help='This setting translates into the max_open_files option given to rocksdb. '
'A higher number will use more memory. Defaults to 64.') 'A higher number will use more memory. Defaults to 64.')
@ -44,5 +48,6 @@ class BlockchainEnv(Env):
prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes, prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes,
cache_all_claim_txos=args.cache_all_claim_txos, index_address_status=args.index_address_statuses, cache_all_claim_txos=args.cache_all_claim_txos, index_address_status=args.index_address_statuses,
hashX_history_cache_size=args.address_history_cache_size, hashX_history_cache_size=args.address_history_cache_size,
rebuild_address_status_from_height=args.rebuild_address_status_from_height rebuild_address_status_from_height=args.rebuild_address_status_from_height,
daemon_ca_path=args.daemon_ca_path
) )

View file

@ -47,7 +47,7 @@ class BlockchainProcessorService(BlockchainService):
def __init__(self, env: 'BlockchainEnv'): def __init__(self, env: 'BlockchainEnv'):
super().__init__(env, secondary_name='', thread_workers=1, thread_prefix='block-processor') super().__init__(env, secondary_name='', thread_workers=1, thread_prefix='block-processor')
self.env = env self.env = env
self.daemon = LBCDaemon(env.coin, env.daemon_url) self.daemon = LBCDaemon(env.coin, env.daemon_url, daemon_ca_path=env.daemon_ca_path)
self.mempool = MemPool(env.coin, self.db) self.mempool = MemPool(env.coin, self.db)
self.coin = env.coin self.coin = env.coin
self.wait_for_blocks_duration = 0.1 self.wait_for_blocks_duration = 0.1