2020-11-15 17:40:18 +00:00
import asyncio
2021-04-04 21:34:46 +00:00
from asyncio . futures import Future
from asyncio . tasks import Task , shield
2020-12-20 01:10:19 +00:00
import multiprocessing
2020-12-20 18:52:27 +00:00
import queue
2021-04-08 21:05:25 +00:00
from typing import List
2020-11-15 17:40:18 +00:00
import websockets
import json
2021-04-04 21:34:46 +00:00
from os import _exit
2021-04-17 21:51:43 +00:00
from websockets . server import Serve
2021-04-18 20:17:41 +00:00
from setproctitle import setproctitle
from multiprocessing import current_process
2020-11-15 17:40:18 +00:00
2021-04-08 21:05:25 +00:00
from helpers . logging_manager import LoggingManager
2021-04-17 21:51:43 +00:00
from helpers . the_terminator import Terminator
2021-04-08 19:53:51 +00:00
2021-04-08 21:32:16 +00:00
2021-04-04 21:34:46 +00:00
class WebsocketServer :
2020-11-15 17:40:18 +00:00
2021-04-04 21:34:46 +00:00
threads = Future
baps_clients = set ( )
channel_to_q : List [ multiprocessing . Queue ]
webstudio_to_q : List [ multiprocessing . Queue ]
server_name : str
logger : LoggingManager
to_webstudio : Task
from_webstudio : Task
websocket_server : Serve
2020-11-15 17:40:18 +00:00
2021-04-04 21:34:46 +00:00
def __init__ ( self , in_q , out_q , state ) :
2020-12-20 01:10:19 +00:00
2021-04-04 21:34:46 +00:00
self . channel_to_q = in_q
self . webstudio_to_q = out_q
2020-11-15 17:40:18 +00:00
2021-04-18 20:17:41 +00:00
process_title = " Websockets Servr "
setproctitle ( process_title )
current_process ( ) . name = process_title
2021-04-04 21:34:46 +00:00
self . logger = LoggingManager ( " Websockets " )
2021-04-18 19:27:54 +00:00
self . server_name = state . get ( ) [ " server_name " ]
2021-03-21 20:15:42 +00:00
2021-04-08 19:53:51 +00:00
self . websocket_server = websockets . serve (
2021-04-18 19:27:54 +00:00
self . websocket_handler , state . get ( ) [ " host " ] , state . get ( ) [ " ws_port " ]
2021-04-08 19:53:51 +00:00
)
2020-12-20 01:10:19 +00:00
2021-04-04 21:34:46 +00:00
asyncio . get_event_loop ( ) . run_until_complete ( self . websocket_server )
2021-04-11 19:29:16 +00:00
asyncio . get_event_loop ( ) . run_until_complete ( self . handle_to_webstudio ( ) )
2020-12-20 01:10:19 +00:00
2021-04-04 21:34:46 +00:00
try :
asyncio . get_event_loop ( ) . run_forever ( )
2021-04-08 21:32:16 +00:00
except Exception :
2021-04-08 21:05:25 +00:00
# Sever died somehow, just quit out.
2021-04-04 21:34:46 +00:00
self . quit ( )
def quit ( self ) :
2021-04-11 19:29:16 +00:00
self . logger . log . info ( " Quitting. " )
2021-04-04 21:34:46 +00:00
del self . websocket_server
del self . logger
_exit ( 0 )
def __del__ ( self ) :
print ( " Deleting websocket server " )
self . quit ( )
2021-04-08 21:32:16 +00:00
async def websocket_handler ( self , websocket , path ) :
2021-04-04 21:34:46 +00:00
self . baps_clients . add ( websocket )
2021-04-08 19:53:51 +00:00
await websocket . send (
json . dumps ( { " message " : " Hello " , " serverName " : self . server_name } )
)
2021-04-04 21:34:46 +00:00
self . logger . log . info ( " New Client: {} " . format ( websocket ) )
for channel in self . channel_to_q :
channel . put ( " WEBSOCKET:STATUS " )
2021-04-08 21:48:38 +00:00
self . from_webstudio = asyncio . create_task ( self . handle_from_webstudio ( websocket ) )
2020-12-20 01:10:19 +00:00
2021-04-08 21:48:38 +00:00
try :
self . threads = await shield (
2021-04-11 19:29:16 +00:00
asyncio . gather ( self . from_webstudio )
2021-04-08 21:48:38 +00:00
)
finally :
self . from_webstudio . cancel ( )
2020-12-20 01:10:19 +00:00
2021-04-08 21:48:38 +00:00
async def handle_from_webstudio ( self , websocket ) :
try :
async for message in websocket :
data = json . loads ( message )
if " channel " not in data :
# Didn't specify a channel, send to all.
for channel in range ( len ( self . channel_to_q ) ) :
self . sendCommand ( channel , data )
else :
channel = int ( data [ " channel " ] )
self . sendCommand ( channel , data )
await asyncio . wait (
[ conn . send ( message ) for conn in self . baps_clients ]
)
2020-12-20 01:10:19 +00:00
2021-04-08 21:48:38 +00:00
except websockets . exceptions . ConnectionClosedError as e :
self . logger . log . error (
" Client Disconncted {} , {} " . format ( websocket , e ) )
2021-03-22 00:33:14 +00:00
2021-04-08 21:48:38 +00:00
except Exception as e :
self . logger . log . exception (
" Exception handling messages from Websocket. \n {} " . format ( e )
)
2021-03-22 00:33:14 +00:00
2021-04-08 21:48:38 +00:00
finally :
self . logger . log . info ( " Removing client: {} " . format ( websocket ) )
self . baps_clients . remove ( websocket )
2021-03-22 00:33:14 +00:00
2021-04-08 21:48:38 +00:00
def sendCommand ( self , channel , data ) :
if channel not in range ( len ( self . channel_to_q ) ) :
self . logger . log . exception (
" Received channel number larger than server supported channels. "
)
return
2021-03-22 00:33:14 +00:00
2021-04-08 21:48:38 +00:00
if " command " in data . keys ( ) :
command = data [ " command " ]
2021-03-22 00:33:14 +00:00
2021-04-08 21:48:38 +00:00
# Handle the general case
# Message format:
# SOURCE:COMMAND:EXTRADATA
2021-03-22 00:33:14 +00:00
2021-04-24 20:33:36 +00:00
message = " WEBSOCKET: "
2021-03-22 00:33:14 +00:00
2021-04-08 21:48:38 +00:00
# If we just want PLAY, PAUSE etc, we're all done.
# Else, let's pipe in some extra info.
extra = " "
2021-03-21 20:15:42 +00:00
2021-04-08 21:48:38 +00:00
try :
if command == " SEEK " :
extra + = str ( data [ " time " ] )
elif command == " LOAD " :
extra + = str ( data [ " weight " ] )
elif command == " AUTOADVANCE " :
extra + = str ( data [ " enabled " ] )
elif command == " PLAYONLOAD " :
extra + = str ( data [ " enabled " ] )
elif command == " REPEAT " :
extra + = str ( data [ " mode " ] ) . lower ( )
elif command == " ADD " :
extra + = json . dumps ( data [ " newItem " ] )
elif command == " REMOVE " :
extra + = str ( data [ " weight " ] )
elif command == " GET_PLAN " :
extra + = str ( data [ " timeslotId " ] )
2021-04-17 17:27:36 +00:00
elif command == " SETMARKER " :
extra + = " {} : {} " . format (
data [ " timeslotitemid " ] ,
json . dumps ( data [ " marker " ] )
)
2021-04-08 21:48:38 +00:00
# SPECIAL CASE ALERT! We need to talk to two channels here.
elif command == " MOVE " :
2021-04-24 23:46:01 +00:00
# remove the exiting item first
self . channel_to_q [ channel ] . put (
" {} REMOVE: {} " . format ( message , data [ " weight " ] )
)
# Now hijack to send the new add on the new channel.
2021-04-08 21:48:38 +00:00
# Now modify the item with the weight in the new channel
new_channel = int ( data [ " new_channel " ] )
item = data [ " item " ]
item [ " weight " ] = int ( data [ " new_weight " ] )
2021-04-24 20:33:36 +00:00
# If we're moving within the same channel, add 1 to the weight, since we're adding the new item before we remove the old one, UI gave us the weight expected after removing.
if channel == new_channel and data [ " new_weight " ] > data [ " weight " ] :
item [ " weight " ] + = 1
2021-04-08 21:48:38 +00:00
# Now send the special case.
self . channel_to_q [ new_channel ] . put (
2021-04-24 20:33:36 +00:00
" WEBSOCKET:ADD: " + json . dumps ( item ) )
2021-04-08 21:48:38 +00:00
2021-04-24 23:46:01 +00:00
# Don't bother, we should be done.
return
2021-04-08 21:48:38 +00:00
except ValueError as e :
self . logger . log . exception (
" Error decoding extra data {} for command {} " . format (
e , command
2021-04-08 19:53:51 +00:00
)
2021-04-08 21:48:38 +00:00
)
pass
2021-04-04 21:34:46 +00:00
2021-04-08 21:48:38 +00:00
# Stick the message together and send!
2021-04-24 20:33:36 +00:00
message + = command # Put the command in at the end, in case MOVE etc changed it.
2021-04-08 21:48:38 +00:00
if extra != " " :
message + = " : " + extra
2021-03-22 00:33:14 +00:00
2021-04-08 21:48:38 +00:00
try :
self . channel_to_q [ channel ] . put ( message )
except Exception as e :
self . logger . log . exception (
" Failed to send message {} to channel {} : {} " . format (
message , channel , e
2021-04-08 19:53:51 +00:00
)
)
2020-11-15 17:40:18 +00:00
2021-04-08 21:48:38 +00:00
else :
self . logger . log . error (
" Command missing from message. Data: {} " . format ( data )
)
2020-11-15 17:40:18 +00:00
2021-04-08 21:48:38 +00:00
async def handle_to_webstudio ( self ) :
2021-04-17 21:51:43 +00:00
terminator = Terminator ( )
while not terminator . terminate :
2021-04-08 21:48:38 +00:00
for channel in range ( len ( self . webstudio_to_q ) ) :
try :
message = self . webstudio_to_q [ channel ] . get_nowait ( )
source = message . split ( " : " ) [ 0 ]
# TODO ENUM
if source not in [ " WEBSOCKET " , " ALL " ] :
2021-04-11 19:29:16 +00:00
self . logger . log . error (
2021-04-08 21:48:38 +00:00
" ERROR: Message received from invalid source to websocket_handler. Ignored. " ,
source ,
message ,
2021-04-08 19:53:51 +00:00
)
2021-04-04 21:34:46 +00:00
continue
2021-04-08 21:48:38 +00:00
command = message . split ( " : " ) [ 1 ]
# print("Websocket Out:", command)
if command == " STATUS " :
try :
message = message . split ( " OKAY: " ) [ 1 ]
message = json . loads ( message )
except Exception :
continue # TODO more logging
elif command == " POS " :
try :
message = message . split ( " : " , 2 ) [ 2 ]
except Exception :
continue
elif command == " QUIT " :
self . quit ( )
else :
2021-04-04 21:34:46 +00:00
continue
2021-04-08 21:48:38 +00:00
data = json . dumps (
{ " command " : command , " data " : message , " channel " : channel }
)
await asyncio . wait (
[ conn . send ( data ) for conn in self . baps_clients ]
)
except queue . Empty :
continue
except ValueError :
# Typically a "Set of coroutines/Futures is empty." when sending to a dead client.
continue
except Exception as e :
self . logger . log . exception (
" Exception trying to send to websocket: " , e
)
await asyncio . sleep ( 0.02 )
2020-11-15 17:40:18 +00:00
2021-04-17 21:51:43 +00:00
self . quit ( )
2020-11-15 17:48:05 +00:00
2020-11-15 17:40:18 +00:00
if __name__ == " __main__ " :
print ( " Don ' t do this " )