From 03773c8dda9841673b7f51c50248d2b1b8084602 Mon Sep 17 00:00:00 2001 From: Matthew Stratford Date: Sun, 4 Apr 2021 22:34:46 +0100 Subject: [PATCH] Various reliabilty / debugging improvements --- api_handler.py | 67 ++++--- build/build-macos.sh | 4 +- build/requirements.txt | 3 +- controllers/mattchbox_usb.py | 7 +- helpers/logging_manager.py | 6 +- launch_standalone.py | 27 ++- player.py | 58 +++--- player_handler.py | 46 +++++ server.py | 145 +++++++------- templates/server.html | 23 ++- websocket_server.py | 367 +++++++++++++++++++---------------- 11 files changed, 437 insertions(+), 316 deletions(-) mode change 100644 => 100755 launch_standalone.py create mode 100644 player_handler.py diff --git a/api_handler.py b/api_handler.py index 8936afc..c0524eb 100644 --- a/api_handler.py +++ b/api_handler.py @@ -1,7 +1,9 @@ import json -from multiprocessing import Queue +from multiprocessing import Queue #, current_process from helpers.logging_manager import LoggingManager from helpers.myradio_api import MyRadioAPI +from setproctitle import setproctitle +from os import _exit # The API handler is needed from the main flask thread to process API requests. # Flask is not able to handle these during page loads, requests.get() hangs. @@ -13,6 +15,11 @@ class APIHandler(): server_from_q: Queue def __init__(self, server_from_q: Queue, server_to_q: Queue): + + process_title = "APIHandler" + setproctitle(process_title) + #current_process().name = process_title + self.server_from_q = server_from_q self.server_to_q = server_to_q self.logger = LoggingManager("APIHandler") @@ -21,33 +28,43 @@ class APIHandler(): self.handle() def handle(self): - while self.server_from_q: - # Wait for an API request to come in. - request = self.server_from_q.get() - self.logger.log.info("Recieved Request: {}".format(request)) - if request == "LIST_PLANS": - self.server_to_q.put(request + ":" + json.dumps(self.api.get_showplans())) - elif request == "LIST_PLAYLIST_MUSIC": - self.server_to_q.put(request + ":" + json.dumps(self.api.get_playlist_music())) - elif request == "LIST_PLAYLIST_AUX": - self.server_to_q.put(request + ":" + json.dumps(self.api.get_playlist_aux())) + try: + while self.server_from_q: + # Wait for an API request to come in. + request = self.server_from_q.get() + self.logger.log.info("Received Request: {}".format(request)) + if request == "LIST_PLANS": + self.server_to_q.put(request + ":" + json.dumps(self.api.get_showplans())) + elif request == "LIST_PLAYLIST_MUSIC": + self.server_to_q.put(request + ":" + json.dumps(self.api.get_playlist_music())) + elif request == "LIST_PLAYLIST_AUX": + self.server_to_q.put(request + ":" + json.dumps(self.api.get_playlist_aux())) - else: - # Commands with params - command = request[:request.index(":")] - params = request[request.index(":")+1:] + else: + # Commands with params + command = request[:request.index(":")] + params = request[request.index(":")+1:] - if command == "GET_PLAYLIST_AUX": - self.server_to_q.put(request + ":" + json.dumps(self.api.get_playlist_aux_items(str(params)))) - elif command == "GET_PLAYLIST_MUSIC": - self.server_to_q.put(request + ":" + json.dumps(self.api.get_playlist_music_items(str(params)))) - elif command == "SEARCH_TRACK": - try: - params = json.loads(params) + if command == "GET_PLAYLIST_AUX": + self.server_to_q.put(request + ":" + json.dumps(self.api.get_playlist_aux_items(str(params)))) + elif command == "GET_PLAYLIST_MUSIC": + self.server_to_q.put(request + ":" + json.dumps(self.api.get_playlist_music_items(str(params)))) + elif command == "SEARCH_TRACK": + try: + params = json.loads(params) - self.server_to_q.put(request + ":" + json.dumps(self.api.get_track_search(str(params["title"]), str(params["artist"])))) - except Exception as e: - self.logger.log.exception("Failed to parse params with message {}, command {}, params {}\n{}".format(request, command, params, e)) + self.server_to_q.put(request + ":" + json.dumps(self.api.get_track_search(str(params["title"]), str(params["artist"])))) + except Exception as e: + self.logger.log.exception("Failed to parse params with message {}, command {}, params {}\n{}".format(request, command, params, e)) + # Catch the handler being killed externally. + except KeyboardInterrupt: + self.logger.log.info("Received KeyboardInterupt") + except SystemExit: + self.logger.log.info("Received SystemExit") + except Exception as e: + self.logger.log.exception("Received unexpected exception: {}".format(e)) + del self.logger + _exit(0) diff --git a/build/build-macos.sh b/build/build-macos.sh index c8675b0..0aadc0a 100755 --- a/build/build-macos.sh +++ b/build/build-macos.sh @@ -16,4 +16,6 @@ rm ./*.spec brew install platypus platypus --load-profile ./BAPSicle.platypus --overwrite ./output/BAPSicle.app - +mkdir ./output/state +mkdir ./output/logs +mkdir ./output/music-tmp diff --git a/build/requirements.txt b/build/requirements.txt index 76d3404..2709887 100644 --- a/build/requirements.txt +++ b/build/requirements.txt @@ -1,4 +1,4 @@ -pygame==2.0.0.dev24 +pygame==2.0.1 flask flask-cors mutagen @@ -9,3 +9,4 @@ pyttsx3 websockets typing_extensions pyserial +requests diff --git a/controllers/mattchbox_usb.py b/controllers/mattchbox_usb.py index 0579924..76aacd3 100644 --- a/controllers/mattchbox_usb.py +++ b/controllers/mattchbox_usb.py @@ -3,11 +3,16 @@ from controllers.controller import Controller from multiprocessing import Queue import serial import sys - +from setproctitle import setproctitle class MattchBox(Controller): ser: serial.Serial def __init__(self, player_to_q: List[Queue], player_from_q: List[Queue]): + + process_title = "ControllerHandler" + setproctitle(process_title) + #current_process().name = process_title + # connect to serial port self.ser = serial.serial_for_url("/dev/cu.usbserial-210", do_not_open=True) self.ser.baudrate = 2400 diff --git a/helpers/logging_manager.py b/helpers/logging_manager.py index 9ee2bf4..dbc00b0 100644 --- a/helpers/logging_manager.py +++ b/helpers/logging_manager.py @@ -29,9 +29,9 @@ class LoggingManager(): self.logger.addHandler(fh) self.logger.info("** LOGGER STARTED **") - def __del__(self): - self.logger.info("** LOGGER EXITING **") - logging.shutdown() + #def __del__(self): + # Can't seem to close logger properly + #self.logger.info("** LOGGER EXITING **") @property def log(self) -> logging.Logger: diff --git a/launch_standalone.py b/launch_standalone.py old mode 100644 new mode 100755 index a37c247..e700518 --- a/launch_standalone.py +++ b/launch_standalone.py @@ -1,22 +1,31 @@ +#!/usr/bin/env python3 import multiprocessing import time import sys import webbrowser +from setproctitle import setproctitle from server import BAPSicleServer - def startServer(): server = multiprocessing.Process(target=BAPSicleServer) server.start() - while True: - time.sleep(5) - if server and server.is_alive(): - pass - else: - print("Server dead. Exiting.") - sys.exit(0) + try: + while True: + time.sleep(5) + if server and server.is_alive(): + pass + else: + print("Server dead. Exiting.") + sys.exit(0) + # Catch the handler being killed externally. + except KeyboardInterrupt: + print("Received KeyboardInterupt") + except SystemExit: + print("Received SystemExit") + except Exception as e: + print("Received unexpected exception: {}".format(e)) if __name__ == '__main__': @@ -27,6 +36,7 @@ if __name__ == '__main__': # If it's not here, multiprocessing just doesn't run in the package. # Freeze support refers to being packaged with Pyinstaller. multiprocessing.freeze_support() + setproctitle("BAPSicle - Standalone Launch") if len(sys.argv) > 1: # We got an argument! It's probably Platypus's UI. try: @@ -42,6 +52,7 @@ if __name__ == '__main__': webbrowser.open("http://localhost:13500/logs") except Exception as e: print("ALERT:BAPSicle failed with exception:\n", e) + sys.exit(1) sys.exit(0) else: diff --git a/player.py b/player.py index 72063bb..3a4066c 100644 --- a/player.py +++ b/player.py @@ -26,7 +26,6 @@ import setproctitle import copy import json import time -import sys from typing import Any, Callable, Dict, List, Optional @@ -47,16 +46,18 @@ PLAYBACK_END = USEREVENT + 1 # TODO ENUM VALID_MESSAGE_SOURCES = ["WEBSOCKET", "UI", "CONTROLLER", "ALL"] class Player(): - state = None - running = False - out_q = None - last_msg = "" - last_msg_source = "" + out_q: multiprocessing.Queue + last_msg: str + last_msg_source: str last_time_update = None - logger = None - api = None - already_stopped = False - starting = False + + state: StateManager + logger: LoggingManager + api: MyRadioAPI + + running: bool = False + already_stopped: bool = False + starting: bool = False __default_state = { "initialised": False, @@ -352,6 +353,7 @@ class Player(): try: mixer.quit() self.state.update("paused", False) + self.logger.log.info("Quit mixer.") except: self.logger.log.exception("Failed to quit mixer.") @@ -497,7 +499,7 @@ class Player(): multiprocessing.current_process().name = process_title # Init pygame, only used really for the end of playback trigger. - init() + #init() self.running = True self.out_q = out_q @@ -537,11 +539,11 @@ class Player(): else: self.logger.log.info("No file was previously loaded.") - while self.running: - time.sleep(0.1) - self._updateState() - self._ping_times() - try: + try: + while self.running: + time.sleep(0.01) + self._updateState() + self._ping_times() try: message = in_q.get_nowait() source = message.split(":")[0] @@ -626,21 +628,19 @@ class Player(): - # Catch the player being killed externally. - except KeyboardInterrupt: - self.logger.log.info("Received KeyboardInterupt") - break - except SystemExit: - self.logger.log.info("Received SystemExit") - break - except Exception as e: - self.logger.log.exception("Received unexpected exception: {}".format(e)) - break + # Catch the player being killed externally. + except KeyboardInterrupt: + self.logger.log.info("Received KeyboardInterupt") + except SystemExit: + self.logger.log.info("Received SystemExit") + except Exception as e: + self.logger.log.exception("Received unexpected exception: {}".format(e)) - self.logger.log.info("Quiting player ", channel) + self.logger.log.info("Quiting player " + str(channel)) self.quit() - self._retMsg("EXIT") - sys.exit(0) + self._retAll("EXIT") + del self.logger + os._exit(0) if __name__ == "__main__": diff --git a/player_handler.py b/player_handler.py new file mode 100644 index 0000000..b4aa25d --- /dev/null +++ b/player_handler.py @@ -0,0 +1,46 @@ +from helpers.logging_manager import LoggingManager +from setproctitle import setproctitle +#from multiprocessing import current_process +from time import sleep +from os import _exit + +class PlayerHandler(): + logger: LoggingManager + + def __init__(self,channel_from_q, websocket_to_q, ui_to_q, controller_to_q): + + self.logger = LoggingManager("PlayerHandler") + process_title = "PlayerHandler" + setproctitle(process_title) + #current_process().name = process_title + + try: + while True: + + for channel in range(len(channel_from_q)): + try: + message = channel_from_q[channel].get_nowait() + source = message.split(":")[0] + # TODO ENUM + if source in ["ALL","WEBSOCKET"]: + websocket_to_q[channel].put(message) + if source in ["ALL","UI"]: + if not message.split(":")[1] == "POS": + # We don't care about position update spam + ui_to_q[channel].put(message) + if source in ["ALL","CONTROLLER"]: + controller_to_q[channel].put(message) + except: + pass + + sleep(0.01) + # Catch the handler being killed externally. + except KeyboardInterrupt: + self.logger.log.info("Received KeyboardInterupt") + except SystemExit: + self.logger.log.info("Received SystemExit") + except Exception as e: + self.logger.log.exception("Received unexpected exception: {}".format(e)) + del self.logger + _exit(0) + diff --git a/server.py b/server.py index 34bf44b..500ff50 100644 --- a/server.py +++ b/server.py @@ -13,21 +13,20 @@ October, November 2020 """ from api_handler import APIHandler -import asyncio from controllers.mattchbox_usb import MattchBox -import copy import multiprocessing import queue -import threading +#import threading import time import player from flask import Flask, render_template, send_from_directory, request, jsonify, abort -from flask_cors import CORS, cross_origin +from flask_cors import CORS from typing import Any, Optional import json -import setproctitle +from setproctitle import setproctitle import logging -import requests + +from player_handler import PlayerHandler from helpers.os_environment import isMacOS from helpers.device_manager import DeviceManager @@ -42,7 +41,33 @@ from helpers.state_manager import StateManager from helpers.logging_manager import LoggingManager from websocket_server import WebsocketServer -setproctitle.setproctitle("BAPSicle - Server") +setproctitle("BAPSicleServer.py") + +logger: LoggingManager +state: StateManager +class BAPSicleServer(): + + def __init__(self): + + process_title = "BAPSicleServer" + setproctitle(process_title) + #multiprocessing.current_process().name = process_title + + global logger + global state + logger = LoggingManager("BAPSicleServer") + + state = StateManager("BAPSicleServer", logger, default_state) + state.update("server_version", config.VERSION) + + startServer() + #asyncio.get_event_loop().run_until_complete(startServer()) + #asyncio.get_event_loop().run_forever() + + def __del__(self): + stopServer() + + default_state = { "server_version": 0, @@ -53,49 +78,6 @@ default_state = { "num_channels": 3 } -logger: LoggingManager -state: StateManager - -class BAPSicleServer(): - - def __init__(self): - - process_title = "Server" - setproctitle.setproctitle(process_title) - multiprocessing.current_process().name = process_title - - global logger - global state - logger = LoggingManager("BAPSicleServer") - - state = StateManager("BAPSicleServer", logger, default_state) - state.update("server_version", config.VERSION) - - asyncio.get_event_loop().run_until_complete(startServer()) - asyncio.get_event_loop().run_forever() - - def __del__(self): - stopServer() - -class PlayerHandler(): - def __init__(self,channel_from_q, websocket_to_q, ui_to_q, controller_to_q): - while True: - for channel in range(len(channel_from_q)): - try: - message = channel_from_q[channel].get_nowait() - source = message.split(":")[0] - # TODO ENUM - if source in ["ALL","WEBSOCKET"]: - websocket_to_q[channel].put(message) - if source in ["ALL","UI"]: - if not message.split(":")[1] == "POS": - # We don't care about position update spam - ui_to_q[channel].put(message) - if source in ["ALL","CONTROLLER"]: - controller_to_q[channel].put(message) - except: - pass - time.sleep(0.1) app = Flask(__name__, static_url_path='') @@ -114,10 +96,11 @@ channel_from_q: List[queue.Queue] = [] ui_to_q: List[queue.Queue] = [] websocket_to_q: List[queue.Queue] = [] controller_to_q: List[queue.Queue] = [] -channel_p = [] - -stopping = False +channel_p: List[multiprocessing.Process] = [] +websockets_server: multiprocessing.Process +controller_handler: multiprocessing.Process +webserver: multiprocessing.Process # General Endpoints @@ -524,9 +507,10 @@ def serve_favicon(): def serve_static(path: str): return send_from_directory('ui-static', path) -async def startServer(): +def startServer(): process_title="startServer" - threading.current_thread().name = process_title + #threading.current_thread().name = process_title + setproctitle(process_title) if isMacOS(): multiprocessing.set_start_method("spawn", True) @@ -540,13 +524,13 @@ async def startServer(): channel_p.append( multiprocessing.Process( target=player.Player, - args=(channel, channel_to_q[-1], channel_from_q[-1]), + args=(channel, channel_to_q[-1], channel_from_q[-1]) #daemon=True ) ) channel_p[channel].start() - global api_from_q, api_to_q + global api_from_q, api_to_q, api_handler, player_handler, websockets_server, controller_handler api_to_q = multiprocessing.Queue() api_from_q = multiprocessing.Queue() api_handler = multiprocessing.Process(target=APIHandler, args=(api_to_q, api_from_q)) @@ -595,37 +579,48 @@ async def startServer(): channel_to_q[0].put("PLAY") # Don't use reloader, it causes Nested Processes! - app.run(host=state.state["host"], port=state.state["port"], debug=True, use_reloader=False) + global webserver + webserver = multiprocessing.Process( + app.run(host=state.state["host"], port=state.state["port"], debug=True, use_reloader=False) + ) + webserver.start() def stopServer(restart=False): - global channel_p - global channel_from_q - global channel_to_q + global channel_p, channel_from_q, channel_to_q, websockets_server, webserver + print("Stopping Websockets") + websocket_to_q[0].put("WEBSOCKET:QUIT") + websockets_server.join() + del websockets_server print("Stopping server.py") for q in channel_to_q: q.put("QUIT") for player in channel_p: try: player.join() - except: + except Exception as e: + print("*** Ignoring exception:",e) pass finally: - channel_p = [] - channel_from_q = [] - channel_to_q = [] + del player + del channel_from_q + del channel_to_q print("Stopped all players.") - global stopping - if stopping == False: - stopping = True - shutdown = request.environ.get('werkzeug.server.shutdown') - if shutdown is None: - print("Shutting down Server.") - else: - print("Shutting down Flask.") - if not restart: - shutdown() + global webserver + webserver.terminate() + webserver.join() + return + + ## Caused an outside context error, presuably because called outside of a page request. + #shutdown = request.environ.get('werkzeug.server.shutdown') + #if shutdown is None: + # print("Shutting down Server.") + + #else: + # print("Shutting down Flask.") + # if not restart: + # shutdown() if __name__ == "__main__": diff --git a/templates/server.html b/templates/server.html index 7726c92..3020c3f 100644 --- a/templates/server.html +++ b/templates/server.html @@ -8,16 +8,27 @@
+ + +

