diff --git a/requirements.txt b/requirements.txt index 43ac0c7..554d610 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,8 @@ +aiohttp==3.6.2 aioice==0.6.18 aiortc==0.9.27 +async-timeout==3.0.1 +attrs==19.3.0 av==7.0.1 blinker==1.4 certifi==2020.4.5.1 @@ -11,6 +14,7 @@ idna==2.9 JACK-Client==0.5.2 jedi==0.15.2 jsonpickle==1.3 +multidict==4.7.5 mypy==0.770 mypy-extensions==0.4.3 netifaces==0.10.9 @@ -31,3 +35,4 @@ typing-extensions==3.7.4.2 ujson==1.35 urllib3==1.25.8 websockets==8.1 +yarl==1.4.2 diff --git a/server.py b/server.py index aef0cd5..737e90a 100644 --- a/server.py +++ b/server.py @@ -2,29 +2,38 @@ import asyncio import websockets import json import uuid -import av # type: ignore +import av # type: ignore import struct -from aiortc import MediaStreamTrack, RTCPeerConnection, RTCSessionDescription # type: ignore -from aiortc.contrib.media import MediaBlackhole, MediaPlayer # type: ignore -import jack as Jack # type: ignore +from aiortc import MediaStreamTrack, RTCPeerConnection, RTCSessionDescription # type: ignore +from aiortc.contrib.media import MediaBlackhole, MediaPlayer # type: ignore +import jack as Jack # type: ignore import os import re from datetime import datetime -from typing import Optional, Any, Type +from typing import Optional, Any, Type, Dict from types import TracebackType import sys -from raygun4py import raygunprovider # type: ignore +import aiohttp +from raygun4py import raygunprovider # type: ignore import configparser config = configparser.ConfigParser() config.read("shittyserver.ini") -def handle_exception(exc_type: Type[BaseException], exc_value: BaseException, exc_traceback: TracebackType) -> None: - cl = raygunprovider.RaygunSender(config["raygun"]["key"]) - cl.send_exception(exc_info=(exc_type, exc_value, exc_traceback)) +ENABLE_EXCEPTION_LOGGING = False -sys.excepthook = handle_exception +if ENABLE_EXCEPTION_LOGGING: + + def handle_exception( + exc_type: Type[BaseException], + exc_value: BaseException, + exc_traceback: TracebackType, + ) -> None: + cl = raygunprovider.RaygunSender(config["raygun"]["key"]) + cl.send_exception(exc_info=(exc_type, exc_value, exc_traceback)) + + sys.excepthook = handle_exception file_contents_ex = re.compile(r"^ws=\d$") @@ -47,12 +56,12 @@ def write_ob_status(status: bool) -> None: fd.truncate() -@Jack.set_error_function # type: ignore +@Jack.set_error_function # type: ignore def error(msg: str) -> None: print("Error:", msg) -@Jack.set_info_function # type: ignore +@Jack.set_info_function # type: ignore def info(msg: str) -> None: print("Info:", msg) @@ -64,6 +73,7 @@ out2 = jack.outports.register("out_1") transfer_buffer1: Any = None transfer_buffer2: Any = None + def init_buffers() -> None: global transfer_buffer1, transfer_buffer2 transfer_buffer1 = Jack.RingBuffer(jack.samplerate * 10) @@ -73,7 +83,7 @@ def init_buffers() -> None: init_buffers() -@jack.set_process_callback # type: ignore +@jack.set_process_callback # type: ignore def process(frames: int) -> None: buf1 = out1.get_buffer() piece1 = transfer_buffer1.read(len(buf1)) @@ -85,6 +95,7 @@ def process(frames: int) -> None: class JackSender(object): resampler: Any + def __init__(self, track: MediaStreamTrack) -> None: self.track = track self.resampler = None @@ -113,7 +124,20 @@ class JackSender(object): transfer_buffer2.write(new_frame.planes[1]) -current_session = None +active_sessions: Dict[str, "Session"] = {} +live_session: Optional['Session'] = None + + +async def notify_mattserver_about_sessions() -> None: + async with aiohttp.ClientSession() as session: + data: Dict[str, Dict[str, str]] = {} + for sid, sess in active_sessions.items(): + data[sid] = sess.to_dict() + await session.post(config["mattserver"]["notify_url"], json=data) + + +class NotReadyException(BaseException): + pass class Session(object): @@ -121,29 +145,63 @@ class Session(object): sender: Optional[JackSender] connection_state: Optional[str] pc: Optional[Any] + connection_id: str def __init__(self) -> None: self.websocket = None self.sender = None self.pc = None self.connection_state = None + self.connection_id = str(uuid.uuid4()) + self.ended = False + + def to_dict(self) -> Dict[str, str]: + return {"connection_id": self.connection_id} + + async def activate(self) -> None: + print(self.connection_id, "Activating") + if self.sender is None: + print(self.connection_id, "... but we don't have a sender!") + raise NotReadyException() + else: + await self.sender.process() async def end(self) -> None: - print(self.connection_id, "going away") - if self.sender is not None: - self.sender.end() - if self.pc is not None: - await self.pc.close() - init_buffers() - write_ob_status(False) - if self.websocket is not None: - await self.websocket.send(json.dumps({"kind": "REPLACED"})) + global active_sessions + + if self.ended: + print(self.connection_id, "already over") + else: + print(self.connection_id, "going away") + + if self.sender is not None: + self.sender.end() + + if self.pc is not None: + await self.pc.close() + + init_buffers() + + if self.websocket is not None and self.websocket.state == websockets.protocol.State.OPEN: + await self.websocket.send(json.dumps({"kind": "REPLACED"})) + await self.websocket.close(1008) + + if self.connection_id in active_sessions: + del active_sessions[self.connection_id] + if len(active_sessions) == 0: + write_ob_status(False) + else: + print(self.connection_id, "wasn't in active_sessions!") + + await notify_mattserver_about_sessions() + print(self.connection_id, "bye bye") + self.ended = True def create_peerconnection(self) -> None: self.pc = RTCPeerConnection() assert self.pc is not None - @self.pc.on("signalingstatechange") # type: ignore + @self.pc.on("signalingstatechange") # type: ignore async def on_signalingstatechange() -> None: assert self.pc is not None print( @@ -151,41 +209,38 @@ class Session(object): "Signaling state is {}".format(self.pc.signalingState), ) - @self.pc.on("iceconnectionstatechange") # type: ignore + @self.pc.on("iceconnectionstatechange") # type: ignore async def on_iceconnectionstatechange() -> None: if self.pc is None: - print(self.connection_id, "ICE connection state change, but the PC is None!") + print( + self.connection_id, + "ICE connection state change, but the PC is None!", + ) else: print( self.connection_id, "ICE connection state is {}".format(self.pc.iceConnectionState), ) if self.pc.iceConnectionState == "failed": - await self.pc.close() - self.pc = None - if self.websocket is not None: - await self.websocket.close(1008) - return + await self.end() - @self.pc.on("track") # type: ignore + @self.pc.on("track") # type: ignore async def on_track(track: MediaStreamTrack) -> None: - global current_session + global active_sessions print(self.connection_id, "Received track") if track.kind == "audio": print(self.connection_id, "Adding to Jack.") - @track.on("ended") # type: ignore + await notify_mattserver_about_sessions() + + @track.on("ended") # type: ignore async def on_ended() -> None: print(self.connection_id, "Track {} ended".format(track.kind)) # TODO: this doesn't exactly handle reconnecting gracefully - self.end() + await self.end() self.sender = JackSender(track) - if current_session is not None: - await current_session.end() - current_session = self write_ob_status(True) - await self.sender.process() async def process_ice(self, message: Any) -> None: if self.connection_state == "HELLO" and message["kind"] == "OFFER": @@ -221,27 +276,35 @@ class Session(object): ) async def connect(self, websocket: websockets.WebSocketServerProtocol) -> None: + global active_sessions + + active_sessions[self.connection_id] = self + self.websocket = websocket - self.connection_id = uuid.uuid4() self.connection_state = "HELLO" print(self.connection_id, "Connected") # TODO Raygun user ID await websocket.send( - json.dumps({"kind": "HELLO", "connectionId": str(self.connection_id)}) + json.dumps({"kind": "HELLO", "connectionId": self.connection_id}) ) - async for msg in websocket: - data = json.loads(msg) - if data["kind"] == "OFFER": - await self.process_ice(data) - elif data["kind"] == "TIME": - time = datetime.now().time() - await websocket.send(json.dumps({"kind": "TIME", "time": str(time)})) - else: - print(self.connection_id, "Unknown kind {}".format(data["kind"])) - await websocket.send( - json.dumps({"kind": "ERROR", "error": "unknown_kind"}) - ) + try: + async for msg in websocket: + data = json.loads(msg) + if data["kind"] == "OFFER": + await self.process_ice(data) + elif data["kind"] == "TIME": + time = datetime.now().time() + await websocket.send(json.dumps({"kind": "TIME", "time": str(time)})) + else: + print(self.connection_id, "Unknown kind {}".format(data["kind"])) + await websocket.send( + json.dumps({"kind": "ERROR", "error": "unknown_kind"}) + ) + + except websockets.exceptions.ConnectionClosedError: + print(self.connection_id, "WebSocket closed") + await self.end() async def serve(websocket: websockets.WebSocketServerProtocol, path: str) -> None: @@ -252,12 +315,64 @@ async def serve(websocket: websockets.WebSocketServerProtocol, path: str) -> Non pass -WS_PORT = 8079 +start_server = websockets.serve(serve, "localhost", int(config["ports"]["websocket"])) + +print("Shittyserver WS starting on port {}.".format(config["ports"]["websocket"])) + + +async def telnet_server( + reader: asyncio.StreamReader, writer: asyncio.StreamWriter +) -> None: + global active_sessions, live_session + while True: + data = await reader.read(128) + if not data: + break + data_str = data.decode("utf-8") + parts = data_str.rstrip().split(" ") + print(parts) + + if parts[0] == "Q": + data: Dict[str, Dict[str, str]] = {} + for sid, sess in active_sessions.items(): + data[sid] = sess.to_dict() + writer.write((json.dumps(data) + "\r\n").encode("utf-8")) + + elif parts[0] == "SEL": + sid = parts[1] + if sid == "NUL": + if live_session is not None: + await live_session.end() + writer.write("OKAY\r\n".encode("utf-8")) + else: + writer.write("WONT\r\n".encode("utf-8")) + else: + session = active_sessions[sid] + if session is None: + writer.write("FAIL\r\n".encode("utf-8")) + else: + if live_session is not None: + await live_session.end() + asyncio.ensure_future(session.activate()) + live_session = session + writer.write("OKAY\r\n".encode("utf-8")) + else: + writer.write("WHAT\r\n".encode("utf-8")) + await writer.drain() + writer.close() + + +async def run_telnet_server() -> None: + server = await asyncio.start_server( + telnet_server, "localhost", int(config["ports"]["telnet"]) + ) + await server.serve_forever() + jack.activate() -start_server = websockets.serve(serve, "localhost", WS_PORT) -print("Shittyserver starting on port {}.".format(WS_PORT)) +print("Shittyserver TELNET starting on port {}".format(config["ports"]["telnet"])) +asyncio.get_event_loop().run_until_complete(notify_mattserver_about_sessions()) -asyncio.get_event_loop().run_until_complete(start_server) +asyncio.get_event_loop().run_until_complete(asyncio.gather(start_server, run_telnet_server())) asyncio.get_event_loop().run_forever() diff --git a/shittyserver.ini.example b/shittyserver.ini.example index 573ea08..5200a17 100644 --- a/shittyserver.ini.example +++ b/shittyserver.ini.example @@ -1,2 +1,5 @@ [raygun] -key = CHANGEME \ No newline at end of file +key = CHANGEME + +[mattserver] +notify_url = https://ent9s2r5u77vj.x.pipedream.net \ No newline at end of file