From 6018f5df0e850bd077b960a714c8c3f6f4715ff5 Mon Sep 17 00:00:00 2001 From: Matthew Stratford Date: Sun, 18 Apr 2021 20:27:54 +0100 Subject: [PATCH] Proxy StateManager instead of copy, allow UI restart/quit --- controllers/mattchbox_usb.py | 4 +-- helpers/myradio_api.py | 12 +++---- helpers/state_manager.py | 5 ++- player.py | 68 ++++++++++++++++++------------------ server.py | 44 ++++++++++++++++------- web_server.py | 32 ++++++++++------- websocket_server.py | 4 +-- 7 files changed, 98 insertions(+), 71 deletions(-) diff --git a/controllers/mattchbox_usb.py b/controllers/mattchbox_usb.py index 502393d..14eb4a9 100644 --- a/controllers/mattchbox_usb.py +++ b/controllers/mattchbox_usb.py @@ -37,7 +37,7 @@ class MattchBox(Controller): # Allow server config changes to trigger controller reload if required. self.port = None - self.next_port = self.server_state.state["serial_port"] + self.next_port = self.server_state.get()["serial_port"] self.server_from_q = server_from_q self.server_to_q = server_to_q @@ -47,7 +47,7 @@ class MattchBox(Controller): # This doesn't run, the callback function gets lost in StateManager. def _state_handler(self): - new_port = self.server_state.state["serial_port"] + new_port = self.server_state.get()["serial_port"] self.logger.log.info("Got server config update. New port: {}".format(new_port)) if new_port != self.port: self.logger.log.info( diff --git a/helpers/myradio_api.py b/helpers/myradio_api.py index 997072e..866f17f 100644 --- a/helpers/myradio_api.py +++ b/helpers/myradio_api.py @@ -48,12 +48,12 @@ class MyRadioAPI: async def get_non_api_call(self, url): - url = "{}{}".format(self.config.state["myradio_base_url"], url) + url = "{}{}".format(self.config.get()["myradio_base_url"], url) if "?" in url: - url += "&api_key={}".format(self.config.state["myradio_api_key"]) + url += "&api_key={}".format(self.config.get()["myradio_api_key"]) else: - url += "?api_key={}".format(self.config.state["myradio_api_key"]) + url += "?api_key={}".format(self.config.get()["myradio_api_key"]) self._log("Requesting non-API URL: " + url) request = self.get(url) @@ -63,12 +63,12 @@ class MyRadioAPI: async def get_apiv2_call(self, url): - url = "{}/v2{}".format(self.config.state["myradio_api_url"], url) + url = "{}/v2{}".format(self.config.get()["myradio_api_url"], url) if "?" in url: - url += "&api_key={}".format(self.config.state["myradio_api_key"]) + url += "&api_key={}".format(self.config.get()["myradio_api_key"]) else: - url += "?api_key={}".format(self.config.state["myradio_api_key"]) + url += "?api_key={}".format(self.config.get()["myradio_api_key"]) self._log("Requesting API V2 URL: " + url) request = self.get(url) diff --git a/helpers/state_manager.py b/helpers/state_manager.py index f3a6cd7..146b4d1 100644 --- a/helpers/state_manager.py +++ b/helpers/state_manager.py @@ -16,7 +16,6 @@ class StateManager: logger: LoggingManager callbacks: List[Any] = [] __state = {} - __state_in_file = {} # Dict of times that params can be updated after, if the time is before current time, it can be written immediately. __rate_limit_params_until = {} __rate_limit_period_s = 0 @@ -94,6 +93,10 @@ class StateManager: def state(self): return copy(self.__state) + # Useful for pipeproxy, since it can't read attributes direct. + def get(self): + return self.state + @state.setter def state(self, state): self.__state = copy(state) diff --git a/player.py b/player.py index 4a1796f..bb633bf 100644 --- a/player.py +++ b/player.py @@ -96,14 +96,14 @@ class Player: @property def isPaused(self) -> bool: - return self.state.state["paused"] + return self.state.get()["paused"] @property def isLoaded(self): return self._isLoaded() def _isLoaded(self, short_test: bool = False): - if not self.state.state["loaded_item"]: + if not self.state.get()["loaded_item"]: return False if self.isPlaying: return True @@ -116,7 +116,7 @@ class Player: # We're not playing now, so we can quickly test run # If that works, we're loaded. try: - position: float = self.state.state["pos"] + position: float = self.state.get()["pos"] mixer.music.set_volume(0) mixer.music.play(0) except Exception: @@ -140,7 +140,7 @@ class Player: # Don't mess with playback, we only care about if it's supposed to be loaded. if not self._isLoaded(short_test=True): return False - return (self.state.state["pos_true"] == self.state.state["loaded_item"].cue and not self.isPlaying) + return (self.state.get()["pos_true"] == self.state.get()["loaded_item"].cue and not self.isPlaying) @property def status(self): @@ -185,7 +185,7 @@ class Player: def unpause(self): if not self.isPlaying: - position: float = self.state.state["pos_true"] + position: float = self.state.get()["pos_true"] try: self.play(position) except Exception: @@ -208,14 +208,14 @@ class Player: self.stopped_manually = True - if not self.state.state["loaded_item"]: + if not self.state.get()["loaded_item"]: self.logger.log.warning("Tried to stop without a loaded item.") return True # This lets users toggle (using the stop button) between cue point and 0. if user_initiated and not self.isCued: # if there's a cue point ant we're not at it, go there. - self.seek(self.state.state["loaded_item"].cue) + self.seek(self.state.get()["loaded_item"].cue) else: # Otherwise, let's go to 0. self.state.update("pos", 0) @@ -257,7 +257,7 @@ class Player: def get_plan(self, message: int): plan = sync(self.api.get_showplan(message)) self.clear_channel_plan() - channel = self.state.state["channel"] + channel = self.state.get()["channel"] self.logger.log.info(plan) if len(plan) > channel: for plan_item in plan[str(channel)]: @@ -273,7 +273,7 @@ class Player: def add_to_plan(self, new_item: Dict[str, Any]) -> bool: new_item_obj = PlanItem(new_item) - plan_copy: List[PlanItem] = copy.copy(self.state.state["show_plan"]) + plan_copy: List[PlanItem] = copy.copy(self.state.get()["show_plan"]) # Shift any plan items after the new position down one to make space. for item in plan_copy: if item.weight >= new_item_obj.weight: @@ -290,7 +290,7 @@ class Player: return True def remove_from_plan(self, weight: int) -> bool: - plan_copy: List[PlanItem] = copy.copy(self.state.state["show_plan"]) + plan_copy: List[PlanItem] = copy.copy(self.state.get()["show_plan"]) found = False for i in plan_copy: if i.weight == weight: @@ -313,7 +313,7 @@ class Player: if not self.isPlaying: self.unload() - showplan = self.state.state["show_plan"] + showplan = self.state.get()["show_plan"] loaded_item: Optional[PlanItem] = None @@ -380,7 +380,7 @@ class Player: if loaded_item.cue > 0: self.seek(loaded_item.cue) - if self.state.state["play_on_load"]: + if self.state.get()["play_on_load"]: self.play() return True @@ -405,7 +405,7 @@ class Player: self.logger.log.exception("Failed to quit mixer.") def output(self, name: Optional[str] = None): - wasPlaying = self.state.state["playing"] + wasPlaying = self.state.get()["playing"] name = None if (not name or name.lower() == "none") else name @@ -422,7 +422,7 @@ class Player: ) return False - loadedItem = self.state.state["loaded_item"] + loadedItem = self.state.get()["loaded_item"] if loadedItem: self.load(loadedItem.weight) if wasPlaying: @@ -444,10 +444,10 @@ class Player: set_loaded = True if not self.isLoaded: return False - timeslotitemid = self.state.state["loaded_item"].timeslotitemid + timeslotitemid = self.state.get()["loaded_item"].timeslotitemid - plan_copy: List[PlanItem] = copy.copy(self.state.state["show_plan"]) - for i in range(len(self.state.state["show_plan"])): + plan_copy: List[PlanItem] = copy.copy(self.state.get()["show_plan"]) + for i in range(len(self.state.get()["show_plan"])): item = plan_copy[i] @@ -463,7 +463,7 @@ class Player: if set_loaded: try: - self.state.update("loaded_item", self.state.state["loaded_item"].set_marker(marker)) + self.state.update("loaded_item", self.state.get()["loaded_item"].set_marker(marker)) except Exception as e: self.logger.log.error( "Failed to set marker on loaded_item {}: {} with marker \n{}".format(timeslotitemid, e, marker)) @@ -474,7 +474,7 @@ class Player: # Helper functions def _ended(self): - loaded_item = self.state.state["loaded_item"] + loaded_item = self.state.get()["loaded_item"] if not loaded_item: return @@ -484,29 +484,29 @@ class Player: # Repeat 1 # TODO ENUM - if self.state.state["repeat"] == "one": + if self.state.get()["repeat"] == "one": self.play() return loaded_new_item = False # Auto Advance - if self.state.state["auto_advance"]: - for i in range(len(self.state.state["show_plan"])): - if self.state.state["show_plan"][i].weight == loaded_item.weight: - if len(self.state.state["show_plan"]) > i + 1: - self.load(self.state.state["show_plan"][i + 1].weight) + if self.state.get()["auto_advance"]: + for i in range(len(self.state.get()["show_plan"])): + if self.state.get()["show_plan"][i].weight == loaded_item.weight: + if len(self.state.get()["show_plan"]) > i + 1: + self.load(self.state.get()["show_plan"][i + 1].weight) loaded_new_item = True break # Repeat All # TODO ENUM - elif self.state.state["repeat"] == "all": - self.load(self.state.state["show_plan"][0].weight) + elif self.state.get()["repeat"] == "all": + self.load(self.state.get()["show_plan"][0].weight) loaded_new_item = True break # Play on Load - if self.state.state["play_on_load"] and loaded_new_item: + if self.state.get()["play_on_load"] and loaded_new_item: self.play() return @@ -530,7 +530,7 @@ class Player: self.state.update("pos_offset", 0) if ( - self.state.state["playing"] + self.state.get()["playing"] and not self.isPlaying and not self.stopped_manually ): @@ -542,15 +542,15 @@ class Player: self.state.update( "pos_true", min( - self.state.state["length"], - self.state.state["pos"] + self.state.state["pos_offset"], + self.state.get()["length"], + self.state.get()["pos"] + self.state.get()["pos_offset"], ), ) self.state.update( "remaining", - max(0, (self.state.state["length"] - - self.state.state["pos_true"])), + max(0, (self.state.get()["length"] - + self.state.get()["pos_true"])), ) def _ping_times(self): @@ -561,7 +561,7 @@ class Player: or self.last_time_update + UPDATES_FREQ_SECS < time.time() ): self.last_time_update = time.time() - self._retAll("POS:" + str(self.state.state["pos_true"])) + self._retAll("POS:" + str(self.state.get()["pos_true"])) def _retAll(self, msg): self.out_q.put("ALL:" + msg) diff --git a/server.py b/server.py index 0ca5a0b..3b20c04 100644 --- a/server.py +++ b/server.py @@ -14,11 +14,11 @@ """ import multiprocessing from multiprocessing.queues import Queue +import multiprocessing.managers as m import time from typing import Any, Optional import json from setproctitle import setproctitle - from helpers.os_environment import isBundelled, isMacOS if not isMacOS(): @@ -41,6 +41,12 @@ import player setproctitle("server.py") +""" Proxy Manager to proxy Class Objects into multiprocessing processes, instead of making a copy. """ + + +class ProxyManager(m.BaseManager): + pass # Pass is really enough. Nothing needs to be done here. + class BAPSicleServer: @@ -56,7 +62,8 @@ class BAPSicleServer: "ser_connected": False, "myradio_api_key": None, "myradio_base_url": "https://ury.org.uk/myradio", - "myradio_api_url": "https://ury.org.uk/api" + "myradio_api_url": "https://ury.org.uk/api", + "running_state": "running" } player_to_q: List[Queue] = [] @@ -75,22 +82,26 @@ class BAPSicleServer: def __init__(self): - self.startServer() + while True: + self.startServer() - self.check_processes() + self.check_processes() - self.stopServer() + self.stopServer() + + if self.state.get()["running_state"] == "restarting": + continue + + break def check_processes(self): terminator = Terminator() log_function = self.logger.log.info - while not terminator.terminate: - # Note, state here will become a copy in the process. - # callbacks will not passthrough :/ + while not terminator.terminate and self.state.get()["running_state"] == "running": - for channel in range(self.state.state["num_channels"]): + for channel in range(self.state.get()["num_channels"]): if not self.player[channel] or not self.player[channel].is_alive(): log_function("Player {} not running, (re)starting.".format(channel)) self.player[channel] = multiprocessing.Process( @@ -142,7 +153,14 @@ class BAPSicleServer: self.logger = LoggingManager("BAPSicleServer") - self.state = StateManager("BAPSicleServer", self.logger, self.default_state) + # Since we're passing the StateManager across processes, it must be made a manager. + # PLEASE NOTE: You can't read attributes directly, use state.get()["var"] and state.update("var", "val") + ProxyManager.register("StateManager", StateManager) + manager = ProxyManager() + manager.start() + self.state: StateManager = manager.StateManager("BAPSicleServer", self.logger, self.default_state) + + self.state.update("running_state", "running") build_commit = "Dev" if isBundelled(): @@ -154,10 +172,10 @@ class BAPSicleServer: self.state.update("server_version", config.VERSION) self.state.update("server_build", build_commit) - channel_count = self.state.state["num_channels"] + channel_count = self.state.get()["num_channels"] self.player = [None] * channel_count - for channel in range(self.state.state["num_channels"]): + for channel in range(self.state.get()["num_channels"]): self.player_to_q.append(multiprocessing.Queue()) self.player_from_q.append(multiprocessing.Queue()) @@ -166,7 +184,7 @@ class BAPSicleServer: self.controller_to_q.append(multiprocessing.Queue()) print("Welcome to BAPSicle Server version: {}, build: {}.".format(config.VERSION, build_commit)) - print("The Server UI is available at http://{}:{}".format(self.state.state["host"], self.state.state["port"])) + print("The Server UI is available at http://{}:{}".format(self.state.get()["host"], self.state.get()["port"])) # TODO Move this to player or installer. if False: diff --git a/web_server.py b/web_server.py index b9b236b..9dfab79 100644 --- a/web_server.py +++ b/web_server.py @@ -56,9 +56,9 @@ def ui_index(request): data = { "ui_page": "index", "ui_title": "", - "server_version": server_state.state["server_version"], - "server_build": server_state.state["server_build"], - "server_name": server_state.state["server_name"], + "server_version": server_state.get()["server_version"], + "server_build": server_state.get()["server_build"], + "server_name": server_state.get()["server_name"], } return render_template("index.html", data=data) @@ -66,7 +66,7 @@ def ui_index(request): @app.route("/status") def ui_status(request): channel_states = [] - for i in range(server_state.state["num_channels"]): + for i in range(server_state.get()["num_channels"]): channel_states.append(status(i)) data = {"channels": channel_states, @@ -77,7 +77,7 @@ def ui_status(request): @app.route("/config/player") def ui_config_player(request): channel_states = [] - for i in range(server_state.state["num_channels"]): + for i in range(server_state.get()["num_channels"]): channel_states.append(status(i)) outputs = DeviceManager.getAudioOutputs() @@ -96,7 +96,7 @@ def ui_config_server(request): data = { "ui_page": "server", "ui_title": "Server Config", - "state": server_state.state, + "state": server_state.get(), "ser_ports": DeviceManager.getSerialPorts(), } return render_template("config_server.html", data=data) @@ -127,7 +127,7 @@ def ui_logs_list(request): "ui_page": "logs", "ui_title": "Logs", "logs": ["BAPSicleServer"] - + ["Player{}".format(x) for x in range(server_state.state["num_channels"])], + + ["Player{}".format(x) for x in range(server_state.get()["num_channels"])], } return render_template("loglist.html", data=data) @@ -281,9 +281,9 @@ async def api_get_playlist(request, type: str, library_id: str): @app.route("/status-json") def json_status(request): channel_states = [] - for i in range(server_state.state["num_channels"]): + for i in range(server_state.get()["num_channels"]): channel_states.append(status(i)) - return resp_json({"server": server_state.state, "channels": channel_states}) + return resp_json({"server": server_state.get(), "channels": channel_states}) # Get audio for UI to generate waveforms. @@ -336,8 +336,14 @@ def status(channel: int): @app.route("/quit") def quit(request): - # stopServer() - return "Shutting down..." + server_state.update("running_state", "quitting") + return text("Server quitting...") + + +@app.route("/restart") +def restart(request): + server_state.update("running_state", "restarting") + return text("Server restarting...") # Don't use reloader, it causes Nested Processes! @@ -365,8 +371,8 @@ def WebServer(player_to: List[Queue], player_from: List[Queue], state: StateMana while not terminate.terminate: try: sync(app.run( - host=server_state.state["host"], - port=server_state.state["port"], + host=server_state.get()["host"], + port=server_state.get()["port"], debug=True, # workers=10, auto_reload=False diff --git a/websocket_server.py b/websocket_server.py index b7e7100..fea6447 100644 --- a/websocket_server.py +++ b/websocket_server.py @@ -31,10 +31,10 @@ class WebsocketServer: self.webstudio_to_q = out_q self.logger = LoggingManager("Websockets") - self.server_name = state.state["server_name"] + self.server_name = state.get()["server_name"] self.websocket_server = websockets.serve( - self.websocket_handler, state.state["host"], state.state["ws_port"] + self.websocket_handler, state.get()["host"], state.get()["ws_port"] ) asyncio.get_event_loop().run_until_complete(self.websocket_server)