Switch to a single queue wherever possible

This commit is contained in:
Matthew Stratford 2021-09-24 21:11:39 +01:00
parent c82b19309a
commit bc1bd45cd4
8 changed files with 139 additions and 128 deletions

View file

@ -6,9 +6,9 @@ class Controller:
# Main controller class. All implementations of controller support should inherit this.
callbacks: List[Callable] = []
player_to_q: List[Queue]
player_from_q: List[Queue]
player_from_q: Queue
def __init__(self, player_to_q: List[Queue], player_from_q: List[Queue]):
def __init__(self, player_to_q: List[Queue], player_from_q: Queue):
self.handler()
return

View file

@ -18,7 +18,7 @@ class MattchBox(Controller):
logger: LoggingManager
def __init__(
self, server_to_q: List[Queue], server_from_q: List[Queue], state: StateManager
self, player_to_q: List[Queue], player_from_q: Queue, state: StateManager
):
process_title = "ControllerHandler"
@ -39,8 +39,8 @@ class MattchBox(Controller):
self.next_port = self.server_state.get()["serial_port"]
self.logger.log.info("Server config gives port as: {}".format(self.next_port))
self.server_from_q = server_from_q
self.server_to_q = server_to_q
self.player_from_q = player_from_q
self.player_to_q = player_to_q
self.handler()
@ -59,7 +59,7 @@ class MattchBox(Controller):
def _disconnected(self):
# If we lose the controller, make sure to set channels live, so we tracklist.
for i in range(len(self.server_from_q)):
for i in range(len(self.player_to_q)):
self.sendToPlayer(i, "SETLIVE:True")
self.server_state.update("ser_connected", False)
@ -138,4 +138,4 @@ class MattchBox(Controller):
self.logger.log.info(
"Sending message to player channel {}: {}".format(channel, msg)
)
self.server_to_q[channel].put("CONTROLLER:" + msg)
self.player_to_q[channel].put("CONTROLLER:" + msg)

View file

