Various reliabilty / debugging improvements
This commit is contained in:
parent
c88e223d6f
commit
03773c8dda
11 changed files with 437 additions and 316 deletions
|
@ -1,7 +1,9 @@
|
|||
import json
|
||||
from multiprocessing import Queue
|
||||
from multiprocessing import Queue #, current_process
|
||||
from helpers.logging_manager import LoggingManager
|
||||
from helpers.myradio_api import MyRadioAPI
|
||||
from setproctitle import setproctitle
|
||||
from os import _exit
|
||||
|
||||
# The API handler is needed from the main flask thread to process API requests.
|
||||
# Flask is not able to handle these during page loads, requests.get() hangs.
|
||||
|
@ -13,6 +15,11 @@ class APIHandler():
|
|||
server_from_q: Queue
|
||||
|
||||
def __init__(self, server_from_q: Queue, server_to_q: Queue):
|
||||
|
||||
process_title = "APIHandler"
|
||||
setproctitle(process_title)
|
||||
#current_process().name = process_title
|
||||
|
||||
self.server_from_q = server_from_q
|
||||
self.server_to_q = server_to_q
|
||||
self.logger = LoggingManager("APIHandler")
|
||||
|
@ -21,33 +28,43 @@ class APIHandler():
|
|||
self.handle()
|
||||
|
||||
def handle(self):
|
||||
while self.server_from_q:
|
||||
# Wait for an API request to come in.
|
||||
request = self.server_from_q.get()
|
||||
self.logger.log.info("Recieved Request: {}".format(request))
|
||||
if request == "LIST_PLANS":
|
||||
self.server_to_q.put(request + ":" + json.dumps(self.api.get_showplans()))
|
||||
elif request == "LIST_PLAYLIST_MUSIC":
|
||||
self.server_to_q.put(request + ":" + json.dumps(self.api.get_playlist_music()))
|
||||
elif request == "LIST_PLAYLIST_AUX":
|
||||
self.server_to_q.put(request + ":" + json.dumps(self.api.get_playlist_aux()))
|
||||
try:
|
||||
while self.server_from_q:
|
||||
# Wait for an API request to come in.
|
||||
request = self.server_from_q.get()
|
||||
self.logger.log.info("Received Request: {}".format(request))
|
||||
if request == "LIST_PLANS":
|
||||
self.server_to_q.put(request + ":" + json.dumps(self.api.get_showplans()))
|
||||
elif request == "LIST_PLAYLIST_MUSIC":
|
||||
self.server_to_q.put(request + ":" + json.dumps(self.api.get_playlist_music()))
|
||||
elif request == "LIST_PLAYLIST_AUX":
|
||||
self.server_to_q.put(request + ":" + json.dumps(self.api.get_playlist_aux()))
|
||||
|
||||
else:
|
||||
# Commands with params
|
||||
command = request[:request.index(":")]
|
||||
params = request[request.index(":")+1:]
|
||||
else:
|
||||
# Commands with params
|
||||
command = request[:request.index(":")]
|
||||
params = request[request.index(":")+1:]
|
||||
|
||||
|
||||
|
||||
if command == "GET_PLAYLIST_AUX":
|
||||
self.server_to_q.put(request + ":" + json.dumps(self.api.get_playlist_aux_items(str(params))))
|
||||
elif command == "GET_PLAYLIST_MUSIC":
|
||||
self.server_to_q.put(request + ":" + json.dumps(self.api.get_playlist_music_items(str(params))))
|
||||
elif command == "SEARCH_TRACK":
|
||||
try:
|
||||
params = json.loads(params)
|
||||
if command == "GET_PLAYLIST_AUX":
|
||||
self.server_to_q.put(request + ":" + json.dumps(self.api.get_playlist_aux_items(str(params))))
|
||||
elif command == "GET_PLAYLIST_MUSIC":
|
||||
self.server_to_q.put(request + ":" + json.dumps(self.api.get_playlist_music_items(str(params))))
|
||||
elif command == "SEARCH_TRACK":
|
||||
try:
|
||||
params = json.loads(params)
|
||||
|
||||
self.server_to_q.put(request + ":" + json.dumps(self.api.get_track_search(str(params["title"]), str(params["artist"]))))
|
||||
except Exception as e:
|
||||
self.logger.log.exception("Failed to parse params with message {}, command {}, params {}\n{}".format(request, command, params, e))
|
||||
self.server_to_q.put(request + ":" + json.dumps(self.api.get_track_search(str(params["title"]), str(params["artist"]))))
|
||||
except Exception as e:
|
||||
self.logger.log.exception("Failed to parse params with message {}, command {}, params {}\n{}".format(request, command, params, e))
|
||||
|
||||
# Catch the handler being killed externally.
|
||||
except KeyboardInterrupt:
|
||||
self.logger.log.info("Received KeyboardInterupt")
|
||||
except SystemExit:
|
||||
self.logger.log.info("Received SystemExit")
|
||||
except Exception as e:
|
||||
self.logger.log.exception("Received unexpected exception: {}".format(e))
|
||||
del self.logger
|
||||
_exit(0)
|
||||
|
|
|
@ -16,4 +16,6 @@ rm ./*.spec
|
|||
brew install platypus
|
||||
|
||||
platypus --load-profile ./BAPSicle.platypus --overwrite ./output/BAPSicle.app
|
||||
|
||||
mkdir ./output/state
|
||||
mkdir ./output/logs
|
||||
mkdir ./output/music-tmp
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
pygame==2.0.0.dev24
|
||||
pygame==2.0.1
|
||||
flask
|
||||
flask-cors
|
||||
mutagen
|
||||
|
@ -9,3 +9,4 @@ pyttsx3
|
|||
websockets
|
||||
typing_extensions
|
||||
pyserial
|
||||
requests
|
||||
|
|
|
@ -3,11 +3,16 @@ from controllers.controller import Controller
|
|||
from multiprocessing import Queue
|
||||
import serial
|
||||
import sys
|
||||
|
||||
from setproctitle import setproctitle
|
||||
class MattchBox(Controller):
|
||||
ser: serial.Serial
|
||||
|
||||
def __init__(self, player_to_q: List[Queue], player_from_q: List[Queue]):
|
||||
|
||||
process_title = "ControllerHandler"
|
||||
setproctitle(process_title)
|
||||
#current_process().name = process_title
|
||||
|
||||
# connect to serial port
|
||||
self.ser = serial.serial_for_url("/dev/cu.usbserial-210", do_not_open=True)
|
||||
self.ser.baudrate = 2400
|
||||
|
|
|
@ -29,9 +29,9 @@ class LoggingManager():
|
|||
self.logger.addHandler(fh)
|
||||
self.logger.info("** LOGGER STARTED **")
|
||||
|
||||
def __del__(self):
|
||||
self.logger.info("** LOGGER EXITING **")
|
||||
logging.shutdown()
|
||||
#def __del__(self):
|
||||
# Can't seem to close logger properly
|
||||
#self.logger.info("** LOGGER EXITING **")
|
||||
|
||||
@property
|
||||
def log(self) -> logging.Logger:
|
||||
|
|
27
launch_standalone.py
Normal file → Executable file
27
launch_standalone.py
Normal file → Executable file
|
@ -1,22 +1,31 @@
|
|||
#!/usr/bin/env python3
|
||||
import multiprocessing
|
||||
import time
|
||||
import sys
|
||||
import webbrowser
|
||||
from setproctitle import setproctitle
|
||||
|
||||
from server import BAPSicleServer
|
||||
|
||||
|
||||
def startServer():
|
||||
server = multiprocessing.Process(target=BAPSicleServer)
|
||||
server.start()
|
||||
|
||||
while True:
|
||||
time.sleep(5)
|
||||
if server and server.is_alive():
|
||||
pass
|
||||
else:
|
||||
print("Server dead. Exiting.")
|
||||
sys.exit(0)
|
||||
try:
|
||||
while True:
|
||||
time.sleep(5)
|
||||
if server and server.is_alive():
|
||||
pass
|
||||
else:
|
||||
print("Server dead. Exiting.")
|
||||
sys.exit(0)
|
||||
# Catch the handler being killed externally.
|
||||
except KeyboardInterrupt:
|
||||
print("Received KeyboardInterupt")
|
||||
except SystemExit:
|
||||
print("Received SystemExit")
|
||||
except Exception as e:
|
||||
print("Received unexpected exception: {}".format(e))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
@ -27,6 +36,7 @@ if __name__ == '__main__':
|
|||
# If it's not here, multiprocessing just doesn't run in the package.
|
||||
# Freeze support refers to being packaged with Pyinstaller.
|
||||
multiprocessing.freeze_support()
|
||||
setproctitle("BAPSicle - Standalone Launch")
|
||||
if len(sys.argv) > 1:
|
||||
# We got an argument! It's probably Platypus's UI.
|
||||
try:
|
||||
|
@ -42,6 +52,7 @@ if __name__ == '__main__':
|
|||
webbrowser.open("http://localhost:13500/logs")
|
||||
except Exception as e:
|
||||
print("ALERT:BAPSicle failed with exception:\n", e)
|
||||
sys.exit(1)
|
||||
|
||||
sys.exit(0)
|
||||
else:
|
||||
|
|
58
player.py
58
player.py
|
@ -26,7 +26,6 @@ import setproctitle
|
|||
import copy
|
||||
import json
|
||||
import time
|
||||
import sys
|
||||
|
||||
from typing import Any, Callable, Dict, List, Optional
|
||||
|
||||
|
@ -47,16 +46,18 @@ PLAYBACK_END = USEREVENT + 1
|
|||
# TODO ENUM
|
||||
VALID_MESSAGE_SOURCES = ["WEBSOCKET", "UI", "CONTROLLER", "ALL"]
|
||||
class Player():
|
||||
state = None
|
||||
running = False
|
||||
out_q = None
|
||||
last_msg = ""
|
||||
last_msg_source = ""
|
||||
out_q: multiprocessing.Queue
|
||||
last_msg: str
|
||||
last_msg_source: str
|
||||
last_time_update = None
|
||||
logger = None
|
||||
api = None
|
||||
already_stopped = False
|
||||
starting = False
|
||||
|
||||
state: StateManager
|
||||
logger: LoggingManager
|
||||
api: MyRadioAPI
|
||||
|
||||
running: bool = False
|
||||
already_stopped: bool = False
|
||||
starting: bool = False
|
||||
|
||||
__default_state = {
|
||||
"initialised": False,
|
||||
|
@ -352,6 +353,7 @@ class Player():
|
|||
try:
|
||||
mixer.quit()
|
||||
self.state.update("paused", False)
|
||||
self.logger.log.info("Quit mixer.")
|
||||
except:
|
||||
self.logger.log.exception("Failed to quit mixer.")
|
||||
|
||||
|
@ -497,7 +499,7 @@ class Player():
|
|||
multiprocessing.current_process().name = process_title
|
||||
|
||||
# Init pygame, only used really for the end of playback trigger.
|
||||
init()
|
||||
#init()
|
||||
|
||||
self.running = True
|
||||
self.out_q = out_q
|
||||
|
@ -537,11 +539,11 @@ class Player():
|
|||
else:
|
||||
self.logger.log.info("No file was previously loaded.")
|
||||
|
||||
while self.running:
|
||||
time.sleep(0.1)
|
||||
self._updateState()
|
||||
self._ping_times()
|
||||
try:
|
||||
try:
|
||||
while self.running:
|
||||
time.sleep(0.01)
|
||||
self._updateState()
|
||||
self._ping_times()
|
||||
try:
|
||||
message = in_q.get_nowait()
|
||||
source = message.split(":")[0]
|
||||
|
@ -626,21 +628,19 @@ class Player():
|
|||
|
||||
|
||||
|
||||
# Catch the player being killed externally.
|
||||
except KeyboardInterrupt:
|
||||
self.logger.log.info("Received KeyboardInterupt")
|
||||
break
|
||||
except SystemExit:
|
||||
self.logger.log.info("Received SystemExit")
|
||||
break
|
||||
except Exception as e:
|
||||
self.logger.log.exception("Received unexpected exception: {}".format(e))
|
||||
break
|
||||
# Catch the player being killed externally.
|
||||
except KeyboardInterrupt:
|
||||
self.logger.log.info("Received KeyboardInterupt")
|
||||
except SystemExit:
|
||||
self.logger.log.info("Received SystemExit")
|
||||
except Exception as e:
|
||||
self.logger.log.exception("Received unexpected exception: {}".format(e))
|
||||
|
||||
self.logger.log.info("Quiting player ", channel)
|
||||
self.logger.log.info("Quiting player " + str(channel))
|
||||
self.quit()
|
||||
self._retMsg("EXIT")
|
||||
sys.exit(0)
|
||||
self._retAll("EXIT")
|
||||
del self.logger
|
||||
os._exit(0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
46
player_handler.py
Normal file
46
player_handler.py
Normal file
|
@ -0,0 +1,46 @@
|
|||
from helpers.logging_manager import LoggingManager
|
||||
from setproctitle import setproctitle
|
||||
#from multiprocessing import current_process
|
||||
from time import sleep
|
||||
from os import _exit
|
||||
|
||||
class PlayerHandler():
|
||||
logger: LoggingManager
|
||||
|
||||
def __init__(self,channel_from_q, websocket_to_q, ui_to_q, controller_to_q):
|
||||
|
||||
self.logger = LoggingManager("PlayerHandler")
|
||||
process_title = "PlayerHandler"
|
||||
setproctitle(process_title)
|
||||
#current_process().name = process_title
|
||||
|
||||
try:
|
||||
while True:
|
||||
|
||||
for channel in range(len(channel_from_q)):
|
||||
try:
|
||||
message = channel_from_q[channel].get_nowait()
|
||||
source = message.split(":")[0]
|
||||
# TODO ENUM
|
||||
if source in ["ALL","WEBSOCKET"]:
|
||||
websocket_to_q[channel].put(message)
|
||||
if source in ["ALL","UI"]:
|
||||
if not message.split(":")[1] == "POS":
|
||||
# We don't care about position update spam
|
||||
ui_to_q[channel].put(message)
|
||||
if source in ["ALL","CONTROLLER"]:
|
||||
controller_to_q[channel].put(message)
|
||||
except:
|
||||
pass
|
||||
|
||||
sleep(0.01)
|
||||
# Catch the handler being killed externally.
|
||||
except KeyboardInterrupt:
|
||||
self.logger.log.info("Received KeyboardInterupt")
|
||||
except SystemExit:
|
||||
self.logger.log.info("Received SystemExit")
|
||||
except Exception as e:
|
||||
self.logger.log.exception("Received unexpected exception: {}".format(e))
|
||||
del self.logger
|
||||
_exit(0)
|
||||
|
145
server.py
145
server.py
|
@ -13,21 +13,20 @@
|
|||
October, November 2020
|
||||
"""
|
||||
from api_handler import APIHandler
|
||||
import asyncio
|
||||
from controllers.mattchbox_usb import MattchBox
|
||||
import copy
|
||||
import multiprocessing
|
||||
import queue
|
||||
import threading
|
||||
#import threading
|
||||
import time
|
||||
import player
|
||||
from flask import Flask, render_template, send_from_directory, request, jsonify, abort
|
||||
from flask_cors import CORS, cross_origin
|
||||
from flask_cors import CORS
|
||||
from typing import Any, Optional
|
||||
import json
|
||||
import setproctitle
|
||||
from setproctitle import setproctitle
|
||||
import logging
|
||||
import requests
|
||||
|
||||
from player_handler import PlayerHandler
|
||||
|
||||
from helpers.os_environment import isMacOS
|
||||
from helpers.device_manager import DeviceManager
|
||||
|
@ -42,7 +41,33 @@ from helpers.state_manager import StateManager
|
|||
from helpers.logging_manager import LoggingManager
|
||||
from websocket_server import WebsocketServer
|
||||
|
||||
setproctitle.setproctitle("BAPSicle - Server")
|
||||
setproctitle("BAPSicleServer.py")
|
||||
|
||||
logger: LoggingManager
|
||||
state: StateManager
|
||||
class BAPSicleServer():
|
||||
|
||||
def __init__(self):
|
||||
|
||||
process_title = "BAPSicleServer"
|
||||
setproctitle(process_title)
|
||||
#multiprocessing.current_process().name = process_title
|
||||
|
||||
global logger
|
||||
global state
|
||||
logger = LoggingManager("BAPSicleServer")
|
||||
|
||||
state = StateManager("BAPSicleServer", logger, default_state)
|
||||
state.update("server_version", config.VERSION)
|
||||
|
||||
startServer()
|
||||
#asyncio.get_event_loop().run_until_complete(startServer())
|
||||
#asyncio.get_event_loop().run_forever()
|
||||
|
||||
def __del__(self):
|
||||
stopServer()
|
||||
|
||||
|
||||
|
||||
default_state = {
|
||||
"server_version": 0,
|
||||
|
@ -53,49 +78,6 @@ default_state = {
|
|||
"num_channels": 3
|
||||
}
|
||||
|
||||
logger: LoggingManager
|
||||
state: StateManager
|
||||
|
||||
class BAPSicleServer():
|
||||
|
||||
def __init__(self):
|
||||
|
||||
process_title = "Server"
|
||||
setproctitle.setproctitle(process_title)
|
||||
multiprocessing.current_process().name = process_title
|
||||
|
||||
global logger
|
||||
global state
|
||||
logger = LoggingManager("BAPSicleServer")
|
||||
|
||||
state = StateManager("BAPSicleServer", logger, default_state)
|
||||
state.update("server_version", config.VERSION)
|
||||
|
||||
asyncio.get_event_loop().run_until_complete(startServer())
|
||||
asyncio.get_event_loop().run_forever()
|
||||
|
||||
def __del__(self):
|
||||
stopServer()
|
||||
|
||||
class PlayerHandler():
|
||||
def __init__(self,channel_from_q, websocket_to_q, ui_to_q, controller_to_q):
|
||||
while True:
|
||||
for channel in range(len(channel_from_q)):
|
||||
try:
|
||||
message = channel_from_q[channel].get_nowait()
|
||||
source = message.split(":")[0]
|
||||
# TODO ENUM
|
||||
if source in ["ALL","WEBSOCKET"]:
|
||||
websocket_to_q[channel].put(message)
|
||||
if source in ["ALL","UI"]:
|
||||
if not message.split(":")[1] == "POS":
|
||||
# We don't care about position update spam
|
||||
ui_to_q[channel].put(message)
|
||||
if source in ["ALL","CONTROLLER"]:
|
||||
controller_to_q[channel].put(message)
|
||||
except:
|
||||
pass
|
||||
time.sleep(0.1)
|
||||
|
||||
|
||||
app = Flask(__name__, static_url_path='')
|
||||
|
@ -114,10 +96,11 @@ channel_from_q: List[queue.Queue] = []
|
|||
ui_to_q: List[queue.Queue] = []
|
||||
websocket_to_q: List[queue.Queue] = []
|
||||
controller_to_q: List[queue.Queue] = []
|
||||
channel_p = []
|
||||
|
||||
stopping = False
|
||||
|
||||
channel_p: List[multiprocessing.Process] = []
|
||||
websockets_server: multiprocessing.Process
|
||||
controller_handler: multiprocessing.Process
|
||||
webserver: multiprocessing.Process
|
||||
|
||||
# General Endpoints
|
||||
|
||||
|
@ -524,9 +507,10 @@ def serve_favicon():
|
|||
def serve_static(path: str):
|
||||
return send_from_directory('ui-static', path)
|
||||
|
||||
async def startServer():
|
||||
def startServer():
|
||||
process_title="startServer"
|
||||
threading.current_thread().name = process_title
|
||||
#threading.current_thread().name = process_title
|
||||
setproctitle(process_title)
|
||||
|
||||
if isMacOS():
|
||||
multiprocessing.set_start_method("spawn", True)
|
||||
|
@ -540,13 +524,13 @@ async def startServer():
|
|||
channel_p.append(
|
||||
multiprocessing.Process(
|
||||
target=player.Player,
|
||||
args=(channel, channel_to_q[-1], channel_from_q[-1]),
|
||||
args=(channel, channel_to_q[-1], channel_from_q[-1])
|
||||
#daemon=True
|
||||
)
|
||||
)
|
||||
channel_p[channel].start()
|
||||
|
||||
global api_from_q, api_to_q
|
||||
global api_from_q, api_to_q, api_handler, player_handler, websockets_server, controller_handler
|
||||
api_to_q = multiprocessing.Queue()
|
||||
api_from_q = multiprocessing.Queue()
|
||||
api_handler = multiprocessing.Process(target=APIHandler, args=(api_to_q, api_from_q))
|
||||
|
@ -595,37 +579,48 @@ async def startServer():
|
|||
channel_to_q[0].put("PLAY")
|
||||
|
||||
# Don't use reloader, it causes Nested Processes!
|
||||
app.run(host=state.state["host"], port=state.state["port"], debug=True, use_reloader=False)
|
||||
global webserver
|
||||
webserver = multiprocessing.Process(
|
||||
app.run(host=state.state["host"], port=state.state["port"], debug=True, use_reloader=False)
|
||||
)
|
||||
webserver.start()
|
||||
|
||||
|
||||
def stopServer(restart=False):
|
||||
global channel_p
|
||||
global channel_from_q
|
||||
global channel_to_q
|
||||
global channel_p, channel_from_q, channel_to_q, websockets_server, webserver
|
||||
print("Stopping Websockets")
|
||||
websocket_to_q[0].put("WEBSOCKET:QUIT")
|
||||
websockets_server.join()
|
||||
del websockets_server
|
||||
print("Stopping server.py")
|
||||
for q in channel_to_q:
|
||||
q.put("QUIT")
|
||||
for player in channel_p:
|
||||
try:
|
||||
player.join()
|
||||
except:
|
||||
except Exception as e:
|
||||
print("*** Ignoring exception:",e)
|
||||
pass
|
||||
finally:
|
||||
channel_p = []
|
||||
channel_from_q = []
|
||||
channel_to_q = []
|
||||
del player
|
||||
del channel_from_q
|
||||
del channel_to_q
|
||||
print("Stopped all players.")
|
||||
global stopping
|
||||
if stopping == False:
|
||||
stopping = True
|
||||
shutdown = request.environ.get('werkzeug.server.shutdown')
|
||||
if shutdown is None:
|
||||
print("Shutting down Server.")
|
||||
|
||||
else:
|
||||
print("Shutting down Flask.")
|
||||
if not restart:
|
||||
shutdown()
|
||||
global webserver
|
||||
webserver.terminate()
|
||||
webserver.join()
|
||||
return
|
||||
|
||||
## Caused an outside context error, presuably because called outside of a page request.
|
||||
#shutdown = request.environ.get('werkzeug.server.shutdown')
|
||||
#if shutdown is None:
|
||||
# print("Shutting down Server.")
|
||||
|
||||
#else:
|
||||
# print("Shutting down Flask.")
|
||||
# if not restart:
|
||||
# shutdown()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
|
@ -8,16 +8,27 @@
|
|||
<label for="port">Server Port:</label>
|
||||
<input type="number" id="port" name="port" class="form-control" value="{{data.state.port}}">
|
||||
<br>
|
||||
<label for="port">WebSockets Port:</label>
|
||||
<input type="number" id="ws_port" name="ws_port" class="form-control" value="{{data.state.ws_port}}">
|
||||
<br>
|
||||
<label for="name">Server Name:</label>
|
||||
<input type="text" id="name" name="name" class="form-control" value="{{data.state.server_name}}">
|
||||
<br>
|
||||
<label for="channels">Number of Channels:</label>
|
||||
<input type="number" id="channels" name="channels" class="form-control" value="{{data.state.num_channels}}">
|
||||
<br>
|
||||
<label for="port">WebSockets Port:</label>
|
||||
<input type="number" id="ws_port" name="ws_port" class="form-control" value="{{data.state.ws_port}}">
|
||||
<br>
|
||||
<input type="submit" class="btn btn-primary" value="Restart Server">
|
||||
<br>
|
||||
<label for="ser_port">BAPS Controller Serial Port:</label>
|
||||
<select class="form-control" name="ser_port">
|
||||
<label>Serial Ports</label>
|
||||
{% for port in data.ser_ports %}
|
||||
<option value="{{port.name}}">{{port.name}}</option>
|
||||
{% endfor %}
|
||||
|
||||
<option value="none">{% if not data.ser_ports %}No ports found.{% else %}None{% endif %}</option>
|
||||
|
||||
</select>
|
||||
<br>
|
||||
<input type="submit" class="btn btn-primary" value="Save & Restart Server">
|
||||
</form>
|
||||
{% endif %}
|
||||
{% endblock %}
|
||||
{% endblock %}
|
||||
|
|
|
@ -1,182 +1,215 @@
|
|||
import asyncio
|
||||
from asyncio.futures import Future
|
||||
from asyncio.tasks import Task, shield
|
||||
|
||||
from websockets.server import Serve
|
||||
from helpers.logging_manager import LoggingManager
|
||||
import multiprocessing
|
||||
import queue
|
||||
from typing import Dict, List, Optional
|
||||
from typing import Any, Dict, List, Optional
|
||||
import websockets
|
||||
import json
|
||||
|
||||
baps_clients = set()
|
||||
channel_to_q: List[multiprocessing.Queue]
|
||||
webstudio_to_q: List[multiprocessing.Queue]
|
||||
server_name: str
|
||||
|
||||
|
||||
|
||||
async def websocket_handler(websocket, path):
|
||||
baps_clients.add(websocket)
|
||||
await websocket.send(json.dumps({"message": "Hello", "serverName": server_name}))
|
||||
print("New Client: {}".format(websocket))
|
||||
for channel in channel_to_q:
|
||||
channel.put("WEBSOCKET:STATUS")
|
||||
|
||||
async def handle_from_webstudio():
|
||||
try:
|
||||
async for message in websocket:
|
||||
data = json.loads(message)
|
||||
print(data)
|
||||
if not "channel" in data:
|
||||
# Didn't specify a channel, send to all.
|
||||
for channel in range(len(channel_to_q)):
|
||||
sendCommand(channel, data)
|
||||
else:
|
||||
channel = int(data["channel"])
|
||||
sendCommand(channel, data)
|
||||
|
||||
|
||||
await asyncio.wait([conn.send(message) for conn in baps_clients])
|
||||
|
||||
except websockets.exceptions.ConnectionClosedError as e:
|
||||
print("RIP {}, {}".format(websocket, e))
|
||||
|
||||
# TODO: Proper Logging
|
||||
except Exception as e:
|
||||
print("Exception", e)
|
||||
|
||||
finally:
|
||||
baps_clients.remove(websocket)
|
||||
|
||||
def sendCommand(channel, data):
|
||||
if channel not in range(len(channel_to_q)):
|
||||
print("ERROR: Received channel number larger than server supported channels.")
|
||||
return
|
||||
|
||||
if "command" in data.keys():
|
||||
command = data["command"]
|
||||
|
||||
# Handle the general case
|
||||
# Message format:
|
||||
## SOURCE:COMMAND:EXTRADATA
|
||||
|
||||
message = "WEBSOCKET:" + command
|
||||
|
||||
# If we just want PLAY, PAUSE etc, we're all done.
|
||||
# Else, let's pipe in some extra info.
|
||||
extra = ""
|
||||
|
||||
try:
|
||||
if command == "SEEK":
|
||||
extra += str(data["time"])
|
||||
elif command == "LOAD":
|
||||
extra += str(data["weight"])
|
||||
elif command == "AUTOADVANCE":
|
||||
extra += str(data["enabled"])
|
||||
elif command == "PLAYONLOAD":
|
||||
extra += str(data["enabled"])
|
||||
elif command == "REPEAT":
|
||||
extra += str(data["mode"]).lower()
|
||||
elif command == "ADD":
|
||||
extra += json.dumps(data["newItem"])
|
||||
elif command == "REMOVE":
|
||||
extra += str(data["weight"])
|
||||
elif command == "GET_PLAN":
|
||||
extra += str(data["timeslotId"])
|
||||
|
||||
# SPECIAL CASE ALERT! We need to talk to two channels here.
|
||||
elif command == "MOVE":
|
||||
# TODO Should we trust the client with the item info?
|
||||
|
||||
# Tell the old channel to remove "weight"
|
||||
extra += str(data["weight"])
|
||||
|
||||
# Now modify the item with the weight in the new channel
|
||||
new_channel = int(data["new_channel"])
|
||||
item = data["item"]
|
||||
item["weight"] = int(data["new_weight"])
|
||||
# Now send the special case.
|
||||
channel_to_q[new_channel].put("ADD:" + json.dumps(item))
|
||||
|
||||
|
||||
except ValueError as e:
|
||||
print("ERROR decoding extra data {} for command {} ".format(e, command))
|
||||
pass
|
||||
|
||||
# Stick the message together and send!
|
||||
if extra != "":
|
||||
message += ":" + extra
|
||||
|
||||
try:
|
||||
channel_to_q[channel].put(message)
|
||||
except Exception as e:
|
||||
print("ERRORL: Failed to send message {} to channel {}: {}".format(message, channel, e))
|
||||
|
||||
else:
|
||||
print("ERROR: Command missing from message.")
|
||||
|
||||
async def handle_to_webstudio():
|
||||
while True:
|
||||
for channel in range(len(webstudio_to_q)):
|
||||
try:
|
||||
message = webstudio_to_q[channel].get_nowait()
|
||||
source = message.split(":")[0]
|
||||
|
||||
# TODO ENUM
|
||||
if source not in ["WEBSOCKET","ALL"]:
|
||||
print("ERROR: Message received from invalid source to websocket_handler. Ignored.", source, message)
|
||||
continue
|
||||
|
||||
command = message.split(":")[1]
|
||||
#print("Websocket Out:", command)
|
||||
if command == "STATUS":
|
||||
try:
|
||||
message = message.split("OKAY:")[1]
|
||||
message = json.loads(message)
|
||||
except:
|
||||
continue # TODO more logging
|
||||
elif command == "POS":
|
||||
try:
|
||||
message = message.split(":", 2)[2]
|
||||
except:
|
||||
continue
|
||||
else:
|
||||
continue
|
||||
|
||||
data = json.dumps({
|
||||
"command": command,
|
||||
"data": message,
|
||||
"channel": channel
|
||||
})
|
||||
await asyncio.wait([conn.send(data) for conn in baps_clients])
|
||||
except queue.Empty:
|
||||
continue
|
||||
except Exception as e:
|
||||
raise e
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
from_webstudio = asyncio.create_task(handle_from_webstudio())
|
||||
to_webstudio = asyncio.create_task(handle_to_webstudio())
|
||||
|
||||
try:
|
||||
await asyncio.gather(from_webstudio, to_webstudio)
|
||||
finally:
|
||||
from_webstudio.cancel()
|
||||
to_webstudio.cancel()
|
||||
|
||||
from os import _exit
|
||||
|
||||
class WebsocketServer:
|
||||
|
||||
threads = Future
|
||||
baps_clients = set()
|
||||
channel_to_q: List[multiprocessing.Queue]
|
||||
webstudio_to_q: List[multiprocessing.Queue]
|
||||
server_name: str
|
||||
logger: LoggingManager
|
||||
to_webstudio: Task
|
||||
from_webstudio: Task
|
||||
websocket_server: Serve
|
||||
|
||||
def __init__(self, in_q, out_q, state):
|
||||
global channel_to_q
|
||||
global webstudio_to_q
|
||||
channel_to_q = in_q
|
||||
webstudio_to_q = out_q
|
||||
|
||||
global server_name
|
||||
server_name = state.state["server_name"]
|
||||
self.channel_to_q = in_q
|
||||
self.webstudio_to_q = out_q
|
||||
|
||||
self.logger = LoggingManager("Websockets")
|
||||
self.server_name = state.state["server_name"]
|
||||
|
||||
self.websocket_server = websockets.serve(self.websocket_handler, state.state["host"], state.state["ws_port"])
|
||||
|
||||
asyncio.get_event_loop().run_until_complete(self.websocket_server)
|
||||
|
||||
try:
|
||||
asyncio.get_event_loop().run_forever()
|
||||
except:
|
||||
self.quit()
|
||||
|
||||
def quit(self):
|
||||
del self.websocket_server
|
||||
del self.logger
|
||||
_exit(0)
|
||||
|
||||
def __del__(self):
|
||||
print("Deleting websocket server")
|
||||
self.quit()
|
||||
|
||||
async def websocket_handler(self, websocket, path):
|
||||
self.baps_clients.add(websocket)
|
||||
await websocket.send(json.dumps({"message": "Hello", "serverName": self.server_name}))
|
||||
self.logger.log.info("New Client: {}".format(websocket))
|
||||
for channel in self.channel_to_q:
|
||||
channel.put("WEBSOCKET:STATUS")
|
||||
|
||||
async def handle_from_webstudio():
|
||||
try:
|
||||
async for message in websocket:
|
||||
data = json.loads(message)
|
||||
if not "channel" in data:
|
||||
# Didn't specify a channel, send to all.
|
||||
for channel in range(len(self.channel_to_q)):
|
||||
sendCommand(channel, data)
|
||||
else:
|
||||
channel = int(data["channel"])
|
||||
sendCommand(channel, data)
|
||||
|
||||
async def send(conn, message):
|
||||
# TODO this doesn't actually catch.
|
||||
try:
|
||||
await conn.send(message)
|
||||
except:
|
||||
pass
|
||||
|
||||
await asyncio.wait([send(conn, message) for conn in self.baps_clients])
|
||||
|
||||
except websockets.exceptions.ConnectionClosedError as e:
|
||||
self.logger.log.error("Client Disconncted {}, {}".format(websocket, e))
|
||||
|
||||
# TODO: Proper Logging
|
||||
except Exception as e:
|
||||
self.logger.log.exception("Exception handling messages from Websocket.\n{}".format(e))
|
||||
|
||||
finally:
|
||||
self.logger.log.info("Removing client: {}".format(websocket))
|
||||
self.baps_clients.remove(websocket)
|
||||
|
||||
def sendCommand(channel, data):
|
||||
if channel not in range(len(self.channel_to_q)):
|
||||
self.logger.log.exception("Received channel number larger than server supported channels.")
|
||||
return
|
||||
|
||||
if "command" in data.keys():
|
||||
command = data["command"]
|
||||
|
||||
# Handle the general case
|
||||
# Message format:
|
||||
## SOURCE:COMMAND:EXTRADATA
|
||||
|
||||
message = "WEBSOCKET:" + command
|
||||
|
||||
# If we just want PLAY, PAUSE etc, we're all done.
|
||||
# Else, let's pipe in some extra info.
|
||||
extra = ""
|
||||
|
||||
try:
|
||||
if command == "SEEK":
|
||||
extra += str(data["time"])
|
||||
elif command == "LOAD":
|
||||
extra += str(data["weight"])
|
||||
elif command == "AUTOADVANCE":
|
||||
extra += str(data["enabled"])
|
||||
elif command == "PLAYONLOAD":
|
||||
extra += str(data["enabled"])
|
||||
elif command == "REPEAT":
|
||||
extra += str(data["mode"]).lower()
|
||||
elif command == "ADD":
|
||||
extra += json.dumps(data["newItem"])
|
||||
elif command == "REMOVE":
|
||||
extra += str(data["weight"])
|
||||
elif command == "GET_PLAN":
|
||||
extra += str(data["timeslotId"])
|
||||
|
||||
# SPECIAL CASE ALERT! We need to talk to two channels here.
|
||||
elif command == "MOVE":
|
||||
# TODO Should we trust the client with the item info?
|
||||
|
||||
# Tell the old channel to remove "weight"
|
||||
extra += str(data["weight"])
|
||||
|
||||
# Now modify the item with the weight in the new channel
|
||||
new_channel = int(data["new_channel"])
|
||||
item = data["item"]
|
||||
item["weight"] = int(data["new_weight"])
|
||||
# Now send the special case.
|
||||
self.channel_to_q[new_channel].put("ADD:" + json.dumps(item))
|
||||
|
||||
|
||||
except ValueError as e:
|
||||
self.logger.log.exception("Error decoding extra data {} for command {} ".format(e, command))
|
||||
pass
|
||||
|
||||
# Stick the message together and send!
|
||||
if extra != "":
|
||||
message += ":" + extra
|
||||
|
||||
try:
|
||||
self.channel_to_q[channel].put(message)
|
||||
except Exception as e:
|
||||
self.logger.log.exception("Failed to send message {} to channel {}: {}".format(message, channel, e))
|
||||
|
||||
else:
|
||||
self.logger.log.error("Command missing from message. Data: {}".format(data))
|
||||
|
||||
async def handle_to_webstudio():
|
||||
while True:
|
||||
for channel in range(len(self.webstudio_to_q)):
|
||||
try:
|
||||
message = self.webstudio_to_q[channel].get_nowait()
|
||||
source = message.split(":")[0]
|
||||
|
||||
# TODO ENUM
|
||||
if source not in ["WEBSOCKET","ALL"]:
|
||||
print("ERROR: Message received from invalid source to websocket_handler. Ignored.", source, message)
|
||||
continue
|
||||
|
||||
command = message.split(":")[1]
|
||||
#print("Websocket Out:", command)
|
||||
if command == "STATUS":
|
||||
try:
|
||||
message = message.split("OKAY:")[1]
|
||||
message = json.loads(message)
|
||||
except:
|
||||
continue # TODO more logging
|
||||
elif command == "POS":
|
||||
try:
|
||||
message = message.split(":", 2)[2]
|
||||
except:
|
||||
continue
|
||||
elif command == "QUIT":
|
||||
self.quit()
|
||||
else:
|
||||
continue
|
||||
|
||||
data = json.dumps({
|
||||
"command": command,
|
||||
"data": message,
|
||||
"channel": channel
|
||||
})
|
||||
await asyncio.wait([conn.send(data) for conn in self.baps_clients])
|
||||
except queue.Empty:
|
||||
continue
|
||||
except ValueError:
|
||||
# Typically a "Set of coroutines/Futures is empty." when sending to a dead client.
|
||||
continue
|
||||
except Exception as e:
|
||||
self.logger.log.exception("Exception trying to send to websocket:", e)
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
self.from_webstudio = asyncio.create_task(handle_from_webstudio())
|
||||
self.to_webstudio = asyncio.create_task(handle_to_webstudio())
|
||||
|
||||
try:
|
||||
self.threads = await shield(asyncio.gather(self.from_webstudio, self.to_webstudio))
|
||||
finally:
|
||||
self.from_webstudio.cancel()
|
||||
self.to_webstudio.cancel()
|
||||
|
||||
websocket_server = websockets.serve(websocket_handler, state.state["host"], state.state["ws_port"])
|
||||
|
||||
asyncio.get_event_loop().run_until_complete(websocket_server)
|
||||
asyncio.get_event_loop().run_forever()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
Loading…
Reference in a new issue