Proxy StateManager instead of copy, allow UI restart/quit

This commit is contained in:
Matthew Stratford 2021-04-18 20:27:54 +01:00
parent 9838425f76
commit 6018f5df0e
7 changed files with 98 additions and 71 deletions

View file

@ -37,7 +37,7 @@ class MattchBox(Controller):
# Allow server config changes to trigger controller reload if required.
self.port = None
self.next_port = self.server_state.state["serial_port"]
self.next_port = self.server_state.get()["serial_port"]
self.server_from_q = server_from_q
self.server_to_q = server_to_q
@ -47,7 +47,7 @@ class MattchBox(Controller):
# This doesn't run, the callback function gets lost in StateManager.
def _state_handler(self):
new_port = self.server_state.state["serial_port"]
new_port = self.server_state.get()["serial_port"]
self.logger.log.info("Got server config update. New port: {}".format(new_port))
if new_port != self.port:
self.logger.log.info(

View file

@ -48,12 +48,12 @@ class MyRadioAPI:
async def get_non_api_call(self, url):
url = "{}{}".format(self.config.state["myradio_base_url"], url)
url = "{}{}".format(self.config.get()["myradio_base_url"], url)
if "?" in url:
url += "&api_key={}".format(self.config.state["myradio_api_key"])
url += "&api_key={}".format(self.config.get()["myradio_api_key"])
else:
url += "?api_key={}".format(self.config.state["myradio_api_key"])
url += "?api_key={}".format(self.config.get()["myradio_api_key"])
self._log("Requesting non-API URL: " + url)
request = self.get(url)
@ -63,12 +63,12 @@ class MyRadioAPI:
async def get_apiv2_call(self, url):
url = "{}/v2{}".format(self.config.state["myradio_api_url"], url)
url = "{}/v2{}".format(self.config.get()["myradio_api_url"], url)
if "?" in url:
url += "&api_key={}".format(self.config.state["myradio_api_key"])
url += "&api_key={}".format(self.config.get()["myradio_api_key"])
else:
url += "?api_key={}".format(self.config.state["myradio_api_key"])
url += "?api_key={}".format(self.config.get()["myradio_api_key"])
self._log("Requesting API V2 URL: " + url)
request = self.get(url)

View file

@ -16,7 +16,6 @@ class StateManager:
logger: LoggingManager
callbacks: List[Any] = []
__state = {}
__state_in_file = {}
# Dict of times that params can be updated after, if the time is before current time, it can be written immediately.
__rate_limit_params_until = {}
__rate_limit_period_s = 0
@ -94,6 +93,10 @@ class StateManager:
def state(self):
return copy(self.__state)
# Useful for pipeproxy, since it can't read attributes direct.
def get(self):
return self.state
@state.setter
def state(self, state):
self.__state = copy(state)

View file

@ -96,14 +96,14 @@ class Player:
@property
def isPaused(self) -> bool:
return self.state.state["paused"]
return self.state.get()["paused"]
@property
def isLoaded(self):
return self._isLoaded()
def _isLoaded(self, short_test: bool = False):
if not self.state.state["loaded_item"]:
if not self.state.get()["loaded_item"]:
return False
if self.isPlaying:
return True
@ -116,7 +116,7 @@ class Player:
# We're not playing now, so we can quickly test run
# If that works, we're loaded.
try:
position: float = self.state.state["pos"]
position: float = self.state.get()["pos"]
mixer.music.set_volume(0)
mixer.music.play(0)
except Exception:
@ -140,7 +140,7 @@ class Player:
# Don't mess with playback, we only care about if it's supposed to be loaded.
if not self._isLoaded(short_test=True):
return False
return (self.state.state["pos_true"] == self.state.state["loaded_item"].cue and not self.isPlaying)
return (self.state.get()["pos_true"] == self.state.get()["loaded_item"].cue and not self.isPlaying)
@property
def status(self):
@ -185,7 +185,7 @@ class Player:
def unpause(self):
if not self.isPlaying:
position: float = self.state.state["pos_true"]
position: float = self.state.get()["pos_true"]
try:
self.play(position)
except Exception:
@ -208,14 +208,14 @@ class Player:
self.stopped_manually = True
if not self.state.state["loaded_item"]:
if not self.state.get()["loaded_item"]:
self.logger.log.warning("Tried to stop without a loaded item.")
return True
# This lets users toggle (using the stop button) between cue point and 0.
if user_initiated and not self.isCued:
# if there's a cue point ant we're not at it, go there.
self.seek(self.state.state["loaded_item"].cue)
self.seek(self.state.get()["loaded_item"].cue)
else:
# Otherwise, let's go to 0.
self.state.update("pos", 0)
@ -257,7 +257,7 @@ class Player:
def get_plan(self, message: int):
plan = sync(self.api.get_showplan(message))
self.clear_channel_plan()
channel = self.state.state["channel"]
channel = self.state.get()["channel"]
self.logger.log.info(plan)
if len(plan) > channel:
for plan_item in plan[str(channel)]:
@ -273,7 +273,7 @@ class Player:
def add_to_plan(self, new_item: Dict[str, Any]) -> bool:
new_item_obj = PlanItem(new_item)
plan_copy: List[PlanItem] = copy.copy(self.state.state["show_plan"])
plan_copy: List[PlanItem] = copy.copy(self.state.get()["show_plan"])
# Shift any plan items after the new position down one to make space.
for item in plan_copy:
if item.weight >= new_item_obj.weight:
@ -290,7 +290,7 @@ class Player:
return True
def remove_from_plan(self, weight: int) -> bool:
plan_copy: List[PlanItem] = copy.copy(self.state.state["show_plan"])
plan_copy: List[PlanItem] = copy.copy(self.state.get()["show_plan"])
found = False
for i in plan_copy:
if i.weight == weight:
@ -313,7 +313,7 @@ class Player:
if not self.isPlaying:
self.unload()
showplan = self.state.state["show_plan"]
showplan = self.state.get()["show_plan"]
loaded_item: Optional[PlanItem] = None
@ -380,7 +380,7 @@ class Player:
if loaded_item.cue > 0:
self.seek(loaded_item.cue)
if self.state.state["play_on_load"]:
if self.state.get()["play_on_load"]:
self.play()
return True
@ -405,7 +405,7 @@ class Player:
self.logger.log.exception("Failed to quit mixer.")
def output(self, name: Optional[str] = None):
wasPlaying = self.state.state["playing"]
wasPlaying = self.state.get()["playing"]
name = None if (not name or name.lower() == "none") else name
@ -422,7 +422,7 @@ class Player:
)
return False
loadedItem = self.state.state["loaded_item"]
loadedItem = self.state.get()["loaded_item"]
if loadedItem:
self.load(loadedItem.weight)
if wasPlaying:
@ -444,10 +444,10 @@ class Player:
set_loaded = True
if not self.isLoaded:
return False
timeslotitemid = self.state.state["loaded_item"].timeslotitemid
timeslotitemid = self.state.get()["loaded_item"].timeslotitemid
plan_copy: List[PlanItem] = copy.copy(self.state.state["show_plan"])
for i in range(len(self.state.state["show_plan"])):
plan_copy: List[PlanItem] = copy.copy(self.state.get()["show_plan"])
for i in range(len(self.state.get()["show_plan"])):
item = plan_copy[i]
@ -463,7 +463,7 @@ class Player:
if set_loaded:
try:
self.state.update("loaded_item", self.state.state["loaded_item"].set_marker(marker))
self.state.update("loaded_item", self.state.get()["loaded_item"].set_marker(marker))
except Exception as e:
self.logger.log.error(
"Failed to set marker on loaded_item {}: {} with marker \n{}".format(timeslotitemid, e, marker))
@ -474,7 +474,7 @@ class Player:
# Helper functions
def _ended(self):
loaded_item = self.state.state["loaded_item"]
loaded_item = self.state.get()["loaded_item"]
if not loaded_item:
return
@ -484,29 +484,29 @@ class Player:
# Repeat 1
# TODO ENUM
if self.state.state["repeat"] == "one":
if self.state.get()["repeat"] == "one":
self.play()
return
loaded_new_item = False
# Auto Advance
if self.state.state["auto_advance"]:
for i in range(len(self.state.state["show_plan"])):
if self.state.state["show_plan"][i].weight == loaded_item.weight:
if len(self.state.state["show_plan"]) > i + 1:
self.load(self.state.state["show_plan"][i + 1].weight)
if self.state.get()["auto_advance"]:
for i in range(len(self.state.get()["show_plan"])):
if self.state.get()["show_plan"][i].weight == loaded_item.weight:
if len(self.state.get()["show_plan"]) > i + 1:
self.load(self.state.get()["show_plan"][i + 1].weight)
loaded_new_item = True
break
# Repeat All
# TODO ENUM
elif self.state.state["repeat"] == "all":
self.load(self.state.state["show_plan"][0].weight)
elif self.state.get()["repeat"] == "all":
self.load(self.state.get()["show_plan"][0].weight)
loaded_new_item = True
break
# Play on Load
if self.state.state["play_on_load"] and loaded_new_item:
if self.state.get()["play_on_load"] and loaded_new_item:
self.play()
return
@ -530,7 +530,7 @@ class Player:
self.state.update("pos_offset", 0)
if (
self.state.state["playing"]
self.state.get()["playing"]
and not self.isPlaying
and not self.stopped_manually
):
@ -542,15 +542,15 @@ class Player:
self.state.update(
"pos_true",
min(
self.state.state["length"],
self.state.state["pos"] + self.state.state["pos_offset"],
self.state.get()["length"],
self.state.get()["pos"] + self.state.get()["pos_offset"],
),
)
self.state.update(
"remaining",
max(0, (self.state.state["length"] -
self.state.state["pos_true"])),
max(0, (self.state.get()["length"] -
self.state.get()["pos_true"])),
)
def _ping_times(self):
@ -561,7 +561,7 @@ class Player:
or self.last_time_update + UPDATES_FREQ_SECS < time.time()
):
self.last_time_update = time.time()
self._retAll("POS:" + str(self.state.state["pos_true"]))
self._retAll("POS:" + str(self.state.get()["pos_true"]))
def _retAll(self, msg):
self.out_q.put("ALL:" + msg)

