diff --git a/helpers/myradio_api.py b/helpers/myradio_api.py index 6e8168e..a57c315 100644 --- a/helpers/myradio_api.py +++ b/helpers/myradio_api.py @@ -192,7 +192,7 @@ class MyRadioAPI: def get_playlist_aux_items(self, library_id: str): # Sometimes they have "aux-", we only need the index. if library_id.index("-") > -1: - library_id = library_id[library_id.index("-") + 1 :] + library_id = library_id[library_id.index("-") + 1:] url = "/nipswebPlaylist/{}/items".format(library_id) request = self.get_apiv2_call(url) diff --git a/helpers/state_manager.py b/helpers/state_manager.py index d11a823..43a52bc 100644 --- a/helpers/state_manager.py +++ b/helpers/state_manager.py @@ -162,7 +162,7 @@ class StateManager: for callback in self.callbacks: try: callback() - except Exception as e : + except Exception as e: self.logger.log.critical( "Failed to execute status callback: {}".format(e) ) diff --git a/tests/test_player.py b/tests/test_player.py index 0725aca..e09d59f 100644 --- a/tests/test_player.py +++ b/tests/test_player.py @@ -104,9 +104,9 @@ class TestPlayer(unittest.TestCase): source = response[: response.index(":")] if source in sources_filter: return response[ - len(source + ":" + msg) + 1 : + len(source + ":" + msg) + 1: ] # +1 to remove trailing : on source. - except Empty : + except Empty: pass finally: time.sleep(0.01) diff --git a/websocket_server.py b/websocket_server.py index 35679d4..c22d921 100644 --- a/websocket_server.py +++ b/websocket_server.py @@ -62,170 +62,8 @@ class WebsocketServer: for channel in self.channel_to_q: channel.put("WEBSOCKET:STATUS") - async def handle_from_webstudio(): - try: - async for message in websocket: - data = json.loads(message) - if "channel" not in data: - # Didn't specify a channel, send to all. - for channel in range(len(self.channel_to_q)): - sendCommand(channel, data) - else: - channel = int(data["channel"]) - sendCommand(channel, data) - - async def send(conn, message): - conn.send(message) - - await asyncio.wait( - [send(conn, message) for conn in self.baps_clients] - ) - - except websockets.exceptions.ConnectionClosedError as e: - self.logger.log.error( - "Client Disconncted {}, {}".format(websocket, e)) - - except Exception as e: - self.logger.log.exception( - "Exception handling messages from Websocket.\n{}".format(e) - ) - - finally: - self.logger.log.info("Removing client: {}".format(websocket)) - self.baps_clients.remove(websocket) - - def sendCommand(channel, data): - if channel not in range(len(self.channel_to_q)): - self.logger.log.exception( - "Received channel number larger than server supported channels." - ) - return - - if "command" in data.keys(): - command = data["command"] - - # Handle the general case - # Message format: - # SOURCE:COMMAND:EXTRADATA - - message = "WEBSOCKET:" + command - - # If we just want PLAY, PAUSE etc, we're all done. - # Else, let's pipe in some extra info. - extra = "" - - try: - if command == "SEEK": - extra += str(data["time"]) - elif command == "LOAD": - extra += str(data["weight"]) - elif command == "AUTOADVANCE": - extra += str(data["enabled"]) - elif command == "PLAYONLOAD": - extra += str(data["enabled"]) - elif command == "REPEAT": - extra += str(data["mode"]).lower() - elif command == "ADD": - extra += json.dumps(data["newItem"]) - elif command == "REMOVE": - extra += str(data["weight"]) - elif command == "GET_PLAN": - extra += str(data["timeslotId"]) - - # SPECIAL CASE ALERT! We need to talk to two channels here. - elif command == "MOVE": - # TODO Should we trust the client with the item info? - - # Tell the old channel to remove "weight" - extra += str(data["weight"]) - - # Now modify the item with the weight in the new channel - new_channel = int(data["new_channel"]) - item = data["item"] - item["weight"] = int(data["new_weight"]) - # Now send the special case. - self.channel_to_q[new_channel].put( - "ADD:" + json.dumps(item)) - - except ValueError as e: - self.logger.log.exception( - "Error decoding extra data {} for command {} ".format( - e, command - ) - ) - pass - - # Stick the message together and send! - if extra != "": - message += ":" + extra - - try: - self.channel_to_q[channel].put(message) - except Exception as e: - self.logger.log.exception( - "Failed to send message {} to channel {}: {}".format( - message, channel, e - ) - ) - - else: - self.logger.log.error( - "Command missing from message. Data: {}".format(data) - ) - - async def handle_to_webstudio(): - while True: - for channel in range(len(self.webstudio_to_q)): - try: - message = self.webstudio_to_q[channel].get_nowait() - source = message.split(":")[0] - - # TODO ENUM - if source not in ["WEBSOCKET", "ALL"]: - print( - "ERROR: Message received from invalid source to websocket_handler. Ignored.", - source, - message, - ) - continue - - command = message.split(":")[1] - # print("Websocket Out:", command) - if command == "STATUS": - try: - message = message.split("OKAY:")[1] - message = json.loads(message) - except Exception: - continue # TODO more logging - elif command == "POS": - try: - message = message.split(":", 2)[2] - except Exception: - continue - elif command == "QUIT": - self.quit() - else: - continue - - data = json.dumps( - {"command": command, "data": message, "channel": channel} - ) - await asyncio.wait( - [conn.send(data) for conn in self.baps_clients] - ) - except queue.Empty: - continue - except ValueError: - # Typically a "Set of coroutines/Futures is empty." when sending to a dead client. - continue - except Exception as e: - self.logger.log.exception( - "Exception trying to send to websocket:", e - ) - await asyncio.sleep(0.02) - - self.from_webstudio = asyncio.create_task(handle_from_webstudio()) - self.to_webstudio = asyncio.create_task(handle_to_webstudio()) + self.from_webstudio = asyncio.create_task(self.handle_from_webstudio(websocket)) + self.to_webstudio = asyncio.create_task(self.handle_to_webstudio()) try: self.threads = await shield( @@ -235,6 +73,166 @@ class WebsocketServer: self.from_webstudio.cancel() self.to_webstudio.cancel() + async def handle_from_webstudio(self, websocket): + try: + async for message in websocket: + data = json.loads(message) + if "channel" not in data: + # Didn't specify a channel, send to all. + for channel in range(len(self.channel_to_q)): + self.sendCommand(channel, data) + else: + channel = int(data["channel"]) + self.sendCommand(channel, data) + + await asyncio.wait( + [conn.send(message) for conn in self.baps_clients] + ) + + except websockets.exceptions.ConnectionClosedError as e: + self.logger.log.error( + "Client Disconncted {}, {}".format(websocket, e)) + + except Exception as e: + self.logger.log.exception( + "Exception handling messages from Websocket.\n{}".format(e) + ) + + finally: + self.logger.log.info("Removing client: {}".format(websocket)) + self.baps_clients.remove(websocket) + + def sendCommand(self, channel, data): + if channel not in range(len(self.channel_to_q)): + self.logger.log.exception( + "Received channel number larger than server supported channels." + ) + return + + if "command" in data.keys(): + command = data["command"] + + # Handle the general case + # Message format: + # SOURCE:COMMAND:EXTRADATA + + message = "WEBSOCKET:" + command + + # If we just want PLAY, PAUSE etc, we're all done. + # Else, let's pipe in some extra info. + extra = "" + + try: + if command == "SEEK": + extra += str(data["time"]) + elif command == "LOAD": + extra += str(data["weight"]) + elif command == "AUTOADVANCE": + extra += str(data["enabled"]) + elif command == "PLAYONLOAD": + extra += str(data["enabled"]) + elif command == "REPEAT": + extra += str(data["mode"]).lower() + elif command == "ADD": + extra += json.dumps(data["newItem"]) + elif command == "REMOVE": + extra += str(data["weight"]) + elif command == "GET_PLAN": + extra += str(data["timeslotId"]) + + # SPECIAL CASE ALERT! We need to talk to two channels here. + elif command == "MOVE": + # TODO Should we trust the client with the item info? + + # Tell the old channel to remove "weight" + extra += str(data["weight"]) + + # Now modify the item with the weight in the new channel + new_channel = int(data["new_channel"]) + item = data["item"] + item["weight"] = int(data["new_weight"]) + # Now send the special case. + self.channel_to_q[new_channel].put( + "ADD:" + json.dumps(item)) + + except ValueError as e: + self.logger.log.exception( + "Error decoding extra data {} for command {} ".format( + e, command + ) + ) + pass + + # Stick the message together and send! + if extra != "": + message += ":" + extra + + try: + self.channel_to_q[channel].put(message) + except Exception as e: + self.logger.log.exception( + "Failed to send message {} to channel {}: {}".format( + message, channel, e + ) + ) + + else: + self.logger.log.error( + "Command missing from message. Data: {}".format(data) + ) + + async def handle_to_webstudio(self): + while True: + for channel in range(len(self.webstudio_to_q)): + try: + message = self.webstudio_to_q[channel].get_nowait() + source = message.split(":")[0] + + # TODO ENUM + if source not in ["WEBSOCKET", "ALL"]: + print( + "ERROR: Message received from invalid source to websocket_handler. Ignored.", + source, + message, + ) + continue + + command = message.split(":")[1] + # print("Websocket Out:", command) + if command == "STATUS": + try: + message = message.split("OKAY:")[1] + message = json.loads(message) + except Exception: + continue # TODO more logging + elif command == "POS": + try: + message = message.split(":", 2)[2] + except Exception: + continue + elif command == "QUIT": + self.quit() + else: + continue + + data = json.dumps( + {"command": command, "data": message, "channel": channel} + ) + await asyncio.wait( + [conn.send(data) for conn in self.baps_clients] + ) + except queue.Empty: + continue + except ValueError: + # Typically a "Set of coroutines/Futures is empty." when sending to a dead client. + continue + except Exception as e: + self.logger.log.exception( + "Exception trying to send to websocket:", e + ) + await asyncio.sleep(0.02) + + if __name__ == "__main__": print("Don't do this")