Compare commits

...
Sign in to create a new pull request.

1 commit

Author SHA1 Message Date
Jack Robison
7a707f7321
fix ProactorEventLoop 2020-03-05 23:49:49 -05:00
2 changed files with 66 additions and 13 deletions

View file

@ -3,7 +3,6 @@ import sys
import shutil
import signal
import pathlib
import platform
import json
import asyncio
import argparse
@ -262,9 +261,6 @@ def setup_logging(logger: logging.Logger, args: argparse.Namespace, conf: Config
def run_daemon(args: argparse.Namespace, conf: Config):
if sys.version_info < (3, 8) and platform.system() == "Windows":
# TODO: remove after we move to requiring Python 3.8
asyncio.set_event_loop(asyncio.ProactorEventLoop())
loop = asyncio.get_event_loop()
if args.verbose is not None:
loop.set_debug(True)

View file

@ -1,3 +1,4 @@
import platform
import asyncio
import json
import logging
@ -6,7 +7,7 @@ import pathlib
import re
import shlex
import shutil
from concurrent.futures.thread import ThreadPoolExecutor
from lbry.conf import TranscodeConfig
log = logging.getLogger(__name__)
@ -20,12 +21,13 @@ class VideoFileAnalyzer:
self._ffmpeg_installed = False
self._which = None
self._checked_ffmpeg = False
self._loop = None
async def _execute(self, command, arguments):
args = shlex.split(arguments)
process = await asyncio.create_subprocess_exec(
os.path.join(self._conf.ffmpeg_folder, command), *args,
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, loop=self._loop
)
stdout, stderr = await process.communicate() # returns when the streams are closed
return stdout.decode(errors='replace') + stderr.decode(errors='replace'), process.returncode
@ -42,15 +44,44 @@ class VideoFileAnalyzer:
f"and ensure that it is callable via PATH or conf.ffmpeg_folder")
return version
async def _verify_ffmpeg_installed(self):
def _verify_ffmpeg_installed(self):
if self._ffmpeg_installed:
return
async def _verify():
await self._verify_executable("ffprobe")
version = await self._verify_executable("ffmpeg")
self._which = shutil.which(os.path.join(self._conf.ffmpeg_folder, "ffmpeg"))
self._ffmpeg_installed = True
log.debug("Using %s at %s", version.splitlines()[0].split(" Copyright")[0], self._which)
def _inner():
if not self._loop:
if platform.system() == 'Windows':
self._loop = asyncio.ProactorEventLoop()
else:
self._loop = asyncio.new_event_loop()
try:
return self._loop.run_until_complete(
asyncio.ensure_future(_verify(), loop=self._loop)
)
finally:
if self._loop:
self._loop.stop()
self._loop.close()
self._loop = None
async def wrapper():
pool = ThreadPoolExecutor(1)
try:
return await asyncio.get_event_loop().run_in_executor(
pool, _inner
)
finally:
pool.shutdown(wait=True)
return asyncio.ensure_future(wrapper())
async def status(self, reset=False, recheck=False):
if reset:
self._available_encoders = ""
@ -286,7 +317,7 @@ class VideoFileAnalyzer:
return scan_data
async def verify_or_repair(self, validate, repair, file_path, ignore_non_video=False):
async def _verify_or_repair(self, validate, repair, file_path, ignore_non_video=False):
if not validate and not repair:
return file_path
@ -363,3 +394,29 @@ class VideoFileAnalyzer:
return file_path
return str(output)
def _blocking_verify_or_repair(self, validate, repair, file_path, ignore_non_video=False):
if platform.system() == 'Windows':
self._loop = asyncio.ProactorEventLoop()
else:
self._loop = asyncio.new_event_loop()
try:
return self._loop.run_until_complete(
asyncio.ensure_future(
self._verify_or_repair(validate, repair, file_path, ignore_non_video), loop=self._loop
)
)
finally:
if self._loop:
self._loop.stop()
self._loop.close()
self._loop = None
async def verify_or_repair(self, validate, repair, file_path, ignore_non_video=False):
pool = ThreadPoolExecutor(1)
try:
return await asyncio.get_event_loop().run_in_executor(
pool, self._blocking_verify_or_repair, validate, repair, file_path, ignore_non_video
)
finally:
pool.shutdown(wait=True)