diff --git a/lbry/extras/cli.py b/lbry/extras/cli.py index e5de48616..4e2a8b527 100644 --- a/lbry/extras/cli.py +++ b/lbry/extras/cli.py @@ -3,7 +3,6 @@ import sys import shutil import signal import pathlib -import platform import json import asyncio import argparse @@ -262,9 +261,6 @@ def setup_logging(logger: logging.Logger, args: argparse.Namespace, conf: Config def run_daemon(args: argparse.Namespace, conf: Config): - if sys.version_info < (3, 8) and platform.system() == "Windows": - # TODO: remove after we move to requiring Python 3.8 - asyncio.set_event_loop(asyncio.ProactorEventLoop()) loop = asyncio.get_event_loop() if args.verbose is not None: loop.set_debug(True) diff --git a/lbry/file_analysis.py b/lbry/file_analysis.py index b5714af86..8b781ab80 100644 --- a/lbry/file_analysis.py +++ b/lbry/file_analysis.py @@ -1,3 +1,4 @@ +import platform import asyncio import json import logging @@ -6,7 +7,7 @@ import pathlib import re import shlex import shutil - +from concurrent.futures.thread import ThreadPoolExecutor from lbry.conf import TranscodeConfig log = logging.getLogger(__name__) @@ -20,12 +21,13 @@ class VideoFileAnalyzer: self._ffmpeg_installed = False self._which = None self._checked_ffmpeg = False + self._loop = None async def _execute(self, command, arguments): args = shlex.split(arguments) process = await asyncio.create_subprocess_exec( os.path.join(self._conf.ffmpeg_folder, command), *args, - stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE + stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, loop=self._loop ) stdout, stderr = await process.communicate() # returns when the streams are closed return stdout.decode(errors='replace') + stderr.decode(errors='replace'), process.returncode @@ -42,14 +44,43 @@ class VideoFileAnalyzer: f"and ensure that it is callable via PATH or conf.ffmpeg_folder") return version - async def _verify_ffmpeg_installed(self): + def _verify_ffmpeg_installed(self): if self._ffmpeg_installed: return - await self._verify_executable("ffprobe") - version = await self._verify_executable("ffmpeg") - self._which = shutil.which(os.path.join(self._conf.ffmpeg_folder, "ffmpeg")) - self._ffmpeg_installed = True - log.debug("Using %s at %s", version.splitlines()[0].split(" Copyright")[0], self._which) + + async def _verify(): + await self._verify_executable("ffprobe") + version = await self._verify_executable("ffmpeg") + self._which = shutil.which(os.path.join(self._conf.ffmpeg_folder, "ffmpeg")) + self._ffmpeg_installed = True + log.debug("Using %s at %s", version.splitlines()[0].split(" Copyright")[0], self._which) + + def _inner(): + if not self._loop: + if platform.system() == 'Windows': + self._loop = asyncio.ProactorEventLoop() + else: + self._loop = asyncio.new_event_loop() + try: + return self._loop.run_until_complete( + asyncio.ensure_future(_verify(), loop=self._loop) + ) + finally: + if self._loop: + self._loop.stop() + self._loop.close() + self._loop = None + + async def wrapper(): + pool = ThreadPoolExecutor(1) + try: + return await asyncio.get_event_loop().run_in_executor( + pool, _inner + ) + finally: + pool.shutdown(wait=True) + + return asyncio.ensure_future(wrapper()) async def status(self, reset=False, recheck=False): if reset: @@ -286,7 +317,7 @@ class VideoFileAnalyzer: return scan_data - async def verify_or_repair(self, validate, repair, file_path, ignore_non_video=False): + async def _verify_or_repair(self, validate, repair, file_path, ignore_non_video=False): if not validate and not repair: return file_path @@ -363,3 +394,29 @@ class VideoFileAnalyzer: return file_path return str(output) + + def _blocking_verify_or_repair(self, validate, repair, file_path, ignore_non_video=False): + if platform.system() == 'Windows': + self._loop = asyncio.ProactorEventLoop() + else: + self._loop = asyncio.new_event_loop() + try: + return self._loop.run_until_complete( + asyncio.ensure_future( + self._verify_or_repair(validate, repair, file_path, ignore_non_video), loop=self._loop + ) + ) + finally: + if self._loop: + self._loop.stop() + self._loop.close() + self._loop = None + + async def verify_or_repair(self, validate, repair, file_path, ignore_non_video=False): + pool = ThreadPoolExecutor(1) + try: + return await asyncio.get_event_loop().run_in_executor( + pool, self._blocking_verify_or_repair, validate, repair, file_path, ignore_non_video + ) + finally: + pool.shutdown(wait=True)