From 5176feed81027dbd2b986aaba20c155477f22bc4 Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Sun, 7 Jul 2019 21:03:05 -0400 Subject: [PATCH] let daemon be shut down before it has fully started. fixes #2231 --- lbry/lbry/conf.py | 5 +- lbry/lbry/extras/cli.py | 121 ++++++++++++-------------- lbry/lbry/extras/daemon/Component.py | 6 +- lbry/lbry/extras/daemon/Components.py | 2 +- lbry/lbry/extras/daemon/Daemon.py | 11 ++- torba/torba/client/basenetwork.py | 3 + 6 files changed, 77 insertions(+), 71 deletions(-) diff --git a/lbry/lbry/conf.py b/lbry/lbry/conf.py index f1f2ed3e8..42fb68389 100644 --- a/lbry/lbry/conf.py +++ b/lbry/lbry/conf.py @@ -362,6 +362,9 @@ class ConfigFileAccess: del self.data[key] +TBC = typing.TypeVar('TBC', bound='BaseConfig') + + class BaseConfig: config = Path("Path to configuration file.", metavar='FILE') @@ -418,7 +421,7 @@ class BaseConfig: } @classmethod - def create_from_arguments(cls, args): + def create_from_arguments(cls, args) -> TBC: conf = cls() conf.set_arguments(args) conf.set_environment() diff --git a/lbry/lbry/extras/cli.py b/lbry/lbry/extras/cli.py index 1c9710728..602ce30a6 100644 --- a/lbry/lbry/extras/cli.py +++ b/lbry/lbry/extras/cli.py @@ -221,6 +221,61 @@ def ensure_directory_exists(path: str): pathlib.Path(path).mkdir(parents=True, exist_ok=True) +def run_daemon(args: list, conf: Config): + default_formatter = logging.Formatter("%(asctime)s %(levelname)-8s %(name)s:%(lineno)d: %(message)s") + file_handler = logging.handlers.RotatingFileHandler( + conf.log_file_path, maxBytes=2097152, backupCount=5 + ) + file_handler.setFormatter(default_formatter) + log.addHandler(file_handler) + logging.getLogger('torba').addHandler(file_handler) + + if not args.quiet: + handler = logging.StreamHandler() + handler.setFormatter(default_formatter) + log.addHandler(handler) + logging.getLogger('torba').addHandler(handler) + logging.getLogger('torba').setLevel(logging.INFO) + + logging.getLogger('aioupnp').setLevel(logging.WARNING) + logging.getLogger('aiohttp').setLevel(logging.CRITICAL) + + loop = asyncio.get_event_loop() + + if args.verbose: + log.setLevel(logging.DEBUG) + loop.set_debug(True) + else: + log.setLevel(logging.INFO) + + if conf.share_usage_data: + loggly_handler = get_loggly_handler() + loggly_handler.setLevel(logging.ERROR) + log.addHandler(loggly_handler) + + daemon = Daemon(conf) + + def __exit(): + raise GracefulExit() + + try: + loop.add_signal_handler(signal.SIGINT, __exit) + loop.add_signal_handler(signal.SIGTERM, __exit) + except NotImplementedError: + pass # Not implemented on Windows + + try: + loop.run_until_complete(daemon.start()) + loop.run_forever() + except (GracefulExit, KeyboardInterrupt, asyncio.CancelledError): + pass + finally: + loop.run_until_complete(daemon.stop()) + + if hasattr(loop, 'shutdown_asyncgens'): + loop.run_until_complete(loop.shutdown_asyncgens()) + + def main(argv=None): argv = argv or sys.argv[1:] parser = get_argument_parser() @@ -232,88 +287,26 @@ def main(argv=None): if args.cli_version: print(f"lbrynet {lbrynet_version}") - return 0 - elif args.command == 'start': - if args.help: args.start_parser.print_help() - return 0 - - default_formatter = logging.Formatter("%(asctime)s %(levelname)-8s %(name)s:%(lineno)d: %(message)s") - file_handler = logging.handlers.RotatingFileHandler( - conf.log_file_path, maxBytes=2097152, backupCount=5 - ) - file_handler.setFormatter(default_formatter) - log.addHandler(file_handler) - logging.getLogger('torba').addHandler(file_handler) - - if not args.quiet: - handler = logging.StreamHandler() - handler.setFormatter(default_formatter) - log.addHandler(handler) - logging.getLogger('torba').addHandler(handler) - logging.getLogger('torba').setLevel(logging.INFO) - - logging.getLogger('aioupnp').setLevel(logging.WARNING) - logging.getLogger('aiohttp').setLevel(logging.CRITICAL) - - loop = asyncio.get_event_loop() - - if args.verbose: - log.setLevel(logging.DEBUG) - loop.set_debug(True) else: - log.setLevel(logging.INFO) - - if conf.share_usage_data: - loggly_handler = get_loggly_handler() - loggly_handler.setLevel(logging.ERROR) - log.addHandler(loggly_handler) - - daemon = Daemon(conf) - started = False - def __exit(): - if started: - daemon.stop_event.set() - else: - raise GracefulExit() - try: - loop.add_signal_handler(signal.SIGINT, __exit) - loop.add_signal_handler(signal.SIGTERM, __exit) - except NotImplementedError: - pass # Not implemented on Windows - try: - loop.run_until_complete(daemon.start()) - started = True - loop.run_until_complete(daemon.stop_event.wait()) - except (GracefulExit, KeyboardInterrupt): - pass - finally: - loop.run_until_complete(daemon.stop()) - if hasattr(loop, 'shutdown_asyncgens'): - loop.run_until_complete(loop.shutdown_asyncgens()) - + run_daemon(args, conf) elif args.command is not None: - doc = args.doc api_method_name = args.api_method_name if args.replaced_by: print(f"{args.api_method_name} is deprecated, using {args.replaced_by['api_method_name']}.") doc = args.replaced_by['doc'] api_method_name = args.replaced_by['api_method_name'] - if args.help: print(doc) else: parsed = docopt(doc, command_args) params = set_kwargs(parsed) - loop = asyncio.get_event_loop() - loop.run_until_complete(execute_command(conf, api_method_name, params)) - + asyncio.get_event_loop().run_until_complete(execute_command(conf, api_method_name, params)) elif args.group is not None: args.group_parser.print_help() - else: parser.print_help() diff --git a/lbry/lbry/extras/daemon/Component.py b/lbry/lbry/extras/daemon/Component.py index b31d99871..46da5f3b2 100644 --- a/lbry/lbry/extras/daemon/Component.py +++ b/lbry/lbry/extras/daemon/Component.py @@ -56,7 +56,8 @@ class Component(metaclass=ComponentType): self._running = True return result except asyncio.CancelledError: - pass + log.info("Cancelled setup of %s component", self.__class__.__name__) + raise except Exception as err: log.exception("Error setting up %s", self.component_name or self.__class__.__name__) raise err @@ -67,7 +68,8 @@ class Component(metaclass=ComponentType): self._running = False return result except asyncio.CancelledError: - pass + log.info("Cancelled stop of %s component", self.__class__.__name__) + raise except Exception as err: log.exception("Error stopping %s", self.__class__.__name__) raise err diff --git a/lbry/lbry/extras/daemon/Components.py b/lbry/lbry/extras/daemon/Components.py index b76eaa5b8..1c32a12cf 100644 --- a/lbry/lbry/extras/daemon/Components.py +++ b/lbry/lbry/extras/daemon/Components.py @@ -168,7 +168,7 @@ class HeadersComponent(Component): } net = Network(ledger) first_connection = net.on_connected.first - asyncio.ensure_future(net.start()) + asyncio.ensure_future(net.start()) # TODO: SKETCHY! it might be trapping a CancelledError and not raising it await first_connection remote_height = await net.get_server_height() await net.stop() diff --git a/lbry/lbry/extras/daemon/Daemon.py b/lbry/lbry/extras/daemon/Daemon.py index f19d62d28..d3e4690bb 100644 --- a/lbry/lbry/extras/daemon/Daemon.py +++ b/lbry/lbry/extras/daemon/Daemon.py @@ -272,7 +272,6 @@ class Daemon(metaclass=JSONRPCServerType): ) self.component_startup_task = None self._connection_status: typing.Tuple[float, bool] = [self.component_manager.loop.time(), False] - self.stop_event = asyncio.Event() logging.getLogger('aiohttp.access').setLevel(logging.WARN) rpc_app = web.Application() @@ -436,7 +435,7 @@ class Daemon(metaclass=JSONRPCServerType): except asyncio.CancelledError: log.info("shutting down before finished starting") await self.analytics_manager.send_server_startup_error("shutting down before finished starting") - await self.stop() + raise except Exception as e: await self.analytics_manager.send_server_startup_error(str(e)) log.exception('Failed to start lbrynet') @@ -516,6 +515,8 @@ class Daemon(metaclass=JSONRPCServerType): log.warning("http code during /stream range request: %s", err) raise err except asyncio.CancelledError: + # if not excepted here, it would bubble up the error to the console. every time you closed + # a running tab, you'd get this error in the console log.debug("/stream range request cancelled") except Exception: log.exception("error handling /stream range request") @@ -696,8 +697,12 @@ class Daemon(metaclass=JSONRPCServerType): Returns: (string) Shutdown message """ + + def shutdown(): + raise web.GracefulExit() + log.info("Shutting down lbrynet daemon") - self.stop_event.set() + asyncio.get_event_loop().call_later(0, shutdown) return "Shutting down" async def jsonrpc_status(self): diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index 0b3c8aeca..1ed55a51f 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -3,6 +3,7 @@ import asyncio from asyncio import CancelledError from time import time from typing import List +import socket from torba.rpc import RPCSession as BaseClientSession, Connector, RPCError @@ -217,6 +218,8 @@ class SessionPool: log.warning("Timeout connecting to %s:%d", *session.server) except asyncio.CancelledError: # pylint: disable=try-except-raise raise + except socket.gaierror: + log.warning("Could not resolve IP for %s", session.server[0]) except Exception as err: # pylint: disable=broad-except if 'Connect call failed' in str(err): log.warning("Could not connect to %s:%d", *session.server)