forked from LBRYCommunity/lbry-sdk
Merge pull request #2890 from lbryio/faster-status
improve speed of `status`
This commit is contained in:
commit
ace90a4354
3 changed files with 64 additions and 16 deletions
|
@ -329,6 +329,9 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
prom_app.router.add_get('/metrics', self.handle_metrics_get_request)
|
prom_app.router.add_get('/metrics', self.handle_metrics_get_request)
|
||||||
self.metrics_runner = web.AppRunner(prom_app)
|
self.metrics_runner = web.AppRunner(prom_app)
|
||||||
|
|
||||||
|
self.need_connection_status_refresh = asyncio.Event()
|
||||||
|
self._connection_status_task: Optional[asyncio.Task] = None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def dht_node(self) -> typing.Optional['Node']:
|
def dht_node(self) -> typing.Optional['Node']:
|
||||||
return self.component_manager.get_component(DHT_COMPONENT)
|
return self.component_manager.get_component(DHT_COMPONENT)
|
||||||
|
@ -441,18 +444,25 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
log.warning("detected internet connection was lost")
|
log.warning("detected internet connection was lost")
|
||||||
self._connection_status = (self.component_manager.loop.time(), connected)
|
self._connection_status = (self.component_manager.loop.time(), connected)
|
||||||
|
|
||||||
async def get_connection_status(self) -> str:
|
async def keep_connection_status_up_to_date(self):
|
||||||
if self._connection_status[0] + 300 > self.component_manager.loop.time():
|
while True:
|
||||||
if not self._connection_status[1]:
|
try:
|
||||||
|
await asyncio.wait_for(self.need_connection_status_refresh.wait(), 300)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
pass
|
||||||
await self.update_connection_status()
|
await self.update_connection_status()
|
||||||
else:
|
self.need_connection_status_refresh.clear()
|
||||||
await self.update_connection_status()
|
|
||||||
return CONNECTION_STATUS_CONNECTED if self._connection_status[1] else CONNECTION_STATUS_NETWORK
|
|
||||||
|
|
||||||
async def start(self):
|
async def start(self):
|
||||||
log.info("Starting LBRYNet Daemon")
|
log.info("Starting LBRYNet Daemon")
|
||||||
log.debug("Settings: %s", json.dumps(self.conf.settings_dict, indent=2))
|
log.debug("Settings: %s", json.dumps(self.conf.settings_dict, indent=2))
|
||||||
log.info("Platform: %s", json.dumps(self.platform_info, indent=2))
|
log.info("Platform: %s", json.dumps(self.platform_info, indent=2))
|
||||||
|
|
||||||
|
self.need_connection_status_refresh.set()
|
||||||
|
self._connection_status_task = self.component_manager.loop.create_task(
|
||||||
|
self.keep_connection_status_up_to_date()
|
||||||
|
)
|
||||||
|
|
||||||
await self.analytics_manager.send_server_startup()
|
await self.analytics_manager.send_server_startup()
|
||||||
await self.rpc_runner.setup()
|
await self.rpc_runner.setup()
|
||||||
await self.streaming_runner.setup()
|
await self.streaming_runner.setup()
|
||||||
|
@ -511,6 +521,10 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
await self.component_startup_task
|
await self.component_startup_task
|
||||||
|
|
||||||
async def stop(self):
|
async def stop(self):
|
||||||
|
if self._connection_status_task:
|
||||||
|
if not self._connection_status_task.done():
|
||||||
|
self._connection_status_task.cancel()
|
||||||
|
self._connection_status_task = None
|
||||||
if self.component_startup_task is not None:
|
if self.component_startup_task is not None:
|
||||||
if self.component_startup_task.done():
|
if self.component_startup_task.done():
|
||||||
await self.component_manager.stop()
|
await self.component_manager.stop()
|
||||||
|
@ -785,7 +799,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
'analyze_audio_volume': (bool) should ffmpeg analyze audio
|
'analyze_audio_volume': (bool) should ffmpeg analyze audio
|
||||||
}
|
}
|
||||||
"""
|
"""
|
||||||
return await self._video_file_analyzer.status(reset=True)
|
return await self._video_file_analyzer.status(reset=True, recheck=True)
|
||||||
|
|
||||||
async def jsonrpc_status(self):
|
async def jsonrpc_status(self):
|
||||||
"""
|
"""
|
||||||
|
@ -875,14 +889,16 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
}
|
}
|
||||||
"""
|
"""
|
||||||
|
|
||||||
connection_code = await self.get_connection_status()
|
if not self._connection_status[1]:
|
||||||
|
self.need_connection_status_refresh.set()
|
||||||
|
connection_code = CONNECTION_STATUS_CONNECTED if self._connection_status[1] else CONNECTION_STATUS_NETWORK
|
||||||
ffmpeg_status = await self._video_file_analyzer.status()
|
ffmpeg_status = await self._video_file_analyzer.status()
|
||||||
|
running_components = self.component_manager.get_components_status()
|
||||||
response = {
|
response = {
|
||||||
'installation_id': self.installation_id,
|
'installation_id': self.installation_id,
|
||||||
'is_running': all(self.component_manager.get_components_status().values()),
|
'is_running': all(running_components.values()),
|
||||||
'skipped_components': self.component_manager.skip_components,
|
'skipped_components': self.component_manager.skip_components,
|
||||||
'startup_status': self.component_manager.get_components_status(),
|
'startup_status': running_components,
|
||||||
'connection_status': {
|
'connection_status': {
|
||||||
'code': connection_code,
|
'code': connection_code,
|
||||||
'message': CONNECTION_MESSAGES[connection_code],
|
'message': CONNECTION_MESSAGES[connection_code],
|
||||||
|
|
|
@ -30,6 +30,7 @@ class VideoFileAnalyzer:
|
||||||
self._which_ffmpeg = None
|
self._which_ffmpeg = None
|
||||||
self._which_ffprobe = None
|
self._which_ffprobe = None
|
||||||
self._env_copy = dict(os.environ)
|
self._env_copy = dict(os.environ)
|
||||||
|
self._checked_ffmpeg = False
|
||||||
if lbry.utils.is_running_from_bundle():
|
if lbry.utils.is_running_from_bundle():
|
||||||
# handle the situation where PyInstaller overrides our runtime environment:
|
# handle the situation where PyInstaller overrides our runtime environment:
|
||||||
self._replace_or_pop_env('LD_LIBRARY_PATH')
|
self._replace_or_pop_env('LD_LIBRARY_PATH')
|
||||||
|
@ -72,6 +73,10 @@ class VideoFileAnalyzer:
|
||||||
log.debug("Using %s at %s", version.splitlines()[0].split(" Copyright")[0], self._which_ffmpeg)
|
log.debug("Using %s at %s", version.splitlines()[0].split(" Copyright")[0], self._which_ffmpeg)
|
||||||
return version
|
return version
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _which_ffmpeg_and_ffmprobe(path):
|
||||||
|
return shutil.which("ffmpeg", path=path), shutil.which("ffprobe", path=path)
|
||||||
|
|
||||||
async def _verify_ffmpeg_installed(self):
|
async def _verify_ffmpeg_installed(self):
|
||||||
if self._ffmpeg_installed:
|
if self._ffmpeg_installed:
|
||||||
return
|
return
|
||||||
|
@ -80,29 +85,33 @@ class VideoFileAnalyzer:
|
||||||
if hasattr(self._conf, "data_dir"):
|
if hasattr(self._conf, "data_dir"):
|
||||||
path += os.path.pathsep + os.path.join(getattr(self._conf, "data_dir"), "ffmpeg", "bin")
|
path += os.path.pathsep + os.path.join(getattr(self._conf, "data_dir"), "ffmpeg", "bin")
|
||||||
path += os.path.pathsep + self._env_copy.get("PATH", "")
|
path += os.path.pathsep + self._env_copy.get("PATH", "")
|
||||||
|
self._which_ffmpeg, self._which_ffprobe = await asyncio.get_running_loop().run_in_executor(
|
||||||
self._which_ffmpeg = shutil.which("ffmpeg", path=path)
|
None, self._which_ffmpeg_and_ffmprobe, path
|
||||||
|
)
|
||||||
if not self._which_ffmpeg:
|
if not self._which_ffmpeg:
|
||||||
log.warning("Unable to locate ffmpeg executable. Path: %s", path)
|
log.warning("Unable to locate ffmpeg executable. Path: %s", path)
|
||||||
raise FileNotFoundError(f"Unable to locate ffmpeg executable. Path: {path}")
|
raise FileNotFoundError(f"Unable to locate ffmpeg executable. Path: {path}")
|
||||||
self._which_ffprobe = shutil.which("ffprobe", path=path)
|
|
||||||
if not self._which_ffprobe:
|
if not self._which_ffprobe:
|
||||||
log.warning("Unable to locate ffprobe executable. Path: %s", path)
|
log.warning("Unable to locate ffprobe executable. Path: %s", path)
|
||||||
raise FileNotFoundError(f"Unable to locate ffprobe executable. Path: {path}")
|
raise FileNotFoundError(f"Unable to locate ffprobe executable. Path: {path}")
|
||||||
if os.path.dirname(self._which_ffmpeg) != os.path.dirname(self._which_ffprobe):
|
if os.path.dirname(self._which_ffmpeg) != os.path.dirname(self._which_ffprobe):
|
||||||
log.warning("ffmpeg and ffprobe are in different folders!")
|
log.warning("ffmpeg and ffprobe are in different folders!")
|
||||||
|
|
||||||
await self._verify_executables()
|
await self._verify_executables()
|
||||||
self._ffmpeg_installed = True
|
self._ffmpeg_installed = True
|
||||||
|
|
||||||
async def status(self, reset=False):
|
async def status(self, reset=False, recheck=False):
|
||||||
if reset:
|
if reset:
|
||||||
self._available_encoders = ""
|
self._available_encoders = ""
|
||||||
self._ffmpeg_installed = None
|
self._ffmpeg_installed = None
|
||||||
if self._ffmpeg_installed is None:
|
if self._checked_ffmpeg and not recheck:
|
||||||
|
pass
|
||||||
|
elif self._ffmpeg_installed is None:
|
||||||
try:
|
try:
|
||||||
await self._verify_ffmpeg_installed()
|
await self._verify_ffmpeg_installed()
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
pass
|
pass
|
||||||
|
self._checked_ffmpeg = True
|
||||||
return {
|
return {
|
||||||
"available": self._ffmpeg_installed,
|
"available": self._ffmpeg_installed,
|
||||||
"which": self._which_ffmpeg,
|
"which": self._which_ffmpeg,
|
||||||
|
|
|
@ -160,3 +160,26 @@ class TranscodeValidation(ClaimTestCase):
|
||||||
await self.analyzer.status(reset=True)
|
await self.analyzer.status(reset=True)
|
||||||
with self.assertRaisesRegex(Exception, "Unable to locate"):
|
with self.assertRaisesRegex(Exception, "Unable to locate"):
|
||||||
await self.analyzer.verify_or_repair(True, False, self.video_file_name)
|
await self.analyzer.verify_or_repair(True, False, self.video_file_name)
|
||||||
|
|
||||||
|
async def test_dont_recheck_ffmpeg_installation(self):
|
||||||
|
|
||||||
|
call_count = 0
|
||||||
|
|
||||||
|
original = self.daemon._video_file_analyzer._verify_ffmpeg_installed
|
||||||
|
|
||||||
|
def _verify_ffmpeg_installed():
|
||||||
|
nonlocal call_count
|
||||||
|
call_count += 1
|
||||||
|
return original()
|
||||||
|
|
||||||
|
self.daemon._video_file_analyzer._verify_ffmpeg_installed = _verify_ffmpeg_installed
|
||||||
|
self.assertEqual(0, call_count)
|
||||||
|
await self.daemon.jsonrpc_status()
|
||||||
|
self.assertEqual(1, call_count)
|
||||||
|
# counter should not go up again
|
||||||
|
await self.daemon.jsonrpc_status()
|
||||||
|
self.assertEqual(1, call_count)
|
||||||
|
|
||||||
|
# this should force rechecking the installation
|
||||||
|
await self.daemon.jsonrpc_ffmpeg_find()
|
||||||
|
self.assertEqual(2, call_count)
|
||||||
|
|
Loading…
Reference in a new issue