From 3666366b3ca854d5ec61226d94379bd29a159ae4 Mon Sep 17 00:00:00 2001 From: Marks Polakovs Date: Mon, 6 Apr 2020 14:13:18 +0200 Subject: [PATCH] server: refactor out into state machine and add time request --- server.py | 112 ++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 75 insertions(+), 37 deletions(-) diff --git a/server.py b/server.py index 2add8cf..e8a6b6a 100644 --- a/server.py +++ b/server.py @@ -9,6 +9,7 @@ from aiortc.contrib.media import MediaBlackhole, MediaPlayer import jack as Jack import os import re +from datetime import datetime file_contents_ex = re.compile(r"^ws=\d$") @@ -33,31 +34,37 @@ def write_ob_status(status): @Jack.set_error_function def error(msg): - print('Error:', msg) + print("Error:", msg) @Jack.set_info_function def info(msg): - print('Info:', msg) + print("Info:", msg) + + +jack = Jack.Client("webstudio") +out1 = jack.outports.register("out_0") +out2 = jack.outports.register("out_1") -jack = Jack.Client('webstudio') -out1 = jack.outports.register('out_0') -out2 = jack.outports.register('out_1') def init_buffers(): global transfer_buffer1, transfer_buffer2 transfer_buffer1 = Jack.RingBuffer(jack.samplerate * 10) transfer_buffer2 = Jack.RingBuffer(jack.samplerate * 10) + + init_buffers() + @jack.set_process_callback def process(frames): buf1 = out1.get_buffer() piece1 = transfer_buffer1.read(len(buf1)) - buf1[:len(piece1)] = piece1 + buf1[: len(piece1)] = piece1 buf2 = out2.get_buffer() piece2 = transfer_buffer2.read(len(buf2)) - buf2[:len(piece2)] = piece2 + buf2[: len(piece2)] = piece2 + class JackSender(object): def __init__(self, track): @@ -79,19 +86,24 @@ class JackSender(object): # (my testing has shown it to be signed 16-bit) # We use PyAV to resample it into the right format if self.resampler is None: - self.resampler = av.audio.resampler.AudioResampler(format="fltp", layout="stereo", rate=jack.samplerate) - frame.pts = None # DIRTY HACK + self.resampler = av.audio.resampler.AudioResampler( + format="fltp", layout="stereo", rate=jack.samplerate + ) + frame.pts = None # DIRTY HACK new_frame = self.resampler.resample(frame) transfer_buffer1.write(new_frame.planes[0]) transfer_buffer2.write(new_frame.planes[1]) + current_session = None + class Session(object): def __init__(self): self.websocket = None self.sender = None self.pc = None + self.connection_state = None async def end(): print(self.connection_id, "going away") @@ -100,29 +112,24 @@ class Session(object): await self.pc.close() init_buffers() write_ob_status(False) - await self.websocket.send(json.dumps({ "kind": "REPLACED" })) - - async def connect(self, websocket): - self.websocket = websocket - self.connection_id = uuid.uuid4(); - print(self.connection_id, "Connected") - await websocket.send(json.dumps({"kind": "HELLO", "connectionId": str(self.connection_id)})) - sdp_offer = json.loads(await websocket.recv()) - if sdp_offer["kind"] != "OFFER": - await websocket.close(1008) - return - offer = RTCSessionDescription(sdp=sdp_offer["sdp"], type=sdp_offer["type"]) - print(self.connection_id, "Received offer") + await self.websocket.send(json.dumps({"kind": "REPLACED"})) + def create_peerconnection(self): self.pc = RTCPeerConnection() @self.pc.on("signalingstatechange") async def on_signalingstatechange(): - print(self.connection_id, "Signaling state is {}".format(self.pc.signalingState)) + print( + self.connection_id, + "Signaling state is {}".format(self.pc.signalingState), + ) @self.pc.on("iceconnectionstatechange") async def on_iceconnectionstatechange(): - print(self.connection_id, "ICE connection state is {}".format(self.pc.iceConnectionState)) + print( + self.connection_id, + "ICE connection state is {}".format(self.pc.iceConnectionState), + ) if self.pc.iceConnectionState == "failed": await self.pc.close() self.pc = None @@ -148,26 +155,57 @@ class Session(object): current_session = self write_ob_status(True) await self.sender.process() - - await self.pc.setRemoteDescription(offer) + async def process_ice(self, message): + if self.connection_state == "HELLO" and message["kind"] == "OFFER": + offer = RTCSessionDescription(sdp=message["sdp"], type=message["type"]) + print(self.connection_id, "Received offer") + self.create_peerconnection() + await self.pc.setRemoteDescription(offer) - answer = await self.pc.createAnswer() - await self.pc.setLocalDescription(answer) + answer = await self.pc.createAnswer() + await self.pc.setLocalDescription(answer) - await websocket.send( - json.dumps( - { - "kind": "ANSWER", - "type": self.pc.localDescription.type, - "sdp": self.pc.localDescription.sdp, - } + await websocket.send( + json.dumps( + { + "kind": "ANSWER", + "type": self.pc.localDescription.type, + "sdp": self.pc.localDescription.sdp, + } + ) ) + self.connection_state = "ANSWER" + print(self.connection_id, "Sent answer") + else: + print( + self.connection_state, + "Incorrect kind {} for state {}".format( + message["kind"], self.connection_state + ), + ) + + async def connect(self, websocket): + self.websocket = websocket + self.connection_id = uuid.uuid4() + self.connection_state = "HELLO" + print(self.connection_id, "Connected") + await websocket.send( + json.dumps({"kind": "HELLO", "connectionId": str(self.connection_id)}) ) - print(self.connection_id, "Sent answer") async for msg in websocket: - print(self.connection_id, msg) + data = json.loads(msg) + if data["kind"] == "OFFER": + await self.process_ice(data) + elif data["kind"] == "TIME": + time = datetime.now().time() + await websocket.send(json.dumps({"kind": "TIME", "time": str(time)})) + else: + print(self.connection_id, "Unknown kind {}".format(data["kind"])) + await websocket.send( + json.dumps({"kind": "ERROR", "error": "unknown_kind"}) + ) async def serve(websocket, path):