most of the pipeline to send player msgs to webstudio

This commit is contained in:
Matthew Stratford 2020-12-20 01:10:19 +00:00
parent 801d2ae40b
commit 479c952766
No known key found for this signature in database
GPG key ID: 9E53C8B3F0B57395
6 changed files with 175 additions and 80 deletions

View file

@ -44,6 +44,7 @@ class MyRadioAPI():
request = requests.get(url, timeout=10) request = requests.get(url, timeout=10)
if request.status_code != 200: if request.status_code != 200:
# TODO: Log something here
return None return None
filename: str = resolve_external_file_path("/music-tmp/{}-{}.{}".format(itemType, id, format)) filename: str = resolve_external_file_path("/music-tmp/{}-{}.{}".format(itemType, id, format))

21
plan.py
View file

@ -17,7 +17,7 @@ import os
class PlanItem: class PlanItem:
_timeslotItemId: int = 0 _timeslotItemId: int = 0
_channel_weight: int = 0 _channelWeight: int = 0
_filename: str = "" _filename: str = ""
_title: str = "" _title: str = ""
_artist: str = "" _artist: str = ""
@ -25,9 +25,10 @@ class PlanItem:
_managedId: Optional[int] = None _managedId: Optional[int] = None
@property @property
def channel_weight(self) -> int: def channelWeight(self) -> int:
return self._channel_weight return self._channelWeight
@property
def timeslotItemId(self) -> int: def timeslotItemId(self) -> int:
return self._timeslotItemId return self._timeslotItemId
@ -51,10 +52,18 @@ class PlanItem:
def managedId(self) -> Optional[int]: def managedId(self) -> Optional[int]:
return self._managedId return self._managedId
@property
def title(self) -> Optional[str]:
return self._title
@property
def artist(self) -> Optional[str]:
return self._artist
@property @property
def __dict__(self): def __dict__(self):
return { return {
"channel_weight": self.channel_weight, "channelWeight": self.channelWeight,
"timeslotItemId": self.timeslotItemId, "timeslotItemId": self.timeslotItemId,
"trackId": self._trackId, "trackId": self._trackId,
"managedId": self._managedId, "managedId": self._managedId,
@ -69,7 +78,7 @@ class PlanItem:
self._trackId = new_item["trackId"] if "trackId" in new_item else None self._trackId = new_item["trackId"] if "trackId" in new_item else None
self._managedId = new_item["managedId"] if "managedId" in new_item else None self._managedId = new_item["managedId"] if "managedId" in new_item else None
self._filename = new_item["filename"] # This could be a temp dir for API-downloaded items, or a mapped drive. self._filename = new_item["filename"] # This could be a temp dir for API-downloaded items, or a mapped drive.
self._channel_weight = new_item["channel_weight"] self._channelWeight = new_item["channelWeight"]
self._title = new_item["title"] self._title = new_item["title"]
self._artist = new_item["artist"] self._artist = new_item["artist"]

View file

@ -179,6 +179,7 @@ class Player():
self.state.update("pos_offset", 0) self.state.update("pos_offset", 0)
self.state.update("pos_true", 0) self.state.update("pos_true", 0)
self.state.update("paused", False) self.state.update("paused", False)
return True return True
# return False # return False
@ -228,10 +229,10 @@ class Player():
self.state.update("show_plan", self.state.state["show_plan"] + [PlanItem(new_item)]) self.state.update("show_plan", self.state.state["show_plan"] + [PlanItem(new_item)])
return True return True
def remove_from_plan(self, timeslotItemId: int) -> bool: def remove_from_plan(self, channel_weight: int) -> bool:
plan_copy: List[PlanItem] = copy.copy(self.state.state["show_plan"]) plan_copy: List[PlanItem] = copy.copy(self.state.state["show_plan"])
for i in plan_copy: for i in plan_copy:
if i.timeslotItemId == timeslotItemId: if i.channelWeight == channel_weight:
plan_copy.remove(i) plan_copy.remove(i)
self.state.update("show_plan", plan_copy) self.state.update("show_plan", plan_copy)
return True return True
@ -241,7 +242,7 @@ class Player():
self.state.update("show_plan", []) self.state.update("show_plan", [])
return True return True
def load(self, timeslotItemId: int): def load(self, channelWeight: int):
if not self.isPlaying: if not self.isPlaying:
self.unload() self.unload()
@ -250,12 +251,12 @@ class Player():
loaded_item: Optional[PlanItem] = None loaded_item: Optional[PlanItem] = None
for i in range(len(showplan)): for i in range(len(showplan)):
if showplan[i].timeslotItemId == timeslotItemId: if showplan[i].channelWeight == channelWeight:
loaded_item = showplan[i] loaded_item = showplan[i]
break break
if loaded_item == None: if loaded_item == None:
self.logger.log.error("Failed to find timeslotItemId: {}".format(timeslotItemId)) self.logger.log.error("Failed to find channelWeight: {}".format(channelWeight))
return False return False
if (loaded_item.filename == "" or loaded_item.filename == None): if (loaded_item.filename == "" or loaded_item.filename == None):
@ -267,7 +268,7 @@ class Player():
self.state.update("loaded_item", loaded_item) self.state.update("loaded_item", loaded_item)
for i in range(len(showplan)): for i in range(len(showplan)):
if showplan[i].timeslotItemId == timeslotItemId: if showplan[i].channelWeight == channelWeight:
self.state.update("show_plan", index=i, value=loaded_item) self.state.update("show_plan", index=i, value=loaded_item)
break break
# TODO: Update the show plan filenames # TODO: Update the show plan filenames
@ -326,7 +327,7 @@ class Player():
loadedItem = self.state.state["loaded_item"] loadedItem = self.state.state["loaded_item"]
if (loadedItem): if (loadedItem):
self.load(loadedItem.timeslotItemId) self.load(loadedItem.channelWeight)
if wasPlaying: if wasPlaying:
self.unpause() self.unpause()
@ -362,14 +363,14 @@ class Player():
# Auto Advance # Auto Advance
elif self.state.state["auto_advance"]: elif self.state.state["auto_advance"]:
for i in range(len(self.state.state["show_plan"])): for i in range(len(self.state.state["show_plan"])):
if self.state.state["show_plan"][i].timeslotItemId == loaded_item.timeslotItemId: if self.state.state["show_plan"][i].channelWeight == loaded_item.channelWeight:
if len(self.state.state["show_plan"]) > i+1: if len(self.state.state["show_plan"]) > i+1:
self.load(self.state.state["show_plan"][i+1].timeslotItemId) self.load(self.state.state["show_plan"][i+1].channelWeight)
break break
# Repeat All # Repeat All
elif self.state.state["repeat"] == "ALL": elif self.state.state["repeat"] == "ALL":
self.load(self.state.state["show_plan"][0].timeslotItemId) self.load(self.state.state["show_plan"][0].channelWeight)
# Play on Load # Play on Load
if self.state.state["play_on_load"]: if self.state.state["play_on_load"]:
@ -417,7 +418,7 @@ class Player():
loaded_item = loaded_state["loaded_item"] loaded_item = loaded_state["loaded_item"]
if loaded_item: if loaded_item:
self.logger.log.info("Loading filename: " + str(loaded_item.filename)) self.logger.log.info("Loading filename: " + str(loaded_item.filename))
self.load(loaded_item.timeslotItemId) self.load(loaded_item.channelWeight)
if loaded_state["pos_true"] != 0: if loaded_state["pos_true"] != 0:
self.logger.log.info("Seeking to pos_true: " + str(loaded_state["pos_true"])) self.logger.log.info("Seeking to pos_true: " + str(loaded_state["pos_true"]))

View file

@ -12,7 +12,9 @@
Date: Date:
October, November 2020 October, November 2020
""" """
import asyncio
import multiprocessing import multiprocessing
import time
import player import player
from flask import Flask, render_template, send_from_directory, request, jsonify from flask import Flask, render_template, send_from_directory, request, jsonify
from typing import Any, Optional from typing import Any, Optional
@ -52,11 +54,23 @@ class BAPSicleServer():
setproctitle.setproctitle(process_title) setproctitle.setproctitle(process_title)
multiprocessing.current_process().name = process_title multiprocessing.current_process().name = process_title
startServer() asyncio.get_event_loop().run_until_complete(startServer())
asyncio.get_event_loop().run_forever()
def __del__(self): def __del__(self):
stopServer() stopServer()
class PlayerHandler():
def __init__(self,channel_from_q, websocket_to_q, ui_to_q):
while True:
for channel in range(len(channel_from_q)):
try:
message = channel_from_q[channel].get_nowait()
websocket_to_q[channel].put(message)
ui_to_q[channel].put(message)
except:
pass
time.sleep(0.01)
logger = LoggingManager("BAPSicleServer") logger = LoggingManager("BAPSicleServer")
@ -71,6 +85,8 @@ app.logger.disabled = True
channel_to_q = [] channel_to_q = []
channel_from_q = [] channel_from_q = []
ui_to_q = []
websocket_to_q = []
channel_p = [] channel_p = []
stopping = False stopping = False
@ -245,7 +261,7 @@ def unload(channel: int):
@app.route("/player/<int:channel>/add", methods=["POST"]) @app.route("/player/<int:channel>/add", methods=["POST"])
def add_to_plan(channel: int): def add_to_plan(channel: int):
new_item: Dict[str, any] = { new_item: Dict[str, Any] = {
"channel_weight": int(request.form["channel_weight"]), "channel_weight": int(request.form["channel_weight"]),
"filename": request.form["filename"], "filename": request.form["filename"],
"title": request.form["title"], "title": request.form["title"],
@ -256,10 +272,6 @@ def add_to_plan(channel: int):
return new_item return new_item
@app.route("/player/<int:channel>/move/<int:timeslotItemId>/<float:position>")
def move_plan(channel: int, timeslotItemId: int, position: float):
channel_to_q[channel].put("MOVE:" + json.dumps({"timeslotItemId": timeslotItemId, "position": position}))
@app.route("/player/<int:channel>/move/<int:channel_weight>/<int:position>") @app.route("/player/<int:channel>/move/<int:channel_weight>/<int:position>")
def move_plan(channel: int, channel_weight: int, position: int): def move_plan(channel: int, channel_weight: int, position: int):
channel_to_q[channel].put("MOVE:" + json.dumps({"channel_weight": channel_weight, "position": position})) channel_to_q[channel].put("MOVE:" + json.dumps({"channel_weight": channel_weight, "position": position}))
@ -267,10 +279,6 @@ def move_plan(channel: int, channel_weight: int, position: int):
# TODO Return # TODO Return
return True return True
@app.route("/player/<int:channel>/remove/<int:timeslotItemId>")
def remove_plan(channel: int, timeslotItemId: int):
channel_to_q[channel].put("REMOVE:" + str(timeslotItemId))
@app.route("/player/<int:channel>/remove/<int:channel_weight>") @app.route("/player/<int:channel>/remove/<int:channel_weight>")
def remove_plan(channel: int, channel_weight: int): def remove_plan(channel: int, channel_weight: int):
channel_to_q[channel].put("REMOVE:" + channel_weight) channel_to_q[channel].put("REMOVE:" + channel_weight)
@ -299,8 +307,9 @@ def channel_json(channel: int):
def status(channel: int): def status(channel: int):
channel_to_q[channel].put("STATUS") channel_to_q[channel].put("STATUS")
while True: while True:
response = channel_from_q[channel].get() response = ui_to_q[channel].get()
if response.startswith("STATUS:"): if response.startswith("STATUS:"):
print("Got my status message")
response = response[7:] response = response[7:]
response = response[response.index(":")+1:] response = response[response.index(":")+1:]
try: try:
@ -309,6 +318,7 @@ def status(channel: int):
pass pass
return response return response
time.sleep(0.1)
@app.route("/quit") @app.route("/quit")
@ -358,13 +368,15 @@ def send_logs(path):
return render_template('log.html', data=data) return render_template('log.html', data=data)
def startServer(): async def startServer():
if isMacOS(): if isMacOS():
multiprocessing.set_start_method("spawn", True) multiprocessing.set_start_method("spawn", True)
for channel in range(state.state["num_channels"]): for channel in range(state.state["num_channels"]):
channel_to_q.append(multiprocessing.Queue()) channel_to_q.append(multiprocessing.Queue())
channel_from_q.append(multiprocessing.Queue()) channel_from_q.append(multiprocessing.Queue())
ui_to_q.append(multiprocessing.Queue())
websocket_to_q.append(multiprocessing.Queue())
channel_p.append( channel_p.append(
multiprocessing.Process( multiprocessing.Process(
target=player.Player, target=player.Player,
@ -374,7 +386,13 @@ def startServer():
) )
channel_p[channel].start() channel_p[channel].start()
websockets_server = multiprocessing.Process(target=WebsocketServer, args=[channel_to_q, state])
player_handler = multiprocessing.Process(target=PlayerHandler, args=[channel_from_q, websocket_to_q, ui_to_q])
player_handler.start()
websockets_server = multiprocessing.Process(target=WebsocketServer, args=[channel_to_q, channel_from_q, state])
websockets_server.start() websockets_server.start()
if not isMacOS(): if not isMacOS():
@ -402,13 +420,16 @@ def startServer():
"artist": "University Radio York", "artist": "University Radio York",
} }
channel_to_q[0].put("ADD:" + json.dumps(new_item)) #channel_to_q[0].put("ADD:" + json.dumps(new_item))
# channel_to_q[0].put("LOAD:0") # channel_to_q[0].put("LOAD:0")
# channel_to_q[0].put("PLAY") # channel_to_q[0].put("PLAY")
# Don't use reloader, it causes Nested Processes! # Don't use reloader, it causes Nested Processes!
app.run(host=state.state["host"], port=state.state["port"], debug=True, use_reloader=False) app.run(host=state.state["host"], port=state.state["port"], debug=True, use_reloader=False)
async def player_message_handler():
print("Handling")
pass
def stopServer(restart=False): def stopServer(restart=False):
global channel_p global channel_p

View file

@ -4,18 +4,40 @@
{% endblock %} {% endblock %}
{% block content_inner %} {% block content_inner %}
{% if data %} {% if data %}
<code> <div class="row">
{% for player in data.channels %} {% for player in data.channels %}
<a href="/player/{{player.channel}}/play">Play</a> <div class="col-4">
{% if player.paused %} <h3 class="h5">Player {{player.channel}}</h3>
<a href="/player/{{player.channel}}/unpause">UnPause</a> <a href="/player/{{player.channel}}/play">Play</a>
{% else %} {% if player.paused %}
<a href="/player/{{player.channel}}/pause">Pause</a> <a href="/player/{{player.channel}}/unpause">UnPause</a>
{% endif %} {% else %}
<a href="/player/{{player.channel}}/stop">Stop</a> <a href="/player/{{player.channel}}/pause">Pause</a>
<a href="/player/{{player.channel}}/seek/50.0">Seek 50</a> {% endif %}
{{player}}<br> <a href="/player/{{player.channel}}/stop">Stop</a>
<a href="/player/{{player.channel}}/seek/50.0">Seek 50</a>
<br>
<label for="file">{{ player.pos_true | int }} / {{ player.length | int }}s ({{ player.remaining | int }}s)</label>
<br>
<progress id="file" value="{{player.pos_true / player.length * 100}}" max="100" style="width: 100%"></progress>
<hr />
<h4 class="h6">Loaded Item</h3>
{{ player.loaded_item }}
<hr />
<h4 class="h6">Plan Items</h3>
{% for planitem in player.show_plan %}
<small>
{{ planitem }}
</small>
<hr />
{% endfor %}
<br>
</div>
{% endfor %} {% endfor %}
</code> </div>
{% endif %} {% endif %}
{% endblock %} {% endblock %}

View file

@ -1,63 +1,104 @@
import asyncio import asyncio
import multiprocessing
from typing import List
import websockets import websockets
import json import json
baps_clients = set() baps_clients = set()
channel_to_q = None channel_to_q = None
channel_from_q: List[multiprocessing.Queue]
server_name = None server_name = None
async def websocket_handler(websocket, path): async def websocket_handler(websocket, path):
baps_clients.add(websocket) baps_clients.add(websocket)
await websocket.send(json.dumps({"message": "Hello", "serverName": server_name})) await websocket.send(json.dumps({"message": "Hello", "serverName": server_name}))
print("New Client: {}".format(websocket)) print("New Client: {}".format(websocket))
async def handle_from_webstudio():
try:
async for message in websocket:
data = json.loads(message)
channel = int(data["channel"])
if "command" in data.keys():
if data["command"] == "PLAY":
channel_to_q[channel].put("PLAY")
elif data["command"] == "PAUSE":
channel_to_q[channel].put("PAUSE")
elif data["command"] == "UNPAUSE":
channel_to_q[channel].put("UNPAUSE")
elif data["command"] == "STOP":
channel_to_q[channel].put("STOP")
elif data["command"] == "SEEK":
channel_to_q[channel].put("SEEK:" + str(data["time"]))
elif data["command"] == "LOAD":
channel_to_q[channel].put("LOAD:" + str(data["weight"]))
elif data["command"] == "ADD":
print(data)
if "managedId" in data["newItem"].keys() and isinstance(data["newItem"]["managedId"], str):
if data["newItem"]["managedId"].startswith("managed"):
managed_id = int(data["newItem"]["managedId"].split(":")[1])
else:
managed_id = int(data["newItem"]["managedId"])
else:
managed_id = None
new_item: Dict[str, any] = {
"channelWeight": int(data["newItem"]["weight"]),
"filename": None,
"title": data["newItem"]["title"],
"artist": data["newItem"]["artist"] if "artist" in data["newItem"].keys() else None,
"timeslotItemId": int(data["newItem"]["timeslotItemId"]) if "timeslotItemId" in data["newItem"].keys() and data["newItem"]["timeslotItemId"] != None else None,
"trackId": int(data["newItem"]["trackId"]) if "trackId" in data["newItem"].keys() and data["newItem"]["trackId"] != None else None,
"managedId": managed_id
}
channel_to_q[channel].put("ADD:" + json.dumps(new_item))
elif data["command"] == "REMOVE":
channel_to_q[channel].put("REMOVE:" + str(data["weight"]))
await asyncio.wait([conn.send(message) for conn in baps_clients])
except websockets.exceptions.ConnectionClosedError as e:
print("RIP {}, {}".format(websocket, e))
except Exception as e:
print("Exception", e)
finally:
baps_clients.remove(websocket)
async def handle_to_webstudio():
global channel_from_q
while True:
for channel in range(len(channel_from_q)):
try:
message = channel_from_q[channel].get_nowait()
data = json.dumps({
"message": message,
"channel:": channel
})
await asyncio.wait([conn.send(data) for conn in baps_clients])
except:
pass
await asyncio.sleep(0.01)
from_webstudio = asyncio.create_task(handle_from_webstudio())
#to_webstudio = asyncio.create_task(handle_to_webstudio())
try: try:
async for message in websocket: await asyncio.gather(from_webstudio)#, to_webstudio)
data = json.loads(message)
channel = int(data["channel"])
if "command" in data.keys():
if data["command"] == "PLAY":
channel_to_q[channel].put("PLAY")
elif data["command"] == "PAUSE":
channel_to_q[channel].put("PAUSE")
elif data["command"] == "UNPAUSE":
channel_to_q[channel].put("UNPAUSE")
elif data["command"] == "STOP":
channel_to_q[channel].put("STOP")
elif data["command"] == "SEEK":
channel_to_q[channel].put("SEEK:" + str(data["time"]))
elif data["command"] == "LOAD":
channel_to_q[channel].put("LOAD:" + str(data["weight"]))
elif data["command"] == "ADD":
print(data)
new_item: Dict[str, any] = {
"channel_weight": int(data["newItem"]["weight"]),
"filename": "dev\\test.mp3",
"title": data["newItem"]["title"],
"artist": data["newItem"]["artist"] if "artist" in data["newItem"].keys() else None
}
channel_to_q[channel].put("ADD:" + json.dumps(new_item))
elif data["command"] == "REMOVE":
channel_to_q[channel].put("REMOVE:" + str(data["weight"]))
await asyncio.wait([conn.send(message) for conn in baps_clients])
except websockets.exceptions.ConnectionClosedError:
print("RIP {}".format(websocket))
except Exception as e:
print("Exception", e)
finally: finally:
baps_clients.remove(websocket) from_webstudio.cancel()
#to_webstudio.cancel()
class WebsocketServer: class WebsocketServer:
def __init__(self, in_q, state): def __init__(self, in_q, out_q, state):
global channel_to_q global channel_to_q
global channel_from_q
channel_to_q = in_q channel_to_q = in_q
channel_from_q = out_q
global server_name global server_name
server_name = state.state["server_name"] server_name = state.state["server_name"]