diff --git a/lbry/wallet/orchstr8/node.py b/lbry/wallet/orchstr8/node.py index 0c73dabc8..e3668f2bb 100644 --- a/lbry/wallet/orchstr8/node.py +++ b/lbry/wallet/orchstr8/node.py @@ -1,4 +1,5 @@ import os +import signal import json import shutil import asyncio @@ -73,8 +74,7 @@ class Conductor: async def start_lbcd(self): if not self.lbcd_started: - asyncio.create_task(self.lbcd_node.start()) - await self.lbcd_node.running.wait() + await self.lbcd_node.start() self.lbcd_started = True async def stop_lbcd(self, cleanup=True): @@ -84,7 +84,7 @@ class Conductor: async def start_hub(self): if not self.hub_started: - asyncio.create_task(self.hub_node.start()) + await self.hub_node.start() await self.lbcwallet_node.running.wait() self.hub_started = True @@ -115,8 +115,7 @@ class Conductor: async def start_lbcwallet(self, clean=True): if not self.lbcwallet_started: - asyncio.create_task(self.lbcwallet_node.start()) - await self.lbcwallet_node.running.wait() + await self.lbcwallet_node.start() if clean: mining_addr = await self.lbcwallet_node.get_new_address() self.lbcwallet_node.mining_addr = mining_addr @@ -134,11 +133,13 @@ class Conductor: await self.start_lbcd() await self.start_lbcwallet() await self.start_spv() + await self.start_hub() await self.start_wallet() async def stop(self): all_the_stops = [ self.stop_wallet, + self.stop_hub, self.stop_spv, self.stop_lbcwallet, self.stop_lbcd @@ -155,6 +156,7 @@ class Conductor: await self.start_lbcd() await self.start_lbcwallet(clean=False) + class WalletNode: def __init__(self, manager_class: Type[WalletManager], ledger_class: Type[Ledger], @@ -222,6 +224,7 @@ class WalletNode: class SPVNode: def __init__(self, coin_class, node_number=1): + self.node_number = node_number self.coin_class = coin_class self.controller = None self.data_path = None @@ -234,46 +237,60 @@ class SPVNode: self.elastic_notifier_port = 19080 + node_number self.rpc_port = 8000 + node_number self.session_timeout = 600 - self.stopped = False + self.stopped = True self.index_name = uuid4().hex async def start(self, lbcwallet_node: 'LBCWalletNode', extraconf=None): - self.data_path = tempfile.mkdtemp() - conf = { - 'description': '', - 'payment_address': '', - 'daily_fee': '0', - 'db_dir': self.data_path, - 'daemon_url': lbcwallet_node.rpc_url, - 'reorg_limit': 100, - 'host': self.hostname, - 'tcp_port': self.port, - 'udp_port': self.udp_port, - 'rpc_port': self.rpc_port, - 'elastic_notifier_port': self.elastic_notifier_port, - 'session_timeout': self.session_timeout, - 'max_query_workers': 0, - 'es_index_prefix': self.index_name, - } - if extraconf: - conf.update(extraconf) - env = Env(self.coin_class, **conf) - self.writer = BlockProcessor(env) - self.server = BlockchainReaderServer(env) - self.es_writer = ElasticWriter(env) - await self.writer.open() - await self.writer.start() - await self.es_writer.start() - await self.server.start() + if not self.stopped: + log.warning("spv node is already running") + return + self.stopped = False + try: + self.data_path = tempfile.mkdtemp() + conf = { + 'description': '', + 'payment_address': '', + 'daily_fee': '0', + 'db_dir': self.data_path, + 'daemon_url': lbcwallet_node.rpc_url, + 'reorg_limit': 100, + 'host': self.hostname, + 'tcp_port': self.port, + 'udp_port': self.udp_port, + 'rpc_port': self.rpc_port, + 'elastic_notifier_port': self.elastic_notifier_port, + 'session_timeout': self.session_timeout, + 'max_query_workers': 0, + 'es_index_prefix': self.index_name, + } + if extraconf: + conf.update(extraconf) + env = Env(self.coin_class, **conf) + self.writer = BlockProcessor(env) + self.server = BlockchainReaderServer(env) + self.es_writer = ElasticWriter(env) + await self.writer.open() + await self.writer.start() + await self.es_writer.start() + await self.server.start() + except Exception as e: + self.stopped = True + if not isinstance(e, asyncio.CancelledError): + log.exception("failed to start spv node") + raise e async def stop(self, cleanup=True): if self.stopped: + log.warning("spv node is already stopped") return try: await self.server.stop() await self.es_writer.stop(delete_index=True) await self.writer.stop() self.stopped = True + except Exception as e: + log.exception("failed to stop spv node") + raise e finally: cleanup and self.cleanup() @@ -339,8 +356,8 @@ class WalletProcess(asyncio.SubprocessProtocol): self.stopped.set() self.ready.set() -class LBCDNode: +class LBCDNode: def __init__(self, url, daemon, cli): self.latest_release_url = url self.project_dir = os.path.dirname(os.path.dirname(__file__)) @@ -410,44 +427,51 @@ class LBCDNode: return self.exists or self.download() async def start(self): - assert self.ensure() - loop = asyncio.get_event_loop() - asyncio.get_child_watcher().attach_loop(loop) - command = [ - self.daemon_bin, - '--notls', - f'--datadir={self.data_path}', - '--regtest', f'--listen=127.0.0.1:{self.peerport}', f'--rpclisten=127.0.0.1:{self.rpcport}', - '--txindex', f'--rpcuser={self.rpcuser}', f'--rpcpass={self.rpcpassword}' - ] - self.log.info(' '.join(command)) - while self.stopped: - if self.running.is_set(): - await asyncio.sleep(1) - continue - try: - self.transport, self.protocol = await loop.subprocess_exec( - LBCDProcess, *command - ) - await self.protocol.ready.wait() - assert not self.protocol.stopped.is_set() - self.running.set() - self.stopped = False - except asyncio.CancelledError: - self.running.clear() - raise - except Exception as e: - self.running.clear() - log.exception('failed to start lbcd', exc_info=e) + if not self.stopped: + return + self.stopped = False + try: + assert self.ensure() + loop = asyncio.get_event_loop() + asyncio.get_child_watcher().attach_loop(loop) + command = [ + self.daemon_bin, + '--notls', + f'--datadir={self.data_path}', + '--regtest', f'--listen=127.0.0.1:{self.peerport}', f'--rpclisten=127.0.0.1:{self.rpcport}', + '--txindex', f'--rpcuser={self.rpcuser}', f'--rpcpass={self.rpcpassword}' + ] + self.log.info(' '.join(command)) + self.transport, self.protocol = await loop.subprocess_exec( + LBCDProcess, *command + ) + await self.protocol.ready.wait() + assert not self.protocol.stopped.is_set() + self.running.set() + except asyncio.CancelledError: + self.running.clear() + self.stopped = True + raise + except Exception as e: + self.running.clear() + self.stopped = True + log.exception('failed to start lbcd', exc_info=e) + raise async def stop(self, cleanup=True): - self.stopped = True + if self.stopped: + return try: - self.transport.terminate() - await self.protocol.stopped.wait() - self.transport.close() + if self.transport: + self.transport.terminate() + await self.protocol.stopped.wait() + self.transport.close() + except Exception as e: + log.exception('failed to stop lbcd', exc_info=e) + raise finally: self.log.info("Done shutting down " + self.daemon_bin) + self.stopped = True if cleanup: self.cleanup() self.running.clear() @@ -548,38 +572,39 @@ class LBCWalletNode: f'--username={self.rpcuser}', f'--password={self.rpcpassword}' ] self.log.info(' '.join(command)) - while self.stopped: - if self.running.is_set(): - await asyncio.sleep(1) - continue - try: - self.transport, self.protocol = await loop.subprocess_exec( - WalletProcess, *command - ) - self.protocol.transport = self.transport - await self.protocol.ready.wait() - assert not self.protocol.stopped.is_set() - self.running.set() - self.stopped = False - except asyncio.CancelledError: - self.running.clear() - raise - except Exception as e: - self.running.clear() - log.exception('failed to start lbcwallet', exc_info=e) + try: + self.transport, self.protocol = await loop.subprocess_exec( + WalletProcess, *command + ) + self.protocol.transport = self.transport + await self.protocol.ready.wait() + assert not self.protocol.stopped.is_set() + self.running.set() + self.stopped = False + except asyncio.CancelledError: + self.running.clear() + raise + except Exception as e: + self.running.clear() + log.exception('failed to start lbcwallet', exc_info=e) def cleanup(self): assert self.stopped shutil.rmtree(self.data_path, ignore_errors=True) async def stop(self, cleanup=True): - self.stopped = True + if self.stopped: + return try: self.transport.terminate() await self.protocol.stopped.wait() self.transport.close() + except Exception as e: + log.exception('failed to stop lbcwallet', exc_info=e) + raise finally: self.log.info("Done shutting down " + self.lbcwallet_bin) + self.stopped = True if cleanup: self.cleanup() self.running.clear() @@ -662,12 +687,15 @@ class LBCWalletNode: class HubProcess(asyncio.SubprocessProtocol): - def __init__(self): - self.ready = asyncio.Event() - self.stopped = asyncio.Event() + def __init__(self, ready, stopped): + self.ready = ready + self.stopped = stopped self.log = log.getChild('hub') + self.transport = None def pipe_data_received(self, fd, data): + self.stopped.clear() + self.ready.set() if self.log: self.log.info(data.decode()) if b'error' in data.lower(): @@ -681,16 +709,26 @@ class HubProcess(asyncio.SubprocessProtocol): print(line) def process_exited(self): + self.ready.clear() self.stopped.set() - self.ready.set() + + async def stop(self): + t = asyncio.create_task(self.stopped.wait()) + try: + self.transport.send_signal(signal.SIGINT) + await asyncio.wait_for(t, 3) + # log.warning("stopped go hub") + except asyncio.TimeoutError: + if not t.done(): + t.cancel() + self.transport.terminate() + await self.stopped.wait() + log.warning("terminated go hub") class HubNode: - def __init__(self, url, daemon, spv_node): self.spv_node = spv_node - self.debug = False - self.latest_release_url = url self.project_dir = os.path.dirname(os.path.dirname(__file__)) self.bin_dir = os.path.join(self.project_dir, 'bin') @@ -701,9 +739,13 @@ class HubNode: self.protocol = None self.hostname = 'localhost' self.rpcport = 50051 # avoid conflict with default rpc port - self.stopped = False + self._stopped = asyncio.Event() self.running = asyncio.Event() + @property + def stopped(self): + return not self.running.is_set() + @property def exists(self): return ( @@ -754,32 +796,24 @@ class HubNode: self.daemon_bin, 'serve', '--esindex', self.spv_node.index_name + 'claims', '--debug' ] self.log.info(' '.join(command)) - while not self.stopped: - if self.running.is_set(): - await asyncio.sleep(1) - continue - try: - if not self.debug: - self.transport, self.protocol = await loop.subprocess_exec( - HubProcess, *command - ) - await self.protocol.ready.wait() - assert not self.protocol.stopped.is_set() - self.running.set() - except asyncio.CancelledError: - self.running.clear() - raise - except Exception as e: - self.running.clear() - log.exception('failed to start hub', exc_info=e) + self.protocol = HubProcess(self.running, self._stopped) + try: + self.transport, _ = await loop.subprocess_exec( + lambda: self.protocol, *command + ) + self.protocol.transport = self.transport + except Exception as e: + log.exception('failed to start go hub', exc_info=e) + raise e + await self.protocol.ready.wait() async def stop(self, cleanup=True): - self.stopped = True try: - if not self.debug: - self.transport.terminate() - await self.protocol.stopped.wait() - self.transport.close() + if self.protocol: + await self.protocol.stop() + except Exception as e: + log.exception('failed to stop go hub', exc_info=e) + raise e finally: if cleanup: self.cleanup()