From 500696d8cd271aed42a68a67b31bda5ab41c13f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicolas=20M=C3=A9meint?= Date: Sat, 8 Feb 2020 00:25:04 +0100 Subject: [PATCH] Use pathlib Paths + type annotations --- README.md | 2 +- av1an.py | 139 ++++++++++++++++++++++++++---------------------------- 2 files changed, 69 insertions(+), 72 deletions(-) diff --git a/README.md b/README.md index f0da00a..0093257 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,7 @@ With your own parameters: 4 rav1e workers is optimal for 6/12 cpu -s --scenes Path to file with scenes timestamps. - If given `0` spliting will be ingnored + If given `0` spliting will be ignored If file not exist, new will be generated in current folder First run to generate stamps, all next reuse it. Example: `-s scenes` diff --git a/av1an.py b/av1an.py index 173e45f..9c9e6cb 100755 --- a/av1an.py +++ b/av1an.py @@ -11,11 +11,12 @@ import shutil from os.path import join from psutil import virtual_memory import argparse -from shutil import rmtree from math import ceil from multiprocessing import Pool import multiprocessing import subprocess +from pathlib import Path +from typing import Optional, Union try: import scenedetect @@ -37,7 +38,7 @@ if sys.version_info < (3, 7): class Av1an: def __init__(self): - self.here = os.getcwd() + self.temp_dir = Path('.temp') self.FFMPEG = 'ffmpeg -hide_banner -loglevel error' self.pix_format = 'yuv420p' self.encoder = 'aom' @@ -50,9 +51,10 @@ class Av1an: self.logging = None self.args = None self.encoding_params = '' - self.output_file = '' + self.output_file: Optional[Path] = None self.pyscene = '' - self.scenes = '' + self.scenes: Optional[Path] = None + self.skip_scenes = False def call_cmd(self, cmd, capture_output=False): if capture_output: @@ -68,14 +70,14 @@ class Av1an: parser = argparse.ArgumentParser() parser.add_argument('--mode', '-m', type=int, default=self.mode, help='Mode 0 - video, Mode 1 - image') parser.add_argument('--encoding_params', '-e', type=str, default=self.encoding_params, help='encoding settings') - parser.add_argument('--file_path', '-i', type=str, default='bruh.mp4', help='Input File', required=True) + parser.add_argument('--file_path', '-i', type=Path, default='bruh.mp4', help='Input File', required=True) parser.add_argument('--encoder', '-enc', type=str, default=self.encoder, help='Choosing encoder') parser.add_argument('--workers', '-t', type=int, default=0, help='Number of workers') parser.add_argument('--audio_params', '-a', type=str, default='-c:a copy', help='FFmpeg audio settings') parser.add_argument('--threshold', '-tr', type=float, default=self.threshold, help='PySceneDetect Threshold') parser.add_argument('--logging', '-log', type=str, default=self.logging, help='Enable logging') parser.add_argument('--encode_pass', '-p', type=int, default=self.encode_pass, help='Specify encoding passes') - parser.add_argument('--output_file', '-o', type=str, default='', help='Specify output file') + parser.add_argument('--output_file', '-o', type=Path, default=None, help='Specify output file') parser.add_argument('--ffmpeg_com', '-ff', type=str, default='', help='FFmpeg commands') parser.add_argument('--pix_format', '-fmt', type=str, default=self.pix_format, help='FFmpeg pixel format') parser.add_argument('--scenes', '-s', type=str, default=self.scenes, help='File location for scenes') @@ -84,7 +86,12 @@ class Av1an: self.args = parser.parse_args() # Set scenes if provided - self.scenes = self.args.scenes + if self.args.scenes: + scenes = self.args.scenes.strip() + if scenes == '0': + self.skip_scenes = True + else: + self.scenes = Path(scenes) self.threshold = self.args.threshold @@ -100,6 +107,12 @@ class Av1an: # Number of encoder passes self.encode_pass = self.args.encode_pass + # Set output file + if self.args.output_file is None: + self.output_file = Path(f'{self.args.file_path.stem}_av1.mkv') + else: + self.output_file = self.args.output_file.with_suffix('.mkv') + # Forcing FPS option if self.args.ffmpeg_com == 0: self.ffmpeg_com = '' @@ -140,48 +153,49 @@ class Av1an: if self.workers == 0: self.workers += 1 - def setup(self, input_file): - if not os.path.exists(input_file): + def setup(self, input_file: Path): + if not input_file.exists(): print("File don't exist") sys.exit() # Make temporal directories, and remove them if already presented - if os.path.isdir(join(os.getcwd(), ".temp")): - rmtree(join(self.here, ".temp")) + if self.temp_dir.is_dir(): + shutil.rmtree(self.temp_dir) - os.makedirs(join(self.here, '.temp', 'split')) - os.makedirs(join(self.here, '.temp', 'encode')) + (self.temp_dir / 'split').mkdir(parents=True) + (self.temp_dir / 'encode').mkdir() - def extract_audio(self, input_vid): + def extract_audio(self, input_vid: Path): # Extracting audio from video file # Encoding audio if needed ffprobe = 'ffprobe -hide_banner -loglevel error -show_streams -select_streams a' # Capture output to check if audio is present - check = fr'{ffprobe} -i {join(self.here,input_vid)}' + check = fr'{ffprobe} -i {input_vid}' is_audio_here = len(self.call_cmd(check, capture_output=True)) > 0 if is_audio_here: - cmd = f'{self.FFMPEG} -i {join(self.here, input_vid)} -vn ' \ - f'{self.args.audio_params} {join(os.getcwd(), ".temp", "audio.mkv")}' + cmd = f'{self.FFMPEG} -i {input_vid} -vn ' \ + f'{self.args.audio_params} {self.temp_dir / "audio.mkv"}' self.call_cmd(cmd) - def scenedetect(self, video): - # PySceneDetect used split video by scenes and pass it to encoder - # Optimal threshold settings 15-50 - video_manager = VideoManager([video]) - scene_manager = SceneManager() - scene_manager.add_detector(ContentDetector(threshold=self.threshold)) - base_timecode = video_manager.get_base_timecode() + def scenedetect(self, video: Path): + # Skip scene detection if the user choosed to + if self.skip_scenes: + return '' try: - if self.scenes.isdigit(): - if int(self.scenes) == 0: - return '' + # PySceneDetect used split video by scenes and pass it to encoder + # Optimal threshold settings 15-50 + video_manager = VideoManager([str(video)]) + scene_manager = SceneManager() + scene_manager.add_detector(ContentDetector(threshold=self.threshold)) + base_timecode = video_manager.get_base_timecode() + # If stats file exists, load it. - if self.scenes and os.path.exists(join(self.here, self.scenes)): + if self.scenes and self.scenes.exists(): # Read stats from CSV file opened in read mode: - with open(join(self.here, self.scenes), 'r') as stats_file: + with open(self.scenes(), 'r') as stats_file: stats = stats_file.read() return stats @@ -210,11 +224,9 @@ class Av1an: # We only write to the stats file if a save is required: if self.scenes: - with open(join(self.here, self.scenes), 'w') as stats_file: + with open(self.scenes, 'w') as stats_file: stats_file.write(scenes) - return scenes - else: - return scenes + return scenes except Exception: print('Error in PySceneDetect') sys.exit() @@ -225,25 +237,17 @@ class Av1an: # If video is single scene, just copy video if len(timecodes) == 0: - cmd = f'{self.FFMPEG} -i {video} -map_metadata 0 -an -c copy -avoid_negative_ts 1 .temp/split/0.mkv' + cmd = f'{self.FFMPEG} -i {video} -map_metadata 0 -an -c copy -avoid_negative_ts 1 {self.temp_dir / "split" / "0.mkv"}' else: - cmd = f'{self.FFMPEG} -i {video} -map_metadata 0 -an -f segment -segment_times {timecodes} ' \ - f'-c copy -avoid_negative_ts 1 .temp/split/%04d.mkv' + cmd = f'{self.FFMPEG} -i {video} -map_metadata 0 -an -f segment -segment_times {timecodes} ' \ + f'-c copy -avoid_negative_ts 1 {self.temp_dir / "split" / "%04d.mkv"}' self.call_cmd(cmd) - def get_video_queue(self, source_path): + def get_video_queue(self, source_path: Path): # Returns sorted list of all videos that need to be encoded. Big first - - videos = [] - for root, dirs, files in os.walk(source_path): - for file in files: - f = os.path.getsize(os.path.join(root, file)) - videos.append([file, f]) - - videos = [i[0] for i in sorted(videos, key=lambda x: -x[1])] - return videos + return sorted(source_path.iterdir(), key=lambda f: -f.stat().st_size) def svt_av1_encode(self, file_paths): @@ -293,7 +297,7 @@ class Av1an: if self.encode_pass == 2: pass_2_commands = [ (f'-i {file[0]} {self.ffmpeg_pipe}' + - f' {two_pass_1_aom} {self.encoding_params} --fpf={file[0]}.log -o /dev/null - ', + f' {two_pass_1_aom} {self.encoding_params} --fpf={file[0]}.log -o {os.devnull} - ', f'-i {file[0]} {self.ffmpeg_pipe}' + f' {two_pass_2_aom} {self.encoding_params} --fpf={file[0]}.log -o {file[1]}.ivf - ', file[2]) @@ -329,10 +333,9 @@ class Av1an: return pass_2_commands def compose_encoding_queue(self, files): - - file_paths = [(f'{join(os.getcwd(), ".temp", "split", file_name)}', - f'{join(os.getcwd(), ".temp", "encode", file_name)}', - file_name) for file_name in files] + file_paths = [(f'{self.temp_dir / "split" / file.name}', + f'{self.temp_dir / "encode" / file.name}', + str(file)) for file in files] if self.encoder == 'aom': return self.aom_encode(file_paths) @@ -357,31 +360,25 @@ class Av1an: cmd = rf'{self.FFMPEG} {i}' self.call_cmd(cmd) - def concatenate_video(self, input_video): + def concatenate_video(self): # Using FFMPEG to concatenate all encoded videos to 1 file. # Reading all files in A-Z order and saving it to concat.txt - with open(f'{join(self.here, ".temp", "concat.txt")}', 'w') as f: - - for root, firs, files in os.walk(join(self.here, '.temp', 'encode')): - for file in sorted(files): - f.write(f"file '{join(root, file)}'\n") - - concat = join(self.here, ".temp", "concat.txt") + concat = self.temp_dir / "concat.txt" + with open(f'{concat}', 'w') as f: + # Write all files that need to be concatenated + # Their path must be relative to the directory where "concat.txt" is + encode_files = sorted((self.temp_dir / 'encode').iterdir()) + f.writelines(f"file '{file.relative_to(self.temp_dir)}'\n" for file in encode_files) # Add the audio file if one was extracted from the input - audio_file = join(self.here, ".temp", "audio.mkv") - if os.path.exists(audio_file): + audio_file = self.temp_dir / "audio.mkv" + if audio_file.exists(): audio = f'-i {audio_file} -c:a copy' else: audio = '' - if self.output_file == self.args.output_file: - self.output_file = f'{input_video.split(".")[0]}_av1.mkv' - else: - self.output_file = f'{join(self.here, self.args.output_file)}.mkv' - try: cmd = f'{self.FFMPEG} -f concat -safe 0 -i {concat} {audio} -c copy -y {self.output_file}' self.call_cmd(cmd) @@ -390,11 +387,11 @@ class Av1an: print('Concatenation failed') sys.exit() - def image(self, image_path): + def image(self, image_path: Path): print('Encoding Image..', end='') image_pipe = rf'{self.FFMPEG} -i {image_path} -pix_fmt yuv420p10le -f yuv4mpegpipe -strict -1 - | ' - output = rf'{"".join(image_path.split(".")[:-1])}.ivf' + output = image_path.with_suffix('.ivf') if self.encoder == 'aom': aom = ' aomenc --passes=1 --pass=1 --end-usage=q -b 10 --input-bit-depth=10 ' cmd = (rf' {image_pipe} ' + @@ -421,7 +418,7 @@ class Av1an: # Splitting video and sorting big-first timestamps = self.scenedetect(self.args.file_path) self.split(self.args.file_path, timestamps) - files = self.get_video_queue('.temp/split') + files = self.get_video_queue(self.temp_dir / 'split') # Extracting audio self.extract_audio(self.args.file_path) @@ -444,10 +441,10 @@ class Av1an: for i, _ in enumerate(tqdm(pool.imap_unordered(self.encode, commands), total=len(files), leave=True), 1): pass - self.concatenate_video(self.args.file_path) + self.concatenate_video() # Delete temp folders - rmtree(join(self.here, ".temp")) + shutil.rmtree(self.temp_dir) elif self.mode == 1: self.image(self.args.file_path)