fix tests

This commit is contained in:
Jack Robison 2022-01-25 14:27:20 -05:00
parent df91f4754a
commit fb4dc8342a
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
4 changed files with 25 additions and 14 deletions

View file

@ -20,8 +20,9 @@ class BlockchainReader:
self.log = logging.getLogger(__name__).getChild(self.__class__.__name__) self.log = logging.getLogger(__name__).getChild(self.__class__.__name__)
self.shutdown_event = asyncio.Event() self.shutdown_event = asyncio.Event()
self.cancellable_tasks = [] self.cancellable_tasks = []
self._thread_workers = thread_workers
self._thread_prefix = thread_prefix
self._executor = ThreadPoolExecutor(thread_workers, thread_name_prefix=thread_prefix) self._executor = ThreadPoolExecutor(thread_workers, thread_name_prefix=thread_prefix)
self.db = HubDB( self.db = HubDB(
env.coin, env.db_dir, env.cache_MB, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes, env.coin, env.db_dir, env.cache_MB, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes,
secondary_name=secondary_name, max_open_files=-1, blocking_channel_ids=env.blocking_channel_ids, secondary_name=secondary_name, max_open_files=-1, blocking_channel_ids=env.blocking_channel_ids,
@ -93,6 +94,11 @@ class BlockchainReader:
self.db.tx_counts.pop() self.db.tx_counts.pop()
self.db.headers.pop() self.db.headers.pop()
async def start(self):
if not self._executor:
self._executor = ThreadPoolExecutor(self._thread_workers, thread_name_prefix=self._thread_prefix)
self.db._executor = self._executor
class BlockchainReaderServer(BlockchainReader): class BlockchainReaderServer(BlockchainReader):
def __init__(self, env): def __init__(self, env):
@ -173,6 +179,7 @@ class BlockchainReaderServer(BlockchainReader):
self.es_notification_client.close() self.es_notification_client.close()
async def start(self): async def start(self):
await super().start()
env = self.env env = self.env
min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings() min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings()
self.log.info(f'software version: {lbry.__version__}') self.log.info(f'software version: {lbry.__version__}')
@ -192,7 +199,7 @@ class BlockchainReaderServer(BlockchainReader):
self.last_state = self.db.read_db_state() self.last_state = self.db.read_db_state()
await self.start_prometheus() await self.start_prometheus()
if self.env.udp_port: if self.env.udp_port and int(self.env.udp_port):
await self.status_server.start( await self.status_server.start(
0, bytes.fromhex(self.env.coin.GENESIS_HASH)[::-1], self.env.country, 0, bytes.fromhex(self.env.coin.GENESIS_HASH)[::-1], self.env.country,
self.env.host, self.env.udp_port, self.env.allow_lan_udp self.env.host, self.env.udp_port, self.env.allow_lan_udp
@ -215,6 +222,7 @@ class BlockchainReaderServer(BlockchainReader):
self.prometheus_server = None self.prometheus_server = None
await self.daemon.close() await self.daemon.close()
self._executor.shutdown(wait=True) self._executor.shutdown(wait=True)
self._executor = None
self.shutdown_event.set() self.shutdown_event.set()
def run(self): def run(self):

View file

@ -122,8 +122,9 @@ class ElasticWriter(BlockchainReader):
await self.sync_client.close() await self.sync_client.close()
self.sync_client = None self.sync_client = None
def delete_index(self): async def delete_index(self):
return self.sync_client.indices.delete(self.index, ignore_unavailable=True) if self.sync_client:
return await self.sync_client.indices.delete(self.index, ignore_unavailable=True)
def update_filter_query(self, censor_type, blockdict, channels=False): def update_filter_query(self, censor_type, blockdict, channels=False):
blockdict = {blocked.hex(): blocker.hex() for blocked, blocker in blockdict.items()} blockdict = {blocked.hex(): blocker.hex() for blocked, blocker in blockdict.items()}
@ -275,8 +276,7 @@ class ElasticWriter(BlockchainReader):
return self._last_wrote_height return self._last_wrote_height
async def start(self): async def start(self):
env = self.env await super().start()
min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings()
def _start_cancellable(run, *args): def _start_cancellable(run, *args):
_flag = asyncio.Event() _flag = asyncio.Event()
@ -301,6 +301,7 @@ class ElasticWriter(BlockchainReader):
await self.delete_index() await self.delete_index()
await self.stop_index() await self.stop_index()
self._executor.shutdown(wait=True) self._executor.shutdown(wait=True)
self._executor = None
def run(self): def run(self):
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()

View file

@ -37,7 +37,7 @@ class Env:
payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None, payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None,
session_timeout=None, drop_client=None, description=None, daily_fee=None, session_timeout=None, drop_client=None, description=None, daily_fee=None,
database_query_timeout=None, db_max_open_files=512, elastic_notifier_port=None, database_query_timeout=None, db_max_open_files=512, elastic_notifier_port=None,
blocking_channel_ids=None, filtering_channel_ids=None): blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, peer_announce=None):
self.logger = class_logger(__name__, self.__class__.__name__) self.logger = class_logger(__name__, self.__class__.__name__)
self.db_dir = db_dir if db_dir is not None else self.required('DB_DIRECTORY') self.db_dir = db_dir if db_dir is not None else self.required('DB_DIRECTORY')
@ -91,7 +91,10 @@ class Env:
self.country = country if country is not None else self.default('COUNTRY', 'US') self.country = country if country is not None else self.default('COUNTRY', 'US')
# Peer discovery # Peer discovery
self.peer_discovery = self.peer_discovery_enum() self.peer_discovery = self.peer_discovery_enum()
self.peer_announce = self.boolean('PEER_ANNOUNCE', True) self.peer_announce = peer_announce if peer_announce is not None else self.boolean('PEER_ANNOUNCE', True)
if peer_hubs is not None:
self.peer_hubs = [p.strip("") for p in peer_hubs.split(",")]
else:
self.peer_hubs = self.extract_peer_hubs() self.peer_hubs = self.extract_peer_hubs()
# self.tor_proxy_host = self.default('TOR_PROXY_HOST', 'localhost') # self.tor_proxy_host = self.default('TOR_PROXY_HOST', 'localhost')
# self.tor_proxy_port = self.integer('TOR_PROXY_PORT', None) # self.tor_proxy_port = self.integer('TOR_PROXY_PORT', None)

View file

@ -5,7 +5,6 @@ import lbry.wallet
from lbry.error import ServerPaymentFeeAboveMaxAllowedError from lbry.error import ServerPaymentFeeAboveMaxAllowedError
from lbry.wallet.network import ClientSession from lbry.wallet.network import ClientSession
from lbry.wallet.rpc import RPCError from lbry.wallet.rpc import RPCError
from lbry.wallet.server.db.elasticsearch.sync import make_es_index_and_run_sync
from lbry.wallet.server.session import LBRYElectrumX from lbry.wallet.server.session import LBRYElectrumX
from lbry.testcase import IntegrationTestCase, CommandTestCase from lbry.testcase import IntegrationTestCase, CommandTestCase
from lbry.wallet.orchstr8.node import SPVNode from lbry.wallet.orchstr8.node import SPVNode
@ -127,19 +126,19 @@ class TestHubDiscovery(CommandTestCase):
async def test_hub_discovery(self): async def test_hub_discovery(self):
us_final_node = SPVNode(self.conductor.spv_module, node_number=2) us_final_node = SPVNode(self.conductor.spv_module, node_number=2)
await us_final_node.start(self.blockchain, extraconf={"COUNTRY": "US"}) await us_final_node.start(self.blockchain, extraconf={"country": "US"})
self.addCleanup(us_final_node.stop) self.addCleanup(us_final_node.stop)
final_node_host = f"{us_final_node.hostname}:{us_final_node.port}" final_node_host = f"{us_final_node.hostname}:{us_final_node.port}"
kp_final_node = SPVNode(self.conductor.spv_module, node_number=3) kp_final_node = SPVNode(self.conductor.spv_module, node_number=3)
await kp_final_node.start(self.blockchain, extraconf={"COUNTRY": "KP"}) await kp_final_node.start(self.blockchain, extraconf={"country": "KP"})
self.addCleanup(kp_final_node.stop) self.addCleanup(kp_final_node.stop)
kp_final_node_host = f"{kp_final_node.hostname}:{kp_final_node.port}" kp_final_node_host = f"{kp_final_node.hostname}:{kp_final_node.port}"
relay_node = SPVNode(self.conductor.spv_module, node_number=4) relay_node = SPVNode(self.conductor.spv_module, node_number=4)
await relay_node.start(self.blockchain, extraconf={ await relay_node.start(self.blockchain, extraconf={
"COUNTRY": "FR", "country": "FR",
"PEER_HUBS": ",".join([kp_final_node_host, final_node_host]) "peer_hubs": ",".join([kp_final_node_host, final_node_host])
}) })
relay_node_host = f"{relay_node.hostname}:{relay_node.port}" relay_node_host = f"{relay_node.hostname}:{relay_node.port}"
self.addCleanup(relay_node.stop) self.addCleanup(relay_node.stop)