Av1an/utils/vmaf.py

176 lines
5.5 KiB
Python
Executable file

#! /bin/env python
import subprocess
import sys
from math import isnan
from pathlib import Path
from subprocess import PIPE, STDOUT
import matplotlib
import numpy as np
from matplotlib import pyplot as plt
from scipy import interpolate
from utils.utils import terminate
def read_vmaf_xml(file, percentile):
with open(file, 'r') as f:
file = f.readlines()
file = [x.strip() for x in file if 'vmaf="' in x]
vmafs = []
for i in file:
vmf = i[i.rfind('="') + 2: i.rfind('"')]
vmafs.append(float(vmf))
vmafs = [float(x) for x in vmafs if isinstance(x, float)]
calc = [x for x in vmafs if isinstance(x, float) and not isnan(x)]
perc = round(np.percentile(calc, percentile), 2)
return perc
def call_vmaf(source: Path, encoded: Path, model, n_threads, return_file=False):
if model:
mod = f":model_path={model}"
else:
mod = ''
if n_threads:
n_threads = f':n_threads={n_threads}'
else:
n_threads = ''
# For vmaf calculation both source and encoded segment scaled to 1080
# for proper vmaf calculation
fl = source.with_name(encoded.stem).with_suffix('.xml').as_posix()
cmd = f'ffmpeg -loglevel error -hide_banner -r 60 -i {source.as_posix()} -r 60 -i {encoded.as_posix()} ' \
f'-filter_complex "[0:v]scale=1920:1080:flags=spline:force_original_aspect_ratio=decrease[scaled1];' \
f'[1:v]scale=1920:1080:flags=spline:force_original_aspect_ratio=decrease[scaled2];' \
f'[scaled2][scaled1]libvmaf=log_path={fl}{mod}{n_threads}" -f null - '
c = subprocess.run(cmd, shell=True, stdout=PIPE, stderr=STDOUT)
call = c.stdout
# print(c.stdout.decode())
if 'error' in call.decode().lower():
print('\n\nERROR IN VMAF CALCULATION\n\n',call.decode())
terminate()
if return_file:
return fl
call = call.decode().strip()
vmf = call.split()[-1]
try:
vmf = float(vmf)
except ValueError:
vmf = 0
return vmf
def plot_vmaf(inp: Path, out: Path, model=None):
print('Calculating Vmaf...\r', end='')
xml = call_vmaf(inp, out, model=model, return_file=True)
if not Path(xml).exists():
print(f'Vmaf calculation failed for files:\n {inp.stem} {out.stem}')
sys.exit()
with open(xml, 'r') as fl:
f = fl.readlines()
f = [x.strip() for x in f if 'vmaf="' in x]
vmafs = []
for i in f:
vmf = i[i.rfind('="') + 2: i.rfind('"')]
vmafs.append(float(vmf))
vmafs = [round(float(x), 3) for x in vmafs if isinstance(x, float)]
perc_1 = read_vmaf_xml(xml, 1)
perc_25 = read_vmaf_xml(xml, 25)
perc_75 = read_vmaf_xml(xml, 75)
mean = round(sum(vmafs) / len(vmafs), 3)
# Plot
plt.figure(figsize=(15, 4))
for i in range(0, 100):
plt.axhline(i, color='grey', linewidth=0.4)
if i % 5 == 0:
plt.axhline(i, color='black', linewidth=0.6)
plt.plot(range(len(vmafs)), vmafs,
label=f'Frames: {len(vmafs)}\nMean:{mean}\n'
f'1%: {perc_1} \n25%: {perc_25} \n75%: {perc_75}', linewidth=0.7)
plt.ylabel('VMAF')
plt.legend(loc="lower right", markerscale=0, handlelength=0, fancybox=True, )
plt.ylim(int(perc_1), 100)
plt.tight_layout()
plt.margins(0)
# Save
file_name = str(out.stem) + '_plot.png'
plt.savefig(file_name, dpi=500)
def x264_probes(video: Path, ffmpeg: str):
cmd = f' ffmpeg -y -hide_banner -loglevel error -i {video.as_posix()} ' \
f'-r 4 -an {ffmpeg} -c:v libx264 -crf 0 {video.with_suffix(".mp4")}'
subprocess.run(cmd, shell=True)
def encoding_fork(min_cq, max_cq, steps):
# Make encoding fork
q = list(np.unique(np.linspace(min_cq, max_cq, num=steps, dtype=int, endpoint=True)))
# Moving highest cq to first check, for early skips
# checking highest first, lowers second, for early skips
q.insert(0, q.pop(-1))
return q
def vmaf_probes(probe, fork, ffmpeg):
params = " aomenc -q --passes=1 --threads=8 --end-usage=q --cpu-used=6 --cq-level="
cmd = [[f'ffmpeg -y -hide_banner -loglevel error -i {probe} {ffmpeg}'
f'{params}{x} -o {probe.with_name(f"v_{x}{probe.stem}")}.ivf - ',
probe, probe.with_name(f'v_{x}{probe.stem}').with_suffix('.ivf'), x] for x in fork]
return cmd
def interpolate_data(vmaf_cq: list, vmaf_target):
x = [x[1] for x in sorted(vmaf_cq)]
y = [float(x[0]) for x in sorted(vmaf_cq)]
# Interpolate data
f = interpolate.interp1d(x, y, kind='cubic')
xnew = np.linspace(min(x), max(x), max(x) - min(x))
# Getting value closest to target
tl = list(zip(xnew, f(xnew)))
vmaf_target_cq = min(tl, key=lambda x: abs(x[1] - vmaf_target))
return vmaf_target_cq, tl, f, xnew
def plot_probes(x, y, f, tl, min_cq, max_cq, probe, xnew, vmaf_target_cq, frames, temp):
# Saving plot of vmaf calculation
matplotlib.use('agg')
plt.ioff()
plt.plot(x, y, 'x', color='tab:blue', alpha=1)
plt.plot(xnew, f(xnew), color='tab:blue', alpha=1)
plt.plot(vmaf_target_cq[0], vmaf_target_cq[1], 'o', color='red', alpha=1)
plt.grid(True)
plt.xlim(min_cq, max_cq)
vmafs = [int(x[1]) for x in tl if isinstance(x[1], float) and not isnan(x[1])]
plt.ylim(min(vmafs), max(vmafs) + 1)
plt.ylabel('VMAF')
plt.xlabel('CQ')
plt.title(f'Chunk: {probe.stem}, Frames: {frames}')
# plt.tight_layout()
temp = temp / probe.stem
plt.tight_layout()
plt.savefig(temp, dpi=300, format='png',transparent=True)
plt.close()