forked from LBRYCommunity/lbry-sdk
let daemon be shut down before it has fully started. fixes #2231
This commit is contained in:
parent
8c63033461
commit
5176feed81
6 changed files with 77 additions and 71 deletions
|
@ -362,6 +362,9 @@ class ConfigFileAccess:
|
|||
del self.data[key]
|
||||
|
||||
|
||||
TBC = typing.TypeVar('TBC', bound='BaseConfig')
|
||||
|
||||
|
||||
class BaseConfig:
|
||||
|
||||
config = Path("Path to configuration file.", metavar='FILE')
|
||||
|
@ -418,7 +421,7 @@ class BaseConfig:
|
|||
}
|
||||
|
||||
@classmethod
|
||||
def create_from_arguments(cls, args):
|
||||
def create_from_arguments(cls, args) -> TBC:
|
||||
conf = cls()
|
||||
conf.set_arguments(args)
|
||||
conf.set_environment()
|
||||
|
|
|
@ -221,6 +221,61 @@ def ensure_directory_exists(path: str):
|
|||
pathlib.Path(path).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
|
||||
def run_daemon(args: list, conf: Config):
|
||||
default_formatter = logging.Formatter("%(asctime)s %(levelname)-8s %(name)s:%(lineno)d: %(message)s")
|
||||
file_handler = logging.handlers.RotatingFileHandler(
|
||||
conf.log_file_path, maxBytes=2097152, backupCount=5
|
||||
)
|
||||
file_handler.setFormatter(default_formatter)
|
||||
log.addHandler(file_handler)
|
||||
logging.getLogger('torba').addHandler(file_handler)
|
||||
|
||||
if not args.quiet:
|
||||
handler = logging.StreamHandler()
|
||||
handler.setFormatter(default_formatter)
|
||||
log.addHandler(handler)
|
||||
logging.getLogger('torba').addHandler(handler)
|
||||
logging.getLogger('torba').setLevel(logging.INFO)
|
||||
|
||||
logging.getLogger('aioupnp').setLevel(logging.WARNING)
|
||||
logging.getLogger('aiohttp').setLevel(logging.CRITICAL)
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
if args.verbose:
|
||||
log.setLevel(logging.DEBUG)
|
||||
loop.set_debug(True)
|
||||
else:
|
||||
log.setLevel(logging.INFO)
|
||||
|
||||
if conf.share_usage_data:
|
||||
loggly_handler = get_loggly_handler()
|
||||
loggly_handler.setLevel(logging.ERROR)
|
||||
log.addHandler(loggly_handler)
|
||||
|
||||
daemon = Daemon(conf)
|
||||
|
||||
def __exit():
|
||||
raise GracefulExit()
|
||||
|
||||
try:
|
||||
loop.add_signal_handler(signal.SIGINT, __exit)
|
||||
loop.add_signal_handler(signal.SIGTERM, __exit)
|
||||
except NotImplementedError:
|
||||
pass # Not implemented on Windows
|
||||
|
||||
try:
|
||||
loop.run_until_complete(daemon.start())
|
||||
loop.run_forever()
|
||||
except (GracefulExit, KeyboardInterrupt, asyncio.CancelledError):
|
||||
pass
|
||||
finally:
|
||||
loop.run_until_complete(daemon.stop())
|
||||
|
||||
if hasattr(loop, 'shutdown_asyncgens'):
|
||||
loop.run_until_complete(loop.shutdown_asyncgens())
|
||||
|
||||
|
||||
def main(argv=None):
|
||||
argv = argv or sys.argv[1:]
|
||||
parser = get_argument_parser()
|
||||
|
@ -232,88 +287,26 @@ def main(argv=None):
|
|||
|
||||
if args.cli_version:
|
||||
print(f"lbrynet {lbrynet_version}")
|
||||
return 0
|
||||
|
||||
elif args.command == 'start':
|
||||
|
||||
if args.help:
|
||||
args.start_parser.print_help()
|
||||
return 0
|
||||
|
||||
default_formatter = logging.Formatter("%(asctime)s %(levelname)-8s %(name)s:%(lineno)d: %(message)s")
|
||||
file_handler = logging.handlers.RotatingFileHandler(
|
||||
conf.log_file_path, maxBytes=2097152, backupCount=5
|
||||
)
|
||||
file_handler.setFormatter(default_formatter)
|
||||
log.addHandler(file_handler)
|
||||
logging.getLogger('torba').addHandler(file_handler)
|
||||
|
||||
if not args.quiet:
|
||||
handler = logging.StreamHandler()
|
||||
handler.setFormatter(default_formatter)
|
||||
log.addHandler(handler)
|
||||
logging.getLogger('torba').addHandler(handler)
|
||||
logging.getLogger('torba').setLevel(logging.INFO)
|
||||
|
||||
logging.getLogger('aioupnp').setLevel(logging.WARNING)
|
||||
logging.getLogger('aiohttp').setLevel(logging.CRITICAL)
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
if args.verbose:
|
||||
log.setLevel(logging.DEBUG)
|
||||
loop.set_debug(True)
|
||||
else:
|
||||
log.setLevel(logging.INFO)
|
||||
|
||||
if conf.share_usage_data:
|
||||
loggly_handler = get_loggly_handler()
|
||||
loggly_handler.setLevel(logging.ERROR)
|
||||
log.addHandler(loggly_handler)
|
||||
|
||||
daemon = Daemon(conf)
|
||||
started = False
|
||||
def __exit():
|
||||
if started:
|
||||
daemon.stop_event.set()
|
||||
else:
|
||||
raise GracefulExit()
|
||||
try:
|
||||
loop.add_signal_handler(signal.SIGINT, __exit)
|
||||
loop.add_signal_handler(signal.SIGTERM, __exit)
|
||||
except NotImplementedError:
|
||||
pass # Not implemented on Windows
|
||||
try:
|
||||
loop.run_until_complete(daemon.start())
|
||||
started = True
|
||||
loop.run_until_complete(daemon.stop_event.wait())
|
||||
except (GracefulExit, KeyboardInterrupt):
|
||||
pass
|
||||
finally:
|
||||
loop.run_until_complete(daemon.stop())
|
||||
if hasattr(loop, 'shutdown_asyncgens'):
|
||||
loop.run_until_complete(loop.shutdown_asyncgens())
|
||||
|
||||
run_daemon(args, conf)
|
||||
elif args.command is not None:
|
||||
|
||||
doc = args.doc
|
||||
api_method_name = args.api_method_name
|
||||
if args.replaced_by:
|
||||
print(f"{args.api_method_name} is deprecated, using {args.replaced_by['api_method_name']}.")
|
||||
doc = args.replaced_by['doc']
|
||||
api_method_name = args.replaced_by['api_method_name']
|
||||
|
||||
if args.help:
|
||||
print(doc)
|
||||
else:
|
||||
parsed = docopt(doc, command_args)
|
||||
params = set_kwargs(parsed)
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.run_until_complete(execute_command(conf, api_method_name, params))
|
||||
|
||||
asyncio.get_event_loop().run_until_complete(execute_command(conf, api_method_name, params))
|
||||
elif args.group is not None:
|
||||
args.group_parser.print_help()
|
||||
|
||||
else:
|
||||
parser.print_help()
|
||||
|
||||
|
|
|
@ -56,7 +56,8 @@ class Component(metaclass=ComponentType):
|
|||
self._running = True
|
||||
return result
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
log.info("Cancelled setup of %s component", self.__class__.__name__)
|
||||
raise
|
||||
except Exception as err:
|
||||
log.exception("Error setting up %s", self.component_name or self.__class__.__name__)
|
||||
raise err
|
||||
|
@ -67,7 +68,8 @@ class Component(metaclass=ComponentType):
|
|||
self._running = False
|
||||
return result
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
log.info("Cancelled stop of %s component", self.__class__.__name__)
|
||||
raise
|
||||
except Exception as err:
|
||||
log.exception("Error stopping %s", self.__class__.__name__)
|
||||
raise err
|
||||
|
|
|
@ -168,7 +168,7 @@ class HeadersComponent(Component):
|
|||
}
|
||||
net = Network(ledger)
|
||||
first_connection = net.on_connected.first
|
||||
asyncio.ensure_future(net.start())
|
||||
asyncio.ensure_future(net.start()) # TODO: SKETCHY! it might be trapping a CancelledError and not raising it
|
||||
await first_connection
|
||||
remote_height = await net.get_server_height()
|
||||
await net.stop()
|
||||
|
|
|
@ -272,7 +272,6 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
)
|
||||
self.component_startup_task = None
|
||||
self._connection_status: typing.Tuple[float, bool] = [self.component_manager.loop.time(), False]
|
||||
self.stop_event = asyncio.Event()
|
||||
|
||||
logging.getLogger('aiohttp.access').setLevel(logging.WARN)
|
||||
rpc_app = web.Application()
|
||||
|
@ -436,7 +435,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
except asyncio.CancelledError:
|
||||
log.info("shutting down before finished starting")
|
||||
await self.analytics_manager.send_server_startup_error("shutting down before finished starting")
|
||||
await self.stop()
|
||||
raise
|
||||
except Exception as e:
|
||||
await self.analytics_manager.send_server_startup_error(str(e))
|
||||
log.exception('Failed to start lbrynet')
|
||||
|
@ -516,6 +515,8 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
log.warning("http code during /stream range request: %s", err)
|
||||
raise err
|
||||
except asyncio.CancelledError:
|
||||
# if not excepted here, it would bubble up the error to the console. every time you closed
|
||||
# a running tab, you'd get this error in the console
|
||||
log.debug("/stream range request cancelled")
|
||||
except Exception:
|
||||
log.exception("error handling /stream range request")
|
||||
|
@ -696,8 +697,12 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
Returns:
|
||||
(string) Shutdown message
|
||||
"""
|
||||
|
||||
def shutdown():
|
||||
raise web.GracefulExit()
|
||||
|
||||
log.info("Shutting down lbrynet daemon")
|
||||
self.stop_event.set()
|
||||
asyncio.get_event_loop().call_later(0, shutdown)
|
||||
return "Shutting down"
|
||||
|
||||
async def jsonrpc_status(self):
|
||||
|
|
|
@ -3,6 +3,7 @@ import asyncio
|
|||
from asyncio import CancelledError
|
||||
from time import time
|
||||
from typing import List
|
||||
import socket
|
||||
|
||||
from torba.rpc import RPCSession as BaseClientSession, Connector, RPCError
|
||||
|
||||
|
@ -217,6 +218,8 @@ class SessionPool:
|
|||
log.warning("Timeout connecting to %s:%d", *session.server)
|
||||
except asyncio.CancelledError: # pylint: disable=try-except-raise
|
||||
raise
|
||||
except socket.gaierror:
|
||||
log.warning("Could not resolve IP for %s", session.server[0])
|
||||
except Exception as err: # pylint: disable=broad-except
|
||||
if 'Connect call failed' in str(err):
|
||||
log.warning("Could not connect to %s:%d", *session.server)
|
||||
|
|
Loading…
Reference in a new issue