Add proper message source routing, fixes a lot of instabity of updating.

This commit is contained in:
Matthew Stratford 2021-03-22 00:33:14 +00:00
parent b15b6a9875
commit 5233aacdf9
6 changed files with 221 additions and 186 deletions

View file

@ -8,7 +8,7 @@ class Controller():
player_from_q: List[Queue]
def __init__(self, player_to_q: List[Queue], player_from_q: List[Queue]):
self.receive()
self.handler()
return
# Registers a function for the controller class to call to tell BAPSicle to do something.
@ -17,7 +17,7 @@ class Controller():
return
# Loop etc in here to process the data from your controller and call the callbacks.
def receive(self):
def handler(self):
return

View file

@ -9,7 +9,7 @@ class MattchBox(Controller):
def __init__(self, player_to_q: List[Queue], player_from_q: List[Queue]):
# connect to serial port
self.ser = serial.serial_for_url("/dev/cu.usbserial-310", do_not_open=True)
self.ser = serial.serial_for_url("/dev/cu.usbserial-210", do_not_open=True)
self.ser.baudrate = 2400
# TOOD: These need to be split in the player handler.
@ -22,22 +22,26 @@ class MattchBox(Controller):
sys.stderr.write('Could not open serial port {}: {}\n'.format(self.ser.name, e))
return
self.receive()
self.handler()
def receive(self):
while self.ser.is_open:
def handler(self):
while True:
try:
if self.ser.is_open:
line = int.from_bytes(self.ser.read(1), "big") # Endianness doesn't matter for 1 byte.
print("Controller got:", line)
if (line == 255):
print("Sending back KeepAlive")
self.ser.write(b'\xff') # Send 255 back.
elif (line in [1,3,5]):
self.player_to_q[int(line / 2)].put("PLAY")
self.sendToPlayer(int(line / 2), "PLAY")
elif (line in [2,4,6]):
self.player_to_q[int(line / 2)-1].put("STOP")
self.sendToPlayer(int(line / 2)-1, "STOP")
except:
continue
def sendToPlayer(self, channel: int, msg:str):
self.player_to_q[channel].put("CONTROLLER:" + msg)

View file

@ -44,11 +44,14 @@ from helpers.state_manager import StateManager
from helpers.logging_manager import LoggingManager
PLAYBACK_END = USEREVENT + 1
# TODO ENUM
VALID_MESSAGE_SOURCES = ["WEBSOCKET", "UI", "CONTROLLER", "ALL"]
class Player():
state = None
running = False
out_q = None
last_msg = ""
last_msg_source = ""
last_time_update = None
logger = None
api = None
@ -431,7 +434,7 @@ class Player():
if stopping:
self.stop()
if self.out_q:
self.out_q.put("STOPPED") # Tell clients that we've stopped playing.
self._retAll("STOPPED") # Tell clients that we've stopped playing.
def _updateState(self, pos: Optional[float] = None):
@ -457,12 +460,18 @@ class Player():
UPDATES_FREQ_SECS = 0.2
if self.last_time_update == None or self.last_time_update + UPDATES_FREQ_SECS < time.time():
self.last_time_update = time.time()
self.out_q.put("POS:" + str(int(self.state.state["pos_true"])))
self._retAll("POS:" + str(int(self.state.state["pos_true"])))
def _retAll(self, msg):
self.out_q.put("ALL:" + msg)
def _retMsg(self, msg: Any, okay_str: Any = False):
response = self.last_msg + ":"
def _retMsg(self, msg: Any, okay_str: bool = False, custom_prefix: Optional[str] = None):
# Make sure to add the message source back, so that it can be sent to the correct destination in the main server.
if custom_prefix:
response = custom_prefix
else:
response = "{}:{}:".format(self.last_msg_source, self.last_msg)
if msg == True:
response += "OKAY"
elif isinstance(msg, str):
@ -478,8 +487,8 @@ class Player():
self.out_q.put(response)
def _send_status(self):
self.last_msg = "STATUS"
self._retMsg(self.status, True)
# TODO This is hacky
self._retMsg(str(self.status),okay_str=True,custom_prefix="ALL:STATUS:")
def __init__(self, channel: int, in_q: multiprocessing.Queue, out_q: multiprocessing.Queue):
@ -534,8 +543,18 @@ class Player():
self._ping_times()
try:
try:
self.last_msg = in_q.get_nowait()
self.logger.log.info("Recieved message: {}".format(self.last_msg))
message = in_q.get_nowait()
source = message.split(":")[0]
if source not in VALID_MESSAGE_SOURCES:
self.last_msg_source = ""
self.last_msg = ""
self.logger.log.warn("Message from unknown sender source: {}".format(source))
continue
self.last_msg_source = source
self.last_msg = message.split(":", 1)[1]
self.logger.log.info("Recieved message from source {}: {}".format(self.last_msg_source, self.last_msg))
except Empty:
# The incomming message queue was empty,
# skip message processing
@ -624,37 +643,5 @@ class Player():
sys.exit(0)
def showOutput(in_q: multiprocessing.Queue, out_q: multiprocessing.Queue):
print("Starting showOutput().")
while True:
time.sleep(0.01)
last_msg = out_q.get()
print(last_msg)
if __name__ == "__main__":
if isMacOS():
multiprocessing.set_start_method("spawn", True)
in_q: multiprocessing.Queue[Any] = multiprocessing.Queue()
out_q: multiprocessing.Queue[Any] = multiprocessing.Queue()
outputProcess = multiprocessing.Process(
target=showOutput,
args=(in_q, out_q),
).start()
playerProcess = multiprocessing.Process(
target=Player,
args=(-1, in_q, out_q),
).start()
# Do some testing
in_q.put("LOADED?")
in_q.put("PLAY")
in_q.put("LOAD:dev/test.mp3")
in_q.put("LOADED?")
in_q.put("PLAY")
print("Entering infinite loop.")
while True:
pass
raise Exception("This BAPSicle Player is a subcomponenet, it will not run individually.")

