diff --git a/lbry/wallet/server/chain_reader.py b/lbry/wallet/server/chain_reader.py index 15cdffe0e..d94917de7 100644 --- a/lbry/wallet/server/chain_reader.py +++ b/lbry/wallet/server/chain_reader.py @@ -20,8 +20,9 @@ class BlockchainReader: self.log = logging.getLogger(__name__).getChild(self.__class__.__name__) self.shutdown_event = asyncio.Event() self.cancellable_tasks = [] + self._thread_workers = thread_workers + self._thread_prefix = thread_prefix self._executor = ThreadPoolExecutor(thread_workers, thread_name_prefix=thread_prefix) - self.db = HubDB( env.coin, env.db_dir, env.cache_MB, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes, secondary_name=secondary_name, max_open_files=-1, blocking_channel_ids=env.blocking_channel_ids, @@ -93,6 +94,11 @@ class BlockchainReader: self.db.tx_counts.pop() self.db.headers.pop() + async def start(self): + if not self._executor: + self._executor = ThreadPoolExecutor(self._thread_workers, thread_name_prefix=self._thread_prefix) + self.db._executor = self._executor + class BlockchainReaderServer(BlockchainReader): def __init__(self, env): @@ -173,6 +179,7 @@ class BlockchainReaderServer(BlockchainReader): self.es_notification_client.close() async def start(self): + await super().start() env = self.env min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings() self.log.info(f'software version: {lbry.__version__}') @@ -192,7 +199,7 @@ class BlockchainReaderServer(BlockchainReader): self.last_state = self.db.read_db_state() await self.start_prometheus() - if self.env.udp_port: + if self.env.udp_port and int(self.env.udp_port): await self.status_server.start( 0, bytes.fromhex(self.env.coin.GENESIS_HASH)[::-1], self.env.country, self.env.host, self.env.udp_port, self.env.allow_lan_udp @@ -215,6 +222,7 @@ class BlockchainReaderServer(BlockchainReader): self.prometheus_server = None await self.daemon.close() self._executor.shutdown(wait=True) + self._executor = None self.shutdown_event.set() def run(self): diff --git a/lbry/wallet/server/db/elasticsearch/sync.py b/lbry/wallet/server/db/elasticsearch/sync.py index d0740ea16..494293f8d 100644 --- a/lbry/wallet/server/db/elasticsearch/sync.py +++ b/lbry/wallet/server/db/elasticsearch/sync.py @@ -122,8 +122,9 @@ class ElasticWriter(BlockchainReader): await self.sync_client.close() self.sync_client = None - def delete_index(self): - return self.sync_client.indices.delete(self.index, ignore_unavailable=True) + async def delete_index(self): + if self.sync_client: + return await self.sync_client.indices.delete(self.index, ignore_unavailable=True) def update_filter_query(self, censor_type, blockdict, channels=False): blockdict = {blocked.hex(): blocker.hex() for blocked, blocker in blockdict.items()} @@ -275,8 +276,7 @@ class ElasticWriter(BlockchainReader): return self._last_wrote_height async def start(self): - env = self.env - min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings() + await super().start() def _start_cancellable(run, *args): _flag = asyncio.Event() @@ -301,6 +301,7 @@ class ElasticWriter(BlockchainReader): await self.delete_index() await self.stop_index() self._executor.shutdown(wait=True) + self._executor = None def run(self): loop = asyncio.get_event_loop() diff --git a/lbry/wallet/server/env.py b/lbry/wallet/server/env.py index 7ef01349a..f2bd8b5e8 100644 --- a/lbry/wallet/server/env.py +++ b/lbry/wallet/server/env.py @@ -37,7 +37,7 @@ class Env: payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None, session_timeout=None, drop_client=None, description=None, daily_fee=None, database_query_timeout=None, db_max_open_files=512, elastic_notifier_port=None, - blocking_channel_ids=None, filtering_channel_ids=None): + blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, peer_announce=None): self.logger = class_logger(__name__, self.__class__.__name__) self.db_dir = db_dir if db_dir is not None else self.required('DB_DIRECTORY') @@ -91,8 +91,11 @@ class Env: self.country = country if country is not None else self.default('COUNTRY', 'US') # Peer discovery self.peer_discovery = self.peer_discovery_enum() - self.peer_announce = self.boolean('PEER_ANNOUNCE', True) - self.peer_hubs = self.extract_peer_hubs() + self.peer_announce = peer_announce if peer_announce is not None else self.boolean('PEER_ANNOUNCE', True) + if peer_hubs is not None: + self.peer_hubs = [p.strip("") for p in peer_hubs.split(",")] + else: + self.peer_hubs = self.extract_peer_hubs() # self.tor_proxy_host = self.default('TOR_PROXY_HOST', 'localhost') # self.tor_proxy_port = self.integer('TOR_PROXY_PORT', None) # The electrum client takes the empty string as unspecified diff --git a/tests/integration/blockchain/test_wallet_server_sessions.py b/tests/integration/blockchain/test_wallet_server_sessions.py index 4aff4634e..64b9a43c3 100644 --- a/tests/integration/blockchain/test_wallet_server_sessions.py +++ b/tests/integration/blockchain/test_wallet_server_sessions.py @@ -5,7 +5,6 @@ import lbry.wallet from lbry.error import ServerPaymentFeeAboveMaxAllowedError from lbry.wallet.network import ClientSession from lbry.wallet.rpc import RPCError -from lbry.wallet.server.db.elasticsearch.sync import make_es_index_and_run_sync from lbry.wallet.server.session import LBRYElectrumX from lbry.testcase import IntegrationTestCase, CommandTestCase from lbry.wallet.orchstr8.node import SPVNode @@ -127,19 +126,19 @@ class TestHubDiscovery(CommandTestCase): async def test_hub_discovery(self): us_final_node = SPVNode(self.conductor.spv_module, node_number=2) - await us_final_node.start(self.blockchain, extraconf={"COUNTRY": "US"}) + await us_final_node.start(self.blockchain, extraconf={"country": "US"}) self.addCleanup(us_final_node.stop) final_node_host = f"{us_final_node.hostname}:{us_final_node.port}" kp_final_node = SPVNode(self.conductor.spv_module, node_number=3) - await kp_final_node.start(self.blockchain, extraconf={"COUNTRY": "KP"}) + await kp_final_node.start(self.blockchain, extraconf={"country": "KP"}) self.addCleanup(kp_final_node.stop) kp_final_node_host = f"{kp_final_node.hostname}:{kp_final_node.port}" relay_node = SPVNode(self.conductor.spv_module, node_number=4) await relay_node.start(self.blockchain, extraconf={ - "COUNTRY": "FR", - "PEER_HUBS": ",".join([kp_final_node_host, final_node_host]) + "country": "FR", + "peer_hubs": ",".join([kp_final_node_host, final_node_host]) }) relay_node_host = f"{relay_node.hostname}:{relay_node.port}" self.addCleanup(relay_node.stop)