diff --git a/helpers/myradio_api.py b/helpers/myradio_api.py index cafbf06..d7f2f08 100644 --- a/helpers/myradio_api.py +++ b/helpers/myradio_api.py @@ -44,6 +44,7 @@ class MyRadioAPI(): request = requests.get(url, timeout=10) if request.status_code != 200: + # TODO: Log something here return None filename: str = resolve_external_file_path("/music-tmp/{}-{}.{}".format(itemType, id, format)) diff --git a/plan.py b/plan.py index 10fc32d..c376ed0 100644 --- a/plan.py +++ b/plan.py @@ -17,7 +17,7 @@ import os class PlanItem: _timeslotItemId: int = 0 - _channel_weight: int = 0 + _channelWeight: int = 0 _filename: str = "" _title: str = "" _artist: str = "" @@ -25,9 +25,10 @@ class PlanItem: _managedId: Optional[int] = None @property - def channel_weight(self) -> int: - return self._channel_weight - + def channelWeight(self) -> int: + return self._channelWeight + + @property def timeslotItemId(self) -> int: return self._timeslotItemId @@ -51,10 +52,18 @@ class PlanItem: def managedId(self) -> Optional[int]: return self._managedId + @property + def title(self) -> Optional[str]: + return self._title + + @property + def artist(self) -> Optional[str]: + return self._artist + @property def __dict__(self): return { - "channel_weight": self.channel_weight, + "channelWeight": self.channelWeight, "timeslotItemId": self.timeslotItemId, "trackId": self._trackId, "managedId": self._managedId, @@ -69,7 +78,7 @@ class PlanItem: self._trackId = new_item["trackId"] if "trackId" in new_item else None self._managedId = new_item["managedId"] if "managedId" in new_item else None self._filename = new_item["filename"] # This could be a temp dir for API-downloaded items, or a mapped drive. - self._channel_weight = new_item["channel_weight"] + self._channelWeight = new_item["channelWeight"] self._title = new_item["title"] self._artist = new_item["artist"] diff --git a/player.py b/player.py index 6ac4509..8210af7 100644 --- a/player.py +++ b/player.py @@ -179,6 +179,7 @@ class Player(): self.state.update("pos_offset", 0) self.state.update("pos_true", 0) self.state.update("paused", False) + return True # return False @@ -228,10 +229,10 @@ class Player(): self.state.update("show_plan", self.state.state["show_plan"] + [PlanItem(new_item)]) return True - def remove_from_plan(self, timeslotItemId: int) -> bool: + def remove_from_plan(self, channel_weight: int) -> bool: plan_copy: List[PlanItem] = copy.copy(self.state.state["show_plan"]) for i in plan_copy: - if i.timeslotItemId == timeslotItemId: + if i.channelWeight == channel_weight: plan_copy.remove(i) self.state.update("show_plan", plan_copy) return True @@ -241,7 +242,7 @@ class Player(): self.state.update("show_plan", []) return True - def load(self, timeslotItemId: int): + def load(self, channelWeight: int): if not self.isPlaying: self.unload() @@ -250,12 +251,12 @@ class Player(): loaded_item: Optional[PlanItem] = None for i in range(len(showplan)): - if showplan[i].timeslotItemId == timeslotItemId: + if showplan[i].channelWeight == channelWeight: loaded_item = showplan[i] break if loaded_item == None: - self.logger.log.error("Failed to find timeslotItemId: {}".format(timeslotItemId)) + self.logger.log.error("Failed to find channelWeight: {}".format(channelWeight)) return False if (loaded_item.filename == "" or loaded_item.filename == None): @@ -267,7 +268,7 @@ class Player(): self.state.update("loaded_item", loaded_item) for i in range(len(showplan)): - if showplan[i].timeslotItemId == timeslotItemId: + if showplan[i].channelWeight == channelWeight: self.state.update("show_plan", index=i, value=loaded_item) break # TODO: Update the show plan filenames @@ -326,7 +327,7 @@ class Player(): loadedItem = self.state.state["loaded_item"] if (loadedItem): - self.load(loadedItem.timeslotItemId) + self.load(loadedItem.channelWeight) if wasPlaying: self.unpause() @@ -362,14 +363,14 @@ class Player(): # Auto Advance elif self.state.state["auto_advance"]: for i in range(len(self.state.state["show_plan"])): - if self.state.state["show_plan"][i].timeslotItemId == loaded_item.timeslotItemId: + if self.state.state["show_plan"][i].channelWeight == loaded_item.channelWeight: if len(self.state.state["show_plan"]) > i+1: - self.load(self.state.state["show_plan"][i+1].timeslotItemId) + self.load(self.state.state["show_plan"][i+1].channelWeight) break # Repeat All elif self.state.state["repeat"] == "ALL": - self.load(self.state.state["show_plan"][0].timeslotItemId) + self.load(self.state.state["show_plan"][0].channelWeight) # Play on Load if self.state.state["play_on_load"]: @@ -417,7 +418,7 @@ class Player(): loaded_item = loaded_state["loaded_item"] if loaded_item: self.logger.log.info("Loading filename: " + str(loaded_item.filename)) - self.load(loaded_item.timeslotItemId) + self.load(loaded_item.channelWeight) if loaded_state["pos_true"] != 0: self.logger.log.info("Seeking to pos_true: " + str(loaded_state["pos_true"])) diff --git a/server.py b/server.py index 20572f7..c6da09b 100644 --- a/server.py +++ b/server.py @@ -12,7 +12,9 @@ Date: October, November 2020 """ +import asyncio import multiprocessing +import time import player from flask import Flask, render_template, send_from_directory, request, jsonify from typing import Any, Optional @@ -52,11 +54,23 @@ class BAPSicleServer(): setproctitle.setproctitle(process_title) multiprocessing.current_process().name = process_title - startServer() + asyncio.get_event_loop().run_until_complete(startServer()) + asyncio.get_event_loop().run_forever() def __del__(self): stopServer() +class PlayerHandler(): + def __init__(self,channel_from_q, websocket_to_q, ui_to_q): + while True: + for channel in range(len(channel_from_q)): + try: + message = channel_from_q[channel].get_nowait() + websocket_to_q[channel].put(message) + ui_to_q[channel].put(message) + except: + pass + time.sleep(0.01) logger = LoggingManager("BAPSicleServer") @@ -71,6 +85,8 @@ app.logger.disabled = True channel_to_q = [] channel_from_q = [] +ui_to_q = [] +websocket_to_q = [] channel_p = [] stopping = False @@ -245,7 +261,7 @@ def unload(channel: int): @app.route("/player//add", methods=["POST"]) def add_to_plan(channel: int): - new_item: Dict[str, any] = { + new_item: Dict[str, Any] = { "channel_weight": int(request.form["channel_weight"]), "filename": request.form["filename"], "title": request.form["title"], @@ -256,10 +272,6 @@ def add_to_plan(channel: int): return new_item -@app.route("/player//move//") -def move_plan(channel: int, timeslotItemId: int, position: float): - channel_to_q[channel].put("MOVE:" + json.dumps({"timeslotItemId": timeslotItemId, "position": position})) - @app.route("/player//move//") def move_plan(channel: int, channel_weight: int, position: int): channel_to_q[channel].put("MOVE:" + json.dumps({"channel_weight": channel_weight, "position": position})) @@ -267,10 +279,6 @@ def move_plan(channel: int, channel_weight: int, position: int): # TODO Return return True -@app.route("/player//remove/") -def remove_plan(channel: int, timeslotItemId: int): - channel_to_q[channel].put("REMOVE:" + str(timeslotItemId)) - @app.route("/player//remove/") def remove_plan(channel: int, channel_weight: int): channel_to_q[channel].put("REMOVE:" + channel_weight) @@ -299,8 +307,9 @@ def channel_json(channel: int): def status(channel: int): channel_to_q[channel].put("STATUS") while True: - response = channel_from_q[channel].get() + response = ui_to_q[channel].get() if response.startswith("STATUS:"): + print("Got my status message") response = response[7:] response = response[response.index(":")+1:] try: @@ -309,6 +318,7 @@ def status(channel: int): pass return response + time.sleep(0.1) @app.route("/quit") @@ -358,13 +368,15 @@ def send_logs(path): return render_template('log.html', data=data) -def startServer(): +async def startServer(): if isMacOS(): multiprocessing.set_start_method("spawn", True) for channel in range(state.state["num_channels"]): channel_to_q.append(multiprocessing.Queue()) channel_from_q.append(multiprocessing.Queue()) + ui_to_q.append(multiprocessing.Queue()) + websocket_to_q.append(multiprocessing.Queue()) channel_p.append( multiprocessing.Process( target=player.Player, @@ -374,7 +386,13 @@ def startServer(): ) channel_p[channel].start() - websockets_server = multiprocessing.Process(target=WebsocketServer, args=[channel_to_q, state]) + + + + player_handler = multiprocessing.Process(target=PlayerHandler, args=[channel_from_q, websocket_to_q, ui_to_q]) + player_handler.start() + + websockets_server = multiprocessing.Process(target=WebsocketServer, args=[channel_to_q, channel_from_q, state]) websockets_server.start() if not isMacOS(): @@ -402,13 +420,16 @@ def startServer(): "artist": "University Radio York", } - channel_to_q[0].put("ADD:" + json.dumps(new_item)) + #channel_to_q[0].put("ADD:" + json.dumps(new_item)) # channel_to_q[0].put("LOAD:0") # channel_to_q[0].put("PLAY") # Don't use reloader, it causes Nested Processes! app.run(host=state.state["host"], port=state.state["port"], debug=True, use_reloader=False) +async def player_message_handler(): + print("Handling") + pass def stopServer(restart=False): global channel_p diff --git a/templates/status.html b/templates/status.html index 1b33c01..d7644d8 100644 --- a/templates/status.html +++ b/templates/status.html @@ -4,18 +4,40 @@ {% endblock %} {% block content_inner %} {% if data %} - +
+ {% for player in data.channels %} - Play - {% if player.paused %} - UnPause - {% else %} - Pause - {% endif %} - Stop - Seek 50 - {{player}}
+
+