View file

@ -77,14 +77,21 @@ class BAPSicleServer():
stopServer()
class PlayerHandler():
def __init__(self,channel_from_q, websocket_to_q, ui_to_q):
def __init__(self,channel_from_q, websocket_to_q, ui_to_q, controller_to_q):
while True:
for channel in range(len(channel_from_q)):
try:
message = channel_from_q[channel].get_nowait()
source = message.split(":")[0]
# TODO ENUM
if source in ["ALL","WEBSOCKET"]:
websocket_to_q[channel].put(message)
#print("Player Handler saw:", message.split(":")[0])
if source in ["ALL","UI"]:
if not message.split(":")[1] == "POS":
# We don't care about position update spam
ui_to_q[channel].put(message)
if source in ["ALL","CONTROLLER"]:
controller_to_q[channel].put(message)
except:
pass
time.sleep(0.1)
@ -105,6 +112,7 @@ channel_to_q: List[queue.Queue] = []
channel_from_q: List[queue.Queue] = []
ui_to_q: List[queue.Queue] = []
websocket_to_q: List[queue.Queue] = []
controller_to_q: List[queue.Queue] = []
channel_p = []
stopping = False
@ -185,14 +193,14 @@ def server_config():
@app.route("/restart", methods=["POST"])
def restart_server():
async def restart_server():
state.update("server_name", request.form["name"])
state.update("host", request.form["host"])
state.update("port", int(request.form["port"]))
state.update("num_channels", int(request.form["channels"]))
state.update("ws_port", int(request.form["ws_port"]))
stopServer(restart=True)
startServer()
await startServer()
# Get audio for UI to generate waveforms.
@ -200,7 +208,6 @@ def restart_server():
@app.route("/audiofile/<type>/<int:id>")
#@cross_origin()
def audio_file(type: str, id: int):
print("Hit!")
if type not in ["managed", "track"]:
abort(404)
return send_from_directory('music-tmp', type + "-" + str(id) + ".mp3")
@ -213,7 +220,7 @@ def audio_file(type: str, id: int):
@app.route("/player/<int:channel>/play")
def play(channel: int):
channel_to_q[channel].put("PLAY")
channel_to_q[channel].put("UI:PLAY")
return ui_status()
@ -221,7 +228,7 @@ def play(channel: int):
@app.route("/player/<int:channel>/pause")
def pause(channel: int):
channel_to_q[channel].put("PAUSE")
channel_to_q[channel].put("UI:PAUSE")
return ui_status()
@ -229,7 +236,7 @@ def pause(channel: int):
@app.route("/player/<int:channel>/unpause")
def unPause(channel: int):
channel_to_q[channel].put("UNPAUSE")
channel_to_q[channel].put("UI:UNPAUSE")
return ui_status()
@ -237,7 +244,7 @@ def unPause(channel: int):
@app.route("/player/<int:channel>/stop")
def stop(channel: int):
channel_to_q[channel].put("STOP")
channel_to_q[channel].put("UI:STOP")
return ui_status()
@ -245,32 +252,32 @@ def stop(channel: int):
@app.route("/player/<int:channel>/seek/<float:pos>")
def seek(channel: int, pos: float):
channel_to_q[channel].put("SEEK:" + str(pos))
channel_to_q[channel].put("UI:SEEK:" + str(pos))
return ui_status()
@app.route("/player/<int:channel>/output/<name>")
def output(channel: int, name: Optional[str]):
channel_to_q[channel].put("OUTPUT:" + str(name))
channel_to_q[channel].put("UI:OUTPUT:" + str(name))
return ui_status()
@app.route("/player/<int:channel>/autoadvance/<int:state>")
def autoadvance(channel: int, state: int):
channel_to_q[channel].put("AUTOADVANCE:" + str(state))
channel_to_q[channel].put("UI:AUTOADVANCE:" + str(state))
return ui_status()
@app.route("/player/<int:channel>/repeat/<state>")
def repeat(channel: int, state: str):
channel_to_q[channel].put("REPEAT:" + state.upper())
channel_to_q[channel].put("UI:REPEAT:" + state.upper())
return ui_status()
@app.route("/player/<int:channel>/playonload/<int:state>")
def playonload(channel: int, state: int):
channel_to_q[channel].put("PLAYONLOAD:" + str(state))
channel_to_q[channel].put("UI:PLAYONLOAD:" + str(state))
return ui_status()
# Channel Items
@ -278,14 +285,14 @@ def playonload(channel: int, state: int):
@app.route("/player/<int:channel>/load/<int:channel_weight>")
def load(channel: int, channel_weight: int):
channel_to_q[channel].put("LOAD:" + str(channel_weight))
channel_to_q[channel].put("UI:LOAD:" + str(channel_weight))
return ui_status()
@app.route("/player/<int:channel>/unload")
def unload(channel: int):
channel_to_q[channel].put("UNLOAD")
channel_to_q[channel].put("UI:UNLOAD")
return ui_status()
@ -299,20 +306,14 @@ def add_to_plan(channel: int):
"artist": request.form["artist"],
}
channel_to_q[channel].put("ADD:" + json.dumps(new_item))
channel_to_q[channel].put("UI:ADD:" + json.dumps(new_item))
return new_item
#@app.route("/player/<int:channel>/move/<int:channel_weight>/<int:position>")
#def move_plan(channel: int, channel_weight: int, position: int):
# channel_to_q[channel].put("MOVE:" + json.dumps({"channel_weight": channel_weight, "position": position}))#
# TODO Return
# return True
#@app.route("/player/<int:channel>/remove/<int:channel_weight>")
def remove_plan(channel: int, channel_weight: int):
channel_to_q[channel].put("REMOVE:" + str(channel_weight))
channel_to_q[channel].put("UI:REMOVE:" + str(channel_weight))
# TODO Return
return True
@ -320,7 +321,7 @@ def remove_plan(channel: int, channel_weight: int):
#@app.route("/player/<int:channel>/clear")
def clear_channel_plan(channel: int):
channel_to_q[channel].put("CLEAR")
channel_to_q[channel].put("UI:CLEAR")
# TODO Return
return True
@ -379,11 +380,7 @@ def search_library(type: str):
try:
response = api_from_q.get_nowait()
if response.startswith("SEARCH_TRACK:"):
response = response[response.index(":")+1:]
#try:
# response = json.loads(response)
#except Exception as e:
# raise e
response = response.split(":", 1)[1]
return response
except queue.Empty:
@ -395,7 +392,7 @@ def search_library(type: str):
def load_showplan(timeslotid: int):
for channel in channel_to_q:
channel.put("GET_PLAN:" + str(timeslotid))
channel.put("UI:GET_PLAN:" + str(timeslotid))
return ui_status()
@ -403,12 +400,14 @@ def status(channel: int):
while (not ui_to_q[channel].empty()):
ui_to_q[channel].get() # Just waste any previous status responses.
channel_to_q[channel].put("STATUS")
while True:
channel_to_q[channel].put("UI:STATUS")
retries = 0
while retries < 40:
try:
response = ui_to_q[channel].get_nowait()
if response.startswith("STATUS:"):
response = response[7:]
if response.startswith("UI:STATUS:"):
response = response.split(":",2)[2]
# TODO: Handle OKAY / FAIL
response = response[response.index(":")+1:]
try:
response = json.loads(response)
@ -419,6 +418,8 @@ def status(channel: int):
except queue.Empty:
pass
retries += 1
time.sleep(0.1)
@ -431,14 +432,14 @@ def quit():
@app.route("/player/all/stop")
def all_stop():
for channel in channel_to_q:
channel.put("STOP")
channel.put("UI:STOP")
return ui_status()
@app.route("/player/all/clear")
def clear_all_channels():
for channel in channel_to_q:
channel.put("CLEAR")
channel.put("UI:CLEAR")
return ui_status()
@ -480,7 +481,8 @@ async def startServer():
channel_to_q.append(multiprocessing.Queue())
channel_from_q.append(multiprocessing.Queue())
ui_to_q.append(multiprocessing.Queue())
websocket_to_q.append(multiprocessing.Manager().Queue())
websocket_to_q.append(multiprocessing.Queue())
controller_to_q.append(multiprocessing.Queue())
channel_p.append(
multiprocessing.Process(
target=player.Player,
@ -496,16 +498,18 @@ async def startServer():
api_handler = multiprocessing.Process(target=APIHandler, args=(api_to_q, api_from_q))
api_handler.start()
player_handler = multiprocessing.Process(target=PlayerHandler, args=(channel_from_q, websocket_to_q, ui_to_q))
player_handler = multiprocessing.Process(target=PlayerHandler, args=(channel_from_q, websocket_to_q, ui_to_q, controller_to_q))
player_handler.start()
websockets_server = multiprocessing.Process(target=WebsocketServer, args=(channel_to_q, channel_from_q, state))
websockets_server = multiprocessing.Process(target=WebsocketServer, args=(channel_to_q, websocket_to_q, state))
websockets_server.start()
controller_handler = multiprocessing.Process(target=MattchBox, args=(channel_to_q, channel_from_q))
controller_handler = multiprocessing.Process(target=MattchBox, args=(channel_to_q, controller_to_q))
controller_handler.start()
# TODO Move this to player or installer.
if False:
if not isMacOS():
# Temporary RIP.
@ -524,6 +528,7 @@ async def startServer():
)
text_to_speach.runAndWait()
new_item: Dict[str,Any] = {
"channel_weight": 0,
"filename": "dev/welcome.mp3",
@ -531,16 +536,13 @@ async def startServer():
"artist": "University Radio York",
}
#channel_to_q[0].put("ADD:" + json.dumps(new_item))
# channel_to_q[0].put("LOAD:0")
# channel_to_q[0].put("PLAY")
channel_to_q[0].put("ADD:" + json.dumps(new_item))
channel_to_q[0].put("LOAD:0")
channel_to_q[0].put("PLAY")
# Don't use reloader, it causes Nested Processes!
app.run(host=state.state["host"], port=state.state["port"], debug=True, use_reloader=False)
async def player_message_handler():
print("Handling")
pass
def stopServer(restart=False):
global channel_p

View file

@ -8,6 +8,7 @@
{% for player in data.channels %}
<div class="col-4">
{% if player %}
<h3 class="h5">Player {{player.channel}}</h3>
<a href="/player/{{player.channel}}/play">Play</a>
{% if player.paused %}
@ -36,6 +37,9 @@
{% endfor %}
<br>
{% else %}
<p>RIP. Failed to get this.</p>
{% endif %}
</div>
{% endfor %}
</div>

View file

@ -17,7 +17,7 @@ async def websocket_handler(websocket, path):
await websocket.send(json.dumps({"message": "Hello", "serverName": server_name}))
print("New Client: {}".format(websocket))
for channel in channel_to_q:
channel.put("STATUS")
channel.put("WEBSOCKET:STATUS")
async def handle_from_webstudio():
try:
@ -46,59 +46,97 @@ async def websocket_handler(websocket, path):
baps_clients.remove(websocket)
def sendCommand(channel, data):
if channel not in range(len(channel_to_q)):
print("ERROR: Received channel number larger than server supported channels.")
return
if "command" in data.keys():
if data["command"] == "PLAY":
channel_to_q[channel].put("PLAY")
elif data["command"] == "PAUSE":
channel_to_q[channel].put("PAUSE")
elif data["command"] == "UNPAUSE":
channel_to_q[channel].put("UNPAUSE")
elif data["command"] == "STOP":
channel_to_q[channel].put("STOP")
elif data["command"] == "SEEK":
channel_to_q[channel].put("SEEK:" + str(data["time"]))
elif data["command"] == "LOAD":
channel_to_q[channel].put("LOAD:" + str(data["weight"]))
command = data["command"]
elif data["command"] == "AUTOADVANCE":
channel_to_q[channel].put("AUTOADVANCE:" + str(data["enabled"]))
# Handle the general case
# Message format:
## SOURCE:COMMAND:EXTRADATA
elif data["command"] == "PLAYONLOAD":
channel_to_q[channel].put("PLAYONLOAD:" + str(data["enabled"]))
message = "WEBSOCKET:" + command
elif data["command"] == "REPEAT":
channel_to_q[channel].put("REPEAT:" + str(data["mode"]).lower())
# If we just want PLAY, PAUSE etc, we're all done.
# Else, let's pipe in some extra info.
extra = ""
elif data["command"] == "MOVE":
# Should we trust the client with the item info?
try:
if command == "SEEK":
extra += str(data["time"])
elif command == "LOAD":
extra += str(data["weight"])
elif command == "AUTOADVANCE":
extra += str(data["enabled"])
elif command == "PLAYONLOAD":
extra += str(data["enabled"])
elif command == "REPEAT":
extra += str(data["mode"]).lower()
elif command == "ADD":
extra += json.dumps(data["newItem"])
elif command == "REMOVE":
extra += str(data["weight"])
elif command == "GET_PLAN":
extra += str(data["timeslotId"])
# SPECIAL CASE ALERT! We need to talk to two channels here.
elif command == "MOVE":
# TODO Should we trust the client with the item info?
# Tell the old channel to remove "weight"
extra += str(data["weight"])
# Now modify the item with the weight in the new channel
new_channel = int(data["new_channel"])
channel_to_q[channel].put("REMOVE:" + str(data["weight"]))
item = data["item"]
item["weight"] = int(data["new_weight"])
# Now send the special case.
channel_to_q[new_channel].put("ADD:" + json.dumps(item))
elif data["command"] == "ADD":
channel_to_q[channel].put("ADD:" + json.dumps(data["newItem"]))
elif data["command"] == "REMOVE":
channel_to_q[channel].put("REMOVE:" + str(data["weight"]))
elif data["command"] == "GET_PLAN":
channel_to_q[channel].put("GET_PLAN:"+ str(data["timeslotId"]))
except ValueError as e:
print("ERROR decoding extra data {} for command {} ".format(e, command))
pass
# Stick the message together and send!
if extra != "":
message += ":" + extra
try:
channel_to_q[channel].put(message)
except Exception as e:
print("ERRORL: Failed to send message {} to channel {}: {}".format(message, channel, e))
else:
print("ERROR: Command missing from message.")
async def handle_to_webstudio():
while True:
for channel in range(len(webstudio_to_q)):
try:
message = webstudio_to_q[channel].get_nowait()
command = message.split(":")[0]
source = message.split(":")[0]
# TODO ENUM
if source not in ["WEBSOCKET","ALL"]:
print("ERROR: Message received from invalid source to websocket_handler. Ignored.", source, message)
continue
command = message.split(":")[1]
#print("Websocket Out:", command)
if command == "STATUS":
try:
message = message.split("OKAY:")[1]
message = json.loads(message)
except:
continue
continue # TODO more logging
elif command == "POS":
message = message.split(":")[1]
try:
message = message.split(":", 2)[2]
except:
continue
else:
continue
@ -109,7 +147,7 @@ async def websocket_handler(websocket, path):
})
await asyncio.wait([conn.send(data) for conn in baps_clients])
except queue.Empty:
pass
continue
except Exception as e:
raise e
await asyncio.sleep(0.01)