fix ProactorEventLoop

This commit is contained in:
Jack Robison 2020-03-05 23:08:33 -05:00
parent c4905d02b9
commit 7a707f7321
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
2 changed files with 66 additions and 13 deletions

View file

@ -3,7 +3,6 @@ import sys
import shutil import shutil
import signal import signal
import pathlib import pathlib
import platform
import json import json
import asyncio import asyncio
import argparse import argparse
@ -262,9 +261,6 @@ def setup_logging(logger: logging.Logger, args: argparse.Namespace, conf: Config
def run_daemon(args: argparse.Namespace, conf: Config): def run_daemon(args: argparse.Namespace, conf: Config):
if sys.version_info < (3, 8) and platform.system() == "Windows":
# TODO: remove after we move to requiring Python 3.8
asyncio.set_event_loop(asyncio.ProactorEventLoop())
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
if args.verbose is not None: if args.verbose is not None:
loop.set_debug(True) loop.set_debug(True)

View file

@ -1,3 +1,4 @@
import platform
import asyncio import asyncio
import json import json
import logging import logging
@ -6,7 +7,7 @@ import pathlib
import re import re
import shlex import shlex
import shutil import shutil
from concurrent.futures.thread import ThreadPoolExecutor
from lbry.conf import TranscodeConfig from lbry.conf import TranscodeConfig
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -20,12 +21,13 @@ class VideoFileAnalyzer:
self._ffmpeg_installed = False self._ffmpeg_installed = False
self._which = None self._which = None
self._checked_ffmpeg = False self._checked_ffmpeg = False
self._loop = None
async def _execute(self, command, arguments): async def _execute(self, command, arguments):
args = shlex.split(arguments) args = shlex.split(arguments)
process = await asyncio.create_subprocess_exec( process = await asyncio.create_subprocess_exec(
os.path.join(self._conf.ffmpeg_folder, command), *args, os.path.join(self._conf.ffmpeg_folder, command), *args,
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, loop=self._loop
) )
stdout, stderr = await process.communicate() # returns when the streams are closed stdout, stderr = await process.communicate() # returns when the streams are closed
return stdout.decode(errors='replace') + stderr.decode(errors='replace'), process.returncode return stdout.decode(errors='replace') + stderr.decode(errors='replace'), process.returncode
@ -42,14 +44,43 @@ class VideoFileAnalyzer:
f"and ensure that it is callable via PATH or conf.ffmpeg_folder") f"and ensure that it is callable via PATH or conf.ffmpeg_folder")
return version return version
async def _verify_ffmpeg_installed(self): def _verify_ffmpeg_installed(self):
if self._ffmpeg_installed: if self._ffmpeg_installed:
return return
await self._verify_executable("ffprobe")
version = await self._verify_executable("ffmpeg") async def _verify():
self._which = shutil.which(os.path.join(self._conf.ffmpeg_folder, "ffmpeg")) await self._verify_executable("ffprobe")
self._ffmpeg_installed = True version = await self._verify_executable("ffmpeg")
log.debug("Using %s at %s", version.splitlines()[0].split(" Copyright")[0], self._which) self._which = shutil.which(os.path.join(self._conf.ffmpeg_folder, "ffmpeg"))
self._ffmpeg_installed = True
log.debug("Using %s at %s", version.splitlines()[0].split(" Copyright")[0], self._which)
def _inner():
if not self._loop:
if platform.system() == 'Windows':
self._loop = asyncio.ProactorEventLoop()
else:
self._loop = asyncio.new_event_loop()
try:
return self._loop.run_until_complete(
asyncio.ensure_future(_verify(), loop=self._loop)
)
finally:
if self._loop:
self._loop.stop()
self._loop.close()
self._loop = None
async def wrapper():
pool = ThreadPoolExecutor(1)
try:
return await asyncio.get_event_loop().run_in_executor(
pool, _inner
)
finally:
pool.shutdown(wait=True)
return asyncio.ensure_future(wrapper())
async def status(self, reset=False, recheck=False): async def status(self, reset=False, recheck=False):
if reset: if reset:
@ -286,7 +317,7 @@ class VideoFileAnalyzer:
return scan_data return scan_data
async def verify_or_repair(self, validate, repair, file_path, ignore_non_video=False): async def _verify_or_repair(self, validate, repair, file_path, ignore_non_video=False):
if not validate and not repair: if not validate and not repair:
return file_path return file_path
@ -363,3 +394,29 @@ class VideoFileAnalyzer:
return file_path return file_path
return str(output) return str(output)
def _blocking_verify_or_repair(self, validate, repair, file_path, ignore_non_video=False):
if platform.system() == 'Windows':
self._loop = asyncio.ProactorEventLoop()
else:
self._loop = asyncio.new_event_loop()
try:
return self._loop.run_until_complete(
asyncio.ensure_future(
self._verify_or_repair(validate, repair, file_path, ignore_non_video), loop=self._loop
)
)
finally:
if self._loop:
self._loop.stop()
self._loop.close()
self._loop = None
async def verify_or_repair(self, validate, repair, file_path, ignore_non_video=False):
pool = ThreadPoolExecutor(1)
try:
return await asyncio.get_event_loop().run_in_executor(
pool, self._blocking_verify_or_repair, validate, repair, file_path, ignore_non_video
)
finally:
pool.shutdown(wait=True)