diff --git a/lbrynet/extras/cli.py b/lbrynet/extras/cli.py index 9a082c2c9..270b5453b 100644 --- a/lbrynet/extras/cli.py +++ b/lbrynet/extras/cli.py @@ -8,6 +8,7 @@ from docopt import docopt from textwrap import dedent import aiohttp +from aiohttp.web import GracefulExit from lbrynet import __name__ as lbrynet_name, __version__ as lbrynet_version from lbrynet.extras.daemon.loggly_handler import get_loggly_handler @@ -136,11 +137,6 @@ def get_argument_parser(): return main -async def run_daemon(daemon: Daemon): - await daemon.start() - await daemon.server.wait_closed() - - def main(argv=None): argv = argv or sys.argv[1:] parser = get_argument_parser() @@ -168,8 +164,7 @@ def main(argv=None): handler = logging.StreamHandler() handler.setFormatter(default_formatter) log.addHandler(handler) - # mostly disable third part logging - logging.getLogger('urllib3').setLevel(logging.CRITICAL) + logging.getLogger('aioupnp').setLevel(logging.WARNING) logging.getLogger('aiohttp').setLevel(logging.CRITICAL) @@ -182,11 +177,17 @@ def main(argv=None): loggly_handler.setLevel(logging.ERROR) log.addHandler(loggly_handler) - log.debug('Final Settings: %s', conf.settings_dict) - log.info("Starting lbrynet-daemon from command line") - daemon = Daemon(conf) - asyncio.run(run_daemon(daemon)) + loop = asyncio.get_event_loop() + loop.run_until_complete(daemon.start()) + try: + loop.run_forever() + except (GracefulExit, KeyboardInterrupt): + pass + finally: + loop.run_until_complete(daemon.stop()) + if hasattr(loop, 'shutdown_asyncgens'): + loop.run_until_complete(loop.shutdown_asyncgens()) elif args.command is not None: diff --git a/lbrynet/extras/daemon/Daemon.py b/lbrynet/extras/daemon/Daemon.py index 4a940e2e7..cbb534803 100644 --- a/lbrynet/extras/daemon/Daemon.py +++ b/lbrynet/extras/daemon/Daemon.py @@ -245,28 +245,26 @@ class Daemon(metaclass=JSONRPCServerType): """ LBRYnet daemon, a jsonrpc interface to lbry functions """ - allowed_during_startup = [] def __init__(self, conf: Config, component_manager: typing.Optional[ComponentManager] = None): self.conf = conf self._node_id = None self._installation_id = None self.session_id = base58.b58encode(utils.generate_id()).decode() - to_skip = conf.components_to_skip self.analytics_manager = analytics.Manager(conf, self.installation_id, self.session_id) self.component_manager = component_manager or ComponentManager( - conf, analytics_manager=self.analytics_manager, skip_components=to_skip or [] + conf, analytics_manager=self.analytics_manager, + skip_components=conf.components_to_skip or [] ) - self.listening_port = None - self._component_setup_task = None + self.component_startup_task = None logging.getLogger('aiohttp.access').setLevel(logging.WARN) self.app = web.Application() self.app.router.add_get('/lbryapi', self.handle_old_jsonrpc) self.app.router.add_post('/lbryapi', self.handle_old_jsonrpc) self.app.router.add_post('/', self.handle_old_jsonrpc) - self.handler = self.app.make_handler() - self.server: asyncio.AbstractServer = None + self.runner = web.AppRunner(self.app) + self.site = None @property def dht_node(self) -> typing.Optional['Node']: @@ -361,9 +359,6 @@ class Daemon(metaclass=JSONRPCServerType): return self._installation_id def ensure_data_dir(self): - # although there is a risk of a race condition here we don't - # expect there to be multiple processes accessing this - # directory so the risk can be ignored if not os.path.isdir(self.conf.data_dir): os.makedirs(self.conf.data_dir) if not os.path.isdir(os.path.join(self.conf.data_dir, "blobfiles")): @@ -379,69 +374,52 @@ class Daemon(metaclass=JSONRPCServerType): os.makedirs(self.conf.download_dir) async def start(self): + log.info("Starting LBRYNet Daemon") + log.debug("Settings: %s", json.dumps(self.conf.settings_dict, indent=2)) + log.info("Platform: %s", json.dumps(system_info.get_platform(), indent=2)) + await self.analytics_manager.send_server_startup() + await self.runner.setup() + self.site = web.TCPSite(self.runner, self.conf.api_host, self.conf.api_port) + try: - self.server = await asyncio.get_event_loop().create_server( - self.handler, self.conf.api_host, self.conf.api_port - ) - log.info('lbrynet API listening on TCP %s:%i', *self.server.sockets[0].getsockname()[:2]) - await self.setup() - if self.analytics_manager: - await self.analytics_manager.send_server_startup_success() - except OSError: + await self.site.start() + log.info('lbrynet API listening on TCP %s:%i', *self.site._server.sockets[0].getsockname()[:2]) + except OSError as e: log.error('lbrynet API failed to bind TCP %s for listening. Daemon is already running or this port is ' 'already in use by another application.', self.conf.api) - raise + await self.analytics_manager.send_server_startup_error(str(e)) + raise SystemExit() + try: - await self.setup() + await self.initialize() except asyncio.CancelledError: log.info("shutting down before finished starting") - await self.shutdown() - raise - except Exception as err: - if self.analytics_manager: - await self.analytics_manager.send_server_startup_error(str(err)) + await self.analytics_manager.send_server_startup_error("shutting down before finished starting") + await self.stop() + except Exception as e: + await self.analytics_manager.send_server_startup_error(str(e)) log.exception('Failed to start lbrynet-daemon') - async def setup(self): - log.info("Starting lbrynet-daemon") - log.info("Platform: %s", json.dumps(system_info.get_platform())) + await self.analytics_manager.send_server_startup_success() + async def initialize(self): self.ensure_data_dir() self.ensure_wallet_dir() self.ensure_download_dir() - - if self.analytics_manager: + if not self.analytics_manager.is_started: self.analytics_manager.start() + self.component_startup_task = asyncio.create_task(self.component_manager.start()) + await self.component_startup_task - self._component_setup_task = self.component_manager.start() - await self._component_setup_task - await self.analytics_manager.send_server_startup() - log.info("Started lbrynet-daemon") - - @staticmethod - def _already_shutting_down(sig_num, frame): - log.info("Already shutting down") - - async def shutdown(self): - # ignore INT/TERM signals once shutdown has started - signal.signal(signal.SIGINT, self._already_shutting_down) - signal.signal(signal.SIGTERM, self._already_shutting_down) - if self.listening_port: - self.listening_port.stopListening() - if self.server is not None: - self.server.close() - await self.server.wait_closed() - await self.app.shutdown() - await self.handler.shutdown(60.0) - await self.app.cleanup() - if self.analytics_manager: + async def stop(self): + if self.component_startup_task is not None: + if self.component_startup_task.done(): + await self.component_manager.stop() + else: + self.component_startup_task.cancel() + await self.runner.cleanup() + if self.analytics_manager.is_started: self.analytics_manager.stop() - try: - self._component_setup_task.cancel() - except (AttributeError, asyncio.CancelledError): - pass - if self.component_manager is not None: - await self.component_manager.stop() async def handle_old_jsonrpc(self, request): data = await request.json() diff --git a/lbrynet/extras/daemon/analytics.py b/lbrynet/extras/daemon/analytics.py index 5ee93bd59..1e9c607d8 100644 --- a/lbrynet/extras/daemon/analytics.py +++ b/lbrynet/extras/daemon/analytics.py @@ -47,7 +47,6 @@ class Manager: def start(self): if self._enabled and self.task is None: self.task = asyncio.create_task(self.run()) - log.info("Start") async def run(self): while True: diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index fadda258f..c5e3df141 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -30,6 +30,7 @@ filter_fields = [ 'nout', 'channel_claim_id', 'channel_name', + 'full_status' ] comparison_operators = { @@ -246,6 +247,8 @@ class StreamManager: streams = [] for stream in self.streams: for search, val in search_by.items(): + if search == 'full_status': + continue if comparison_operators[comparison](getattr(stream, search), val): streams.append(stream) break