Merge pull request #14 from UniversityRadioYork/audio-markers
Audio Marker Support
This commit is contained in:
commit
0dcd0b4ae6
8 changed files with 455 additions and 143 deletions
0
baps_types/__init__.py
Normal file
0
baps_types/__init__.py
Normal file
68
baps_types/marker.py
Normal file
68
baps_types/marker.py
Normal file
|
@ -0,0 +1,68 @@
|
|||
import json
|
||||
from typing import Dict, Literal, Optional, Union
|
||||
|
||||
POSITIONS = ["start", "mid", "end"]
|
||||
PARAMS = ["name", "time", "position", "section"]
|
||||
|
||||
|
||||
class Marker:
|
||||
marker: Dict
|
||||
|
||||
def __init__(self, new_marker: Union[str, dict]):
|
||||
marker: dict
|
||||
try:
|
||||
if isinstance(new_marker, str):
|
||||
marker = json.loads(new_marker)
|
||||
else:
|
||||
marker = new_marker
|
||||
except Exception as e:
|
||||
raise ValueError("Failed to decode JSON for marker: {}".format(e))
|
||||
|
||||
for key in marker.keys():
|
||||
if key not in PARAMS:
|
||||
raise ValueError("Key {} is not a valid marker parameter.".format(key))
|
||||
|
||||
if not isinstance(marker["name"], str):
|
||||
raise ValueError("Name is not str.")
|
||||
|
||||
if not (isinstance(marker["time"], int) or isinstance(marker["time"], float)):
|
||||
raise ValueError("Time is not a float or int")
|
||||
|
||||
if marker["position"] not in POSITIONS:
|
||||
raise ValueError("Position is not in allowed values.")
|
||||
|
||||
if not (marker["section"] is None or isinstance(marker["section"], str)):
|
||||
raise ValueError("Section name is not str or None.")
|
||||
|
||||
marker["time"] = float(marker["time"])
|
||||
# If everything checks out, let's save it.
|
||||
self.marker = marker
|
||||
|
||||
@property
|
||||
def __str__(self) -> str:
|
||||
return json.dumps(self.marker)
|
||||
|
||||
@property
|
||||
def __dict__(self) -> dict:
|
||||
return self.marker
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return self.marker["name"]
|
||||
|
||||
@property
|
||||
def time(self) -> float:
|
||||
return float(self.marker["time"])
|
||||
|
||||
@property
|
||||
def position(self) -> str:
|
||||
return self.marker["position"]
|
||||
|
||||
@property
|
||||
def section(self) -> Optional[str]:
|
||||
return self.marker["section"]
|
||||
|
||||
def same_type(self, o: object) -> bool:
|
||||
if not isinstance(o, Marker):
|
||||
return False
|
||||
return o.position == self.position and o.section == self.section
|
211
baps_types/plan.py
Normal file
211
baps_types/plan.py
Normal file
|
@ -0,0 +1,211 @@
|
|||
"""
|
||||
BAPSicle Server
|
||||
Next-gen audio playout server for University Radio York playout,
|
||||
based on WebStudio interface.
|
||||
|
||||
Show Plan Items
|
||||
|
||||
Authors:
|
||||
Michael Grace
|
||||
|
||||
Date:
|
||||
November 2020
|
||||
"""
|
||||
|
||||
|
||||
import json
|
||||
from typing import Any, Dict, List, Optional
|
||||
import os
|
||||
|
||||
from baps_types.marker import Marker
|
||||
|
||||
class PlanItem:
|
||||
_timeslotitemid: int = 0
|
||||
_weight: int = 0
|
||||
_filename: Optional[str]
|
||||
_title: str
|
||||
_artist: Optional[str]
|
||||
_trackid: Optional[int]
|
||||
_managedid: Optional[int]
|
||||
_markers: List[Marker] = []
|
||||
|
||||
@property
|
||||
def weight(self) -> int:
|
||||
return self._weight
|
||||
|
||||
@weight.setter
|
||||
def weight(self, value: int):
|
||||
self._weight = value
|
||||
|
||||
@property
|
||||
def timeslotitemid(self) -> int:
|
||||
return self._timeslotitemid
|
||||
|
||||
@property
|
||||
def filename(self) -> Optional[str]:
|
||||
return self._filename
|
||||
|
||||
@filename.setter
|
||||
def filename(self, value: Optional[str]):
|
||||
self._filename = value
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return (
|
||||
"{0} - {1}".format(self._title, self._artist)
|
||||
if self._artist
|
||||
else self._title
|
||||
)
|
||||
|
||||
@property
|
||||
def trackid(self) -> Optional[int]:
|
||||
return self._trackid
|
||||
|
||||
@property
|
||||
def managedid(self) -> Optional[int]:
|
||||
return self._managedid
|
||||
|
||||
@property
|
||||
def title(self) -> Optional[str]:
|
||||
return self._title
|
||||
|
||||
@property
|
||||
def artist(self) -> Optional[str]:
|
||||
return self._artist
|
||||
|
||||
@property
|
||||
def length(self) -> Optional[str]:
|
||||
return self._length
|
||||
|
||||
@property
|
||||
def type(self) -> Optional[str]:
|
||||
return "aux" if self.managedid else "central"
|
||||
|
||||
@property
|
||||
def intro(self) -> float:
|
||||
markers = list(filter(lambda m: m.position == "start" and m.section is None, self._markers))
|
||||
# TODO: Handle multiple (shouldn't happen?)
|
||||
if len(markers) > 0:
|
||||
return markers[0].time
|
||||
return 0
|
||||
|
||||
@property
|
||||
def cue(self) -> float:
|
||||
markers = list(filter(lambda m: m.position == "mid" and m.section is None, self._markers))
|
||||
# TODO: Handle multiple (shouldn't happen?)
|
||||
if len(markers) > 0:
|
||||
return markers[0].time
|
||||
return 0
|
||||
|
||||
@property
|
||||
def outro(self) -> float:
|
||||
markers = list(filter(lambda m: m.position == "end" and m.section is None, self._markers))
|
||||
# TODO: Handle multiple (shouldn't happen?)
|
||||
if len(markers) > 0:
|
||||
return markers[0].time
|
||||
return 0
|
||||
|
||||
@property
|
||||
def markers(self) -> List[dict]:
|
||||
return [repr.__dict__ for repr in self._markers]
|
||||
|
||||
@property
|
||||
def __dict__(self):
|
||||
return {
|
||||
"weight": self.weight,
|
||||
"timeslotitemid": self.timeslotitemid,
|
||||
"trackid": self._trackid,
|
||||
"type": self.type,
|
||||
"managedid": self._managedid,
|
||||
"title": self._title,
|
||||
"artist": self._artist,
|
||||
"name": self.name,
|
||||
"filename": self.filename,
|
||||
"length": self.length,
|
||||
"intro": self.intro,
|
||||
"cue": self.cue,
|
||||
"outro": self.outro,
|
||||
"markers": self.markers
|
||||
}
|
||||
|
||||
def __init__(self, new_item: Dict[str, Any]):
|
||||
self._timeslotitemid = new_item["timeslotitemid"]
|
||||
self._managedid = new_item["managedid"] if "managedid" in new_item else None
|
||||
self._trackid = (
|
||||
int(new_item["trackid"])
|
||||
if "trackid" in new_item and not self._managedid
|
||||
else None
|
||||
)
|
||||
self._filename = (
|
||||
new_item["filename"] if "filename" in new_item else None
|
||||
) # This could be a temp dir for API-downloaded items, or a mapped drive.
|
||||
self._weight = int(new_item["weight"])
|
||||
self._title = new_item["title"]
|
||||
self._artist = new_item["artist"] if "artist" in new_item else None
|
||||
self._length = new_item["length"]
|
||||
self._markers = (
|
||||
[Marker(marker) for marker in new_item["markers"]] if "markers" in new_item else []
|
||||
)
|
||||
|
||||
# TODO: Edit this to handle markers when MyRadio supports them
|
||||
if "intro" in new_item and (isinstance(new_item["intro"], int) or isinstance(new_item["intro"], float)) and new_item["intro"] > 0:
|
||||
marker = {
|
||||
"name": "Intro",
|
||||
"time": new_item["intro"],
|
||||
"position": "start",
|
||||
"section": None
|
||||
}
|
||||
self.set_marker(Marker(json.dumps(marker)))
|
||||
if "cue" in new_item and (isinstance(new_item["cue"], int) or isinstance(new_item["cue"], float)) and new_item["cue"] > 0:
|
||||
marker = {
|
||||
"name": "Cue",
|
||||
"time": new_item["cue"],
|
||||
"position": "mid",
|
||||
"section": None
|
||||
}
|
||||
self.set_marker(Marker(json.dumps(marker)))
|
||||
# TODO: Convert / handle outro being from end of item.
|
||||
if "outro" in new_item and (isinstance(new_item["outro"], int) or isinstance(new_item["outro"], float)) and new_item["outro"] > 0:
|
||||
marker = {
|
||||
"name": "Outro",
|
||||
"time": new_item["outro"],
|
||||
"position": "end",
|
||||
"section": None
|
||||
}
|
||||
self.set_marker(Marker(json.dumps(marker)))
|
||||
|
||||
|
||||
# Fix any OS specific / or \'s
|
||||
if self.filename:
|
||||
if os.path.sep == "/":
|
||||
self._filename = self.filename.replace("\\", "/")
|
||||
else:
|
||||
self._filename = self.filename.replace("/", "\\")
|
||||
|
||||
def __eq__(self, o: object) -> bool:
|
||||
if not isinstance(o, PlanItem):
|
||||
return False
|
||||
|
||||
return o.__dict__ == self.__dict__
|
||||
|
||||
def set_marker(self, new_marker: Marker):
|
||||
if not isinstance(new_marker, Marker):
|
||||
raise ValueError("Marker provided is not of type Marker.")
|
||||
|
||||
replaced = False
|
||||
new_markers = []
|
||||
for marker in self._markers:
|
||||
if marker.same_type(new_marker):
|
||||
new_markers.append(new_marker)
|
||||
# Replace marker
|
||||
replaced = True
|
||||
else:
|
||||
new_markers.append(marker)
|
||||
|
||||
if not replaced:
|
||||
new_markers.append(new_marker)
|
||||
|
||||
self._markers = new_markers
|
||||
|
||||
# Return updated item for easy chaining.
|
||||
return self
|
|
@ -22,15 +22,15 @@ import json
|
|||
from logging import INFO
|
||||
import os
|
||||
|
||||
from plan import PlanItem
|
||||
from baps_types.plan import PlanItem
|
||||
from helpers.os_environment import resolve_external_file_path
|
||||
from helpers.logging_manager import LoggingManager
|
||||
from helpers.state_manager import StateManager
|
||||
|
||||
|
||||
class MyRadioAPI:
|
||||
# TODO Config type
|
||||
logger = None
|
||||
logger: LoggingManager
|
||||
config: StateManager
|
||||
|
||||
def __init__(self, logger: LoggingManager, config: StateManager):
|
||||
self.logger = logger
|
||||
|
|
|
@ -6,7 +6,7 @@ from datetime import datetime
|
|||
from copy import copy
|
||||
from typing import Any, List
|
||||
|
||||
from plan import PlanItem
|
||||
from baps_types.plan import PlanItem
|
||||
from helpers.logging_manager import LoggingManager
|
||||
from helpers.os_environment import resolve_external_file_path
|
||||
|
||||
|
@ -99,9 +99,6 @@ class StateManager:
|
|||
self.__state = copy(state)
|
||||
|
||||
def write_to_file(self, state):
|
||||
if self.__state_in_file == state:
|
||||
# No change to be updated.
|
||||
return
|
||||
|
||||
self.__state_in_file = state
|
||||
|
||||
|
|
116
plan.py
116
plan.py
|
@ -1,116 +0,0 @@
|
|||
"""
|
||||
BAPSicle Server
|
||||
Next-gen audio playout server for University Radio York playout,
|
||||
based on WebStudio interface.
|
||||
|
||||
Show Plan Items
|
||||
|
||||
Authors:
|
||||
Michael Grace
|
||||
|
||||
Date:
|
||||
November 2020
|
||||
"""
|
||||
|
||||
from typing import Any, Dict, Optional
|
||||
import os
|
||||
|
||||
|
||||
class PlanItem:
|
||||
_timeslotitemid: int = 0
|
||||
_weight: int = 0
|
||||
_filename: Optional[str]
|
||||
_title: str
|
||||
_artist: Optional[str]
|
||||
_trackid: Optional[int]
|
||||
_managedid: Optional[int]
|
||||
|
||||
@property
|
||||
def weight(self) -> int:
|
||||
return self._weight
|
||||
|
||||
@weight.setter
|
||||
def weight(self, value: int):
|
||||
self._weight = value
|
||||
|
||||
@property
|
||||
def timeslotitemid(self) -> int:
|
||||
return self._timeslotitemid
|
||||
|
||||
@property
|
||||
def filename(self) -> Optional[str]:
|
||||
return self._filename
|
||||
|
||||
@filename.setter
|
||||
def filename(self, value: Optional[str]):
|
||||
self._filename = value
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return (
|
||||
"{0} - {1}".format(self._title, self._artist)
|
||||
if self._artist
|
||||
else self._title
|
||||
)
|
||||
|
||||
@property
|
||||
def trackid(self) -> Optional[int]:
|
||||
return self._trackid
|
||||
|
||||
@property
|
||||
def managedid(self) -> Optional[int]:
|
||||
return self._managedid
|
||||
|
||||
@property
|
||||
def title(self) -> Optional[str]:
|
||||
return self._title
|
||||
|
||||
@property
|
||||
def artist(self) -> Optional[str]:
|
||||
return self._artist
|
||||
|
||||
@property
|
||||
def length(self) -> Optional[str]:
|
||||
return self._length
|
||||
|
||||
@property
|
||||
def type(self) -> Optional[str]:
|
||||
return "aux" if self.managedid else "central"
|
||||
|
||||
@property
|
||||
def __dict__(self):
|
||||
return {
|
||||
"weight": self.weight,
|
||||
"timeslotitemid": self.timeslotitemid,
|
||||
"trackid": self._trackid,
|
||||
"type": self.type,
|
||||
"managedid": self._managedid,
|
||||
"title": self._title,
|
||||
"artist": self._artist,
|
||||
"name": self.name,
|
||||
"filename": self.filename,
|
||||
"length": self.length,
|
||||
}
|
||||
|
||||
def __init__(self, new_item: Dict[str, Any]):
|
||||
self._timeslotitemid = new_item["timeslotitemid"]
|
||||
self._managedid = new_item["managedid"] if "managedid" in new_item else None
|
||||
self._trackid = (
|
||||
int(new_item["trackid"])
|
||||
if "trackid" in new_item and not self._managedid
|
||||
else None
|
||||
)
|
||||
self._filename = (
|
||||
new_item["filename"] if "filename" in new_item else None
|
||||
) # This could be a temp dir for API-downloaded items, or a mapped drive.
|
||||
self._weight = int(new_item["weight"])
|
||||
self._title = new_item["title"]
|
||||
self._artist = new_item["artist"] if "artist" in new_item else None
|
||||
self._length = new_item["length"]
|
||||
|
||||
# Fix any OS specific / or \'s
|
||||
if self.filename:
|
||||
if os.path.sep == "/":
|
||||
self._filename = self.filename.replace("\\", "/")
|
||||
else:
|
||||
self._filename = self.filename.replace("/", "\\")
|
108
player.py
108
player.py
|
@ -36,8 +36,8 @@ from mutagen.mp3 import MP3
|
|||
from helpers.myradio_api import MyRadioAPI
|
||||
from helpers.state_manager import StateManager
|
||||
from helpers.logging_manager import LoggingManager
|
||||
from plan import PlanItem
|
||||
|
||||
from baps_types.plan import PlanItem
|
||||
from baps_types.marker import Marker
|
||||
|
||||
# TODO ENUM
|
||||
VALID_MESSAGE_SOURCES = ["WEBSOCKET", "UI", "CONTROLLER", "TEST", "ALL"]
|
||||
|
@ -88,7 +88,7 @@ class Player:
|
|||
return True
|
||||
|
||||
@property
|
||||
def isPlaying(self):
|
||||
def isPlaying(self) -> bool:
|
||||
if self.isInit:
|
||||
return (not self.isPaused) and bool(mixer.music.get_busy())
|
||||
return False
|
||||
|
@ -99,11 +99,18 @@ class Player:
|
|||
|
||||
@property
|
||||
def isLoaded(self):
|
||||
return self._isLoaded()
|
||||
|
||||
def _isLoaded(self, short_test: bool = False):
|
||||
if not self.state.state["loaded_item"]:
|
||||
return False
|
||||
if self.isPlaying:
|
||||
return True
|
||||
|
||||
# If we don't want to do any testing if it's really loaded, fine.
|
||||
if short_test:
|
||||
return True
|
||||
|
||||
# Because Pygame/SDL is annoying
|
||||
# We're not playing now, so we can quickly test run
|
||||
# If that works, we're loaded.
|
||||
|
@ -127,6 +134,13 @@ class Player:
|
|||
mixer.music.set_volume(1)
|
||||
return True
|
||||
|
||||
@property
|
||||
def isCued(self):
|
||||
# Don't mess with playback, we only care about if it's supposed to be loaded.
|
||||
if not self._isLoaded(short_test=True):
|
||||
return False
|
||||
return (self.state.state["pos_true"] == self.state.state["loaded_item"].cue and not self.isPlaying)
|
||||
|
||||
@property
|
||||
def status(self):
|
||||
state = copy.copy(self.state.state)
|
||||
|
@ -159,7 +173,7 @@ class Player:
|
|||
|
||||
def pause(self):
|
||||
try:
|
||||
mixer.music.pause()
|
||||
mixer.music.stop()
|
||||
except Exception:
|
||||
self.logger.log.exception("Failed to pause.")
|
||||
return False
|
||||
|
@ -183,22 +197,31 @@ class Player:
|
|||
return True
|
||||
return False
|
||||
|
||||
def stop(self):
|
||||
# if self.isPlaying or self.isPaused:
|
||||
def stop(self, user_initiated: bool = False):
|
||||
try:
|
||||
mixer.music.stop()
|
||||
except Exception:
|
||||
self.logger.log.exception("Failed to stop playing.")
|
||||
return False
|
||||
self.state.update("pos", 0)
|
||||
self.state.update("pos_offset", 0)
|
||||
self.state.update("pos_true", 0)
|
||||
self.state.update("paused", False)
|
||||
|
||||
self.stopped_manually = True
|
||||
|
||||
if not self.state.state["loaded_item"]:
|
||||
self.logger.log.warning("Tried to stop without a loaded item.")
|
||||
return True
|
||||
|
||||
# This lets users toggle (using the stop button) between cue point and 0.
|
||||
if user_initiated and not self.isCued:
|
||||
# if there's a cue point ant we're not at it, go there.
|
||||
self.seek(self.state.state["loaded_item"].cue)
|
||||
else:
|
||||
# Otherwise, let's go to 0.
|
||||
self.state.update("pos", 0)
|
||||
self.state.update("pos_offset", 0)
|
||||
self.state.update("pos_true", 0)
|
||||
|
||||
return True
|
||||
# return False
|
||||
|
||||
def seek(self, pos: float) -> bool:
|
||||
if self.isPlaying:
|
||||
|
@ -209,6 +232,7 @@ class Player:
|
|||
return False
|
||||
return True
|
||||
else:
|
||||
self.stopped_manually = True # Don't trigger _ended() on seeking.
|
||||
self.state.update("paused", True)
|
||||
self._updateState(pos=pos)
|
||||
return True
|
||||
|
@ -325,7 +349,7 @@ class Player:
|
|||
if showplan[i].weight == weight:
|
||||
self.state.update("show_plan", index=i, value=loaded_item)
|
||||
break
|
||||
# TODO: Update the show plan filenames
|
||||
# TODO: Update the show plan filenames???
|
||||
|
||||
try:
|
||||
self.logger.log.info("Loading file: " +
|
||||
|
@ -352,6 +376,9 @@ class Player:
|
|||
"Failed to update the length of item.")
|
||||
return False
|
||||
|
||||
if loaded_item.cue > 0:
|
||||
self.seek(loaded_item.cue)
|
||||
|
||||
if self.state.state["play_on_load"]:
|
||||
self.play()
|
||||
|
||||
|
@ -402,9 +429,54 @@ class Player:
|
|||
|
||||
return True
|
||||
|
||||
def ended(self):
|
||||
def set_marker(self, timeslotitemid: int, marker_str: str):
|
||||
set_loaded = False
|
||||
success = True
|
||||
try:
|
||||
marker = Marker(marker_str)
|
||||
except Exception as e:
|
||||
self.logger.log.error("Failed to create Marker instance with {} {}: {}".format(timeslotitemid, marker_str, e))
|
||||
return False
|
||||
|
||||
if timeslotitemid == -1:
|
||||
set_loaded = True
|
||||
if not self.isLoaded:
|
||||
return False
|
||||
timeslotitemid = self.state.state["loaded_item"].timeslotitemid
|
||||
|
||||
plan_copy: List[PlanItem] = copy.copy(self.state.state["show_plan"])
|
||||
for i in range(len(self.state.state["show_plan"])):
|
||||
|
||||
item = plan_copy[i]
|
||||
|
||||
if item.timeslotitemid == timeslotitemid:
|
||||
try:
|
||||
new_item = item.set_marker(marker)
|
||||
self.state.update("show_plan", new_item, index=i)
|
||||
|
||||
except Exception as e:
|
||||
self.logger.log.error(
|
||||
"Failed to set marker on item {}: {} with marker \n{}".format(timeslotitemid, e, marker))
|
||||
success = False
|
||||
|
||||
if set_loaded:
|
||||
try:
|
||||
self.state.update("loaded_item", self.state.state["loaded_item"].set_marker(marker))
|
||||
except Exception as e:
|
||||
self.logger.log.error(
|
||||
"Failed to set marker on loaded_item {}: {} with marker \n{}".format(timeslotitemid, e, marker))
|
||||
success = False
|
||||
|
||||
return success
|
||||
|
||||
# Helper functions
|
||||
|
||||
def _ended(self):
|
||||
loaded_item = self.state.state["loaded_item"]
|
||||
|
||||
if not loaded_item:
|
||||
return
|
||||
|
||||
# Track has ended
|
||||
print("Finished", loaded_item.name, loaded_item.weight)
|
||||
|
||||
|
@ -459,7 +531,7 @@ class Player:
|
|||
and not self.isPlaying
|
||||
and not self.stopped_manually
|
||||
):
|
||||
self.ended()
|
||||
self._ended()
|
||||
|
||||
self.state.update("playing", self.isPlaying)
|
||||
self.state.update("loaded", self.isLoaded)
|
||||
|
@ -560,6 +632,8 @@ class Player:
|
|||
str(loaded_item.filename))
|
||||
self.load(loaded_item.weight)
|
||||
|
||||
# Load may jump to the cue point, as it would do on a regular load.
|
||||
# If we were at a different state before, we have to override it now.
|
||||
if loaded_state["pos_true"] != 0:
|
||||
self.logger.log.info(
|
||||
"Seeking to pos_true: " + str(loaded_state["pos_true"])
|
||||
|
@ -568,7 +642,7 @@ class Player:
|
|||
|
||||
if loaded_state["playing"] is True:
|
||||
self.logger.log.info("Resuming.")
|
||||
self.unpause()
|
||||
self.unpause() # Use un-pause as we don't want to jump to a new position.
|
||||
else:
|
||||
self.logger.log.info("No file was previously loaded.")
|
||||
|
||||
|
@ -616,10 +690,11 @@ class Player:
|
|||
] = { # TODO Check Types
|
||||
"STATUS": lambda: self._retMsg(self.status, True),
|
||||
# Audio Playout
|
||||
"PLAY": lambda: self._retMsg(self.play()),
|
||||
# Unpause, so we don't jump to 0, we play from the current pos.
|
||||
"PLAY": lambda: self._retMsg(self.unpause()),
|
||||
"PAUSE": lambda: self._retMsg(self.pause()),
|
||||
"UNPAUSE": lambda: self._retMsg(self.unpause()),
|
||||
"STOP": lambda: self._retMsg(self.stop()),
|
||||
"STOP": lambda: self._retMsg(self.stop(user_initiated=True)),
|
||||
"SEEK": lambda: self._retMsg(
|
||||
self.seek(float(self.last_msg.split(":")[1]))
|
||||
),
|
||||
|
@ -656,6 +731,7 @@ class Player:
|
|||
int(self.last_msg.split(":")[1]))
|
||||
),
|
||||
"CLEAR": lambda: self._retMsg(self.clear_channel_plan()),
|
||||
"SETMARKER": lambda: self._retMsg(self.set_marker(int(self.last_msg.split(":")[1]), self.last_msg.split(":", 2)[2])),
|
||||
}
|
||||
|
||||
message_type: str = self.last_msg.split(":")[0]
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
from helpers.os_environment import isMacOS
|
||||
from typing import Optional
|
||||
from queue import Empty
|
||||
import unittest
|
||||
import multiprocessing
|
||||
|
@ -8,6 +8,8 @@ import json
|
|||
|
||||
from player import Player
|
||||
from helpers.logging_manager import LoggingManager
|
||||
from helpers.state_manager import StateManager
|
||||
from helpers.os_environment import isMacOS
|
||||
|
||||
# How long to wait (by default) in secs for the player to respond.
|
||||
TIMEOUT_MSG_MAX_S = 10
|
||||
|
@ -34,7 +36,23 @@ def getPlanItem(length: int, weight: int):
|
|||
|
||||
|
||||
def getPlanItemJSON(length: int, weight: int):
|
||||
return str(json.dumps(getPlanItem(length, weight)))
|
||||
return str(json.dumps(getPlanItem(**locals())))
|
||||
|
||||
|
||||
# All because constant dicts are still mutable in python :/
|
||||
def getMarker(name: str, time: float, position: str, section: Optional[str] = None):
|
||||
# Time is not validated here, to allow tests to check server response.
|
||||
marker = {
|
||||
"name": name, # User friendly name, eg. "Hit the vocals"
|
||||
"time": time, # Position (secs) through item
|
||||
"section": section, # for linking in loops, if none, assume intro, cue, outro based on "position"
|
||||
"position": position, # start, mid, end
|
||||
}
|
||||
return marker
|
||||
|
||||
|
||||
def getMarkerJSON(name: str, time: float, position: str, section: Optional[str] = None):
|
||||
return json.dumps(getMarker(**locals()))
|
||||
|
||||
|
||||
class TestPlayer(unittest.TestCase):
|
||||
|
@ -43,12 +61,14 @@ class TestPlayer(unittest.TestCase):
|
|||
player_from_q: multiprocessing.Queue
|
||||
player_to_q: multiprocessing.Queue
|
||||
logger: LoggingManager
|
||||
server_state: StateManager
|
||||
|
||||
# initialization logic for the test suite declared in the test module
|
||||
# code that is executed before all tests in one test run
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
cls.logger = LoggingManager("Test_Player")
|
||||
cls.server_state = StateManager("BAPSicleServer", cls.logger) # Mostly dummy here.
|
||||
|
||||
# clean up logic for the test suite declared in the test module
|
||||
# code that is executed after all tests in one test run
|
||||
|
@ -62,7 +82,7 @@ class TestPlayer(unittest.TestCase):
|
|||
self.player_from_q = multiprocessing.Queue()
|
||||
self.player_to_q = multiprocessing.Queue()
|
||||
self.player = multiprocessing.Process(
|
||||
target=Player, args=(-1, self.player_to_q, self.player_from_q)
|
||||
target=Player, args=(-1, self.player_to_q, self.player_from_q, self.server_state)
|
||||
)
|
||||
self.player.start()
|
||||
self._send_msg_wait_OKAY("CLEAR") # Empty any previous track items.
|
||||
|
@ -122,7 +142,7 @@ class TestPlayer(unittest.TestCase):
|
|||
|
||||
def _send_msg_wait_OKAY(
|
||||
self, msg: str, sources_filter=["TEST"], timeout: int = TIMEOUT_MSG_MAX_S
|
||||
):
|
||||
) -> Optional[str]:
|
||||
response = self._send_msg_and_wait(msg, sources_filter, timeout)
|
||||
|
||||
self.assertTrue(response)
|
||||
|
@ -283,6 +303,62 @@ class TestPlayer(unittest.TestCase):
|
|||
|
||||
time.sleep(5)
|
||||
|
||||
# TODO: Test validation of trying to break this.
|
||||
# TODO: Test cue behaviour.
|
||||
def test_markers(self):
|
||||
self._send_msg_wait_OKAY("ADD:" + getPlanItemJSON(5, 0))
|
||||
self._send_msg_wait_OKAY("ADD:" + getPlanItemJSON(5, 1))
|
||||
self._send_msg_wait_OKAY("ADD:" + getPlanItemJSON(5, 2))
|
||||
|
||||
self._send_msg_wait_OKAY("LOAD:2") # To test currently loaded marker sets.
|
||||
|
||||
markers = [
|
||||
# Markers are stored as float, to compare against later, these must all be floats, despite int being supported.
|
||||
getMarkerJSON("Intro Name", 2.0, "start", None),
|
||||
getMarkerJSON("Cue Name", 3.14, "mid", None),
|
||||
getMarkerJSON("Outro Name", 4.0, "end", None),
|
||||
getMarkerJSON("Start Loop", 2.0, "start", "The Best Loop 1"),
|
||||
getMarkerJSON("Mid Loop", 3.0, "mid", "The Best Loop 1"),
|
||||
getMarkerJSON("End Loop", 3.5, "end", "The Best Loop 1"),
|
||||
]
|
||||
# Command, Weight?/itemid? (-1 is loaded), marker json (Intro at 2 seconds.)
|
||||
self._send_msg_wait_OKAY("SETMARKER:0:" + markers[0])
|
||||
self._send_msg_wait_OKAY("SETMARKER:0:" + markers[1])
|
||||
self._send_msg_wait_OKAY("SETMARKER:1:" + markers[2])
|
||||
self._send_msg_wait_OKAY("SETMARKER:-1:" + markers[3])
|
||||
self._send_msg_wait_OKAY("SETMARKER:-1:" + markers[4])
|
||||
self._send_msg_wait_OKAY("SETMARKER:-1:" + markers[5])
|
||||
|
||||
# Test we didn't completely break the player
|
||||
response = self._send_msg_wait_OKAY("STATUS")
|
||||
self.assertTrue(response)
|
||||
json_obj = json.loads(response)
|
||||
self.logger.log.warning(json_obj)
|
||||
|
||||
# time.sleep(1000000)
|
||||
# Now test that all the markers we setup are present.
|
||||
item = json_obj["show_plan"][0]
|
||||
self.assertEqual(item["weight"], 0)
|
||||
self.assertEqual(item["intro"], 2.0) # Backwards compat with basic Webstudio intro/cue/outro
|
||||
self.assertEqual(item["cue"], 3.14)
|
||||
self.assertEqual([json.dumps(item) for item in item["markers"]], markers[0:2]) # Check the full marker configs match
|
||||
|
||||
item = json_obj["show_plan"][1]
|
||||
self.assertEqual(item["weight"], 1)
|
||||
self.assertEqual(item["outro"], 4.0)
|
||||
self.assertEqual([json.dumps(item) for item in item["markers"]], [markers[2]])
|
||||
|
||||
# In this case, we want to make sure both the current and loaded items are updated
|
||||
for item in [json_obj["show_plan"][2], json_obj["loaded_item"]]:
|
||||
self.assertEqual(item["weight"], 2)
|
||||
# This is a loop marker. It should not appear as a standard intro, outro or cue. Default of 0.0 should apply to all.
|
||||
self.assertEqual(item["intro"], 0.0)
|
||||
self.assertEqual(item["outro"], 0.0)
|
||||
self.assertEqual(item["cue"], 0.0)
|
||||
self.assertEqual([json.dumps(item) for item in item["markers"]], markers[3:])
|
||||
|
||||
# TODO: Now test editing/deleting them
|
||||
|
||||
|
||||
# runs the unit tests in the module
|
||||
if __name__ == "__main__":
|
||||
|
|
Loading…
Reference in a new issue