mirror of
https://github.com/master-of-zen/Av1an.git
synced 2024-11-25 02:29:40 +00:00
175 lines
4.8 KiB
Python
Executable file
175 lines
4.8 KiB
Python
Executable file
#!/bin/env python
|
|
|
|
import atexit
|
|
import json
|
|
import os
|
|
import re
|
|
import statistics
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
from subprocess import PIPE
|
|
from threading import Lock
|
|
|
|
import cv2
|
|
import numpy as np
|
|
|
|
|
|
def startup_check():
|
|
if sys.version_info < (3, 6):
|
|
print('Python 3.6+ required')
|
|
sys.exit()
|
|
if sys.platform == 'linux':
|
|
def restore_term():
|
|
os.system("stty sane")
|
|
atexit.register(restore_term)
|
|
|
|
|
|
def terminate():
|
|
sys.exit(1)
|
|
|
|
|
|
def process_inputs(inputs):
|
|
# Check input file for being valid
|
|
if not inputs:
|
|
print('No input file')
|
|
terminate()
|
|
|
|
if inputs[0].is_dir():
|
|
inputs = [x for x in inputs[0].iterdir() if x.suffix in (".mkv", ".mp4", ".mov", ".avi", ".flv", ".m2ts")]
|
|
|
|
valid = np.array([i.exists() for i in inputs])
|
|
|
|
if not all(valid):
|
|
print(f'File(s) do not exist: {", ".join([str(inputs[i]) for i in np.where(not valid)[0]])}')
|
|
terminate()
|
|
|
|
if len(inputs) > 1:
|
|
return inputs, None
|
|
else:
|
|
return None, inputs[0]
|
|
|
|
|
|
def get_keyframes(file: Path):
|
|
"""
|
|
Read file info and return list of all keyframes
|
|
|
|
:param file: Path for input file
|
|
:return: list with frame numbers of keyframes
|
|
"""
|
|
|
|
keyframes = []
|
|
|
|
ff = ["ffmpeg", "-hide_banner", "-i", file.as_posix(),
|
|
"-vf", "select=eq(pict_type\,PICT_TYPE_I)",
|
|
"-f", "null", "-loglevel", "debug", "-"]
|
|
|
|
pipe = subprocess.Popen(ff, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
|
|
|
while True:
|
|
line = pipe.stdout.readline().strip().decode("utf-8")
|
|
|
|
if len(line) == 0 and pipe.poll() is not None:
|
|
break
|
|
|
|
match = re.search(r"n:([0-9]+)\.[0-9]+ pts:.+key:1", line)
|
|
if match:
|
|
keyframe = int(match.group(1))
|
|
keyframes.append(keyframe)
|
|
|
|
return keyframes
|
|
|
|
|
|
def get_cq(command):
|
|
"""
|
|
Return cq values from command
|
|
:param command: string with commands for encoder
|
|
:return: list with frame numbers of keyframes
|
|
|
|
"""
|
|
matches = re.findall(r"--cq-level= *([^ ]+?) ", command)
|
|
return int(matches[-1])
|
|
|
|
|
|
def man_cq(command: str, cq: int):
|
|
"""Return command with new cq value"""
|
|
mt = '--cq-level='
|
|
cmd = command[:command.find(mt) + 11] + str(cq) + command[command.find(mt) + 13:]
|
|
return cmd
|
|
|
|
|
|
def frame_probe_fast(source: Path):
|
|
video = cv2.VideoCapture(source.as_posix())
|
|
total = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
|
|
return total
|
|
|
|
|
|
def frame_probe(source: Path):
|
|
"""Get frame count."""
|
|
cmd = ["ffmpeg", "-hide_banner", "-i", source.as_posix(), "-map", "0:v:0", "-f", "null", "-"]
|
|
r = subprocess.run(cmd, stdout=PIPE, stderr=PIPE)
|
|
matches = re.findall(r"frame=\s*([0-9]+)\s", r.stderr.decode("utf-8") + r.stdout.decode("utf-8"))
|
|
return int(matches[-1])
|
|
|
|
|
|
doneFileLock = Lock()
|
|
def frame_check(source: Path, encoded: Path, temp, check):
|
|
"""Checking is source and encoded video frame count match."""
|
|
try:
|
|
status_file = Path(temp / 'done.json')
|
|
|
|
if check:
|
|
s1 = frame_probe(source)
|
|
doneFileLock.acquire()
|
|
with status_file.open() as f:
|
|
d = json.load(f)
|
|
d['done'][source.name] = s1
|
|
with status_file.open('w') as f:
|
|
json.dump(d, f)
|
|
else:
|
|
s1, s2 = [frame_probe(i) for i in (source, encoded)]
|
|
if s1 == s2:
|
|
doneFileLock.acquire()
|
|
with status_file.open() as f:
|
|
d = json.load(f)
|
|
d['done'][source.name] = s1
|
|
with status_file.open('w') as f:
|
|
json.dump(d, f)
|
|
else:
|
|
print(f'Frame Count Differ for Source {source.name}: {s2}/{s1}')
|
|
except IndexError:
|
|
print('Encoding failed, check validity of your encoding settings/commands and start again')
|
|
terminate()
|
|
except Exception as e:
|
|
_, _, exc_tb = sys.exc_info()
|
|
print(f'\nError frame_check: {e}\nAt line: {exc_tb.tb_lineno}\n')
|
|
finally:
|
|
if doneFileLock.locked():
|
|
doneFileLock.release()
|
|
|
|
|
|
def get_brightness(video):
|
|
"""Getting average brightness value for single video."""
|
|
brightness = []
|
|
cap = cv2.VideoCapture(video)
|
|
try:
|
|
while True:
|
|
# Capture frame-by-frame
|
|
_, frame = cap.read()
|
|
|
|
# Our operations on the frame come here
|
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
|
|
|
# Display the resulting frame
|
|
mean = cv2.mean(gray)
|
|
brightness.append(mean[0])
|
|
if cv2.waitKey(1) & 0xFF == ord('q'):
|
|
break
|
|
except cv2.error:
|
|
pass
|
|
|
|
# When everything done, release the capture
|
|
cap.release()
|
|
brig_geom = round(statistics.geometric_mean([x + 1 for x in brightness]), 1)
|
|
|
|
return brig_geom
|