fix startup/shutdown and file_list --full-status argument

This commit is contained in:
Lex Berezhny 2019-01-23 23:46:11 -05:00
parent 5249d2a3ca
commit 0ee279bfd7
4 changed files with 51 additions and 70 deletions

View file

@ -8,6 +8,7 @@ from docopt import docopt
from textwrap import dedent from textwrap import dedent
import aiohttp import aiohttp
from aiohttp.web import GracefulExit
from lbrynet import __name__ as lbrynet_name, __version__ as lbrynet_version from lbrynet import __name__ as lbrynet_name, __version__ as lbrynet_version
from lbrynet.extras.daemon.loggly_handler import get_loggly_handler from lbrynet.extras.daemon.loggly_handler import get_loggly_handler
@ -136,11 +137,6 @@ def get_argument_parser():
return main return main
async def run_daemon(daemon: Daemon):
await daemon.start()
await daemon.server.wait_closed()
def main(argv=None): def main(argv=None):
argv = argv or sys.argv[1:] argv = argv or sys.argv[1:]
parser = get_argument_parser() parser = get_argument_parser()
@ -168,8 +164,7 @@ def main(argv=None):
handler = logging.StreamHandler() handler = logging.StreamHandler()
handler.setFormatter(default_formatter) handler.setFormatter(default_formatter)
log.addHandler(handler) log.addHandler(handler)
# mostly disable third part logging
logging.getLogger('urllib3').setLevel(logging.CRITICAL)
logging.getLogger('aioupnp').setLevel(logging.WARNING) logging.getLogger('aioupnp').setLevel(logging.WARNING)
logging.getLogger('aiohttp').setLevel(logging.CRITICAL) logging.getLogger('aiohttp').setLevel(logging.CRITICAL)
@ -182,11 +177,17 @@ def main(argv=None):
loggly_handler.setLevel(logging.ERROR) loggly_handler.setLevel(logging.ERROR)
log.addHandler(loggly_handler) log.addHandler(loggly_handler)
log.debug('Final Settings: %s', conf.settings_dict)
log.info("Starting lbrynet-daemon from command line")
daemon = Daemon(conf) daemon = Daemon(conf)
asyncio.run(run_daemon(daemon)) loop = asyncio.get_event_loop()
loop.run_until_complete(daemon.start())
try:
loop.run_forever()
except (GracefulExit, KeyboardInterrupt):
pass
finally:
loop.run_until_complete(daemon.stop())
if hasattr(loop, 'shutdown_asyncgens'):
loop.run_until_complete(loop.shutdown_asyncgens())
elif args.command is not None: elif args.command is not None:

View file

