fix tests
This commit is contained in:
parent
8b592ba277
commit
9dfe41bd33
4 changed files with 25 additions and 14 deletions
|
@ -20,8 +20,9 @@ class BlockchainReader:
|
|||
self.log = logging.getLogger(__name__).getChild(self.__class__.__name__)
|
||||
self.shutdown_event = asyncio.Event()
|
||||
self.cancellable_tasks = []
|
||||
self._thread_workers = thread_workers
|
||||
self._thread_prefix = thread_prefix
|
||||
self._executor = ThreadPoolExecutor(thread_workers, thread_name_prefix=thread_prefix)
|
||||
|
||||
self.db = HubDB(
|
||||
env.coin, env.db_dir, env.cache_MB, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes,
|
||||
secondary_name=secondary_name, max_open_files=-1, blocking_channel_ids=env.blocking_channel_ids,
|
||||
|
@ -93,6 +94,11 @@ class BlockchainReader:
|
|||
self.db.tx_counts.pop()
|
||||
self.db.headers.pop()
|
||||
|
||||
async def start(self):
|
||||
if not self._executor:
|
||||
self._executor = ThreadPoolExecutor(self._thread_workers, thread_name_prefix=self._thread_prefix)
|
||||
self.db._executor = self._executor
|
||||
|
||||
|
||||
class BlockchainReaderServer(BlockchainReader):
|
||||
def __init__(self, env):
|
||||
|
@ -173,6 +179,7 @@ class BlockchainReaderServer(BlockchainReader):
|
|||
self.es_notification_client.close()
|
||||
|
||||
async def start(self):
|
||||
await super().start()
|
||||
env = self.env
|
||||
min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings()
|
||||
self.log.info(f'software version: {lbry.__version__}')
|
||||
|
@ -192,7 +199,7 @@ class BlockchainReaderServer(BlockchainReader):
|
|||
self.last_state = self.db.read_db_state()
|
||||
|
||||
await self.start_prometheus()
|
||||
if self.env.udp_port:
|
||||
if self.env.udp_port and int(self.env.udp_port):
|
||||
await self.status_server.start(
|
||||
0, bytes.fromhex(self.env.coin.GENESIS_HASH)[::-1], self.env.country,
|
||||
self.env.host, self.env.udp_port, self.env.allow_lan_udp
|
||||
|
@ -215,6 +222,7 @@ class BlockchainReaderServer(BlockchainReader):
|
|||
self.prometheus_server = None
|
||||
await self.daemon.close()
|
||||
self._executor.shutdown(wait=True)
|
||||
self._executor = None
|
||||
self.shutdown_event.set()
|
||||
|
||||
def run(self):
|
||||
|
|
|
@ -122,8 +122,9 @@ class ElasticWriter(BlockchainReader):
|
|||
await self.sync_client.close()
|
||||
self.sync_client = None
|
||||
|
||||
def delete_index(self):
|
||||
return self.sync_client.indices.delete(self.index, ignore_unavailable=True)
|
||||
async def delete_index(self):
|
||||
if self.sync_client:
|
||||
return await self.sync_client.indices.delete(self.index, ignore_unavailable=True)
|
||||
|
||||
def update_filter_query(self, censor_type, blockdict, channels=False):
|
||||
blockdict = {blocked.hex(): blocker.hex() for blocked, blocker in blockdict.items()}
|
||||
|
@ -275,8 +276,7 @@ class ElasticWriter(BlockchainReader):
|
|||
return self._last_wrote_height
|
||||
|
||||
async def start(self):
|
||||
env = self.env
|
||||
min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings()
|
||||
await super().start()
|
||||
|
||||
def _start_cancellable(run, *args):
|
||||
_flag = asyncio.Event()
|
||||
|
@ -301,6 +301,7 @@ class ElasticWriter(BlockchainReader):
|
|||
await self.delete_index()
|
||||
await self.stop_index()
|
||||
self._executor.shutdown(wait=True)
|
||||
self._executor = None
|
||||
|
||||
def run(self):
|
||||
loop = asyncio.get_event_loop()
|
||||
|
|
|
@ -37,7 +37,7 @@ class Env:
|
|||
payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None,
|
||||
session_timeout=None, drop_client=None, description=None, daily_fee=None,
|
||||
database_query_timeout=None, db_max_open_files=512, elastic_notifier_port=None,
|
||||
blocking_channel_ids=None, filtering_channel_ids=None):
|
||||
blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, peer_announce=None):
|
||||
self.logger = class_logger(__name__, self.__class__.__name__)
|
||||
|
||||
self.db_dir = db_dir if db_dir is not None else self.required('DB_DIRECTORY')
|
||||
|
@ -91,8 +91,11 @@ class Env:
|
|||
self.country = country if country is not None else self.default('COUNTRY', 'US')
|
||||
# Peer discovery
|
||||
self.peer_discovery = self.peer_discovery_enum()
|
||||
self.peer_announce = self.boolean('PEER_ANNOUNCE', True)
|
||||
self.peer_hubs = self.extract_peer_hubs()
|
||||
self.peer_announce = peer_announce if peer_announce is not None else self.boolean('PEER_ANNOUNCE', True)
|
||||
if peer_hubs is not None:
|
||||
self.peer_hubs = [p.strip("") for p in peer_hubs.split(",")]
|
||||
else:
|
||||
self.peer_hubs = self.extract_peer_hubs()
|
||||
# self.tor_proxy_host = self.default('TOR_PROXY_HOST', 'localhost')
|
||||
# self.tor_proxy_port = self.integer('TOR_PROXY_PORT', None)
|
||||
# The electrum client takes the empty string as unspecified
|
||||
|
|
|
@ -5,7 +5,6 @@ import lbry.wallet
|
|||
from lbry.error import ServerPaymentFeeAboveMaxAllowedError
|
||||
from lbry.wallet.network import ClientSession
|
||||
from lbry.wallet.rpc import RPCError
|
||||
from lbry.wallet.server.db.elasticsearch.sync import make_es_index_and_run_sync
|
||||
from lbry.wallet.server.session import LBRYElectrumX
|
||||
from lbry.testcase import IntegrationTestCase, CommandTestCase
|
||||
from lbry.wallet.orchstr8.node import SPVNode
|
||||
|
@ -127,19 +126,19 @@ class TestHubDiscovery(CommandTestCase):
|
|||
|
||||
async def test_hub_discovery(self):
|
||||
us_final_node = SPVNode(self.conductor.spv_module, node_number=2)
|
||||
await us_final_node.start(self.blockchain, extraconf={"COUNTRY": "US"})
|
||||
await us_final_node.start(self.blockchain, extraconf={"country": "US"})
|
||||
self.addCleanup(us_final_node.stop)
|
||||
final_node_host = f"{us_final_node.hostname}:{us_final_node.port}"
|
||||
|
||||
kp_final_node = SPVNode(self.conductor.spv_module, node_number=3)
|
||||
await kp_final_node.start(self.blockchain, extraconf={"COUNTRY": "KP"})
|
||||
await kp_final_node.start(self.blockchain, extraconf={"country": "KP"})
|
||||
self.addCleanup(kp_final_node.stop)
|
||||
kp_final_node_host = f"{kp_final_node.hostname}:{kp_final_node.port}"
|
||||
|
||||
relay_node = SPVNode(self.conductor.spv_module, node_number=4)
|
||||
await relay_node.start(self.blockchain, extraconf={
|
||||
"COUNTRY": "FR",
|
||||
"PEER_HUBS": ",".join([kp_final_node_host, final_node_host])
|
||||
"country": "FR",
|
||||
"peer_hubs": ",".join([kp_final_node_host, final_node_host])
|
||||
})
|
||||
relay_node_host = f"{relay_node.hostname}:{relay_node.port}"
|
||||
self.addCleanup(relay_node.stop)
|
||||
|
|
Loading…
Reference in a new issue