-
- - -
- +
+ + +
+ {% endif %} -{% endblock %} \ No newline at end of file +{% endblock %} diff --git a/websocket_server.py b/websocket_server.py index 35986b6..1a1d656 100644 --- a/websocket_server.py +++ b/websocket_server.py @@ -1,182 +1,215 @@ import asyncio +from asyncio.futures import Future +from asyncio.tasks import Task, shield + +from websockets.server import Serve +from helpers.logging_manager import LoggingManager import multiprocessing import queue -from typing import Dict, List, Optional +from typing import Any, Dict, List, Optional import websockets import json - -baps_clients = set() -channel_to_q: List[multiprocessing.Queue] -webstudio_to_q: List[multiprocessing.Queue] -server_name: str - - - -async def websocket_handler(websocket, path): - baps_clients.add(websocket) - await websocket.send(json.dumps({"message": "Hello", "serverName": server_name})) - print("New Client: {}".format(websocket)) - for channel in channel_to_q: - channel.put("WEBSOCKET:STATUS") - - async def handle_from_webstudio(): - try: - async for message in websocket: - data = json.loads(message) - print(data) - if not "channel" in data: - # Didn't specify a channel, send to all. - for channel in range(len(channel_to_q)): - sendCommand(channel, data) - else: - channel = int(data["channel"]) - sendCommand(channel, data) - - - await asyncio.wait([conn.send(message) for conn in baps_clients]) - - except websockets.exceptions.ConnectionClosedError as e: - print("RIP {}, {}".format(websocket, e)) - - # TODO: Proper Logging - except Exception as e: - print("Exception", e) - - finally: - baps_clients.remove(websocket) - - def sendCommand(channel, data): - if channel not in range(len(channel_to_q)): - print("ERROR: Received channel number larger than server supported channels.") - return - - if "command" in data.keys(): - command = data["command"] - - # Handle the general case - # Message format: - ## SOURCE:COMMAND:EXTRADATA - - message = "WEBSOCKET:" + command - - # If we just want PLAY, PAUSE etc, we're all done. - # Else, let's pipe in some extra info. - extra = "" - - try: - if command == "SEEK": - extra += str(data["time"]) - elif command == "LOAD": - extra += str(data["weight"]) - elif command == "AUTOADVANCE": - extra += str(data["enabled"]) - elif command == "PLAYONLOAD": - extra += str(data["enabled"]) - elif command == "REPEAT": - extra += str(data["mode"]).lower() - elif command == "ADD": - extra += json.dumps(data["newItem"]) - elif command == "REMOVE": - extra += str(data["weight"]) - elif command == "GET_PLAN": - extra += str(data["timeslotId"]) - - # SPECIAL CASE ALERT! We need to talk to two channels here. - elif command == "MOVE": - # TODO Should we trust the client with the item info? - - # Tell the old channel to remove "weight" - extra += str(data["weight"]) - - # Now modify the item with the weight in the new channel - new_channel = int(data["new_channel"]) - item = data["item"] - item["weight"] = int(data["new_weight"]) - # Now send the special case. - channel_to_q[new_channel].put("ADD:" + json.dumps(item)) - - - except ValueError as e: - print("ERROR decoding extra data {} for command {} ".format(e, command)) - pass - - # Stick the message together and send! - if extra != "": - message += ":" + extra - - try: - channel_to_q[channel].put(message) - except Exception as e: - print("ERRORL: Failed to send message {} to channel {}: {}".format(message, channel, e)) - - else: - print("ERROR: Command missing from message.") - - async def handle_to_webstudio(): - while True: - for channel in range(len(webstudio_to_q)): - try: - message = webstudio_to_q[channel].get_nowait() - source = message.split(":")[0] - - # TODO ENUM - if source not in ["WEBSOCKET","ALL"]: - print("ERROR: Message received from invalid source to websocket_handler. Ignored.", source, message) - continue - - command = message.split(":")[1] - #print("Websocket Out:", command) - if command == "STATUS": - try: - message = message.split("OKAY:")[1] - message = json.loads(message) - except: - continue # TODO more logging - elif command == "POS": - try: - message = message.split(":", 2)[2] - except: - continue - else: - continue - - data = json.dumps({ - "command": command, - "data": message, - "channel": channel - }) - await asyncio.wait([conn.send(data) for conn in baps_clients]) - except queue.Empty: - continue - except Exception as e: - raise e - await asyncio.sleep(0.01) - - from_webstudio = asyncio.create_task(handle_from_webstudio()) - to_webstudio = asyncio.create_task(handle_to_webstudio()) - - try: - await asyncio.gather(from_webstudio, to_webstudio) - finally: - from_webstudio.cancel() - to_webstudio.cancel() - +from os import _exit class WebsocketServer: + threads = Future + baps_clients = set() + channel_to_q: List[multiprocessing.Queue] + webstudio_to_q: List[multiprocessing.Queue] + server_name: str + logger: LoggingManager + to_webstudio: Task + from_webstudio: Task + websocket_server: Serve + def __init__(self, in_q, out_q, state): - global channel_to_q - global webstudio_to_q - channel_to_q = in_q - webstudio_to_q = out_q - global server_name - server_name = state.state["server_name"] + self.channel_to_q = in_q + self.webstudio_to_q = out_q + + self.logger = LoggingManager("Websockets") + self.server_name = state.state["server_name"] + + self.websocket_server = websockets.serve(self.websocket_handler, state.state["host"], state.state["ws_port"]) + + asyncio.get_event_loop().run_until_complete(self.websocket_server) + + try: + asyncio.get_event_loop().run_forever() + except: + self.quit() + + def quit(self): + del self.websocket_server + del self.logger + _exit(0) + + def __del__(self): + print("Deleting websocket server") + self.quit() + + async def websocket_handler(self, websocket, path): + self.baps_clients.add(websocket) + await websocket.send(json.dumps({"message": "Hello", "serverName": self.server_name})) + self.logger.log.info("New Client: {}".format(websocket)) + for channel in self.channel_to_q: + channel.put("WEBSOCKET:STATUS") + + async def handle_from_webstudio(): + try: + async for message in websocket: + data = json.loads(message) + if not "channel" in data: + # Didn't specify a channel, send to all. + for channel in range(len(self.channel_to_q)): + sendCommand(channel, data) + else: + channel = int(data["channel"]) + sendCommand(channel, data) + + async def send(conn, message): + # TODO this doesn't actually catch. + try: + await conn.send(message) + except: + pass + + await asyncio.wait([send(conn, message) for conn in self.baps_clients]) + + except websockets.exceptions.ConnectionClosedError as e: + self.logger.log.error("Client Disconncted {}, {}".format(websocket, e)) + + # TODO: Proper Logging + except Exception as e: + self.logger.log.exception("Exception handling messages from Websocket.\n{}".format(e)) + + finally: + self.logger.log.info("Removing client: {}".format(websocket)) + self.baps_clients.remove(websocket) + + def sendCommand(channel, data): + if channel not in range(len(self.channel_to_q)): + self.logger.log.exception("Received channel number larger than server supported channels.") + return + + if "command" in data.keys(): + command = data["command"] + + # Handle the general case + # Message format: + ## SOURCE:COMMAND:EXTRADATA + + message = "WEBSOCKET:" + command + + # If we just want PLAY, PAUSE etc, we're all done. + # Else, let's pipe in some extra info. + extra = "" + + try: + if command == "SEEK": + extra += str(data["time"]) + elif command == "LOAD": + extra += str(data["weight"]) + elif command == "AUTOADVANCE": + extra += str(data["enabled"]) + elif command == "PLAYONLOAD": + extra += str(data["enabled"]) + elif command == "REPEAT": + extra += str(data["mode"]).lower() + elif command == "ADD": + extra += json.dumps(data["newItem"]) + elif command == "REMOVE": + extra += str(data["weight"]) + elif command == "GET_PLAN": + extra += str(data["timeslotId"]) + + # SPECIAL CASE ALERT! We need to talk to two channels here. + elif command == "MOVE": + # TODO Should we trust the client with the item info? + + # Tell the old channel to remove "weight" + extra += str(data["weight"]) + + # Now modify the item with the weight in the new channel + new_channel = int(data["new_channel"]) + item = data["item"] + item["weight"] = int(data["new_weight"]) + # Now send the special case. + self.channel_to_q[new_channel].put("ADD:" + json.dumps(item)) + + + except ValueError as e: + self.logger.log.exception("Error decoding extra data {} for command {} ".format(e, command)) + pass + + # Stick the message together and send! + if extra != "": + message += ":" + extra + + try: + self.channel_to_q[channel].put(message) + except Exception as e: + self.logger.log.exception("Failed to send message {} to channel {}: {}".format(message, channel, e)) + + else: + self.logger.log.error("Command missing from message. Data: {}".format(data)) + + async def handle_to_webstudio(): + while True: + for channel in range(len(self.webstudio_to_q)): + try: + message = self.webstudio_to_q[channel].get_nowait() + source = message.split(":")[0] + + # TODO ENUM + if source not in ["WEBSOCKET","ALL"]: + print("ERROR: Message received from invalid source to websocket_handler. Ignored.", source, message) + continue + + command = message.split(":")[1] + #print("Websocket Out:", command) + if command == "STATUS": + try: + message = message.split("OKAY:")[1] + message = json.loads(message) + except: + continue # TODO more logging + elif command == "POS": + try: + message = message.split(":", 2)[2] + except: + continue + elif command == "QUIT": + self.quit() + else: + continue + + data = json.dumps({ + "command": command, + "data": message, + "channel": channel + }) + await asyncio.wait([conn.send(data) for conn in self.baps_clients]) + except queue.Empty: + continue + except ValueError: + # Typically a "Set of coroutines/Futures is empty." when sending to a dead client. + continue + except Exception as e: + self.logger.log.exception("Exception trying to send to websocket:", e) + await asyncio.sleep(0.01) + + self.from_webstudio = asyncio.create_task(handle_from_webstudio()) + self.to_webstudio = asyncio.create_task(handle_to_webstudio()) + + try: + self.threads = await shield(asyncio.gather(self.from_webstudio, self.to_webstudio)) + finally: + self.from_webstudio.cancel() + self.to_webstudio.cancel() - websocket_server = websockets.serve(websocket_handler, state.state["host"], state.state["ws_port"]) - asyncio.get_event_loop().run_until_complete(websocket_server) - asyncio.get_event_loop().run_forever() if __name__ == "__main__":