From 9dbd8cab4bae2440a095557dd947293054f3ac5d Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 14 Jun 2022 15:25:28 -0400 Subject: [PATCH] add --daemon_ca_path arg to use ssl with lbcd fixes https://github.com/lbryio/hub/issues/41 --- hub/herald/env.py | 7 +++++-- hub/herald/service.py | 2 +- hub/scribe/daemon.py | 8 ++++++-- hub/scribe/env.py | 9 +++++++-- hub/scribe/service.py | 2 +- 5 files changed, 20 insertions(+), 8 deletions(-) diff --git a/hub/herald/env.py b/hub/herald/env.py index b412ab7..074137a 100644 --- a/hub/herald/env.py +++ b/hub/herald/env.py @@ -11,7 +11,7 @@ class ServerEnv(Env): session_timeout=None, drop_client=None, description=None, daily_fee=None, database_query_timeout=None, elastic_notifier_host=None, elastic_notifier_port=None, blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, peer_announce=None, - index_address_status=None, address_history_cache_size=None): + index_address_status=None, address_history_cache_size=None, daemon_ca_path=None): super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes, cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status) self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL') @@ -52,6 +52,7 @@ class ServerEnv(Env): (float(self.integer('QUERY_TIMEOUT_MS', 10000)) / 1000.0) self.hashX_history_cache_size = address_history_cache_size if address_history_cache_size is not None \ else self.integer('ADDRESS_HISTORY_CACHE_SIZE', 1000) + self.daemon_ca_path = daemon_ca_path if daemon_ca_path else None @classmethod def contribute_to_arg_parser(cls, parser): @@ -61,6 +62,8 @@ class ServerEnv(Env): help="URL for rpc from lbrycrd or lbcd, " ":@.", default=env_daemon_url) + parser.add_argument('--daemon_ca_path', type=str, default='', + help='Path to the lbcd ca file, used for lbcd with ssl') parser.add_argument('--host', type=str, default=cls.default('HOST', 'localhost'), help="Interface for hub server to listen on, use 0.0.0.0 to listen on the external " "interface. Can be set in env with 'HOST'") @@ -118,5 +121,5 @@ class ServerEnv(Env): database_query_timeout=args.query_timeout_ms, blocking_channel_ids=args.blocking_channel_ids, filtering_channel_ids=args.filtering_channel_ids, elastic_notifier_host=args.elastic_notifier_host, elastic_notifier_port=args.elastic_notifier_port, index_address_status=args.index_address_statuses, - address_history_cache_size=args.address_history_cache_size + address_history_cache_size=args.address_history_cache_size, daemon_ca_path=args.daemon_ca_path ) diff --git a/hub/herald/service.py b/hub/herald/service.py index f5a7d3d..aa4e0c4 100644 --- a/hub/herald/service.py +++ b/hub/herald/service.py @@ -19,7 +19,7 @@ class HubServerService(BlockchainReaderService): self.notifications_to_send = [] self.mempool_notifications = set() self.status_server = StatusServer() - self.daemon = LBCDaemon(env.coin, env.daemon_url) # only needed for broadcasting txs + self.daemon = LBCDaemon(env.coin, env.daemon_url, daemon_ca_path=env.daemon_ca_path) # only needed for broadcasting txs self.mempool = HubMemPool(self.env.coin, self.db) self.session_manager = SessionManager( env, self.db, self.mempool, self.daemon, diff --git a/hub/scribe/daemon.py b/hub/scribe/daemon.py index b5af795..7a76c8c 100644 --- a/hub/scribe/daemon.py +++ b/hub/scribe/daemon.py @@ -3,6 +3,7 @@ import itertools import json import time import logging +import ssl from functools import wraps import aiohttp @@ -43,7 +44,7 @@ class LBCDaemon: ) def __init__(self, coin, url, max_workqueue=10, init_retry=0.25, - max_retry=4.0): + max_retry=4.0, daemon_ca_path=None): self.coin = coin self.logger = logging.getLogger(__name__) self.set_url(url) @@ -54,7 +55,10 @@ class LBCDaemon: self.max_retry = max_retry self._height = None self.available_rpcs = {} - self.connector = aiohttp.TCPConnector(ssl=False) + ssl_context = None if not daemon_ca_path else ssl.create_default_context( + purpose=ssl.Purpose.CLIENT_AUTH, capath=daemon_ca_path + ) + self.connector = aiohttp.TCPConnector(ssl=ssl_context is not None, ssl_context=ssl_context) self._block_hash_cache = LRUCacheWithMetrics(1024) self._block_cache = LRUCacheWithMetrics(64, metric_name='block', namespace=NAMESPACE) diff --git a/hub/scribe/env.py b/hub/scribe/env.py index 6bc32c2..4a5daf8 100644 --- a/hub/scribe/env.py +++ b/hub/scribe/env.py @@ -6,7 +6,8 @@ class BlockchainEnv(Env): prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, blocking_channel_ids=None, filtering_channel_ids=None, db_max_open_files=64, daemon_url=None, hashX_history_cache_size=None, - index_address_status=None, rebuild_address_status_from_height=None): + index_address_status=None, rebuild_address_status_from_height=None, + daemon_ca_path=None): super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes, cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status) self.db_max_open_files = db_max_open_files @@ -15,6 +16,7 @@ class BlockchainEnv(Env): else self.integer('ADDRESS_HISTORY_CACHE_SIZE', 1000) self.rebuild_address_status_from_height = rebuild_address_status_from_height \ if isinstance(rebuild_address_status_from_height, int) else -1 + self.daemon_ca_path = daemon_ca_path if daemon_ca_path else None @classmethod def contribute_to_arg_parser(cls, parser): @@ -24,6 +26,8 @@ class BlockchainEnv(Env): help="URL for rpc from lbrycrd or lbcd, " ":@.", default=env_daemon_url) + parser.add_argument('--daemon_ca_path', type=str, default='', + help='Path to the lbcd ca file, used for lbcd with ssl') parser.add_argument('--db_max_open_files', type=int, default=64, help='This setting translates into the max_open_files option given to rocksdb. ' 'A higher number will use more memory. Defaults to 64.') @@ -44,5 +48,6 @@ class BlockchainEnv(Env): prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes, cache_all_claim_txos=args.cache_all_claim_txos, index_address_status=args.index_address_statuses, hashX_history_cache_size=args.address_history_cache_size, - rebuild_address_status_from_height=args.rebuild_address_status_from_height + rebuild_address_status_from_height=args.rebuild_address_status_from_height, + daemon_ca_path=args.daemon_ca_path ) diff --git a/hub/scribe/service.py b/hub/scribe/service.py index ed46b0a..4234193 100644 --- a/hub/scribe/service.py +++ b/hub/scribe/service.py @@ -47,7 +47,7 @@ class BlockchainProcessorService(BlockchainService): def __init__(self, env: 'BlockchainEnv'): super().__init__(env, secondary_name='', thread_workers=1, thread_prefix='block-processor') self.env = env - self.daemon = LBCDaemon(env.coin, env.daemon_url) + self.daemon = LBCDaemon(env.coin, env.daemon_url, daemon_ca_path=env.daemon_ca_path) self.mempool = MemPool(env.coin, self.db) self.coin = env.coin self.wait_for_blocks_duration = 0.1