diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index 656efdea6..62f0becc0 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -329,6 +329,9 @@ class Daemon(metaclass=JSONRPCServerType): prom_app.router.add_get('/metrics', self.handle_metrics_get_request) self.metrics_runner = web.AppRunner(prom_app) + self.need_connection_status_refresh = asyncio.Event() + self._connection_status_task: Optional[asyncio.Task] = None + @property def dht_node(self) -> typing.Optional['Node']: return self.component_manager.get_component(DHT_COMPONENT) @@ -441,18 +444,25 @@ class Daemon(metaclass=JSONRPCServerType): log.warning("detected internet connection was lost") self._connection_status = (self.component_manager.loop.time(), connected) - async def get_connection_status(self) -> str: - if self._connection_status[0] + 300 > self.component_manager.loop.time(): - if not self._connection_status[1]: - await self.update_connection_status() - else: + async def keep_connection_status_up_to_date(self): + while True: + try: + await asyncio.wait_for(self.need_connection_status_refresh.wait(), 300) + except asyncio.TimeoutError: + pass await self.update_connection_status() - return CONNECTION_STATUS_CONNECTED if self._connection_status[1] else CONNECTION_STATUS_NETWORK + self.need_connection_status_refresh.clear() async def start(self): log.info("Starting LBRYNet Daemon") log.debug("Settings: %s", json.dumps(self.conf.settings_dict, indent=2)) log.info("Platform: %s", json.dumps(self.platform_info, indent=2)) + + self.need_connection_status_refresh.set() + self._connection_status_task = self.component_manager.loop.create_task( + self.keep_connection_status_up_to_date() + ) + await self.analytics_manager.send_server_startup() await self.rpc_runner.setup() await self.streaming_runner.setup() @@ -511,6 +521,10 @@ class Daemon(metaclass=JSONRPCServerType): await self.component_startup_task async def stop(self): + if self._connection_status_task: + if not self._connection_status_task.done(): + self._connection_status_task.cancel() + self._connection_status_task = None if self.component_startup_task is not None: if self.component_startup_task.done(): await self.component_manager.stop() @@ -785,7 +799,7 @@ class Daemon(metaclass=JSONRPCServerType): 'analyze_audio_volume': (bool) should ffmpeg analyze audio } """ - return await self._video_file_analyzer.status(reset=True) + return await self._video_file_analyzer.status(reset=True, recheck=True) async def jsonrpc_status(self): """ @@ -875,14 +889,16 @@ class Daemon(metaclass=JSONRPCServerType): } """ - connection_code = await self.get_connection_status() + if not self._connection_status[1]: + self.need_connection_status_refresh.set() + connection_code = CONNECTION_STATUS_CONNECTED if self._connection_status[1] else CONNECTION_STATUS_NETWORK ffmpeg_status = await self._video_file_analyzer.status() - + running_components = self.component_manager.get_components_status() response = { 'installation_id': self.installation_id, - 'is_running': all(self.component_manager.get_components_status().values()), + 'is_running': all(running_components.values()), 'skipped_components': self.component_manager.skip_components, - 'startup_status': self.component_manager.get_components_status(), + 'startup_status': running_components, 'connection_status': { 'code': connection_code, 'message': CONNECTION_MESSAGES[connection_code], diff --git a/lbry/file_analysis.py b/lbry/file_analysis.py index 982a75e35..7e083991c 100644 --- a/lbry/file_analysis.py +++ b/lbry/file_analysis.py @@ -30,6 +30,7 @@ class VideoFileAnalyzer: self._which_ffmpeg = None self._which_ffprobe = None self._env_copy = dict(os.environ) + self._checked_ffmpeg = False if lbry.utils.is_running_from_bundle(): # handle the situation where PyInstaller overrides our runtime environment: self._replace_or_pop_env('LD_LIBRARY_PATH') @@ -72,6 +73,10 @@ class VideoFileAnalyzer: log.debug("Using %s at %s", version.splitlines()[0].split(" Copyright")[0], self._which_ffmpeg) return version + @staticmethod + def _which_ffmpeg_and_ffmprobe(path): + return shutil.which("ffmpeg", path=path), shutil.which("ffprobe", path=path) + async def _verify_ffmpeg_installed(self): if self._ffmpeg_installed: return @@ -80,29 +85,33 @@ class VideoFileAnalyzer: if hasattr(self._conf, "data_dir"): path += os.path.pathsep + os.path.join(getattr(self._conf, "data_dir"), "ffmpeg", "bin") path += os.path.pathsep + self._env_copy.get("PATH", "") - - self._which_ffmpeg = shutil.which("ffmpeg", path=path) + self._which_ffmpeg, self._which_ffprobe = await asyncio.get_running_loop().run_in_executor( + None, self._which_ffmpeg_and_ffmprobe, path + ) if not self._which_ffmpeg: log.warning("Unable to locate ffmpeg executable. Path: %s", path) raise FileNotFoundError(f"Unable to locate ffmpeg executable. Path: {path}") - self._which_ffprobe = shutil.which("ffprobe", path=path) if not self._which_ffprobe: log.warning("Unable to locate ffprobe executable. Path: %s", path) raise FileNotFoundError(f"Unable to locate ffprobe executable. Path: {path}") if os.path.dirname(self._which_ffmpeg) != os.path.dirname(self._which_ffprobe): log.warning("ffmpeg and ffprobe are in different folders!") + await self._verify_executables() self._ffmpeg_installed = True - async def status(self, reset=False): + async def status(self, reset=False, recheck=False): if reset: self._available_encoders = "" self._ffmpeg_installed = None - if self._ffmpeg_installed is None: + if self._checked_ffmpeg and not recheck: + pass + elif self._ffmpeg_installed is None: try: await self._verify_ffmpeg_installed() except FileNotFoundError: pass + self._checked_ffmpeg = True return { "available": self._ffmpeg_installed, "which": self._which_ffmpeg, diff --git a/tests/integration/other/test_transcoding.py b/tests/integration/other/test_transcoding.py index d671ace4d..9bf654825 100644 --- a/tests/integration/other/test_transcoding.py +++ b/tests/integration/other/test_transcoding.py @@ -160,3 +160,26 @@ class TranscodeValidation(ClaimTestCase): await self.analyzer.status(reset=True) with self.assertRaisesRegex(Exception, "Unable to locate"): await self.analyzer.verify_or_repair(True, False, self.video_file_name) + + async def test_dont_recheck_ffmpeg_installation(self): + + call_count = 0 + + original = self.daemon._video_file_analyzer._verify_ffmpeg_installed + + def _verify_ffmpeg_installed(): + nonlocal call_count + call_count += 1 + return original() + + self.daemon._video_file_analyzer._verify_ffmpeg_installed = _verify_ffmpeg_installed + self.assertEqual(0, call_count) + await self.daemon.jsonrpc_status() + self.assertEqual(1, call_count) + # counter should not go up again + await self.daemon.jsonrpc_status() + self.assertEqual(1, call_count) + + # this should force rechecking the installation + await self.daemon.jsonrpc_ffmpeg_find() + self.assertEqual(2, call_count)