BAPSicle/websocket_server.py

263 lines
8.9 KiB
Python
Raw Permalink Normal View History

import asyncio
from asyncio.futures import Future
from asyncio.tasks import Task, shield
import multiprocessing
import queue
2021-04-08 21:05:25 +00:00
from typing import List
import websockets
import json
from os import _exit
from websockets.legacy.server import Serve
from websockets.server import serve
2021-04-18 20:17:41 +00:00
from setproctitle import setproctitle
from multiprocessing import current_process
2021-04-08 21:05:25 +00:00
from helpers.logging_manager import LoggingManager
from helpers.the_terminator import Terminator
2021-04-08 19:53:51 +00:00
2021-04-08 21:32:16 +00:00
class WebsocketServer:
threads = Future
baps_clients = set()
player_to_q: List[multiprocessing.Queue]
player_from_q: multiprocessing.Queue
server_name: str
logger: LoggingManager
to_webstudio: Task
from_webstudio: Task
2021-09-30 23:08:24 +00:00
websocket_server: serve
def __init__(self, in_q, out_q, state):
self.player_to_q = in_q
self.player_from_q = out_q
process_title = "BAPSicle - Websockets Server"
2021-04-18 20:17:41 +00:00
setproctitle(process_title)
current_process().name = process_title
self.logger = LoggingManager("Websockets")
self.server_name = state.get()["server_name"]
self.websocket_server = serve(
self.websocket_handler, state.get()["host"], state.get()["ws_port"]
2021-04-08 19:53:51 +00:00
)
asyncio.get_event_loop().run_until_complete(self.websocket_server)
asyncio.get_event_loop().run_until_complete(self.handle_to_webstudio())
try:
asyncio.get_event_loop().run_forever()
2021-04-08 21:32:16 +00:00
except Exception:
2021-04-08 21:05:25 +00:00
# Sever died somehow, just quit out.
self.quit()
def quit(self):
self.logger.log.info("Quitting.")
del self.websocket_server
del self.logger
_exit(0)
def __del__(self):
2021-09-08 22:36:59 +00:00
self.logger.log.info("Deleting websocket server")
self.quit()
2021-04-08 21:32:16 +00:00
async def websocket_handler(self, websocket, path):
self.baps_clients.add(websocket)
2021-04-08 19:53:51 +00:00
await websocket.send(
json.dumps({"message": "Hello", "serverName": self.server_name})
)
self.logger.log.info("New Client: {}".format(websocket))
for channel in self.player_to_q:
channel.put("WEBSOCKET:STATUS")
2021-09-11 16:18:35 +00:00
self.from_webstudio = asyncio.create_task(
self.handle_from_webstudio(websocket))
2021-04-08 21:48:38 +00:00
try:
2021-09-11 15:49:08 +00:00
self.threads = await shield(asyncio.gather(self.from_webstudio))
2021-04-08 21:48:38 +00:00
finally:
self.from_webstudio.cancel()
2021-04-08 21:48:38 +00:00
async def handle_from_webstudio(self, websocket):
try:
async for message in websocket:
data = json.loads(message)
if "channel" not in data:
# Didn't specify a channel, send to all.
for channel in range(len(self.player_to_q)):
2021-04-08 21:48:38 +00:00
self.sendCommand(channel, data)
else:
channel = int(data["channel"])
self.sendCommand(channel, data)
await asyncio.wait([asyncio.Task(conn.send(message)) for conn in self.baps_clients])
2021-04-08 21:48:38 +00:00
except websockets.exceptions.ConnectionClosedError as e:
2021-09-11 16:18:35 +00:00
self.logger.log.error(
"Client Disconncted {}, {}".format(websocket, e))
2021-04-08 21:48:38 +00:00
except Exception as e:
self.logger.log.exception(
"Exception handling messages from Websocket.\n{}".format(e)
)
2021-04-08 21:48:38 +00:00
finally:
self.logger.log.info("Removing client: {}".format(websocket))
self.baps_clients.remove(websocket)
2021-04-08 21:48:38 +00:00
def sendCommand(self, channel, data):
if channel not in range(len(self.player_to_q)):
2021-04-08 21:48:38 +00:00
self.logger.log.exception(
"Received channel number larger than server supported channels."
)
return
2021-04-08 21:48:38 +00:00
if "command" in data.keys():
command = data["command"]
2021-04-08 21:48:38 +00:00
# Handle the general case
# Message format:
# SOURCE:COMMAND:EXTRADATA
message = "WEBSOCKET:"
2021-04-08 21:48:38 +00:00
# If we just want PLAY, PAUSE etc, we're all done.
# Else, let's pipe in some extra info.
extra = ""
2021-04-08 21:48:38 +00:00
try:
if command == "SEEK":
extra += str(data["time"])
elif command == "LOAD":
extra += str(data["weight"])
elif command == "AUTOADVANCE":
extra += str(data["enabled"])
elif command == "PLAYONLOAD":
extra += str(data["enabled"])
elif command == "REPEAT":
extra += str(data["mode"]).lower()
elif command == "ADD":
extra += json.dumps(data["newItem"])
elif command == "REMOVE":
extra += str(data["weight"])
2021-05-02 01:18:00 +00:00
elif command == "RESETPLAYED":
extra += str(data["weight"])
elif command == "SETPLAYED":
extra += str(data["weight"])
elif command == "GETPLAN":
2021-04-08 21:48:38 +00:00
extra += str(data["timeslotId"])
2021-04-17 17:27:36 +00:00
elif command == "SETMARKER":
extra += "{}:{}".format(
2021-09-11 15:49:08 +00:00
data["timeslotitemid"], json.dumps(data["marker"])
2021-04-17 17:27:36 +00:00
)
2021-04-08 21:48:38 +00:00
# TODO: Move this to player handler.
2021-04-08 21:48:38 +00:00
# SPECIAL CASE ALERT! We need to talk to two channels here.
elif command == "MOVE":
# remove the exiting item first
self.player_to_q[channel].put(
"{}REMOVE:{}".format(message, data["weight"])
)
# Now hijack to send the new add on the new channel.
2021-04-08 21:48:38 +00:00
# Now modify the item with the weight in the new channel
new_channel = int(data["new_channel"])
item = data["item"]
item["weight"] = int(data["new_weight"])
2021-04-08 21:48:38 +00:00
# Now send the special case.
self.player_to_q[new_channel].put(
2021-09-11 15:49:08 +00:00
"WEBSOCKET:ADD:" + json.dumps(item)
)
2021-04-08 21:48:38 +00:00
# Don't bother, we should be done.
return
2021-04-08 21:48:38 +00:00
except ValueError as e:
self.logger.log.exception(
2021-09-11 16:18:35 +00:00
"Error decoding extra data {} for command {} ".format(
e, command)
2021-04-08 21:48:38 +00:00
)
pass
2021-04-08 21:48:38 +00:00
# Stick the message together and send!
2021-09-11 15:49:08 +00:00
message += (
2021-09-11 16:18:35 +00:00
# Put the command in at the end, in case MOVE etc changed it.
command
2021-09-11 15:49:08 +00:00
)
2021-04-08 21:48:38 +00:00
if extra != "":
message += ":" + extra
2021-04-08 21:48:38 +00:00
try:
self.player_to_q[channel].put(message)
2021-04-08 21:48:38 +00:00
except Exception as e:
self.logger.log.exception(
"Failed to send message {} to channel {}: {}".format(
message, channel, e
2021-04-08 19:53:51 +00:00
)
)
2021-04-08 21:48:38 +00:00
else:
2021-09-11 16:18:35 +00:00
self.logger.log.error(
"Command missing from message. Data: {}".format(data))
2021-04-08 21:48:38 +00:00
async def handle_to_webstudio(self):
terminator = Terminator()
while not terminator.terminate:
await asyncio.sleep(0.02)
try:
message = self.player_from_q.get_nowait()
split = message.split(":")
channel = int(split[0])
source = split[1]
# TODO ENUM
if source not in ["WEBSOCKET", "ALL"]:
self.logger.log.error(
"ERROR: Message received from invalid source to websocket_handler. Ignored.",
source,
message,
)
continue
command = split[2]
if command == "STATUS":
try:
message = message.split("OKAY:")[1]
message = json.loads(message)
except Exception:
continue # TODO more logging
elif command == "POS":
try:
message = split[3]
except Exception:
continue
elif command == "QUIT":
self.quit()
else:
continue
2021-04-08 21:48:38 +00:00
data = json.dumps(
{"command": command, "data": message, "channel": channel}
)
await asyncio.wait([asyncio.Task(conn.send(data)) for conn in self.baps_clients])
except queue.Empty:
continue
except ValueError:
# Typically a "Set of coroutines/Futures is empty." when sending to a dead client.
continue
except Exception as e:
self.logger.log.exception(
"Exception trying to send to websocket:", e
)
self.quit()
2020-11-15 17:48:05 +00:00
if __name__ == "__main__":
2021-09-08 22:36:59 +00:00
raise Exception("Don't run this file standalone.")