Av1an/Encoders/vvc.py
2020-10-31 16:29:18 +02:00

152 lines
5.3 KiB
Python

import os
import subprocess
from distutils.spawn import find_executable
from pathlib import Path
from subprocess import PIPE, STDOUT
from typing import Tuple, Optional
import re
from Av1an.arg_parse import Args
from Chunks.chunk import Chunk
from Av1an.commandtypes import MPCommands, CommandPair, Command
from Encoders.encoder import Encoder
from Av1an.logger import log
from Av1an.utils import list_index_of_regex
class Vvc(Encoder):
"""
Redo after VVenC default and expert app have concatenation
"""
def __init__(self):
super(Vvc, self).__init__(
encoder_bin='vvc_encoder',
encoder_help='vvc_encoder --help',
default_args=None,
default_passes=1,
default_q_range=(20, 50),
output_extension='h266'
)
def compose_1_pass(self, a: Args, c: Chunk, output: str) -> MPCommands:
yuv_file: str = Vvc.get_yuv_file_path(c).as_posix()
return [
CommandPair(
[],
['vvc_encoder', '-c', a.vvc_conf, '-i', yuv_file, *a.video_params, '-f', str(c.frames),
'--InputBitDepth=10', '--OutputBitDepth=10', '-b', output]
)
]
def compose_2_pass(self, a: Args, c: Chunk, output: str) -> MPCommands:
raise ValueError('VVC does not support 2 pass encoding')
def man_q(self, command: Command, q: int) -> Command:
"""Return command with new cq value
:param command: old command
:param q: new cq value
:return: command with new cq value"""
adjusted_command = command.copy()
i = list_index_of_regex(adjusted_command, r"-q")
adjusted_command[i + 1] = f'{q}'
return adjusted_command
def match_line(self, line: str):
"""Extract number of encoded frames from line.
:param line: one line of text output from the encoder
:return: match object from re.search matching the number of encoded frames"""
return re.search(r"POC.*? ([^ ]+?)", line)
def make_pipes(self, a: Args, c: Chunk, passes: int, current_pass: int, output: str, man_q: int = None):
"""
Creates a pipe for the given chunk with the given args
:param a: the Args
:param c: the Chunk
:param passes: the total number of passes (1 or 2)
:param current_pass: the current_pass
:param man_q: use a diffrent quality
:return: a Pipe attached to the encoders stdout
"""
# Filter cmd not used?
filter_cmd, enc_cmd = self.compose_1_pass(a, c, output)[0] if passes == 1 else \
self.compose_2_pass(a, c, output)[current_pass - 1]
if man_q:
enc_cmd = self.man_q(enc_cmd, man_q)
elif c.vmaf_target_cq:
enc_cmd = self.man_q(enc_cmd, c.vmaf_target_cq)
pipe = subprocess.Popen(enc_cmd, stdout=PIPE,
stderr=STDOUT,
universal_newlines=True)
return pipe
def is_valid(self, args: Args) -> Tuple[bool, Optional[str]]:
# vvc requires a special concat executable
if not find_executable('vvc_concat'):
return False, 'vvc concatenation executable "vvc_concat" not found'
# make sure there's a vvc config file
if args.vvc_conf is None:
return False, 'Conf file for vvc required'
# vvc requires video information that av1an can't provide
if args.video_params is None:
return False, 'VVC requires:\n' \
' -wdt X - video width\n' \
' -hgt X - video height\n' \
' -fr X - framerate\n' \
' -q X - quantizer\n' \
'Example: -wdt 640 -hgt 360 -fr 23.98 -q 30'
return super().is_valid(args)
def on_before_chunk(self, args: Args, chunk: Chunk) -> None:
# vvc requires a yuv files as input, make it here
log(f'Creating yuv for chunk {chunk.name}\n')
Vvc.to_yuv(chunk)
log(f'Created yuv for chunk {chunk.name}\n')
super().on_before_chunk(args, chunk)
def on_after_chunk(self, args: Args, chunk: Chunk) -> None:
# delete the yuv file for this chunk
yuv_path = Vvc.get_yuv_file_path(chunk)
os.remove(yuv_path)
super().on_after_chunk(args, chunk)
@staticmethod
def get_yuv_file_path(chunk: Chunk) -> Path:
"""
Gets the yuv path to be used for a given chunk
:param chunk: the Chunk
:return: a yuv file path for the chunk
"""
return (chunk.temp / 'split') / f'{chunk.name}.yuv'
@staticmethod
def to_yuv(chunk: Chunk) -> None:
"""
Generates a yuv file for a given chunk
:param chunk: the Chunk
:return: None
"""
yuv_path = Vvc.get_yuv_file_path(chunk)
ffmpeg_gen_pipe = subprocess.Popen(chunk.ffmpeg_gen_cmd, stdout=PIPE, stderr=STDOUT)
# TODO: apply ffmpeg filter to the yuv file
cmd = ['ffmpeg', '-y', '-loglevel', 'error', '-i', '-', '-f', 'rawvideo', '-vf', 'format=yuv420p10le',
yuv_path.as_posix()]
pipe = subprocess.Popen(cmd, stdin=ffmpeg_gen_pipe.stdout, stdout=PIPE, stderr=STDOUT, universal_newlines=True)
pipe.wait()