@ -19,7 +19,7 @@ class FileManager:
logger: LoggingManager
api: MyRadioAPI
def __init__(self, channel_from_q: List[Queue], server_config: StateManager):
def __init__(self, channel_from_q: Queue, server_config: StateManager):
self.logger = LoggingManager("FileManager")
self.api = MyRadioAPI(self.logger, server_config)
@ -29,7 +29,7 @@ class FileManager:
current_process().name = process_title
terminator = Terminator()
self.channel_count = len(channel_from_q)
self.channel_count = server_config.get()["num_channels"]
self.channel_received = None
self.last_known_show_plan = [[]] * self.channel_count
self.next_channel_preload = 0
@ -46,15 +46,25 @@ class FileManager:
):
self.channel_received = [False] * self.channel_count
for channel in range(self.channel_count):
try:
message = channel_from_q[channel].get_nowait()
except Exception:
continue
try:
message = channel_from_q.get_nowait()
except Exception:
# No new messages
# Let's try preload / normalise some files now we're free of messages.
preloaded = self.do_preload()
normalised = self.do_normalise()
if not preloaded and not normalised:
# We didn't do any hard work, let's sleep.
sleep(0.2)
else:
try:
# source = message.split(":")[0]
command = message.split(":", 2)[1]
split = message.split(":", 1)
channel = int(split[0])
# source = split[1]
command = split[2]
# If we have requested a new show plan, empty the music-tmp directory for the previous show.
if command == "GETPLAN":
@ -114,12 +124,12 @@ class FileManager:
] * self.channel_count
# If we receive a new status message, let's check for files which have not been pre-loaded.
if command == "STATUS":
extra = message.split(":", 3)
if extra[2] != "OKAY":
elif command == "STATUS":
extra = message.split(":", 4)
if extra[3] != "OKAY":
continue
status = json.loads(extra[3])
status = json.loads(extra[4])
show_plan = status["show_plan"]
item_ids = []
for item in show_plan:
@ -140,13 +150,7 @@ class FileManager:
)
)
# Let's try preload / normalise some files now we're free of messages.
preloaded = self.do_preload()
normalised = self.do_normalise()
if not preloaded and not normalised:
# We didn't do any hard work, let's sleep.
sleep(0.2)
except Exception as e:
self.logger.log.exception(

View file

@ -923,16 +923,17 @@ class Player:
def _retAll(self, msg):
if self.out_q:
self.out_q.put("ALL:" + msg)
self.out_q.put("{}:ALL:{}".format(self.state.get()["channel"],msg))
def _retMsg(
self, msg: Any, okay_str: bool = False, custom_prefix: Optional[str] = None
):
response = "{}:".format(self.state.get()["channel"])
# Make sure to add the message source back, so that it can be sent to the correct destination in the main server.
if custom_prefix:
response = custom_prefix
response += custom_prefix
else:
response = "{}:{}:".format(self.last_msg_source, self.last_msg)
response += "{}:{}:".format(self.last_msg_source, self.last_msg)
if msg is True:
response += "OKAY"
elif isinstance(msg, str):

View file

@ -22,28 +22,31 @@ class PlayerHandler:
terminator = Terminator()
try:
while not terminator.terminate:
try:
# Format <CHANNEL NUM>:<SOURCE>:<COMMAND>:<EXTRAS>
q_msg = channel_from_q.get_nowait()
if not isinstance(q_msg, str):
continue
split = q_msg.split(":",1)
message = split[1]
source = message.split(":")[0]
command = message.split(":")[1]
for channel in range(len(channel_from_q)):
try:
message = channel_from_q[channel].get_nowait()
source = message.split(":")[0]
command = message.split(":")[1]
# Let the file manager manage the files based on status and loading new show plan triggers.
if command == "GETPLAN" or command == "STATUS":
file_to_q.put(q_msg)
# Let the file manager manage the files based on status and loading new show plan triggers.
if command == "GET_PLAN" or command == "STATUS":
file_to_q[channel].put(message)
# TODO ENUM
if source in ["ALL", "WEBSOCKET"]:
websocket_to_q[channel].put(message)
if source in ["ALL", "UI"]:
if not message.split(":")[1] == "POS":
# We don't care about position update spam
ui_to_q[channel].put(message)
if source in ["ALL", "CONTROLLER"]:
controller_to_q[channel].put(message)
except Exception:
pass
# TODO ENUM
if source in ["ALL", "WEBSOCKET"]:
websocket_to_q.put(q_msg)
if source in ["ALL", "UI"]:
if not message.split(":")[1] == "POS":
# We don't care about position update spam
ui_to_q.put(q_msg)
if source in ["ALL", "CONTROLLER"]:
controller_to_q.put(q_msg)
except Exception:
pass
sleep(0.02)
except Exception as e:

View file

@ -73,13 +73,11 @@ class BAPSicleServer:
}
player_to_q: List[Queue] = []
player_from_q: List[Queue] = []
ui_to_q: List[Queue] = []
websocket_to_q: List[Queue] = []
controller_to_q: List[Queue] = []
file_to_q: List[Queue] = []
api_from_q: Queue
api_to_q: Queue
player_from_q: Queue
ui_to_q: Queue
websocket_to_q: Queue
controller_to_q: Queue
file_to_q: Queue
player: List[multiprocessing.Process] = []
websockets_server: Optional[multiprocessing.Process] = None
@ -97,10 +95,8 @@ class BAPSicleServer:
self.stopServer()
if self.state.get()["running_state"] == "restarting":
continue
break
if self.state.get()["running_state"] != "restarting":
break
def check_processes(self):
@ -128,7 +124,7 @@ class BAPSicleServer:
args=(
channel,
self.player_to_q[channel],
self.player_from_q[channel],
self.player_from_q,
self.state,
),
)
@ -239,11 +235,12 @@ class BAPSicleServer:
for channel in range(self.state.get()["num_channels"]):
self.player_to_q.append(multiprocessing.Queue())
self.player_from_q.append(multiprocessing.Queue())
self.ui_to_q.append(multiprocessing.Queue())
self.websocket_to_q.append(multiprocessing.Queue())
self.controller_to_q.append(multiprocessing.Queue())
self.file_to_q.append(multiprocessing.Queue())
self.player_from_q = multiprocessing.Queue()
self.ui_to_q = multiprocessing.Queue()
self.websocket_to_q = multiprocessing.Queue()
self.controller_to_q = multiprocessing.Queue()
self.file_to_q = multiprocessing.Queue()
print(
"Welcome to BAPSicle Server version: {}, build: {}.".format(
@ -291,7 +288,7 @@ class BAPSicleServer:
print("Stopping BASPicle Server.")
print("Stopping Websocket Server")
self.websocket_to_q[0].put("WEBSOCKET:QUIT")
self.websocket_to_q.put("0:WEBSOCKET:QUIT")
if self.websockets_server:
self.websockets_server.join(timeout=PROCESS_KILL_TIMEOUT_S)
del self.websockets_server

View file

@ -8,9 +8,10 @@ import asyncio
from jinja2 import Environment, FileSystemLoader
from jinja2.utils import select_autoescape
from urllib.parse import unquote
from setproctitle import setproctitle
from typing import Any, Optional, List
from setproctitle import setproctitle
from multiprocessing.queues import Queue
from multiprocessing.process import current_process
from queue import Empty
from time import sleep
import json
@ -99,7 +100,7 @@ server_state: StateManager
api: MyRadioAPI
player_to_q: List[Queue] = []
player_from_q: List[Queue] = []
player_from_q: Queue
# General UI Endpoints
@ -416,15 +417,18 @@ app.static(
def status(channel: int):
while not player_from_q[channel].empty():
while not player_from_q.empty():
# Just waste any previous status responses.
player_from_q[channel].get()
player_from_q.get()
player_to_q[channel].put("UI:STATUS")
retries = 0
while retries < 40:
try:
response = player_from_q[channel].get_nowait()
message = player_from_q.get_nowait()
split = message.split(":",1)
channel = int(split[0])
response = split[1]
if response.startswith("UI:STATUS:"):
response = response.split(":", 2)[2]
# TODO: Handle OKAY / FAIL
@ -477,7 +481,7 @@ def restart(request):
# Don't use reloader, it causes Nested Processes!
def WebServer(player_to: List[Queue], player_from: List[Queue], state: StateManager):
def WebServer(player_to: List[Queue], player_from: Queue, state: StateManager):
global player_to_q, player_from_q, server_state, api, app
player_to_q = player_to
@ -489,6 +493,7 @@ def WebServer(player_to: List[Queue], player_from: List[Queue], state: StateMana
process_title = "Web Server"
setproctitle(process_title)
current_process().name = process_title
CORS(app, supports_credentials=True) # Allow ALL CORS!!!
terminate = Terminator()

View file

@ -19,8 +19,8 @@ class WebsocketServer:
threads = Future
baps_clients = set()
channel_to_q: List[multiprocessing.Queue]
webstudio_to_q: List[multiprocessing.Queue]
player_to_q: List[multiprocessing.Queue]
player_from_q: multiprocessing.Queue
server_name: str
logger: LoggingManager
to_webstudio: Task
@ -29,10 +29,10 @@ class WebsocketServer:
def __init__(self, in_q, out_q, state):
self.channel_to_q = in_q
self.webstudio_to_q = out_q
self.player_to_q = in_q
self.player_from_q = out_q
process_title = "Websockets Servr"
process_title = "Websockets Server"
setproctitle(process_title)
current_process().name = process_title
@ -68,7 +68,7 @@ class WebsocketServer:
json.dumps({"message": "Hello", "serverName": self.server_name})
)
self.logger.log.info("New Client: {}".format(websocket))
for channel in self.channel_to_q:
for channel in self.player_to_q:
channel.put("WEBSOCKET:STATUS")
self.from_webstudio = asyncio.create_task(
@ -85,7 +85,7 @@ class WebsocketServer:
data = json.loads(message)
if "channel" not in data:
# Didn't specify a channel, send to all.
for channel in range(len(self.channel_to_q)):
for channel in range(len(self.player_to_q)):
self.sendCommand(channel, data)
else:
channel = int(data["channel"])
@ -107,7 +107,7 @@ class WebsocketServer:
self.baps_clients.remove(websocket)
def sendCommand(self, channel, data):
if channel not in range(len(self.channel_to_q)):
if channel not in range(len(self.player_to_q)):
self.logger.log.exception(
"Received channel number larger than server supported channels."
)
@ -157,7 +157,7 @@ class WebsocketServer:
elif command == "MOVE":
# remove the exiting item first
self.channel_to_q[channel].put(
self.player_to_q[channel].put(
"{}REMOVE:{}".format(message, data["weight"])
)
@ -169,7 +169,7 @@ class WebsocketServer:
item["weight"] = int(data["new_weight"])
# Now send the special case.
self.channel_to_q[new_channel].put(
self.player_to_q[new_channel].put(
"WEBSOCKET:ADD:" + json.dumps(item)
)
@ -192,7 +192,7 @@ class WebsocketServer:
message += ":" + extra
try:
self.channel_to_q[channel].put(message)
self.player_to_q[channel].put(message)
except Exception as e:
self.logger.log.exception(
"Failed to send message {} to channel {}: {}".format(
@ -208,51 +208,52 @@ class WebsocketServer:
terminator = Terminator()
while not terminator.terminate:
for channel in range(len(self.webstudio_to_q)):
try:
message = self.webstudio_to_q[channel].get_nowait()
source = message.split(":")[0]
# TODO ENUM
if source not in ["WEBSOCKET", "ALL"]:
self.logger.log.error(
"ERROR: Message received from invalid source to websocket_handler. Ignored.",
source,
message,
)
continue
command = message.split(":")[1]
if command == "STATUS":
try:
message = message.split("OKAY:")[1]
message = json.loads(message)
except Exception:
continue # TODO more logging
elif command == "POS":
try:
message = message.split(":", 2)[2]
except Exception:
continue
elif command == "QUIT":
self.quit()
else:
continue
data = json.dumps(
{"command": command, "data": message, "channel": channel}
)
await asyncio.wait([conn.send(data) for conn in self.baps_clients])
except queue.Empty:
continue
except ValueError:
# Typically a "Set of coroutines/Futures is empty." when sending to a dead client.
continue
except Exception as e:
self.logger.log.exception(
"Exception trying to send to websocket:", e
)
await asyncio.sleep(0.02)
try:
message = self.player_from_q.get_nowait()
split = message.split(":")
channel = int(split[0])
source = split[1]
# TODO ENUM
if source not in ["WEBSOCKET", "ALL"]:
self.logger.log.error(
"ERROR: Message received from invalid source to websocket_handler. Ignored.",
source,
message,
)
continue
command = split[2]
if command == "STATUS":
try:
message = message.split("OKAY:")[1]
message = json.loads(message)
except Exception:
continue # TODO more logging
elif command == "POS":
try:
message = split[3]
except Exception:
continue
elif command == "QUIT":
self.quit()
else:
continue
data = json.dumps(
{"command": command, "data": message, "channel": channel}
)
await asyncio.wait([conn.send(data) for conn in self.baps_clients])
except queue.Empty:
continue
except ValueError:
# Typically a "Set of coroutines/Futures is empty." when sending to a dead client.
continue
except Exception as e:
self.logger.log.exception(
"Exception trying to send to websocket:", e
)
self.quit()