View file

@ -14,11 +14,11 @@
"""
import multiprocessing
from multiprocessing.queues import Queue
import multiprocessing.managers as m
import time
from typing import Any, Optional
import json
from setproctitle import setproctitle
from helpers.os_environment import isBundelled, isMacOS
if not isMacOS():
@ -41,6 +41,12 @@ import player
setproctitle("server.py")
""" Proxy Manager to proxy Class Objects into multiprocessing processes, instead of making a copy. """
class ProxyManager(m.BaseManager):
pass # Pass is really enough. Nothing needs to be done here.
class BAPSicleServer:
@ -56,7 +62,8 @@ class BAPSicleServer:
"ser_connected": False,
"myradio_api_key": None,
"myradio_base_url": "https://ury.org.uk/myradio",
"myradio_api_url": "https://ury.org.uk/api"
"myradio_api_url": "https://ury.org.uk/api",
"running_state": "running"
}
player_to_q: List[Queue] = []
@ -75,22 +82,26 @@ class BAPSicleServer:
def __init__(self):
self.startServer()
while True:
self.startServer()
self.check_processes()
self.check_processes()
self.stopServer()
self.stopServer()
if self.state.get()["running_state"] == "restarting":
continue
break
def check_processes(self):
terminator = Terminator()
log_function = self.logger.log.info
while not terminator.terminate:
# Note, state here will become a copy in the process.
# callbacks will not passthrough :/
while not terminator.terminate and self.state.get()["running_state"] == "running":
for channel in range(self.state.state["num_channels"]):
for channel in range(self.state.get()["num_channels"]):
if not self.player[channel] or not self.player[channel].is_alive():
log_function("Player {} not running, (re)starting.".format(channel))
self.player[channel] = multiprocessing.Process(
@ -142,7 +153,14 @@ class BAPSicleServer:
self.logger = LoggingManager("BAPSicleServer")
self.state = StateManager("BAPSicleServer", self.logger, self.default_state)
# Since we're passing the StateManager across processes, it must be made a manager.
# PLEASE NOTE: You can't read attributes directly, use state.get()["var"] and state.update("var", "val")
ProxyManager.register("StateManager", StateManager)
manager = ProxyManager()
manager.start()
self.state: StateManager = manager.StateManager("BAPSicleServer", self.logger, self.default_state)
self.state.update("running_state", "running")
build_commit = "Dev"
if isBundelled():
@ -154,10 +172,10 @@ class BAPSicleServer:
self.state.update("server_version", config.VERSION)
self.state.update("server_build", build_commit)
channel_count = self.state.state["num_channels"]
channel_count = self.state.get()["num_channels"]
self.player = [None] * channel_count
for channel in range(self.state.state["num_channels"]):
for channel in range(self.state.get()["num_channels"]):
self.player_to_q.append(multiprocessing.Queue())
self.player_from_q.append(multiprocessing.Queue())
@ -166,7 +184,7 @@ class BAPSicleServer:
self.controller_to_q.append(multiprocessing.Queue())
print("Welcome to BAPSicle Server version: {}, build: {}.".format(config.VERSION, build_commit))
print("The Server UI is available at http://{}:{}".format(self.state.state["host"], self.state.state["port"]))
print("The Server UI is available at http://{}:{}".format(self.state.get()["host"], self.state.get()["port"]))
# TODO Move this to player or installer.
if False:

View file

@ -56,9 +56,9 @@ def ui_index(request):
data = {
"ui_page": "index",
"ui_title": "",
"server_version": server_state.state["server_version"],
"server_build": server_state.state["server_build"],
"server_name": server_state.state["server_name"],
"server_version": server_state.get()["server_version"],
"server_build": server_state.get()["server_build"],
"server_name": server_state.get()["server_name"],
}
return render_template("index.html", data=data)
@ -66,7 +66,7 @@ def ui_index(request):
@app.route("/status")
def ui_status(request):
channel_states = []
for i in range(server_state.state["num_channels"]):
for i in range(server_state.get()["num_channels"]):
channel_states.append(status(i))
data = {"channels": channel_states,
@ -77,7 +77,7 @@ def ui_status(request):
@app.route("/config/player")
def ui_config_player(request):
channel_states = []
for i in range(server_state.state["num_channels"]):
for i in range(server_state.get()["num_channels"]):
channel_states.append(status(i))
outputs = DeviceManager.getAudioOutputs()
@ -96,7 +96,7 @@ def ui_config_server(request):
data = {
"ui_page": "server",
"ui_title": "Server Config",
"state": server_state.state,
"state": server_state.get(),
"ser_ports": DeviceManager.getSerialPorts(),
}
return render_template("config_server.html", data=data)
@ -127,7 +127,7 @@ def ui_logs_list(request):
"ui_page": "logs",
"ui_title": "Logs",
"logs": ["BAPSicleServer"]
+ ["Player{}".format(x) for x in range(server_state.state["num_channels"])],
+ ["Player{}".format(x) for x in range(server_state.get()["num_channels"])],
}
return render_template("loglist.html", data=data)
@ -281,9 +281,9 @@ async def api_get_playlist(request, type: str, library_id: str):
@app.route("/status-json")
def json_status(request):
channel_states = []
for i in range(server_state.state["num_channels"]):
for i in range(server_state.get()["num_channels"]):
channel_states.append(status(i))
return resp_json({"server": server_state.state, "channels": channel_states})
return resp_json({"server": server_state.get(), "channels": channel_states})
# Get audio for UI to generate waveforms.
@ -336,8 +336,14 @@ def status(channel: int):
@app.route("/quit")
def quit(request):
# stopServer()
return "Shutting down..."
server_state.update("running_state", "quitting")
return text("Server quitting...")
@app.route("/restart")
def restart(request):
server_state.update("running_state", "restarting")
return text("Server restarting...")
# Don't use reloader, it causes Nested Processes!
@ -365,8 +371,8 @@ def WebServer(player_to: List[Queue], player_from: List[Queue], state: StateMana
while not terminate.terminate:
try:
sync(app.run(
host=server_state.state["host"],
port=server_state.state["port"],
host=server_state.get()["host"],
port=server_state.get()["port"],
debug=True,
# workers=10,
auto_reload=False

View file

@ -31,10 +31,10 @@ class WebsocketServer:
self.webstudio_to_q = out_q
self.logger = LoggingManager("Websockets")
self.server_name = state.state["server_name"]
self.server_name = state.get()["server_name"]
self.websocket_server = websockets.serve(
self.websocket_handler, state.state["host"], state.state["ws_port"]
self.websocket_handler, state.get()["host"], state.get()["ws_port"]
)
asyncio.get_event_loop().run_until_complete(self.websocket_server)