lbry-sdk/lbry/file_analysis.py
2020-03-05 23:49:49 -05:00

422 lines
17 KiB
Python

import platform
import asyncio
import json
import logging
import os
import pathlib
import re
import shlex
import shutil
from concurrent.futures.thread import ThreadPoolExecutor
from lbry.conf import TranscodeConfig
log = logging.getLogger(__name__)
class VideoFileAnalyzer:
def __init__(self, conf: TranscodeConfig):
self._conf = conf
self._available_encoders = ""
self._ffmpeg_installed = False
self._which = None
self._checked_ffmpeg = False
self._loop = None
async def _execute(self, command, arguments):
args = shlex.split(arguments)
process = await asyncio.create_subprocess_exec(
os.path.join(self._conf.ffmpeg_folder, command), *args,
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, loop=self._loop
)
stdout, stderr = await process.communicate() # returns when the streams are closed
return stdout.decode(errors='replace') + stderr.decode(errors='replace'), process.returncode
async def _verify_executable(self, name):
try:
version, code = await self._execute(name, "-version")
except Exception as e:
log.warning("Unable to run %s, but it was requested. Message: %s", name, str(e))
code = -1
version = ""
if code != 0 or not version.startswith(name):
raise FileNotFoundError(f"Unable to locate or run {name}. Please install FFmpeg "
f"and ensure that it is callable via PATH or conf.ffmpeg_folder")
return version
def _verify_ffmpeg_installed(self):
if self._ffmpeg_installed:
return
async def _verify():
await self._verify_executable("ffprobe")
version = await self._verify_executable("ffmpeg")
self._which = shutil.which(os.path.join(self._conf.ffmpeg_folder, "ffmpeg"))
self._ffmpeg_installed = True
log.debug("Using %s at %s", version.splitlines()[0].split(" Copyright")[0], self._which)
def _inner():
if not self._loop:
if platform.system() == 'Windows':
self._loop = asyncio.ProactorEventLoop()
else:
self._loop = asyncio.new_event_loop()
try:
return self._loop.run_until_complete(
asyncio.ensure_future(_verify(), loop=self._loop)
)
finally:
if self._loop:
self._loop.stop()
self._loop.close()
self._loop = None
async def wrapper():
pool = ThreadPoolExecutor(1)
try:
return await asyncio.get_event_loop().run_in_executor(
pool, _inner
)
finally:
pool.shutdown(wait=True)
return asyncio.ensure_future(wrapper())
async def status(self, reset=False, recheck=False):
if reset:
self._available_encoders = ""
self._ffmpeg_installed = False
self._which = None
if self._checked_ffmpeg and not recheck:
installed = self._ffmpeg_installed
else:
installed = True
try:
await self._verify_ffmpeg_installed()
except FileNotFoundError:
installed = False
self._checked_ffmpeg = True
return {
"available": installed,
"which": self._which,
"analyze_audio_volume": int(self._conf.volume_analysis_time) > 0
}
@staticmethod
def _verify_container(scan_data: json):
container = scan_data["format"]["format_name"]
log.debug(" Detected container is %s", container)
if not {"webm", "mp4", "3gp", "ogg"}.intersection(container.split(",")):
return "Container format is not in the approved list of WebM, MP4. " \
f"Actual: {container} [{scan_data['format']['format_long_name']}]"
return ""
@staticmethod
def _verify_video_encoding(scan_data: json):
for stream in scan_data["streams"]:
if stream["codec_type"] != "video":
continue
codec = stream["codec_name"]
log.debug(" Detected video codec is %s, format is %s", codec, stream["pix_fmt"])
if not {"h264", "vp8", "vp9", "av1", "theora"}.intersection(codec.split(",")):
return "Video codec is not in the approved list of H264, VP8, VP9, AV1, Theora. " \
f"Actual: {codec} [{stream['codec_long_name']}]"
if "h264" in codec.split(",") and stream["pix_fmt"] != "yuv420p":
return "Video codec is H264, but its pixel format does not match the approved yuv420p. " \
f"Actual: {stream['pix_fmt']}"
return ""
def _verify_bitrate(self, scan_data: json, file_path):
bit_rate_max = float(self._conf.video_bitrate_maximum)
if bit_rate_max <= 0:
return ""
if "bit_rate" in scan_data["format"]:
bit_rate = float(scan_data["format"]["bit_rate"])
else:
bit_rate = os.stat(file_path).st_size / float(scan_data["format"]["duration"])
log.debug(" Detected bitrate is %s Mbps. Allowed is %s Mbps",
str(bit_rate / 1000000.0), str(bit_rate_max / 1000000.0))
if bit_rate > bit_rate_max:
return "The bit rate is above the configured maximum. Actual: " \
f"{bit_rate / 1000000.0} Mbps; Allowed: {bit_rate_max / 1000000.0} Mbps"
return ""
async def _verify_fast_start(self, scan_data: json, video_file):
container = scan_data["format"]["format_name"]
if {"webm", "ogg"}.intersection(container.split(",")):
return ""
result, _ = await self._execute("ffprobe", f'-v debug "{video_file}"')
match = re.search(r"Before avformat_find_stream_info.+?\s+seeks:(\d+)\s+", result)
if match and int(match.group(1)) != 0:
return "Video stream descriptors are not at the start of the file (the faststart flag was not used)."
return ""
@staticmethod
def _verify_audio_encoding(scan_data: json):
for stream in scan_data["streams"]:
if stream["codec_type"] != "audio":
continue
codec = stream["codec_name"]
log.debug(" Detected audio codec is %s", codec)
if not {"aac", "mp3", "flac", "vorbis", "opus"}.intersection(codec.split(",")):
return "Audio codec is not in the approved list of AAC, FLAC, MP3, Vorbis, and Opus. " \
f"Actual: {codec} [{stream['codec_long_name']}]"
return ""
async def _verify_audio_volume(self, seconds, video_file):
try:
validate_volume = int(seconds) > 0
except ValueError:
validate_volume = False
if not validate_volume:
return ""
result, _ = await self._execute("ffmpeg", f'-i "{video_file}" -t {seconds} '
f'-af volumedetect -vn -sn -dn -f null "{os.devnull}"')
try:
mean_volume = float(re.search(r"mean_volume:\s+([-+]?\d*\.\d+|\d+)", result).group(1))
max_volume = float(re.search(r"max_volume:\s+([-+]?\d*\.\d+|\d+)", result).group(1))
except Exception as e:
log.debug(" Failure in volume analysis. Message: %s", str(e))
return ""
if max_volume < -5.0 and mean_volume < -22.0:
return "Audio is at least five dB lower than prime. " \
f"Actual max: {max_volume}, mean: {mean_volume}"
log.debug(" Detected audio volume has mean, max of %f, %f dB", mean_volume, max_volume)
return ""
@staticmethod
def _compute_crf(scan_data):
height = 240.0
for stream in scan_data["streams"]:
if stream["codec_type"] == "video":
height = max(height, float(stream["height"]))
# https://developers.google.com/media/vp9/settings/vod/
return int(-0.011 * height + 40)
def _get_video_scaler(self):
return self._conf.video_scaler
async def _get_video_encoder(self, scan_data):
# use what the user said if it's there:
# if it's not there, use h264 if we can because it's way faster than the others
# if we don't have h264 use vp9; it's fairly compatible even though it's slow
if not self._available_encoders:
self._available_encoders, _ = await self._execute("ffmpeg", "-encoders -v quiet")
encoder = self._conf.video_encoder.split(" ", 1)[0]
if re.search(fr"^\s*V..... {encoder} ", self._available_encoders, re.MULTILINE):
return self._conf.video_encoder
if re.search(r"^\s*V..... libx264 ", self._available_encoders, re.MULTILINE):
if encoder:
log.warning(" Using libx264 since the requested encoder was unavailable. Requested: %s", encoder)
return 'libx264 -crf 19 -vf "format=yuv420p"'
if not encoder:
encoder = "libx264"
if re.search(r"^\s*V..... libvpx-vp9 ", self._available_encoders, re.MULTILINE):
log.warning(" Using libvpx-vp9 since the requested encoder was unavailable. Requested: %s", encoder)
crf = self._compute_crf(scan_data)
return f"libvpx-vp9 -crf {crf} -b:v 0"
if re.search(r"^\s*V..... libtheora", self._available_encoders, re.MULTILINE):
log.warning(" Using libtheora since the requested encoder was unavailable. Requested: %s", encoder)
return "libtheora -q:v 7"
raise Exception(f"The video encoder is not available. Requested: {encoder}")
async def _get_audio_encoder(self, extension):
# if the video encoding is theora or av1/vp8/vp9 use opus (or fallback to vorbis)
# or we don't have a video encoding but we have an ogg or webm container use opus
# if we need to use opus/vorbis see if the conf file has it else use our own params
# else use the user-set value if it exists
# else use aac
wants_opus = extension != "mp4"
if not self._available_encoders:
self._available_encoders, _ = await self._execute("ffmpeg", "-encoders -v quiet")
encoder = self._conf.audio_encoder.split(" ", 1)[0]
if wants_opus and 'opus' in encoder:
return self._conf.audio_encoder
if wants_opus and re.search(r"^\s*A..... libopus ", self._available_encoders, re.MULTILINE):
return "libopus -b:a 160k"
if wants_opus and 'vorbis' in encoder:
return self._conf.audio_encoder
if wants_opus and re.search(r"^\s*A..... libvorbis ", self._available_encoders, re.MULTILINE):
return "libvorbis -q:a 6"
if re.search(fr"^\s*A..... {encoder} ", self._available_encoders, re.MULTILINE):
return self._conf.audio_encoder
if re.search(r"^\s*A..... aac ", self._available_encoders, re.MULTILINE):
return "aac -b:a 192k"
raise Exception(f"The audio encoder is not available. Requested: {encoder or 'aac'}")
async def _get_volume_filter(self):
return self._conf.volume_filter if self._conf.volume_filter else "-af loudnorm"
@staticmethod
def _get_best_container_extension(scan_data, video_encoder):
# the container is chosen by the video format
# if we are theora-encoded, we want ogg
# if we are vp8/vp9/av1 we want webm
# use mp4 for anything else
if not video_encoder: # not re-encoding video
for stream in scan_data["streams"]:
if stream["codec_type"] != "video":
continue
codec = stream["codec_name"].split(",")
if "theora" in codec:
return "ogv"
if {"vp8", "vp9", "av1"}.intersection(codec):
return "webm"
if "theora" in video_encoder:
return "ogv"
elif re.search(r"vp[89x]|av1", video_encoder.split(" ", 1)[0]):
return "webm"
return "mp4"
async def _get_scan_data(self, validate, file_path):
result, _ = await self._execute("ffprobe",
f'-v quiet -print_format json -show_format -show_streams "{file_path}"')
try:
scan_data = json.loads(result)
except Exception as e:
log.debug("Failure in JSON parsing ffprobe results. Message: %s", str(e))
raise ValueError(f'Absent or unreadable video file: {file_path}')
if "format" not in scan_data or "duration" not in scan_data["format"]:
log.debug("Format data is missing from ffprobe results for: %s", file_path)
raise ValueError(f'Media file does not appear to contain video content at: {file_path}')
if float(scan_data["format"]["duration"]) < 0.1:
log.debug("Media file appears to be an image: %s", file_path)
raise ValueError(f'Assuming image file at: {file_path}')
return scan_data
async def _verify_or_repair(self, validate, repair, file_path, ignore_non_video=False):
if not validate and not repair:
return file_path
await self._verify_ffmpeg_installed()
try:
scan_data = await self._get_scan_data(validate, file_path)
except ValueError:
if ignore_non_video:
return file_path
raise
fast_start_msg = await self._verify_fast_start(scan_data, file_path)
log.debug("Analyzing %s:", file_path)
log.debug(" Detected faststart is %s", "false" if fast_start_msg else "true")
container_msg = self._verify_container(scan_data)
bitrate_msg = self._verify_bitrate(scan_data, file_path)
video_msg = self._verify_video_encoding(scan_data)
audio_msg = self._verify_audio_encoding(scan_data)
volume_msg = await self._verify_audio_volume(self._conf.volume_analysis_time, file_path)
messages = [container_msg, bitrate_msg, fast_start_msg, video_msg, audio_msg, volume_msg]
if not any(messages):
return file_path
if not repair:
errors = ["Streamability verification failed:"]
errors.extend(filter(None, messages))
raise Exception("\n ".join(errors))
# the plan for transcoding:
# we have to re-encode the video if it is in a nonstandard format
# we also re-encode if we are h264 but not yuv420p (both errors caught in video_msg)
# we also re-encode if our bitrate is too high
try:
transcode_command = [f'-i "{file_path}" -y -c:s copy -c:d copy -c:v']
video_encoder = ""
if video_msg or bitrate_msg:
video_encoder = await self._get_video_encoder(scan_data)
transcode_command.append(video_encoder)
# could do the scaling only if bitrate_msg, but if we're going to the effort to re-encode anyway...
transcode_command.append(self._get_video_scaler())
else:
transcode_command.append("copy")
transcode_command.append("-movflags +faststart -c:a")
path = pathlib.Path(file_path)
extension = self._get_best_container_extension(scan_data, video_encoder)
if audio_msg or volume_msg:
audio_encoder = await self._get_audio_encoder(extension)
transcode_command.append(audio_encoder)
if volume_msg:
volume_filter = await self._get_volume_filter()
transcode_command.append(volume_filter)
else:
transcode_command.append("copy")
# TODO: put it in a temp folder and delete it after we upload?
output = path.parent / f"{path.stem}_fixed.{extension}"
transcode_command.append(f'"{output}"')
ffmpeg_command = " ".join(transcode_command)
log.info("Proceeding on transcode via: ffmpeg %s", ffmpeg_command)
result, code = await self._execute("ffmpeg", ffmpeg_command)
if code != 0:
raise Exception(f"Failure to complete the transcode command. Output: {result}")
except Exception as e:
if validate:
raise
log.info("Unable to transcode %s . Message: %s", file_path, str(e))
# TODO: delete partial output file here if it exists?
return file_path
return str(output)
def _blocking_verify_or_repair(self, validate, repair, file_path, ignore_non_video=False):
if platform.system() == 'Windows':
self._loop = asyncio.ProactorEventLoop()
else:
self._loop = asyncio.new_event_loop()
try:
return self._loop.run_until_complete(
asyncio.ensure_future(
self._verify_or_repair(validate, repair, file_path, ignore_non_video), loop=self._loop
)
)
finally:
if self._loop:
self._loop.stop()
self._loop.close()
self._loop = None
async def verify_or_repair(self, validate, repair, file_path, ignore_non_video=False):
pool = ThreadPoolExecutor(1)
try:
return await asyncio.get_event_loop().run_in_executor(
pool, self._blocking_verify_or_repair, validate, repair, file_path, ignore_non_video
)
finally:
pool.shutdown(wait=True)