From 844e413a05e9657eb9a6c5c3151f9bb755392ba2 Mon Sep 17 00:00:00 2001 From: Zen Date: Wed, 6 Jan 2021 06:09:11 +0200 Subject: [PATCH] wip encoding queue --- av1an/manager/Queue.py | 101 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 97 insertions(+), 4 deletions(-) diff --git a/av1an/manager/Queue.py b/av1an/manager/Queue.py index 102d7b7..1a3ae1a 100644 --- a/av1an/manager/Queue.py +++ b/av1an/manager/Queue.py @@ -1,10 +1,103 @@ - - +import time +import sys +import concurrent +import concurrent.futures from collections import deque +from av1an.project.Project import Project +from typing import List +from av1an.target_quality import (per_frame_target_quality_routine, + per_shot_target_quality_routine) +from av1an.encoder import ENCODERS +from av1an.utils import frame_probe, terminate +from av1an.resume import write_progress_file +from av1an.chunk import Chunk +from av1an.logger import log, set_log +from pathlib import Path +from .Pipes import tqdm_bar + class Queue: """ Queue manager with ability to add/remove/restart jobs """ - def __init__(self, workers): - self.queue = deque(maxlen=workers) \ No newline at end of file + def __init__(self, project, chunk_queue): + self.chunk_queue = chunk_queue + self.queue = [] + self.project = project + self.thread_executor = concurrent.futures.ThreadPoolExecutor() + + def is_empty(self): + return self.queue == [] + + def enqueue(self, chunk): + self.queue.insert(0,chunk) + + def dequeue(self): + return self.queue.pop() + + def size(self): + return len(self.queue) + + def encoding_loop(self): + with concurrent.futures.ThreadPoolExecutor(max_workers=self.project.workers) as executor: + future_cmd = {executor.submit(self.encode_chunk, cmd): cmd for cmd in self.chunk_queue} + for future in concurrent.futures.as_completed(future_cmd): + try: + future.result() + except Exception as exc: + _, _, exc_tb = sys.exc_info() + print(f'Encoding error {exc}\nAt line {exc_tb.tb_lineno}') + terminate() + self.project.counter.close() + + + def encode_chunk(self, chunk: Chunk): + """ + Encodes a chunk. + + :param chunk: The chunk to encode + :param project: The cli project + :return: None + """ + st_time = time.time() + + chunk_frames = chunk.frames + + log(f'Enc: {chunk.name}, {chunk_frames} fr\n\n') + + # Target Quality Mode + if self.project.target_quality: + if self.project.target_quality_method == 'per_shot': + per_shot_target_quality_routine(self.project, chunk) + if self.project.target_quality_method == 'per_frame': + per_frame_target_quality_routine(self.project, chunk) + + ENCODERS[self.project.encoder].on_before_chunk(self.project, chunk) + + # skip first pass if reusing + start = 2 if self.project.reuse_first_pass and self.project.passes >= 2 else 1 + + # Run all passes for this chunk + for current_pass in range(start, self.project.passes + 1): + tqdm_bar(self.project, chunk, self.project.encoder, self.project.counter, chunk_frames, self.project.passes, current_pass) + + ENCODERS[self.project.encoder].on_after_chunk(self.project, chunk) + + # get the number of encoded frames, if no check assume it worked and encoded same number of frames + encoded_frames = chunk_frames if self.project.no_check else self.frame_check_output(chunk, chunk_frames) + + # write this chunk as done if it encoded correctly + if encoded_frames == chunk_frames: + write_progress_file(Path(self.project.temp / 'done.json'), chunk, encoded_frames) + + enc_time = round(time.time() - st_time, 2) + log(f'Done: {chunk.name} Fr: {encoded_frames}/{chunk_frames}\n' + f'Fps: {round(encoded_frames / enc_time, 4)} Time: {enc_time} sec.\n\n') + + def frame_check_output(self, chunk: Chunk, expected_frames: int, last_chunk=False) -> int: + actual_frames = frame_probe(chunk.output_path) + if actual_frames != expected_frames: + msg = f':: Chunk #{chunk.name}: {actual_frames}/{expected_frames} fr' + log(msg) + print(msg) + return actual_frames \ No newline at end of file