From 5233aacdf9dfb5da71c25f5a4919e78cb4b6ac4c Mon Sep 17 00:00:00 2001 From: Matthew Stratford Date: Mon, 22 Mar 2021 00:33:14 +0000 Subject: [PATCH] Add proper message source routing, fixes a lot of instabity of updating. --- controllers/controller.py | 4 +- controllers/mattchbox_usb.py | 30 ++++---- player.py | 69 +++++++---------- server.py | 140 ++++++++++++++++++----------------- templates/status.html | 54 +++++++------- websocket_server.py | 110 ++++++++++++++++++--------- 6 files changed, 221 insertions(+), 186 deletions(-) diff --git a/controllers/controller.py b/controllers/controller.py index ad4be67..2d51750 100644 --- a/controllers/controller.py +++ b/controllers/controller.py @@ -8,7 +8,7 @@ class Controller(): player_from_q: List[Queue] def __init__(self, player_to_q: List[Queue], player_from_q: List[Queue]): - self.receive() + self.handler() return # Registers a function for the controller class to call to tell BAPSicle to do something. @@ -17,7 +17,7 @@ class Controller(): return # Loop etc in here to process the data from your controller and call the callbacks. - def receive(self): + def handler(self): return diff --git a/controllers/mattchbox_usb.py b/controllers/mattchbox_usb.py index e21b5d1..0579924 100644 --- a/controllers/mattchbox_usb.py +++ b/controllers/mattchbox_usb.py @@ -9,7 +9,7 @@ class MattchBox(Controller): def __init__(self, player_to_q: List[Queue], player_from_q: List[Queue]): # connect to serial port - self.ser = serial.serial_for_url("/dev/cu.usbserial-310", do_not_open=True) + self.ser = serial.serial_for_url("/dev/cu.usbserial-210", do_not_open=True) self.ser.baudrate = 2400 # TOOD: These need to be split in the player handler. @@ -22,22 +22,26 @@ class MattchBox(Controller): sys.stderr.write('Could not open serial port {}: {}\n'.format(self.ser.name, e)) return - self.receive() + self.handler() - def receive(self): - while self.ser.is_open: + def handler(self): + while True: try: - line = int.from_bytes(self.ser.read(1), "big") # Endianness doesn't matter for 1 byte. - print("Controller got:", line) - if (line == 255): - print("Sending back KeepAlive") - self.ser.write(b'\xff') # Send 255 back. - elif (line in [1,3,5]): - self.player_to_q[int(line / 2)].put("PLAY") - elif (line in [2,4,6]): - self.player_to_q[int(line / 2)-1].put("STOP") + if self.ser.is_open: + line = int.from_bytes(self.ser.read(1), "big") # Endianness doesn't matter for 1 byte. + print("Controller got:", line) + if (line == 255): + print("Sending back KeepAlive") + self.ser.write(b'\xff') # Send 255 back. + elif (line in [1,3,5]): + self.sendToPlayer(int(line / 2), "PLAY") + elif (line in [2,4,6]): + self.sendToPlayer(int(line / 2)-1, "STOP") except: continue + def sendToPlayer(self, channel: int, msg:str): + self.player_to_q[channel].put("CONTROLLER:" + msg) + diff --git a/player.py b/player.py index 11ee9f5..72063bb 100644 --- a/player.py +++ b/player.py @@ -44,11 +44,14 @@ from helpers.state_manager import StateManager from helpers.logging_manager import LoggingManager PLAYBACK_END = USEREVENT + 1 +# TODO ENUM +VALID_MESSAGE_SOURCES = ["WEBSOCKET", "UI", "CONTROLLER", "ALL"] class Player(): state = None running = False out_q = None last_msg = "" + last_msg_source = "" last_time_update = None logger = None api = None @@ -431,7 +434,7 @@ class Player(): if stopping: self.stop() if self.out_q: - self.out_q.put("STOPPED") # Tell clients that we've stopped playing. + self._retAll("STOPPED") # Tell clients that we've stopped playing. def _updateState(self, pos: Optional[float] = None): @@ -457,12 +460,18 @@ class Player(): UPDATES_FREQ_SECS = 0.2 if self.last_time_update == None or self.last_time_update + UPDATES_FREQ_SECS < time.time(): self.last_time_update = time.time() - self.out_q.put("POS:" + str(int(self.state.state["pos_true"]))) + self._retAll("POS:" + str(int(self.state.state["pos_true"]))) + def _retAll(self, msg): + self.out_q.put("ALL:" + msg) - def _retMsg(self, msg: Any, okay_str: Any = False): - response = self.last_msg + ":" + def _retMsg(self, msg: Any, okay_str: bool = False, custom_prefix: Optional[str] = None): + # Make sure to add the message source back, so that it can be sent to the correct destination in the main server. + if custom_prefix: + response = custom_prefix + else: + response = "{}:{}:".format(self.last_msg_source, self.last_msg) if msg == True: response += "OKAY" elif isinstance(msg, str): @@ -478,8 +487,8 @@ class Player(): self.out_q.put(response) def _send_status(self): - self.last_msg = "STATUS" - self._retMsg(self.status, True) + # TODO This is hacky + self._retMsg(str(self.status),okay_str=True,custom_prefix="ALL:STATUS:") def __init__(self, channel: int, in_q: multiprocessing.Queue, out_q: multiprocessing.Queue): @@ -534,8 +543,18 @@ class Player(): self._ping_times() try: try: - self.last_msg = in_q.get_nowait() - self.logger.log.info("Recieved message: {}".format(self.last_msg)) + message = in_q.get_nowait() + source = message.split(":")[0] + if source not in VALID_MESSAGE_SOURCES: + self.last_msg_source = "" + self.last_msg = "" + self.logger.log.warn("Message from unknown sender source: {}".format(source)) + continue + + self.last_msg_source = source + self.last_msg = message.split(":", 1)[1] + + self.logger.log.info("Recieved message from source {}: {}".format(self.last_msg_source, self.last_msg)) except Empty: # The incomming message queue was empty, # skip message processing @@ -624,37 +643,5 @@ class Player(): sys.exit(0) -def showOutput(in_q: multiprocessing.Queue, out_q: multiprocessing.Queue): - print("Starting showOutput().") - while True: - time.sleep(0.01) - last_msg = out_q.get() - print(last_msg) - - if __name__ == "__main__": - if isMacOS(): - multiprocessing.set_start_method("spawn", True) - - in_q: multiprocessing.Queue[Any] = multiprocessing.Queue() - out_q: multiprocessing.Queue[Any] = multiprocessing.Queue() - - outputProcess = multiprocessing.Process( - target=showOutput, - args=(in_q, out_q), - ).start() - - playerProcess = multiprocessing.Process( - target=Player, - args=(-1, in_q, out_q), - ).start() - - # Do some testing - in_q.put("LOADED?") - in_q.put("PLAY") - in_q.put("LOAD:dev/test.mp3") - in_q.put("LOADED?") - in_q.put("PLAY") - print("Entering infinite loop.") - while True: - pass + raise Exception("This BAPSicle Player is a subcomponenet, it will not run individually.") diff --git a/server.py b/server.py index 2f378fc..a9f6858 100644 --- a/server.py +++ b/server.py @@ -77,14 +77,21 @@ class BAPSicleServer(): stopServer() class PlayerHandler(): - def __init__(self,channel_from_q, websocket_to_q, ui_to_q): + def __init__(self,channel_from_q, websocket_to_q, ui_to_q, controller_to_q): while True: for channel in range(len(channel_from_q)): try: message = channel_from_q[channel].get_nowait() - websocket_to_q[channel].put(message) - #print("Player Handler saw:", message.split(":")[0]) - ui_to_q[channel].put(message) + source = message.split(":")[0] + # TODO ENUM + if source in ["ALL","WEBSOCKET"]: + websocket_to_q[channel].put(message) + if source in ["ALL","UI"]: + if not message.split(":")[1] == "POS": + # We don't care about position update spam + ui_to_q[channel].put(message) + if source in ["ALL","CONTROLLER"]: + controller_to_q[channel].put(message) except: pass time.sleep(0.1) @@ -105,6 +112,7 @@ channel_to_q: List[queue.Queue] = [] channel_from_q: List[queue.Queue] = [] ui_to_q: List[queue.Queue] = [] websocket_to_q: List[queue.Queue] = [] +controller_to_q: List[queue.Queue] = [] channel_p = [] stopping = False @@ -185,14 +193,14 @@ def server_config(): @app.route("/restart", methods=["POST"]) -def restart_server(): +async def restart_server(): state.update("server_name", request.form["name"]) state.update("host", request.form["host"]) state.update("port", int(request.form["port"])) state.update("num_channels", int(request.form["channels"])) state.update("ws_port", int(request.form["ws_port"])) stopServer(restart=True) - startServer() + await startServer() # Get audio for UI to generate waveforms. @@ -200,7 +208,6 @@ def restart_server(): @app.route("/audiofile//") #@cross_origin() def audio_file(type: str, id: int): - print("Hit!") if type not in ["managed", "track"]: abort(404) return send_from_directory('music-tmp', type + "-" + str(id) + ".mp3") @@ -213,7 +220,7 @@ def audio_file(type: str, id: int): @app.route("/player//play") def play(channel: int): - channel_to_q[channel].put("PLAY") + channel_to_q[channel].put("UI:PLAY") return ui_status() @@ -221,7 +228,7 @@ def play(channel: int): @app.route("/player//pause") def pause(channel: int): - channel_to_q[channel].put("PAUSE") + channel_to_q[channel].put("UI:PAUSE") return ui_status() @@ -229,7 +236,7 @@ def pause(channel: int): @app.route("/player//unpause") def unPause(channel: int): - channel_to_q[channel].put("UNPAUSE") + channel_to_q[channel].put("UI:UNPAUSE") return ui_status() @@ -237,7 +244,7 @@ def unPause(channel: int): @app.route("/player//stop") def stop(channel: int): - channel_to_q[channel].put("STOP") + channel_to_q[channel].put("UI:STOP") return ui_status() @@ -245,32 +252,32 @@ def stop(channel: int): @app.route("/player//seek/") def seek(channel: int, pos: float): - channel_to_q[channel].put("SEEK:" + str(pos)) + channel_to_q[channel].put("UI:SEEK:" + str(pos)) return ui_status() @app.route("/player//output/") def output(channel: int, name: Optional[str]): - channel_to_q[channel].put("OUTPUT:" + str(name)) + channel_to_q[channel].put("UI:OUTPUT:" + str(name)) return ui_status() @app.route("/player//autoadvance/") def autoadvance(channel: int, state: int): - channel_to_q[channel].put("AUTOADVANCE:" + str(state)) + channel_to_q[channel].put("UI:AUTOADVANCE:" + str(state)) return ui_status() @app.route("/player//repeat/") def repeat(channel: int, state: str): - channel_to_q[channel].put("REPEAT:" + state.upper()) + channel_to_q[channel].put("UI:REPEAT:" + state.upper()) return ui_status() @app.route("/player//playonload/") def playonload(channel: int, state: int): - channel_to_q[channel].put("PLAYONLOAD:" + str(state)) + channel_to_q[channel].put("UI:PLAYONLOAD:" + str(state)) return ui_status() # Channel Items @@ -278,14 +285,14 @@ def playonload(channel: int, state: int): @app.route("/player//load/") def load(channel: int, channel_weight: int): - channel_to_q[channel].put("LOAD:" + str(channel_weight)) + channel_to_q[channel].put("UI:LOAD:" + str(channel_weight)) return ui_status() @app.route("/player//unload") def unload(channel: int): - channel_to_q[channel].put("UNLOAD") + channel_to_q[channel].put("UI:UNLOAD") return ui_status() @@ -299,20 +306,14 @@ def add_to_plan(channel: int): "artist": request.form["artist"], } - channel_to_q[channel].put("ADD:" + json.dumps(new_item)) + channel_to_q[channel].put("UI:ADD:" + json.dumps(new_item)) return new_item -#@app.route("/player//move//") -#def move_plan(channel: int, channel_weight: int, position: int): -# channel_to_q[channel].put("MOVE:" + json.dumps({"channel_weight": channel_weight, "position": position}))# - - # TODO Return -# return True #@app.route("/player//remove/") def remove_plan(channel: int, channel_weight: int): - channel_to_q[channel].put("REMOVE:" + str(channel_weight)) + channel_to_q[channel].put("UI:REMOVE:" + str(channel_weight)) # TODO Return return True @@ -320,7 +321,7 @@ def remove_plan(channel: int, channel_weight: int): #@app.route("/player//clear") def clear_channel_plan(channel: int): - channel_to_q[channel].put("CLEAR") + channel_to_q[channel].put("UI:CLEAR") # TODO Return return True @@ -379,11 +380,7 @@ def search_library(type: str): try: response = api_from_q.get_nowait() if response.startswith("SEARCH_TRACK:"): - response = response[response.index(":")+1:] - #try: - # response = json.loads(response) - #except Exception as e: - # raise e + response = response.split(":", 1)[1] return response except queue.Empty: @@ -395,7 +392,7 @@ def search_library(type: str): def load_showplan(timeslotid: int): for channel in channel_to_q: - channel.put("GET_PLAN:" + str(timeslotid)) + channel.put("UI:GET_PLAN:" + str(timeslotid)) return ui_status() @@ -403,12 +400,14 @@ def status(channel: int): while (not ui_to_q[channel].empty()): ui_to_q[channel].get() # Just waste any previous status responses. - channel_to_q[channel].put("STATUS") - while True: + channel_to_q[channel].put("UI:STATUS") + retries = 0 + while retries < 40: try: response = ui_to_q[channel].get_nowait() - if response.startswith("STATUS:"): - response = response[7:] + if response.startswith("UI:STATUS:"): + response = response.split(":",2)[2] + # TODO: Handle OKAY / FAIL response = response[response.index(":")+1:] try: response = json.loads(response) @@ -419,6 +418,8 @@ def status(channel: int): except queue.Empty: pass + retries += 1 + time.sleep(0.1) @@ -431,14 +432,14 @@ def quit(): @app.route("/player/all/stop") def all_stop(): for channel in channel_to_q: - channel.put("STOP") + channel.put("UI:STOP") return ui_status() @app.route("/player/all/clear") def clear_all_channels(): for channel in channel_to_q: - channel.put("CLEAR") + channel.put("UI:CLEAR") return ui_status() @@ -480,7 +481,8 @@ async def startServer(): channel_to_q.append(multiprocessing.Queue()) channel_from_q.append(multiprocessing.Queue()) ui_to_q.append(multiprocessing.Queue()) - websocket_to_q.append(multiprocessing.Manager().Queue()) + websocket_to_q.append(multiprocessing.Queue()) + controller_to_q.append(multiprocessing.Queue()) channel_p.append( multiprocessing.Process( target=player.Player, @@ -496,51 +498,51 @@ async def startServer(): api_handler = multiprocessing.Process(target=APIHandler, args=(api_to_q, api_from_q)) api_handler.start() - player_handler = multiprocessing.Process(target=PlayerHandler, args=(channel_from_q, websocket_to_q, ui_to_q)) + player_handler = multiprocessing.Process(target=PlayerHandler, args=(channel_from_q, websocket_to_q, ui_to_q, controller_to_q)) player_handler.start() - websockets_server = multiprocessing.Process(target=WebsocketServer, args=(channel_to_q, channel_from_q, state)) + websockets_server = multiprocessing.Process(target=WebsocketServer, args=(channel_to_q, websocket_to_q, state)) websockets_server.start() - controller_handler = multiprocessing.Process(target=MattchBox, args=(channel_to_q, channel_from_q)) + controller_handler = multiprocessing.Process(target=MattchBox, args=(channel_to_q, controller_to_q)) controller_handler.start() - if not isMacOS(): + # TODO Move this to player or installer. + if False: + if not isMacOS(): - # Temporary RIP. + # Temporary RIP. - # Welcome Speech + # Welcome Speech - text_to_speach = pyttsx3.init() - text_to_speach.save_to_file( - """Thank-you for installing BAPSicle - the play-out server from the broadcasting and presenting suite. - By default, this server is accepting connections on port 13500 - The version of the server service is {} - Please refer to the documentation included with this application for further assistance.""".format( - config.VERSION - ), - "dev/welcome.mp3" - ) - text_to_speach.runAndWait() + text_to_speach = pyttsx3.init() + text_to_speach.save_to_file( + """Thank-you for installing BAPSicle - the play-out server from the broadcasting and presenting suite. + By default, this server is accepting connections on port 13500 + The version of the server service is {} + Please refer to the documentation included with this application for further assistance.""".format( + config.VERSION + ), + "dev/welcome.mp3" + ) + text_to_speach.runAndWait() - new_item: Dict[str,Any] = { - "channel_weight": 0, - "filename": "dev/welcome.mp3", - "title": "Welcome to BAPSicle", - "artist": "University Radio York", - } - #channel_to_q[0].put("ADD:" + json.dumps(new_item)) - # channel_to_q[0].put("LOAD:0") - # channel_to_q[0].put("PLAY") + new_item: Dict[str,Any] = { + "channel_weight": 0, + "filename": "dev/welcome.mp3", + "title": "Welcome to BAPSicle", + "artist": "University Radio York", + } + + channel_to_q[0].put("ADD:" + json.dumps(new_item)) + channel_to_q[0].put("LOAD:0") + channel_to_q[0].put("PLAY") # Don't use reloader, it causes Nested Processes! app.run(host=state.state["host"], port=state.state["port"], debug=True, use_reloader=False) -async def player_message_handler(): - print("Handling") - pass def stopServer(restart=False): global channel_p diff --git a/templates/status.html b/templates/status.html index 1ffbb9f..94cc3da 100644 --- a/templates/status.html +++ b/templates/status.html @@ -8,34 +8,38 @@ {% for player in data.channels %}
-