Player {{player.channel}}

+ Play + {% if player.paused %} + UnPause + {% else %} + Pause + {% endif %} + Stop + Seek 50 +
+ +
+ + +
+

Loaded Item

+ {{ player.loaded_item }} +
+

Plan Items

+ {% for planitem in player.show_plan %} + + + {{ planitem }} + +
+ + {% endfor %} +
+
{% endfor %} -
+
{% endif %} {% endblock %} diff --git a/websocket_server.py b/websocket_server.py index bc132a4..ba02ee5 100644 --- a/websocket_server.py +++ b/websocket_server.py @@ -1,63 +1,104 @@ import asyncio +import multiprocessing +from typing import List import websockets import json baps_clients = set() channel_to_q = None +channel_from_q: List[multiprocessing.Queue] server_name = None + async def websocket_handler(websocket, path): baps_clients.add(websocket) await websocket.send(json.dumps({"message": "Hello", "serverName": server_name})) print("New Client: {}".format(websocket)) + async def handle_from_webstudio(): + try: + async for message in websocket: + data = json.loads(message) + channel = int(data["channel"]) + if "command" in data.keys(): + if data["command"] == "PLAY": + channel_to_q[channel].put("PLAY") + elif data["command"] == "PAUSE": + channel_to_q[channel].put("PAUSE") + elif data["command"] == "UNPAUSE": + channel_to_q[channel].put("UNPAUSE") + elif data["command"] == "STOP": + channel_to_q[channel].put("STOP") + elif data["command"] == "SEEK": + channel_to_q[channel].put("SEEK:" + str(data["time"])) + elif data["command"] == "LOAD": + channel_to_q[channel].put("LOAD:" + str(data["weight"])) + elif data["command"] == "ADD": + print(data) + if "managedId" in data["newItem"].keys() and isinstance(data["newItem"]["managedId"], str): + if data["newItem"]["managedId"].startswith("managed"): + managed_id = int(data["newItem"]["managedId"].split(":")[1]) + else: + managed_id = int(data["newItem"]["managedId"]) + else: + managed_id = None + new_item: Dict[str, any] = { + "channelWeight": int(data["newItem"]["weight"]), + "filename": None, + "title": data["newItem"]["title"], + "artist": data["newItem"]["artist"] if "artist" in data["newItem"].keys() else None, + "timeslotItemId": int(data["newItem"]["timeslotItemId"]) if "timeslotItemId" in data["newItem"].keys() and data["newItem"]["timeslotItemId"] != None else None, + "trackId": int(data["newItem"]["trackId"]) if "trackId" in data["newItem"].keys() and data["newItem"]["trackId"] != None else None, + "managedId": managed_id + } + channel_to_q[channel].put("ADD:" + json.dumps(new_item)) + elif data["command"] == "REMOVE": + channel_to_q[channel].put("REMOVE:" + str(data["weight"])) + + await asyncio.wait([conn.send(message) for conn in baps_clients]) + + except websockets.exceptions.ConnectionClosedError as e: + print("RIP {}, {}".format(websocket, e)) + + except Exception as e: + print("Exception", e) + + finally: + baps_clients.remove(websocket) + + async def handle_to_webstudio(): + global channel_from_q + while True: + for channel in range(len(channel_from_q)): + try: + message = channel_from_q[channel].get_nowait() + data = json.dumps({ + "message": message, + "channel:": channel + }) + await asyncio.wait([conn.send(data) for conn in baps_clients]) + except: + pass + await asyncio.sleep(0.01) + + from_webstudio = asyncio.create_task(handle_from_webstudio()) + #to_webstudio = asyncio.create_task(handle_to_webstudio()) + try: - async for message in websocket: - data = json.loads(message) - channel = int(data["channel"]) - if "command" in data.keys(): - if data["command"] == "PLAY": - channel_to_q[channel].put("PLAY") - elif data["command"] == "PAUSE": - channel_to_q[channel].put("PAUSE") - elif data["command"] == "UNPAUSE": - channel_to_q[channel].put("UNPAUSE") - elif data["command"] == "STOP": - channel_to_q[channel].put("STOP") - elif data["command"] == "SEEK": - channel_to_q[channel].put("SEEK:" + str(data["time"])) - elif data["command"] == "LOAD": - channel_to_q[channel].put("LOAD:" + str(data["weight"])) - elif data["command"] == "ADD": - print(data) - new_item: Dict[str, any] = { - "channel_weight": int(data["newItem"]["weight"]), - "filename": "dev\\test.mp3", - "title": data["newItem"]["title"], - "artist": data["newItem"]["artist"] if "artist" in data["newItem"].keys() else None - } - channel_to_q[channel].put("ADD:" + json.dumps(new_item)) - elif data["command"] == "REMOVE": - channel_to_q[channel].put("REMOVE:" + str(data["weight"])) - - await asyncio.wait([conn.send(message) for conn in baps_clients]) - - except websockets.exceptions.ConnectionClosedError: - print("RIP {}".format(websocket)) - - except Exception as e: - print("Exception", e) - + await asyncio.gather(from_webstudio)#, to_webstudio) finally: - baps_clients.remove(websocket) + from_webstudio.cancel() + #to_webstudio.cancel() class WebsocketServer: - def __init__(self, in_q, state): + def __init__(self, in_q, out_q, state): global channel_to_q + global channel_from_q channel_to_q = in_q + channel_from_q = out_q global server_name server_name = state.state["server_name"]