fix test teardown and setup

This commit is contained in:
Jack Robison 2022-02-20 12:48:39 -05:00
parent 8d86b0754c
commit 28abd9c449
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2

View file

@ -1,4 +1,5 @@
import os import os
import signal
import json import json
import shutil import shutil
import asyncio import asyncio
@ -73,8 +74,7 @@ class Conductor:
async def start_lbcd(self): async def start_lbcd(self):
if not self.lbcd_started: if not self.lbcd_started:
asyncio.create_task(self.lbcd_node.start()) await self.lbcd_node.start()
await self.lbcd_node.running.wait()
self.lbcd_started = True self.lbcd_started = True
async def stop_lbcd(self, cleanup=True): async def stop_lbcd(self, cleanup=True):
@ -84,7 +84,7 @@ class Conductor:
async def start_hub(self): async def start_hub(self):
if not self.hub_started: if not self.hub_started:
asyncio.create_task(self.hub_node.start()) await self.hub_node.start()
await self.lbcwallet_node.running.wait() await self.lbcwallet_node.running.wait()
self.hub_started = True self.hub_started = True
@ -115,8 +115,7 @@ class Conductor:
async def start_lbcwallet(self, clean=True): async def start_lbcwallet(self, clean=True):
if not self.lbcwallet_started: if not self.lbcwallet_started:
asyncio.create_task(self.lbcwallet_node.start()) await self.lbcwallet_node.start()
await self.lbcwallet_node.running.wait()
if clean: if clean:
mining_addr = await self.lbcwallet_node.get_new_address() mining_addr = await self.lbcwallet_node.get_new_address()
self.lbcwallet_node.mining_addr = mining_addr self.lbcwallet_node.mining_addr = mining_addr
@ -134,11 +133,13 @@ class Conductor:
await self.start_lbcd() await self.start_lbcd()
await self.start_lbcwallet() await self.start_lbcwallet()
await self.start_spv() await self.start_spv()
await self.start_hub()
await self.start_wallet() await self.start_wallet()
async def stop(self): async def stop(self):
all_the_stops = [ all_the_stops = [
self.stop_wallet, self.stop_wallet,
self.stop_hub,
self.stop_spv, self.stop_spv,
self.stop_lbcwallet, self.stop_lbcwallet,
self.stop_lbcd self.stop_lbcd
@ -155,6 +156,7 @@ class Conductor:
await self.start_lbcd() await self.start_lbcd()
await self.start_lbcwallet(clean=False) await self.start_lbcwallet(clean=False)
class WalletNode: class WalletNode:
def __init__(self, manager_class: Type[WalletManager], ledger_class: Type[Ledger], def __init__(self, manager_class: Type[WalletManager], ledger_class: Type[Ledger],
@ -222,6 +224,7 @@ class WalletNode:
class SPVNode: class SPVNode:
def __init__(self, coin_class, node_number=1): def __init__(self, coin_class, node_number=1):
self.node_number = node_number
self.coin_class = coin_class self.coin_class = coin_class
self.controller = None self.controller = None
self.data_path = None self.data_path = None
@ -234,46 +237,60 @@ class SPVNode:
self.elastic_notifier_port = 19080 + node_number self.elastic_notifier_port = 19080 + node_number
self.rpc_port = 8000 + node_number self.rpc_port = 8000 + node_number
self.session_timeout = 600 self.session_timeout = 600
self.stopped = False self.stopped = True
self.index_name = uuid4().hex self.index_name = uuid4().hex
async def start(self, lbcwallet_node: 'LBCWalletNode', extraconf=None): async def start(self, lbcwallet_node: 'LBCWalletNode', extraconf=None):
self.data_path = tempfile.mkdtemp() if not self.stopped:
conf = { log.warning("spv node is already running")
'description': '', return
'payment_address': '', self.stopped = False
'daily_fee': '0', try:
'db_dir': self.data_path, self.data_path = tempfile.mkdtemp()
'daemon_url': lbcwallet_node.rpc_url, conf = {
'reorg_limit': 100, 'description': '',
'host': self.hostname, 'payment_address': '',
'tcp_port': self.port, 'daily_fee': '0',
'udp_port': self.udp_port, 'db_dir': self.data_path,
'rpc_port': self.rpc_port, 'daemon_url': lbcwallet_node.rpc_url,
'elastic_notifier_port': self.elastic_notifier_port, 'reorg_limit': 100,
'session_timeout': self.session_timeout, 'host': self.hostname,
'max_query_workers': 0, 'tcp_port': self.port,
'es_index_prefix': self.index_name, 'udp_port': self.udp_port,
} 'rpc_port': self.rpc_port,
if extraconf: 'elastic_notifier_port': self.elastic_notifier_port,
conf.update(extraconf) 'session_timeout': self.session_timeout,
env = Env(self.coin_class, **conf) 'max_query_workers': 0,
self.writer = BlockProcessor(env) 'es_index_prefix': self.index_name,
self.server = BlockchainReaderServer(env) }
self.es_writer = ElasticWriter(env) if extraconf:
await self.writer.open() conf.update(extraconf)
await self.writer.start() env = Env(self.coin_class, **conf)
await self.es_writer.start() self.writer = BlockProcessor(env)
await self.server.start() self.server = BlockchainReaderServer(env)
self.es_writer = ElasticWriter(env)
await self.writer.open()
await self.writer.start()
await self.es_writer.start()
await self.server.start()
except Exception as e:
self.stopped = True
if not isinstance(e, asyncio.CancelledError):
log.exception("failed to start spv node")
raise e
async def stop(self, cleanup=True): async def stop(self, cleanup=True):
if self.stopped: if self.stopped:
log.warning("spv node is already stopped")
return return
try: try:
await self.server.stop() await self.server.stop()
await self.es_writer.stop(delete_index=True) await self.es_writer.stop(delete_index=True)
await self.writer.stop() await self.writer.stop()
self.stopped = True self.stopped = True
except Exception as e:
log.exception("failed to stop spv node")
raise e
finally: finally:
cleanup and self.cleanup() cleanup and self.cleanup()
@ -339,8 +356,8 @@ class WalletProcess(asyncio.SubprocessProtocol):
self.stopped.set() self.stopped.set()
self.ready.set() self.ready.set()
class LBCDNode:
class LBCDNode:
def __init__(self, url, daemon, cli): def __init__(self, url, daemon, cli):
self.latest_release_url = url self.latest_release_url = url
self.project_dir = os.path.dirname(os.path.dirname(__file__)) self.project_dir = os.path.dirname(os.path.dirname(__file__))
@ -410,44 +427,51 @@ class LBCDNode:
return self.exists or self.download() return self.exists or self.download()
async def start(self): async def start(self):
assert self.ensure() if not self.stopped:
loop = asyncio.get_event_loop() return
asyncio.get_child_watcher().attach_loop(loop) self.stopped = False
command = [ try:
self.daemon_bin, assert self.ensure()
'--notls', loop = asyncio.get_event_loop()
f'--datadir={self.data_path}', asyncio.get_child_watcher().attach_loop(loop)
'--regtest', f'--listen=127.0.0.1:{self.peerport}', f'--rpclisten=127.0.0.1:{self.rpcport}', command = [
'--txindex', f'--rpcuser={self.rpcuser}', f'--rpcpass={self.rpcpassword}' self.daemon_bin,
] '--notls',
self.log.info(' '.join(command)) f'--datadir={self.data_path}',
while self.stopped: '--regtest', f'--listen=127.0.0.1:{self.peerport}', f'--rpclisten=127.0.0.1:{self.rpcport}',
if self.running.is_set(): '--txindex', f'--rpcuser={self.rpcuser}', f'--rpcpass={self.rpcpassword}'
await asyncio.sleep(1) ]
continue self.log.info(' '.join(command))
try: self.transport, self.protocol = await loop.subprocess_exec(
self.transport, self.protocol = await loop.subprocess_exec( LBCDProcess, *command
LBCDProcess, *command )
) await self.protocol.ready.wait()
await self.protocol.ready.wait() assert not self.protocol.stopped.is_set()
assert not self.protocol.stopped.is_set() self.running.set()
self.running.set() except asyncio.CancelledError:
self.stopped = False self.running.clear()
except asyncio.CancelledError: self.stopped = True
self.running.clear() raise
raise except Exception as e:
except Exception as e: self.running.clear()
self.running.clear() self.stopped = True
log.exception('failed to start lbcd', exc_info=e) log.exception('failed to start lbcd', exc_info=e)
raise
async def stop(self, cleanup=True): async def stop(self, cleanup=True):
self.stopped = True if self.stopped:
return
try: try:
self.transport.terminate() if self.transport:
await self.protocol.stopped.wait() self.transport.terminate()
self.transport.close() await self.protocol.stopped.wait()
self.transport.close()
except Exception as e:
log.exception('failed to stop lbcd', exc_info=e)
raise
finally: finally:
self.log.info("Done shutting down " + self.daemon_bin) self.log.info("Done shutting down " + self.daemon_bin)
self.stopped = True
if cleanup: if cleanup:
self.cleanup() self.cleanup()
self.running.clear() self.running.clear()
@ -548,38 +572,39 @@ class LBCWalletNode:
f'--username={self.rpcuser}', f'--password={self.rpcpassword}' f'--username={self.rpcuser}', f'--password={self.rpcpassword}'
] ]
self.log.info(' '.join(command)) self.log.info(' '.join(command))
while self.stopped: try:
if self.running.is_set(): self.transport, self.protocol = await loop.subprocess_exec(
await asyncio.sleep(1) WalletProcess, *command
continue )
try: self.protocol.transport = self.transport
self.transport, self.protocol = await loop.subprocess_exec( await self.protocol.ready.wait()
WalletProcess, *command assert not self.protocol.stopped.is_set()
) self.running.set()
self.protocol.transport = self.transport self.stopped = False
await self.protocol.ready.wait() except asyncio.CancelledError:
assert not self.protocol.stopped.is_set() self.running.clear()
self.running.set() raise
self.stopped = False except Exception as e:
except asyncio.CancelledError: self.running.clear()
self.running.clear() log.exception('failed to start lbcwallet', exc_info=e)
raise
except Exception as e:
self.running.clear()
log.exception('failed to start lbcwallet', exc_info=e)
def cleanup(self): def cleanup(self):
assert self.stopped assert self.stopped
shutil.rmtree(self.data_path, ignore_errors=True) shutil.rmtree(self.data_path, ignore_errors=True)
async def stop(self, cleanup=True): async def stop(self, cleanup=True):
self.stopped = True if self.stopped:
return
try: try:
self.transport.terminate() self.transport.terminate()
await self.protocol.stopped.wait() await self.protocol.stopped.wait()
self.transport.close() self.transport.close()
except Exception as e:
log.exception('failed to stop lbcwallet', exc_info=e)
raise
finally: finally:
self.log.info("Done shutting down " + self.lbcwallet_bin) self.log.info("Done shutting down " + self.lbcwallet_bin)
self.stopped = True
if cleanup: if cleanup:
self.cleanup() self.cleanup()
self.running.clear() self.running.clear()
@ -662,12 +687,15 @@ class LBCWalletNode:
class HubProcess(asyncio.SubprocessProtocol): class HubProcess(asyncio.SubprocessProtocol):
def __init__(self): def __init__(self, ready, stopped):
self.ready = asyncio.Event() self.ready = ready
self.stopped = asyncio.Event() self.stopped = stopped
self.log = log.getChild('hub') self.log = log.getChild('hub')
self.transport = None
def pipe_data_received(self, fd, data): def pipe_data_received(self, fd, data):
self.stopped.clear()
self.ready.set()
if self.log: if self.log:
self.log.info(data.decode()) self.log.info(data.decode())
if b'error' in data.lower(): if b'error' in data.lower():
@ -681,16 +709,26 @@ class HubProcess(asyncio.SubprocessProtocol):
print(line) print(line)
def process_exited(self): def process_exited(self):
self.ready.clear()
self.stopped.set() self.stopped.set()
self.ready.set()
async def stop(self):
t = asyncio.create_task(self.stopped.wait())
try:
self.transport.send_signal(signal.SIGINT)
await asyncio.wait_for(t, 3)
# log.warning("stopped go hub")
except asyncio.TimeoutError:
if not t.done():
t.cancel()
self.transport.terminate()
await self.stopped.wait()
log.warning("terminated go hub")
class HubNode: class HubNode:
def __init__(self, url, daemon, spv_node): def __init__(self, url, daemon, spv_node):
self.spv_node = spv_node self.spv_node = spv_node
self.debug = False
self.latest_release_url = url self.latest_release_url = url
self.project_dir = os.path.dirname(os.path.dirname(__file__)) self.project_dir = os.path.dirname(os.path.dirname(__file__))
self.bin_dir = os.path.join(self.project_dir, 'bin') self.bin_dir = os.path.join(self.project_dir, 'bin')
@ -701,9 +739,13 @@ class HubNode:
self.protocol = None self.protocol = None
self.hostname = 'localhost' self.hostname = 'localhost'
self.rpcport = 50051 # avoid conflict with default rpc port self.rpcport = 50051 # avoid conflict with default rpc port
self.stopped = False self._stopped = asyncio.Event()
self.running = asyncio.Event() self.running = asyncio.Event()
@property
def stopped(self):
return not self.running.is_set()
@property @property
def exists(self): def exists(self):
return ( return (
@ -754,32 +796,24 @@ class HubNode:
self.daemon_bin, 'serve', '--esindex', self.spv_node.index_name + 'claims', '--debug' self.daemon_bin, 'serve', '--esindex', self.spv_node.index_name + 'claims', '--debug'
] ]
self.log.info(' '.join(command)) self.log.info(' '.join(command))
while not self.stopped: self.protocol = HubProcess(self.running, self._stopped)
if self.running.is_set(): try:
await asyncio.sleep(1) self.transport, _ = await loop.subprocess_exec(
continue lambda: self.protocol, *command
try: )
if not self.debug: self.protocol.transport = self.transport
self.transport, self.protocol = await loop.subprocess_exec( except Exception as e:
HubProcess, *command log.exception('failed to start go hub', exc_info=e)
) raise e
await self.protocol.ready.wait() await self.protocol.ready.wait()
assert not self.protocol.stopped.is_set()
self.running.set()
except asyncio.CancelledError:
self.running.clear()
raise
except Exception as e:
self.running.clear()
log.exception('failed to start hub', exc_info=e)
async def stop(self, cleanup=True): async def stop(self, cleanup=True):
self.stopped = True
try: try:
if not self.debug: if self.protocol:
self.transport.terminate() await self.protocol.stop()
await self.protocol.stopped.wait() except Exception as e:
self.transport.close() log.exception('failed to stop go hub', exc_info=e)
raise e
finally: finally:
if cleanup: if cleanup:
self.cleanup() self.cleanup()