@ -245,28 +245,26 @@ class Daemon(metaclass=JSONRPCServerType):
""" """
LBRYnet daemon, a jsonrpc interface to lbry functions LBRYnet daemon, a jsonrpc interface to lbry functions
""" """
allowed_during_startup = []
def __init__(self, conf: Config, component_manager: typing.Optional[ComponentManager] = None): def __init__(self, conf: Config, component_manager: typing.Optional[ComponentManager] = None):
self.conf = conf self.conf = conf
self._node_id = None self._node_id = None
self._installation_id = None self._installation_id = None
self.session_id = base58.b58encode(utils.generate_id()).decode() self.session_id = base58.b58encode(utils.generate_id()).decode()
to_skip = conf.components_to_skip
self.analytics_manager = analytics.Manager(conf, self.installation_id, self.session_id) self.analytics_manager = analytics.Manager(conf, self.installation_id, self.session_id)
self.component_manager = component_manager or ComponentManager( self.component_manager = component_manager or ComponentManager(
conf, analytics_manager=self.analytics_manager, skip_components=to_skip or [] conf, analytics_manager=self.analytics_manager,
skip_components=conf.components_to_skip or []
) )
self.listening_port = None self.component_startup_task = None
self._component_setup_task = None
logging.getLogger('aiohttp.access').setLevel(logging.WARN) logging.getLogger('aiohttp.access').setLevel(logging.WARN)
self.app = web.Application() self.app = web.Application()
self.app.router.add_get('/lbryapi', self.handle_old_jsonrpc) self.app.router.add_get('/lbryapi', self.handle_old_jsonrpc)
self.app.router.add_post('/lbryapi', self.handle_old_jsonrpc) self.app.router.add_post('/lbryapi', self.handle_old_jsonrpc)
self.app.router.add_post('/', self.handle_old_jsonrpc) self.app.router.add_post('/', self.handle_old_jsonrpc)
self.handler = self.app.make_handler() self.runner = web.AppRunner(self.app)
self.server: asyncio.AbstractServer = None self.site = None
@property @property
def dht_node(self) -> typing.Optional['Node']: def dht_node(self) -> typing.Optional['Node']:
@ -361,9 +359,6 @@ class Daemon(metaclass=JSONRPCServerType):
return self._installation_id return self._installation_id
def ensure_data_dir(self): def ensure_data_dir(self):
# although there is a risk of a race condition here we don't
# expect there to be multiple processes accessing this
# directory so the risk can be ignored
if not os.path.isdir(self.conf.data_dir): if not os.path.isdir(self.conf.data_dir):
os.makedirs(self.conf.data_dir) os.makedirs(self.conf.data_dir)
if not os.path.isdir(os.path.join(self.conf.data_dir, "blobfiles")): if not os.path.isdir(os.path.join(self.conf.data_dir, "blobfiles")):
@ -379,69 +374,52 @@ class Daemon(metaclass=JSONRPCServerType):
os.makedirs(self.conf.download_dir) os.makedirs(self.conf.download_dir)
async def start(self): async def start(self):
log.info("Starting LBRYNet Daemon")
log.debug("Settings: %s", json.dumps(self.conf.settings_dict, indent=2))
log.info("Platform: %s", json.dumps(system_info.get_platform(), indent=2))
await self.analytics_manager.send_server_startup()
await self.runner.setup()
self.site = web.TCPSite(self.runner, self.conf.api_host, self.conf.api_port)
try: try:
self.server = await asyncio.get_event_loop().create_server( await self.site.start()
self.handler, self.conf.api_host, self.conf.api_port log.info('lbrynet API listening on TCP %s:%i', *self.site._server.sockets[0].getsockname()[:2])
) except OSError as e:
log.info('lbrynet API listening on TCP %s:%i', *self.server.sockets[0].getsockname()[:2])
await self.setup()
if self.analytics_manager:
await self.analytics_manager.send_server_startup_success()
except OSError:
log.error('lbrynet API failed to bind TCP %s for listening. Daemon is already running or this port is ' log.error('lbrynet API failed to bind TCP %s for listening. Daemon is already running or this port is '
'already in use by another application.', self.conf.api) 'already in use by another application.', self.conf.api)
raise await self.analytics_manager.send_server_startup_error(str(e))
raise SystemExit()
try: try:
await self.setup() await self.initialize()
except asyncio.CancelledError: except asyncio.CancelledError:
log.info("shutting down before finished starting") log.info("shutting down before finished starting")
await self.shutdown() await self.analytics_manager.send_server_startup_error("shutting down before finished starting")
raise await self.stop()
except Exception as err: except Exception as e:
if self.analytics_manager: await self.analytics_manager.send_server_startup_error(str(e))
await self.analytics_manager.send_server_startup_error(str(err))
log.exception('Failed to start lbrynet-daemon') log.exception('Failed to start lbrynet-daemon')
async def setup(self): await self.analytics_manager.send_server_startup_success()
log.info("Starting lbrynet-daemon")
log.info("Platform: %s", json.dumps(system_info.get_platform()))
async def initialize(self):
self.ensure_data_dir() self.ensure_data_dir()
self.ensure_wallet_dir() self.ensure_wallet_dir()
self.ensure_download_dir() self.ensure_download_dir()
if not self.analytics_manager.is_started:
if self.analytics_manager:
self.analytics_manager.start() self.analytics_manager.start()
self.component_startup_task = asyncio.create_task(self.component_manager.start())
await self.component_startup_task
self._component_setup_task = self.component_manager.start() async def stop(self):
await self._component_setup_task if self.component_startup_task is not None:
await self.analytics_manager.send_server_startup() if self.component_startup_task.done():
log.info("Started lbrynet-daemon") await self.component_manager.stop()
else:
@staticmethod self.component_startup_task.cancel()
def _already_shutting_down(sig_num, frame): await self.runner.cleanup()
log.info("Already shutting down") if self.analytics_manager.is_started:
async def shutdown(self):
# ignore INT/TERM signals once shutdown has started
signal.signal(signal.SIGINT, self._already_shutting_down)
signal.signal(signal.SIGTERM, self._already_shutting_down)
if self.listening_port:
self.listening_port.stopListening()
if self.server is not None:
self.server.close()
await self.server.wait_closed()
await self.app.shutdown()
await self.handler.shutdown(60.0)
await self.app.cleanup()
if self.analytics_manager:
self.analytics_manager.stop() self.analytics_manager.stop()
try:
self._component_setup_task.cancel()
except (AttributeError, asyncio.CancelledError):
pass
if self.component_manager is not None:
await self.component_manager.stop()
async def handle_old_jsonrpc(self, request): async def handle_old_jsonrpc(self, request):
data = await request.json() data = await request.json()

View file

@ -47,7 +47,6 @@ class Manager:
def start(self): def start(self):
if self._enabled and self.task is None: if self._enabled and self.task is None:
self.task = asyncio.create_task(self.run()) self.task = asyncio.create_task(self.run())
log.info("Start")
async def run(self): async def run(self):
while True: while True:

View file

@ -30,6 +30,7 @@ filter_fields = [
'nout', 'nout',
'channel_claim_id', 'channel_claim_id',
'channel_name', 'channel_name',
'full_status'
] ]
comparison_operators = { comparison_operators = {
@ -246,6 +247,8 @@ class StreamManager:
streams = [] streams = []
for stream in self.streams: for stream in self.streams:
for search, val in search_by.items(): for search, val in search_by.items():
if search == 'full_status':
continue
if comparison_operators[comparison](getattr(stream, search), val): if comparison_operators[comparison](getattr(stream, search), val):
streams.append(stream) streams.append(stream)
break break