From bc1bd45cd467ec8c1184c836dab00e0f71ad1310 Mon Sep 17 00:00:00 2001 From: Matthew Stratford Date: Fri, 24 Sep 2021 21:11:39 +0100 Subject: [PATCH] Switch to a single queue wherever possible --- controllers/controller.py | 4 +- controllers/mattchbox_usb.py | 10 ++-- file_manager.py | 42 +++++++------ player.py | 7 ++- player_handler.py | 43 +++++++------- server.py | 33 +++++------ web_server.py | 17 ++++-- websocket_server.py | 111 ++++++++++++++++++----------------- 8 files changed, 139 insertions(+), 128 deletions(-) diff --git a/controllers/controller.py b/controllers/controller.py index 7582441..25aae87 100644 --- a/controllers/controller.py +++ b/controllers/controller.py @@ -6,9 +6,9 @@ class Controller: # Main controller class. All implementations of controller support should inherit this. callbacks: List[Callable] = [] player_to_q: List[Queue] - player_from_q: List[Queue] + player_from_q: Queue - def __init__(self, player_to_q: List[Queue], player_from_q: List[Queue]): + def __init__(self, player_to_q: List[Queue], player_from_q: Queue): self.handler() return diff --git a/controllers/mattchbox_usb.py b/controllers/mattchbox_usb.py index c7dcc11..12552d1 100644 --- a/controllers/mattchbox_usb.py +++ b/controllers/mattchbox_usb.py @@ -18,7 +18,7 @@ class MattchBox(Controller): logger: LoggingManager def __init__( - self, server_to_q: List[Queue], server_from_q: List[Queue], state: StateManager + self, player_to_q: List[Queue], player_from_q: Queue, state: StateManager ): process_title = "ControllerHandler" @@ -39,8 +39,8 @@ class MattchBox(Controller): self.next_port = self.server_state.get()["serial_port"] self.logger.log.info("Server config gives port as: {}".format(self.next_port)) - self.server_from_q = server_from_q - self.server_to_q = server_to_q + self.player_from_q = player_from_q + self.player_to_q = player_to_q self.handler() @@ -59,7 +59,7 @@ class MattchBox(Controller): def _disconnected(self): # If we lose the controller, make sure to set channels live, so we tracklist. - for i in range(len(self.server_from_q)): + for i in range(len(self.player_to_q)): self.sendToPlayer(i, "SETLIVE:True") self.server_state.update("ser_connected", False) @@ -138,4 +138,4 @@ class MattchBox(Controller): self.logger.log.info( "Sending message to player channel {}: {}".format(channel, msg) ) - self.server_to_q[channel].put("CONTROLLER:" + msg) + self.player_to_q[channel].put("CONTROLLER:" + msg) diff --git a/file_manager.py b/file_manager.py index 06af766..fa011d1 100644 --- a/file_manager.py +++ b/file_manager.py @@ -19,7 +19,7 @@ class FileManager: logger: LoggingManager api: MyRadioAPI - def __init__(self, channel_from_q: List[Queue], server_config: StateManager): + def __init__(self, channel_from_q: Queue, server_config: StateManager): self.logger = LoggingManager("FileManager") self.api = MyRadioAPI(self.logger, server_config) @@ -29,7 +29,7 @@ class FileManager: current_process().name = process_title terminator = Terminator() - self.channel_count = len(channel_from_q) + self.channel_count = server_config.get()["num_channels"] self.channel_received = None self.last_known_show_plan = [[]] * self.channel_count self.next_channel_preload = 0 @@ -46,15 +46,25 @@ class FileManager: ): self.channel_received = [False] * self.channel_count - for channel in range(self.channel_count): - try: - message = channel_from_q[channel].get_nowait() - except Exception: - continue + try: + message = channel_from_q.get_nowait() + except Exception: + # No new messages + # Let's try preload / normalise some files now we're free of messages. + preloaded = self.do_preload() + normalised = self.do_normalise() + if not preloaded and not normalised: + # We didn't do any hard work, let's sleep. + sleep(0.2) + else: try: - # source = message.split(":")[0] - command = message.split(":", 2)[1] + + split = message.split(":", 1) + + channel = int(split[0]) + # source = split[1] + command = split[2] # If we have requested a new show plan, empty the music-tmp directory for the previous show. if command == "GETPLAN": @@ -114,12 +124,12 @@ class FileManager: ] * self.channel_count # If we receive a new status message, let's check for files which have not been pre-loaded. - if command == "STATUS": - extra = message.split(":", 3) - if extra[2] != "OKAY": + elif command == "STATUS": + extra = message.split(":", 4) + if extra[3] != "OKAY": continue - status = json.loads(extra[3]) + status = json.loads(extra[4]) show_plan = status["show_plan"] item_ids = [] for item in show_plan: @@ -140,13 +150,7 @@ class FileManager: ) ) - # Let's try preload / normalise some files now we're free of messages. - preloaded = self.do_preload() - normalised = self.do_normalise() - if not preloaded and not normalised: - # We didn't do any hard work, let's sleep. - sleep(0.2) except Exception as e: self.logger.log.exception( diff --git a/player.py b/player.py index 2120241..5e1ca31 100644 --- a/player.py +++ b/player.py @@ -923,16 +923,17 @@ class Player: def _retAll(self, msg): if self.out_q: - self.out_q.put("ALL:" + msg) + self.out_q.put("{}:ALL:{}".format(self.state.get()["channel"],msg)) def _retMsg( self, msg: Any, okay_str: bool = False, custom_prefix: Optional[str] = None ): + response = "{}:".format(self.state.get()["channel"]) # Make sure to add the message source back, so that it can be sent to the correct destination in the main server. if custom_prefix: - response = custom_prefix + response += custom_prefix else: - response = "{}:{}:".format(self.last_msg_source, self.last_msg) + response += "{}:{}:".format(self.last_msg_source, self.last_msg) if msg is True: response += "OKAY" elif isinstance(msg, str): diff --git a/player_handler.py b/player_handler.py index da17b64..ea7026e 100644 --- a/player_handler.py +++ b/player_handler.py @@ -22,28 +22,31 @@ class PlayerHandler: terminator = Terminator() try: while not terminator.terminate: + try: + # Format ::: + q_msg = channel_from_q.get_nowait() + if not isinstance(q_msg, str): + continue + split = q_msg.split(":",1) + message = split[1] + source = message.split(":")[0] + command = message.split(":")[1] - for channel in range(len(channel_from_q)): - try: - message = channel_from_q[channel].get_nowait() - source = message.split(":")[0] - command = message.split(":")[1] + # Let the file manager manage the files based on status and loading new show plan triggers. + if command == "GETPLAN" or command == "STATUS": + file_to_q.put(q_msg) - # Let the file manager manage the files based on status and loading new show plan triggers. - if command == "GET_PLAN" or command == "STATUS": - file_to_q[channel].put(message) - - # TODO ENUM - if source in ["ALL", "WEBSOCKET"]: - websocket_to_q[channel].put(message) - if source in ["ALL", "UI"]: - if not message.split(":")[1] == "POS": - # We don't care about position update spam - ui_to_q[channel].put(message) - if source in ["ALL", "CONTROLLER"]: - controller_to_q[channel].put(message) - except Exception: - pass + # TODO ENUM + if source in ["ALL", "WEBSOCKET"]: + websocket_to_q.put(q_msg) + if source in ["ALL", "UI"]: + if not message.split(":")[1] == "POS": + # We don't care about position update spam + ui_to_q.put(q_msg) + if source in ["ALL", "CONTROLLER"]: + controller_to_q.put(q_msg) + except Exception: + pass sleep(0.02) except Exception as e: diff --git a/server.py b/server.py index 83a80d0..e3376c7 100644 --- a/server.py +++ b/server.py @@ -73,13 +73,11 @@ class BAPSicleServer: } player_to_q: List[Queue] = [] - player_from_q: List[Queue] = [] - ui_to_q: List[Queue] = [] - websocket_to_q: List[Queue] = [] - controller_to_q: List[Queue] = [] - file_to_q: List[Queue] = [] - api_from_q: Queue - api_to_q: Queue + player_from_q: Queue + ui_to_q: Queue + websocket_to_q: Queue + controller_to_q: Queue + file_to_q: Queue player: List[multiprocessing.Process] = [] websockets_server: Optional[multiprocessing.Process] = None @@ -97,10 +95,8 @@ class BAPSicleServer: self.stopServer() - if self.state.get()["running_state"] == "restarting": - continue - - break + if self.state.get()["running_state"] != "restarting": + break def check_processes(self): @@ -128,7 +124,7 @@ class BAPSicleServer: args=( channel, self.player_to_q[channel], - self.player_from_q[channel], + self.player_from_q, self.state, ), ) @@ -239,11 +235,12 @@ class BAPSicleServer: for channel in range(self.state.get()["num_channels"]): self.player_to_q.append(multiprocessing.Queue()) - self.player_from_q.append(multiprocessing.Queue()) - self.ui_to_q.append(multiprocessing.Queue()) - self.websocket_to_q.append(multiprocessing.Queue()) - self.controller_to_q.append(multiprocessing.Queue()) - self.file_to_q.append(multiprocessing.Queue()) + + self.player_from_q = multiprocessing.Queue() + self.ui_to_q = multiprocessing.Queue() + self.websocket_to_q = multiprocessing.Queue() + self.controller_to_q = multiprocessing.Queue() + self.file_to_q = multiprocessing.Queue() print( "Welcome to BAPSicle Server version: {}, build: {}.".format( @@ -291,7 +288,7 @@ class BAPSicleServer: print("Stopping BASPicle Server.") print("Stopping Websocket Server") - self.websocket_to_q[0].put("WEBSOCKET:QUIT") + self.websocket_to_q.put("0:WEBSOCKET:QUIT") if self.websockets_server: self.websockets_server.join(timeout=PROCESS_KILL_TIMEOUT_S) del self.websockets_server diff --git a/web_server.py b/web_server.py index 90f6527..c044976 100644 --- a/web_server.py +++ b/web_server.py @@ -8,9 +8,10 @@ import asyncio from jinja2 import Environment, FileSystemLoader from jinja2.utils import select_autoescape from urllib.parse import unquote -from setproctitle import setproctitle from typing import Any, Optional, List +from setproctitle import setproctitle from multiprocessing.queues import Queue +from multiprocessing.process import current_process from queue import Empty from time import sleep import json @@ -99,7 +100,7 @@ server_state: StateManager api: MyRadioAPI player_to_q: List[Queue] = [] -player_from_q: List[Queue] = [] +player_from_q: Queue # General UI Endpoints @@ -416,15 +417,18 @@ app.static( def status(channel: int): - while not player_from_q[channel].empty(): + while not player_from_q.empty(): # Just waste any previous status responses. - player_from_q[channel].get() + player_from_q.get() player_to_q[channel].put("UI:STATUS") retries = 0 while retries < 40: try: - response = player_from_q[channel].get_nowait() + message = player_from_q.get_nowait() + split = message.split(":",1) + channel = int(split[0]) + response = split[1] if response.startswith("UI:STATUS:"): response = response.split(":", 2)[2] # TODO: Handle OKAY / FAIL @@ -477,7 +481,7 @@ def restart(request): # Don't use reloader, it causes Nested Processes! -def WebServer(player_to: List[Queue], player_from: List[Queue], state: StateManager): +def WebServer(player_to: List[Queue], player_from: Queue, state: StateManager): global player_to_q, player_from_q, server_state, api, app player_to_q = player_to @@ -489,6 +493,7 @@ def WebServer(player_to: List[Queue], player_from: List[Queue], state: StateMana process_title = "Web Server" setproctitle(process_title) + current_process().name = process_title CORS(app, supports_credentials=True) # Allow ALL CORS!!! terminate = Terminator() diff --git a/websocket_server.py b/websocket_server.py index e43ecb3..59f64c8 100644 --- a/websocket_server.py +++ b/websocket_server.py @@ -19,8 +19,8 @@ class WebsocketServer: threads = Future baps_clients = set() - channel_to_q: List[multiprocessing.Queue] - webstudio_to_q: List[multiprocessing.Queue] + player_to_q: List[multiprocessing.Queue] + player_from_q: multiprocessing.Queue server_name: str logger: LoggingManager to_webstudio: Task @@ -29,10 +29,10 @@ class WebsocketServer: def __init__(self, in_q, out_q, state): - self.channel_to_q = in_q - self.webstudio_to_q = out_q + self.player_to_q = in_q + self.player_from_q = out_q - process_title = "Websockets Servr" + process_title = "Websockets Server" setproctitle(process_title) current_process().name = process_title @@ -68,7 +68,7 @@ class WebsocketServer: json.dumps({"message": "Hello", "serverName": self.server_name}) ) self.logger.log.info("New Client: {}".format(websocket)) - for channel in self.channel_to_q: + for channel in self.player_to_q: channel.put("WEBSOCKET:STATUS") self.from_webstudio = asyncio.create_task( @@ -85,7 +85,7 @@ class WebsocketServer: data = json.loads(message) if "channel" not in data: # Didn't specify a channel, send to all. - for channel in range(len(self.channel_to_q)): + for channel in range(len(self.player_to_q)): self.sendCommand(channel, data) else: channel = int(data["channel"]) @@ -107,7 +107,7 @@ class WebsocketServer: self.baps_clients.remove(websocket) def sendCommand(self, channel, data): - if channel not in range(len(self.channel_to_q)): + if channel not in range(len(self.player_to_q)): self.logger.log.exception( "Received channel number larger than server supported channels." ) @@ -157,7 +157,7 @@ class WebsocketServer: elif command == "MOVE": # remove the exiting item first - self.channel_to_q[channel].put( + self.player_to_q[channel].put( "{}REMOVE:{}".format(message, data["weight"]) ) @@ -169,7 +169,7 @@ class WebsocketServer: item["weight"] = int(data["new_weight"]) # Now send the special case. - self.channel_to_q[new_channel].put( + self.player_to_q[new_channel].put( "WEBSOCKET:ADD:" + json.dumps(item) ) @@ -192,7 +192,7 @@ class WebsocketServer: message += ":" + extra try: - self.channel_to_q[channel].put(message) + self.player_to_q[channel].put(message) except Exception as e: self.logger.log.exception( "Failed to send message {} to channel {}: {}".format( @@ -208,51 +208,52 @@ class WebsocketServer: terminator = Terminator() while not terminator.terminate: - - for channel in range(len(self.webstudio_to_q)): - try: - message = self.webstudio_to_q[channel].get_nowait() - source = message.split(":")[0] - # TODO ENUM - if source not in ["WEBSOCKET", "ALL"]: - self.logger.log.error( - "ERROR: Message received from invalid source to websocket_handler. Ignored.", - source, - message, - ) - continue - - command = message.split(":")[1] - if command == "STATUS": - try: - message = message.split("OKAY:")[1] - message = json.loads(message) - except Exception: - continue # TODO more logging - elif command == "POS": - try: - message = message.split(":", 2)[2] - except Exception: - continue - elif command == "QUIT": - self.quit() - else: - continue - - data = json.dumps( - {"command": command, "data": message, "channel": channel} - ) - await asyncio.wait([conn.send(data) for conn in self.baps_clients]) - except queue.Empty: - continue - except ValueError: - # Typically a "Set of coroutines/Futures is empty." when sending to a dead client. - continue - except Exception as e: - self.logger.log.exception( - "Exception trying to send to websocket:", e - ) await asyncio.sleep(0.02) + try: + message = self.player_from_q.get_nowait() + split = message.split(":") + channel = int(split[0]) + source = split[1] + # TODO ENUM + if source not in ["WEBSOCKET", "ALL"]: + self.logger.log.error( + "ERROR: Message received from invalid source to websocket_handler. Ignored.", + source, + message, + ) + continue + + command = split[2] + if command == "STATUS": + try: + message = message.split("OKAY:")[1] + message = json.loads(message) + except Exception: + continue # TODO more logging + elif command == "POS": + try: + message = split[3] + except Exception: + continue + elif command == "QUIT": + self.quit() + else: + continue + + data = json.dumps( + {"command": command, "data": message, "channel": channel} + ) + await asyncio.wait([conn.send(data) for conn in self.baps_clients]) + except queue.Empty: + continue + except ValueError: + # Typically a "Set of coroutines/Futures is empty." when sending to a dead client. + continue + except Exception as e: + self.logger.log.exception( + "Exception trying to send to websocket:", e + ) + self.quit()