Merge branch 'bug-fixes' into dev

This commit is contained in:
Matthew Stratford 2021-04-27 20:05:22 +01:00
commit 8aa9d9aecf
11 changed files with 271 additions and 55 deletions

View file

@ -14,13 +14,13 @@
import json
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Union
import os
from baps_types.marker import Marker
class PlanItem:
_timeslotitemid: int = 0
_timeslotitemid: str = "0"
_weight: int = 0
_filename: Optional[str]
_title: str
@ -38,9 +38,13 @@ class PlanItem:
self._weight = value
@property
def timeslotitemid(self) -> int:
def timeslotitemid(self) -> str:
return self._timeslotitemid
@timeslotitemid.setter
def timeslotitemid(self, value):
self._timeslotitemid = str(value)
@property
def filename(self) -> Optional[str]:
return self._filename
@ -129,7 +133,7 @@ class PlanItem:
}
def __init__(self, new_item: Dict[str, Any]):
self._timeslotitemid = new_item["timeslotitemid"]
self._timeslotitemid = str(new_item["timeslotitemid"])
self._managedid = new_item["managedid"] if "managedid" in new_item else None
self._trackid = (
int(new_item["trackid"])
@ -196,7 +200,9 @@ class PlanItem:
new_markers = []
for marker in self._markers:
if marker.same_type(new_marker):
new_markers.append(new_marker)
# Only add new marker if the marker is > 0 (to delete markers otherwise)
if new_marker.time != 0:
new_markers.append(new_marker)
# Replace marker
replaced = True
else:

View file

@ -80,7 +80,7 @@
{
"optionDest": "datas",
"value": "/package.json;./"
}
}
],
"nonPyinstallerOptions": {
"increaseRecursionLimit": false,

View file

@ -11,9 +11,17 @@ in_file.close()
for option in config["pyinstallerOptions"]:
if option["optionDest"] in ["datas", "filenames", "icon_file"]:
# If we wanted a relative output directory, this will go missing in abspath on windows.
relative_fix = False
split = option["value"].split(";")
if len(split) > 1 and split[1] == "./":
relative_fix = True
option["value"] = os.path.abspath(parent_path + option["value"])
if not isWindows():
option["value"] = option["value"].replace(";", ":")
elif relative_fix:
option["value"] += "./" # Add the windows relative path.
out_file = open('build-exe-config.json', 'w')
out_file.write(json.dumps(config, indent=2))

View file

@ -13,3 +13,4 @@ websockets
typing_extensions
pyserial
requests
jinja2

125
file_manager.py Normal file
View file

@ -0,0 +1,125 @@
from helpers.state_manager import StateManager
from helpers.os_environment import resolve_external_file_path
from typing import List
from setproctitle import setproctitle
from multiprocessing import current_process, Queue
from time import sleep
import os
import json
from syncer import sync
from helpers.logging_manager import LoggingManager
from helpers.the_terminator import Terminator
from helpers.myradio_api import MyRadioAPI
from baps_types.plan import PlanItem
class FileManager:
logger: LoggingManager
api: MyRadioAPI
def __init__(self, channel_from_q: List[Queue], server_config: StateManager):
self.logger = LoggingManager("FileManager")
self.api = MyRadioAPI(self.logger, server_config)
process_title = "File Manager"
setproctitle(process_title)
current_process().name = process_title
terminator = Terminator()
channel_count = len(channel_from_q)
channel_received = None
last_known_show_plan = [None]*channel_count
next_channel_preload = 0
last_known_item_ids = [[]]*channel_count
try:
while not terminator.terminate:
# If all channels have received the delete command, reset for the next one.
if (channel_received == None or channel_received == [True]*channel_count):
channel_received = [False]*channel_count
for channel in range(channel_count):
try:
message = channel_from_q[channel].get_nowait()
#source = message.split(":")[0]
command = message.split(":",2)[1]
# If we have requested a new show plan, empty the music-tmp directory for the previous show.
if command == "GET_PLAN":
if channel_received != [False]*channel_count and channel_received[channel] != True:
# We've already received a delete trigger on a channel, let's not delete the folder more than once.
# If the channel was already in the process of being deleted, the user has requested it again, so allow it.
channel_received[channel] = True
continue
# Delete the previous show files!
# Note: The players load into RAM. If something is playing over the load, the source file can still be deleted.
path: str = resolve_external_file_path("/music-tmp/")
if not os.path.isdir(path):
self.logger.log.warning("Music-tmp folder is missing, not handling.")
continue
files = [f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]
for file in files:
os.remove(path+"/"+file)
channel_received[channel] = True
# If we receive a new status message, let's check for files which have not been pre-loaded.
if command == "STATUS":
extra = message.split(":",3)
if extra[2] != "OKAY":
continue
status = json.loads(extra[3])
show_plan = status["show_plan"]
item_ids = []
for item in show_plan:
item_ids += item["timeslotitemid"]
# If the new status update has a different order / list of items, let's update the show plan we know about
# This will trigger the chunk below to do the rounds again and preload any new files.
if item_ids != last_known_item_ids[channel]:
last_known_item_ids[channel] = item_ids
last_known_show_plan[channel] = show_plan
except Exception:
pass
# Right, let's have a quick check in the status for shows without filenames, to preload them.
delay = True
for i in range(len(last_known_show_plan[next_channel_preload])):
item_obj = PlanItem(last_known_show_plan[next_channel_preload][i])
if not item_obj.filename:
print("Checking pre-load on channel {}, weight {}: {}".format(next_channel_preload, item_obj.weight, item_obj.name))
# Getting the file name will only pull the new file if the file doesn't already exist, so this is not too inefficient.
item_obj.filename,did_download = sync(self.api.get_filename(item_obj, True))
# Alright, we've done one, now let's give back control to process new statuses etc.
# Given we probably took some time to download, let's not sleep in the loop.
# Save back the resulting item back in regular dict form
last_known_show_plan[next_channel_preload][i] = item_obj.__dict__
if did_download:
delay = False
break
else:
# We didn't download anything this time, file was already loaded.
# Let's try the next one.
continue
next_channel_preload += 1
if next_channel_preload >= channel_count:
next_channel_preload = 0
if delay:
sleep(0.1)
except Exception as e:
self.logger.log.exception(
"Received unexpected exception: {}".format(e))
del self.logger

View file

@ -183,11 +183,25 @@ class MyRadioAPI:
self._logException("Failed to get show plan.")
return None
return json.loads(await request)["payload"]
payload = json.loads(await request)["payload"]
plan = {}
# Account for MyRadio api being dumb depending on if it's cached or not.
if isinstance(payload, list):
for channel in range(len(payload)):
plan[str(channel)] = payload[channel]
return plan
elif isinstance(payload, dict):
return payload
self.logger.log.error("Show plan in unknown format.")
return None
# Audio Library
async def get_filename(self, item: PlanItem):
async def get_filename(self, item: PlanItem, did_download: bool = False):
format = "mp3" # TODO: Maybe we want this customisable?
if item.trackid:
itemType = "track"
@ -200,7 +214,7 @@ class MyRadioAPI:
url = "/NIPSWeb/managed_play?managedid={}".format(id)
else:
return None
return (None, False) if did_download else None
# Now check if the file already exists
path: str = resolve_external_file_path("/music-tmp/")
@ -211,29 +225,29 @@ class MyRadioAPI:
os.mkdir(path)
except Exception as e:
self._logException("Failed to create music-tmp folder: {}".format(e))
return None
return (None, False) if did_download else None
filename: str = resolve_external_file_path(
"/music-tmp/{}-{}.{}".format(itemType, id, format)
)
if os.path.isfile(filename):
return filename
return (filename, False) if did_download else filename
# File doesn't exist, download it.
request = await self.async_api_call(url, api_version="non")
if not request:
return None
return (None, False) if did_download else None
try:
with open(filename, "wb") as file:
file.write(await request)
except Exception as e:
self._logException("Failed to write music file: {}".format(e))
return None
return (None, False) if did_download else None
return filename
return (filename, True) if did_download else filename
# Gets the list of managed music playlists.
async def get_playlist_music(self):

View file

@ -226,9 +226,7 @@ class Player:
self.seek(self.state.get()["loaded_item"].cue)
else:
# Otherwise, let's go to 0.
self.state.update("pos", 0)
self.state.update("pos_offset", 0)
self.state.update("pos_true", 0)
self.seek(0)
return True
@ -242,7 +240,8 @@ class Player:
return True
else:
self.stopped_manually = True # Don't trigger _ended() on seeking.
self.state.update("paused", True)
if pos > 0:
self.state.update("paused", True)
self._updateState(pos=pos)
return True
@ -266,8 +265,10 @@ class Player:
plan = sync(self.api.get_showplan(message))
self.clear_channel_plan()
channel = self.state.get()["channel"]
self.logger.log.info(plan)
if len(plan) > channel:
self.logger.log.debug(plan)
if not isinstance(plan, dict):
return False
if str(channel) in plan.keys():
for plan_item in plan[str(channel)]:
try:
self.add_to_plan(plan_item)
@ -279,8 +280,17 @@ class Player:
return True
def _check_ghosts(self, item: PlanItem):
if isinstance(item.timeslotitemid, str) and item.timeslotitemid.startswith("I"):
# Kinda a bodge for the moment, each "Ghost" (item which is not saved in the database showplan yet) needs to have a unique temporary item.
# To do this, we'll start with the channel number the item was originally added to (to stop items somehow simultaneously added to different channels from having the same id)
# And chuck in the unix epoch in ns for good measure.
item.timeslotitemid = "GHOST-{}-{}".format(self.state.get()["channel"], time.time_ns())
return item
def add_to_plan(self, new_item: Dict[str, Any]) -> bool:
new_item_obj = PlanItem(new_item)
new_item_obj = self._check_ghosts(new_item_obj)
plan_copy: List[PlanItem] = copy.copy(self.state.get()["show_plan"])
# Shift any plan items after the new position down one to make space.
for item in plan_copy:
@ -289,10 +299,7 @@ class Player:
plan_copy += [new_item_obj] # Add the new item.
def sort_weight(e: PlanItem):
return e.weight
plan_copy.sort(key=sort_weight) # Sort into weighted order.
plan_copy = self._fix_weights(plan_copy)
self.state.update("show_plan", plan_copy)
return True
@ -304,11 +311,8 @@ class Player:
if i.weight == weight:
plan_copy.remove(i)
found = True
elif (
i.weight > weight
): # Shuffle up the weights of the items following the deleted one.
i.weight -= 1
if found:
plan_copy = self._fix_weights(plan_copy)
self.state.update("show_plan", plan_copy)
return True
return False
@ -387,9 +391,11 @@ class Player:
if loaded_item.cue > 0:
self.seek(loaded_item.cue)
else:
self.seek(0)
if self.state.get()["play_on_load"]:
self.play()
self.unpause()
return True
@ -420,6 +426,7 @@ class Player:
def output(self, name: Optional[str] = None):
wasPlaying = self.state.get()["playing"]
oldPos = self.state.get()["pos_true"]
name = None if (not name or name.lower() == "none") else name
@ -440,7 +447,7 @@ class Player:
if loadedItem:
self.load(loadedItem.weight)
if wasPlaying:
self.unpause()
self.play(oldPos)
return True
@ -459,6 +466,9 @@ class Player:
if not self.isLoaded:
return False
timeslotitemid = self.state.get()["loaded_item"].timeslotitemid
elif self.isLoaded and self.state.get()["loaded_item"].timeslotitemid == timeslotitemid:
set_loaded = True
plan_copy: List[PlanItem] = copy.copy(self.state.get()["show_plan"])
for i in range(len(self.state.get()["show_plan"])):
@ -489,7 +499,7 @@ class Player:
# This essentially allows the tracklist end API call to happen in a separate thread, to avoid hanging playout/loading.
def _potentially_tracklist(self):
mode: TracklistMode = self.state.get()["tracklist_mode"]
mode = self.state.get()["tracklist_mode"]
time: int = -1
if mode == "on":
@ -574,27 +584,19 @@ class Player:
self.play()
return
loaded_new_item = False
# Auto Advance
if self.state.get()["auto_advance"]:
for i in range(len(self.state.get()["show_plan"])):
if self.state.get()["show_plan"][i].weight == loaded_item.weight:
if len(self.state.get()["show_plan"]) > i + 1:
self.load(self.state.get()["show_plan"][i + 1].weight)
loaded_new_item = True
break
return
# Repeat All
# TODO ENUM
elif self.state.get()["repeat"] == "all":
self.load(self.state.get()["show_plan"][0].weight)
loaded_new_item = True
break
# Play on Load
if self.state.get()["play_on_load"] and loaded_new_item:
self.play()
return
return
# No automations, just stop playing.
self.stop()
@ -605,15 +607,14 @@ class Player:
self.state.update("initialised", self.isInit)
if self.isInit:
if pos:
self.state.update("pos", max(0, pos))
if pos is not None:
# Seeking sets the position like this when not playing.
self.state.update("pos", pos) # Reset back to 0 if stopped.
self.state.update("pos_offset", 0)
elif self.isPlaying:
# This is the bit that makes the time actually progress during playback.
# Get one last update in, incase we're about to pause/stop it.
self.state.update("pos", max(0, mixer.music.get_pos() / 1000))
# TODO this is wrong now we don't pause the mixer.
elif not self.isPaused:
self.state.update("pos", 0) # Reset back to 0 if stopped.
self.state.update("pos_offset", 0)
# If the state is changing from playing to not playing, and the user didn't stop it, the item must have ended.
if (
@ -680,6 +681,24 @@ class Player:
self._retMsg(str(self.status), okay_str=True,
custom_prefix="ALL:STATUS:")
def _fix_weights(self, plan):
def _sort_weight(e: PlanItem):
return e.weight
for item in plan:
self.logger.log.info("Pre weights:\n{}".format(item))
plan.sort(key=_sort_weight) # Sort into weighted order.
for item in plan:
self.logger.log.info("Post Sort:\n{}".format(item))
for i in range(len(plan)):
plan[i].weight = i # Recorrect the weights on the channel.
for item in plan:
self.logger.log.info("Post Weights:\n{}".format(item))
return plan
def __init__(
self, channel: int, in_q: multiprocessing.Queue, out_q: multiprocessing.Queue, server_state: StateManager
):
@ -707,6 +726,11 @@ class Player:
self.state.update("channel", channel)
self.state.update("tracklist_mode", server_state.get()["tracklist_mode"])
# Just in case there's any weights somehow messed up, let's fix them.
plan_copy: List[PlanItem] = copy.copy(self.state.get()["show_plan"])
plan_copy = self._fix_weights(plan_copy)
self.state.update("show_plan", plan_copy)
loaded_state = copy.copy(self.state.state)
if loaded_state["output"]:

View file

@ -10,7 +10,7 @@ from helpers.the_terminator import Terminator
class PlayerHandler:
logger: LoggingManager
def __init__(self, channel_from_q, websocket_to_q, ui_to_q, controller_to_q):
def __init__(self, channel_from_q, websocket_to_q, ui_to_q, controller_to_q, file_to_q):
self.logger = LoggingManager("PlayerHandler")
process_title = "Player Handler"
@ -25,6 +25,13 @@ class PlayerHandler:
try:
message = channel_from_q[channel].get_nowait()
source = message.split(":")[0]
command = message.split(":")[1]
# Let the file manager manage the files based on status and loading new show plan triggers.
if command == "GET_PLAN" or command == "STATUS":
file_to_q[channel].put(message)
# TODO ENUM
if source in ["ALL", "WEBSOCKET"]:
websocket_to_q[channel].put(message)

@ -1 +1 @@
Subproject commit 2b151649c9d38367529425793d9f751863eac6a0
Subproject commit 199cfc88439273ab8ede8cef8cd13868ad8f0ae9

View file

@ -12,6 +12,7 @@
Date:
October, November 2020
"""
from file_manager import FileManager
import multiprocessing
from multiprocessing.queues import Queue
import multiprocessing.managers as m
@ -75,6 +76,7 @@ class BAPSicleServer:
ui_to_q: List[Queue] = []
websocket_to_q: List[Queue] = []
controller_to_q: List[Queue] = []
file_to_q: List[Queue] = []
api_from_q: Queue
api_to_q: Queue
@ -82,6 +84,7 @@ class BAPSicleServer:
websockets_server: Optional[multiprocessing.Process] = None
controller_handler: Optional[multiprocessing.Process] = None
player_handler: Optional[multiprocessing.Process] = None
file_manager: Optional[multiprocessing.Process] = None
webserver: Optional[multiprocessing.Process] = None
def __init__(self):
@ -118,10 +121,18 @@ class BAPSicleServer:
log_function("Player Handler not running, (re)starting.")
self.player_handler = multiprocessing.Process(
target=PlayerHandler,
args=(self.player_from_q, self.websocket_to_q, self.ui_to_q, self.controller_to_q),
args=(self.player_from_q, self.websocket_to_q, self.ui_to_q, self.controller_to_q, self.file_to_q),
)
self.player_handler.start()
if not self.file_manager or not self.file_manager.is_alive():
log_function("File Manager not running, (re)starting.")
self.file_manager = multiprocessing.Process(
target=FileManager,
args=(self.file_to_q, self.state),
)
self.file_manager.start()
if not self.websockets_server or not self.websockets_server.is_alive():
log_function("Websocket Server not running, (re)starting.")
self.websockets_server = multiprocessing.Process(
@ -182,6 +193,7 @@ class BAPSicleServer:
self.ui_to_q.append(multiprocessing.Queue())
self.websocket_to_q.append(multiprocessing.Queue())
self.controller_to_q.append(multiprocessing.Queue())
self.file_to_q.append(multiprocessing.Queue())
print("Welcome to BAPSicle Server version: {}, build: {}.".format(package.VERSION, package.BUILD))
print("The Server UI is available at http://{}:{}".format(self.state.get()["host"], self.state.get()["port"]))
@ -247,11 +259,18 @@ class BAPSicleServer:
self.player_handler.join(timeout=PROCESS_KILL_TIMEOUT_S)
del self.player_handler
print("Stopping File Manager")
if self.file_manager:
self.file_manager.terminate()
self.file_manager.join(timeout=PROCESS_KILL_TIMEOUT_S)
del self.file_manager
print("Stopping Controllers")
if self.controller_handler:
self.controller_handler.terminate()
self.controller_handler.join(timeout=PROCESS_KILL_TIMEOUT_S)
del self.controller_handler
print("Stopped all processes.")
if __name__ == "__main__":

View file

@ -123,7 +123,7 @@ class WebsocketServer:
# Message format:
# SOURCE:COMMAND:EXTRADATA
message = "WEBSOCKET:" + command
message = "WEBSOCKET:"
# If we just want PLAY, PAUSE etc, we're all done.
# Else, let's pipe in some extra info.
@ -154,18 +154,29 @@ class WebsocketServer:
# SPECIAL CASE ALERT! We need to talk to two channels here.
elif command == "MOVE":
# TODO Should we trust the client with the item info?
# Tell the old channel to remove "weight"
extra += str(data["weight"])
# remove the exiting item first
self.channel_to_q[channel].put(
"{}REMOVE:{}".format(message, data["weight"])
)
# Now hijack to send the new add on the new channel.
# Now modify the item with the weight in the new channel
new_channel = int(data["new_channel"])
item = data["item"]
item["weight"] = int(data["new_weight"])
# If we're moving within the same channel, add 1 to the weight, since we're adding the new item before we remove the old one, UI gave us the weight expected after removing.
if channel == new_channel and data["new_weight"] > data["weight"]:
item["weight"] += 1
# Now send the special case.
self.channel_to_q[new_channel].put(
"ADD:" + json.dumps(item))
"WEBSOCKET:ADD:" + json.dumps(item))
# Don't bother, we should be done.
return
except ValueError as e:
self.logger.log.exception(
@ -176,6 +187,7 @@ class WebsocketServer:
pass
# Stick the message together and send!
message += command # Put the command in at the end, in case MOVE etc changed it.
if extra != "":
message += ":" + extra