Player {{player.channel}}

- Play - {% if player.paused %} - UnPause - {% else %} - Pause - {% endif %} - Stop - Seek 50 -
- -
- + {% if player %} +

Player {{player.channel}}

+ Play + {% if player.paused %} + UnPause + {% else %} + Pause + {% endif %} + Stop + Seek 50 +
+ +
+ -
-

Loaded Item

- {{ player.loaded_item }} -
-

Plan Items

- {% for planitem in player.show_plan %} - - - Load {{ planitem }} -
+

Loaded Item

+ {{ player.loaded_item }} +
+

Plan Items

+ {% for planitem in player.show_plan %} - {% endfor %} -
+ + Load {{ planitem }} + +
+ + {% endfor %} +
+ {% else %} +

RIP. Failed to get this.

+ {% endif %}
{% endfor %} diff --git a/websocket_server.py b/websocket_server.py index 5b1ae0e..35986b6 100644 --- a/websocket_server.py +++ b/websocket_server.py @@ -17,7 +17,7 @@ async def websocket_handler(websocket, path): await websocket.send(json.dumps({"message": "Hello", "serverName": server_name})) print("New Client: {}".format(websocket)) for channel in channel_to_q: - channel.put("STATUS") + channel.put("WEBSOCKET:STATUS") async def handle_from_webstudio(): try: @@ -46,59 +46,97 @@ async def websocket_handler(websocket, path): baps_clients.remove(websocket) def sendCommand(channel, data): + if channel not in range(len(channel_to_q)): + print("ERROR: Received channel number larger than server supported channels.") + return + if "command" in data.keys(): - if data["command"] == "PLAY": - channel_to_q[channel].put("PLAY") - elif data["command"] == "PAUSE": - channel_to_q[channel].put("PAUSE") - elif data["command"] == "UNPAUSE": - channel_to_q[channel].put("UNPAUSE") - elif data["command"] == "STOP": - channel_to_q[channel].put("STOP") - elif data["command"] == "SEEK": - channel_to_q[channel].put("SEEK:" + str(data["time"])) - elif data["command"] == "LOAD": - channel_to_q[channel].put("LOAD:" + str(data["weight"])) + command = data["command"] - elif data["command"] == "AUTOADVANCE": - channel_to_q[channel].put("AUTOADVANCE:" + str(data["enabled"])) + # Handle the general case + # Message format: + ## SOURCE:COMMAND:EXTRADATA - elif data["command"] == "PLAYONLOAD": - channel_to_q[channel].put("PLAYONLOAD:" + str(data["enabled"])) + message = "WEBSOCKET:" + command - elif data["command"] == "REPEAT": - channel_to_q[channel].put("REPEAT:" + str(data["mode"]).lower()) + # If we just want PLAY, PAUSE etc, we're all done. + # Else, let's pipe in some extra info. + extra = "" - elif data["command"] == "MOVE": - # Should we trust the client with the item info? - new_channel = int(data["new_channel"]) - channel_to_q[channel].put("REMOVE:" + str(data["weight"])) - item = data["item"] - item["weight"] = int(data["new_weight"]) - channel_to_q[new_channel].put("ADD:" + json.dumps(item)) + try: + if command == "SEEK": + extra += str(data["time"]) + elif command == "LOAD": + extra += str(data["weight"]) + elif command == "AUTOADVANCE": + extra += str(data["enabled"]) + elif command == "PLAYONLOAD": + extra += str(data["enabled"]) + elif command == "REPEAT": + extra += str(data["mode"]).lower() + elif command == "ADD": + extra += json.dumps(data["newItem"]) + elif command == "REMOVE": + extra += str(data["weight"]) + elif command == "GET_PLAN": + extra += str(data["timeslotId"]) - elif data["command"] == "ADD": - channel_to_q[channel].put("ADD:" + json.dumps(data["newItem"])) - elif data["command"] == "REMOVE": - channel_to_q[channel].put("REMOVE:" + str(data["weight"])) - elif data["command"] == "GET_PLAN": - channel_to_q[channel].put("GET_PLAN:"+ str(data["timeslotId"])) + # SPECIAL CASE ALERT! We need to talk to two channels here. + elif command == "MOVE": + # TODO Should we trust the client with the item info? + + # Tell the old channel to remove "weight" + extra += str(data["weight"]) + + # Now modify the item with the weight in the new channel + new_channel = int(data["new_channel"]) + item = data["item"] + item["weight"] = int(data["new_weight"]) + # Now send the special case. + channel_to_q[new_channel].put("ADD:" + json.dumps(item)) + + + except ValueError as e: + print("ERROR decoding extra data {} for command {} ".format(e, command)) + pass + + # Stick the message together and send! + if extra != "": + message += ":" + extra + + try: + channel_to_q[channel].put(message) + except Exception as e: + print("ERRORL: Failed to send message {} to channel {}: {}".format(message, channel, e)) + + else: + print("ERROR: Command missing from message.") async def handle_to_webstudio(): while True: for channel in range(len(webstudio_to_q)): try: message = webstudio_to_q[channel].get_nowait() - command = message.split(":")[0] + source = message.split(":")[0] + + # TODO ENUM + if source not in ["WEBSOCKET","ALL"]: + print("ERROR: Message received from invalid source to websocket_handler. Ignored.", source, message) + continue + + command = message.split(":")[1] #print("Websocket Out:", command) if command == "STATUS": try: message = message.split("OKAY:")[1] message = json.loads(message) except: - continue + continue # TODO more logging elif command == "POS": - message = message.split(":")[1] + try: + message = message.split(":", 2)[2] + except: + continue else: continue @@ -109,7 +147,7 @@ async def websocket_handler(websocket, path): }) await asyncio.wait([conn.send(data) for conn in baps_clients]) except queue.Empty: - pass + continue except Exception as e: raise e await asyncio.sleep(0.01)