shittyserver: implement things needed for mattserver

This commit is contained in:
Marks Polakovs 2020-04-11 15:53:50 +02:00
parent 4dca05cf57
commit 150596920f
3 changed files with 181 additions and 58 deletions

View file

@ -1,5 +1,8 @@
aiohttp==3.6.2
aioice==0.6.18
aiortc==0.9.27
async-timeout==3.0.1
attrs==19.3.0
av==7.0.1
blinker==1.4
certifi==2020.4.5.1
@ -11,6 +14,7 @@ idna==2.9
JACK-Client==0.5.2
jedi==0.15.2
jsonpickle==1.3
multidict==4.7.5
mypy==0.770
mypy-extensions==0.4.3
netifaces==0.10.9
@ -31,3 +35,4 @@ typing-extensions==3.7.4.2
ujson==1.35
urllib3==1.25.8
websockets==8.1
yarl==1.4.2

229
server.py
View file

@ -2,29 +2,38 @@ import asyncio
import websockets
import json
import uuid
import av # type: ignore
import av # type: ignore
import struct
from aiortc import MediaStreamTrack, RTCPeerConnection, RTCSessionDescription # type: ignore
from aiortc.contrib.media import MediaBlackhole, MediaPlayer # type: ignore
import jack as Jack # type: ignore
from aiortc import MediaStreamTrack, RTCPeerConnection, RTCSessionDescription # type: ignore
from aiortc.contrib.media import MediaBlackhole, MediaPlayer # type: ignore
import jack as Jack # type: ignore
import os
import re
from datetime import datetime
from typing import Optional, Any, Type
from typing import Optional, Any, Type, Dict
from types import TracebackType
import sys
from raygun4py import raygunprovider # type: ignore
import aiohttp
from raygun4py import raygunprovider # type: ignore
import configparser
config = configparser.ConfigParser()
config.read("shittyserver.ini")
def handle_exception(exc_type: Type[BaseException], exc_value: BaseException, exc_traceback: TracebackType) -> None:
cl = raygunprovider.RaygunSender(config["raygun"]["key"])
cl.send_exception(exc_info=(exc_type, exc_value, exc_traceback))
ENABLE_EXCEPTION_LOGGING = False
sys.excepthook = handle_exception
if ENABLE_EXCEPTION_LOGGING:
def handle_exception(
exc_type: Type[BaseException],
exc_value: BaseException,
exc_traceback: TracebackType,
) -> None:
cl = raygunprovider.RaygunSender(config["raygun"]["key"])
cl.send_exception(exc_info=(exc_type, exc_value, exc_traceback))
sys.excepthook = handle_exception
file_contents_ex = re.compile(r"^ws=\d$")
@ -47,12 +56,12 @@ def write_ob_status(status: bool) -> None:
fd.truncate()
@Jack.set_error_function # type: ignore
@Jack.set_error_function # type: ignore
def error(msg: str) -> None:
print("Error:", msg)
@Jack.set_info_function # type: ignore
@Jack.set_info_function # type: ignore
def info(msg: str) -> None:
print("Info:", msg)
@ -64,6 +73,7 @@ out2 = jack.outports.register("out_1")
transfer_buffer1: Any = None
transfer_buffer2: Any = None
def init_buffers() -> None:
global transfer_buffer1, transfer_buffer2
transfer_buffer1 = Jack.RingBuffer(jack.samplerate * 10)
@ -73,7 +83,7 @@ def init_buffers() -> None:
init_buffers()
@jack.set_process_callback # type: ignore
@jack.set_process_callback # type: ignore
def process(frames: int) -> None:
buf1 = out1.get_buffer()
piece1 = transfer_buffer1.read(len(buf1))
@ -85,6 +95,7 @@ def process(frames: int) -> None:
class JackSender(object):
resampler: Any
def __init__(self, track: MediaStreamTrack) -> None:
self.track = track
self.resampler = None
@ -113,7 +124,20 @@ class JackSender(object):
transfer_buffer2.write(new_frame.planes[1])
current_session = None
active_sessions: Dict[str, "Session"] = {}
live_session: Optional['Session'] = None
async def notify_mattserver_about_sessions() -> None:
async with aiohttp.ClientSession() as session:
data: Dict[str, Dict[str, str]] = {}
for sid, sess in active_sessions.items():
data[sid] = sess.to_dict()
await session.post(config["mattserver"]["notify_url"], json=data)
class NotReadyException(BaseException):
pass
class Session(object):
@ -121,29 +145,63 @@ class Session(object):
sender: Optional[JackSender]
connection_state: Optional[str]
pc: Optional[Any]
connection_id: str
def __init__(self) -> None:
self.websocket = None
self.sender = None
self.pc = None
self.connection_state = None
self.connection_id = str(uuid.uuid4())
self.ended = False
def to_dict(self) -> Dict[str, str]:
return {"connection_id": self.connection_id}
async def activate(self) -> None:
print(self.connection_id, "Activating")
if self.sender is None:
print(self.connection_id, "... but we don't have a sender!")
raise NotReadyException()
else:
await self.sender.process()
async def end(self) -> None:
print(self.connection_id, "going away")
if self.sender is not None:
self.sender.end()
if self.pc is not None:
await self.pc.close()
init_buffers()
write_ob_status(False)
if self.websocket is not None:
await self.websocket.send(json.dumps({"kind": "REPLACED"}))
global active_sessions
if self.ended:
print(self.connection_id, "already over")
else:
print(self.connection_id, "going away")
if self.sender is not None:
self.sender.end()
if self.pc is not None:
await self.pc.close()
init_buffers()
if self.websocket is not None and self.websocket.state == websockets.protocol.State.OPEN:
await self.websocket.send(json.dumps({"kind": "REPLACED"}))
await self.websocket.close(1008)
if self.connection_id in active_sessions:
del active_sessions[self.connection_id]
if len(active_sessions) == 0:
write_ob_status(False)
else:
print(self.connection_id, "wasn't in active_sessions!")
await notify_mattserver_about_sessions()
print(self.connection_id, "bye bye")
self.ended = True
def create_peerconnection(self) -> None:
self.pc = RTCPeerConnection()
assert self.pc is not None
@self.pc.on("signalingstatechange") # type: ignore
@self.pc.on("signalingstatechange") # type: ignore
async def on_signalingstatechange() -> None:
assert self.pc is not None
print(
@ -151,41 +209,38 @@ class Session(object):
"Signaling state is {}".format(self.pc.signalingState),
)
@self.pc.on("iceconnectionstatechange") # type: ignore
@self.pc.on("iceconnectionstatechange") # type: ignore
async def on_iceconnectionstatechange() -> None:
if self.pc is None:
print(self.connection_id, "ICE connection state change, but the PC is None!")
print(
self.connection_id,
"ICE connection state change, but the PC is None!",
)
else:
print(
self.connection_id,
"ICE connection state is {}".format(self.pc.iceConnectionState),
)
if self.pc.iceConnectionState == "failed":
await self.pc.close()
self.pc = None
if self.websocket is not None:
await self.websocket.close(1008)
return
await self.end()
@self.pc.on("track") # type: ignore
@self.pc.on("track") # type: ignore
async def on_track(track: MediaStreamTrack) -> None:
global current_session
global active_sessions
print(self.connection_id, "Received track")
if track.kind == "audio":
print(self.connection_id, "Adding to Jack.")
@track.on("ended") # type: ignore
await notify_mattserver_about_sessions()
@track.on("ended") # type: ignore
async def on_ended() -> None:
print(self.connection_id, "Track {} ended".format(track.kind))
# TODO: this doesn't exactly handle reconnecting gracefully
self.end()
await self.end()
self.sender = JackSender(track)
if current_session is not None:
await current_session.end()
current_session = self
write_ob_status(True)
await self.sender.process()
async def process_ice(self, message: Any) -> None:
if self.connection_state == "HELLO" and message["kind"] == "OFFER":
@ -221,27 +276,35 @@ class Session(object):
)
async def connect(self, websocket: websockets.WebSocketServerProtocol) -> None:
global active_sessions
active_sessions[self.connection_id] = self
self.websocket = websocket
self.connection_id = uuid.uuid4()
self.connection_state = "HELLO"
print(self.connection_id, "Connected")
# TODO Raygun user ID
await websocket.send(
json.dumps({"kind": "HELLO", "connectionId": str(self.connection_id)})
json.dumps({"kind": "HELLO", "connectionId": self.connection_id})
)
async for msg in websocket:
data = json.loads(msg)
if data["kind"] == "OFFER":
await self.process_ice(data)
elif data["kind"] == "TIME":
time = datetime.now().time()
await websocket.send(json.dumps({"kind": "TIME", "time": str(time)}))
else:
print(self.connection_id, "Unknown kind {}".format(data["kind"]))
await websocket.send(
json.dumps({"kind": "ERROR", "error": "unknown_kind"})
)
try:
async for msg in websocket:
data = json.loads(msg)
if data["kind"] == "OFFER":
await self.process_ice(data)
elif data["kind"] == "TIME":
time = datetime.now().time()
await websocket.send(json.dumps({"kind": "TIME", "time": str(time)}))
else:
print(self.connection_id, "Unknown kind {}".format(data["kind"]))
await websocket.send(
json.dumps({"kind": "ERROR", "error": "unknown_kind"})
)
except websockets.exceptions.ConnectionClosedError:
print(self.connection_id, "WebSocket closed")
await self.end()
async def serve(websocket: websockets.WebSocketServerProtocol, path: str) -> None:
@ -252,12 +315,64 @@ async def serve(websocket: websockets.WebSocketServerProtocol, path: str) -> Non
pass
WS_PORT = 8079
start_server = websockets.serve(serve, "localhost", int(config["ports"]["websocket"]))
print("Shittyserver WS starting on port {}.".format(config["ports"]["websocket"]))
async def telnet_server(
reader: asyncio.StreamReader, writer: asyncio.StreamWriter
) -> None:
global active_sessions, live_session
while True:
data = await reader.read(128)
if not data:
break
data_str = data.decode("utf-8")
parts = data_str.rstrip().split(" ")
print(parts)
if parts[0] == "Q":
data: Dict[str, Dict[str, str]] = {}
for sid, sess in active_sessions.items():
data[sid] = sess.to_dict()
writer.write((json.dumps(data) + "\r\n").encode("utf-8"))
elif parts[0] == "SEL":
sid = parts[1]
if sid == "NUL":
if live_session is not None:
await live_session.end()
writer.write("OKAY\r\n".encode("utf-8"))
else:
writer.write("WONT\r\n".encode("utf-8"))
else:
session = active_sessions[sid]
if session is None:
writer.write("FAIL\r\n".encode("utf-8"))
else:
if live_session is not None:
await live_session.end()
asyncio.ensure_future(session.activate())
live_session = session
writer.write("OKAY\r\n".encode("utf-8"))
else:
writer.write("WHAT\r\n".encode("utf-8"))
await writer.drain()
writer.close()
async def run_telnet_server() -> None:
server = await asyncio.start_server(
telnet_server, "localhost", int(config["ports"]["telnet"])
)
await server.serve_forever()
jack.activate()
start_server = websockets.serve(serve, "localhost", WS_PORT)
print("Shittyserver starting on port {}.".format(WS_PORT))
print("Shittyserver TELNET starting on port {}".format(config["ports"]["telnet"]))
asyncio.get_event_loop().run_until_complete(notify_mattserver_about_sessions())
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_until_complete(asyncio.gather(start_server, run_telnet_server()))
asyncio.get_event_loop().run_forever()

View file

@ -1,2 +1,5 @@
[raygun]
key = CHANGEME
key = CHANGEME
[mattserver]
notify_url = https://ent9s2r5u77vj.x.pipedream.net