server: refactor out into state machine and add time request

This commit is contained in:
Marks Polakovs 2020-04-06 14:13:18 +02:00
parent f070542c5d
commit 3666366b3c

112
server.py
View file

@ -9,6 +9,7 @@ from aiortc.contrib.media import MediaBlackhole, MediaPlayer
import jack as Jack import jack as Jack
import os import os
import re import re
from datetime import datetime
file_contents_ex = re.compile(r"^ws=\d$") file_contents_ex = re.compile(r"^ws=\d$")
@ -33,31 +34,37 @@ def write_ob_status(status):
@Jack.set_error_function @Jack.set_error_function
def error(msg): def error(msg):
print('Error:', msg) print("Error:", msg)
@Jack.set_info_function @Jack.set_info_function
def info(msg): def info(msg):
print('Info:', msg) print("Info:", msg)
jack = Jack.Client("webstudio")
out1 = jack.outports.register("out_0")
out2 = jack.outports.register("out_1")
jack = Jack.Client('webstudio')
out1 = jack.outports.register('out_0')
out2 = jack.outports.register('out_1')
def init_buffers(): def init_buffers():
global transfer_buffer1, transfer_buffer2 global transfer_buffer1, transfer_buffer2
transfer_buffer1 = Jack.RingBuffer(jack.samplerate * 10) transfer_buffer1 = Jack.RingBuffer(jack.samplerate * 10)
transfer_buffer2 = Jack.RingBuffer(jack.samplerate * 10) transfer_buffer2 = Jack.RingBuffer(jack.samplerate * 10)
init_buffers() init_buffers()
@jack.set_process_callback @jack.set_process_callback
def process(frames): def process(frames):
buf1 = out1.get_buffer() buf1 = out1.get_buffer()
piece1 = transfer_buffer1.read(len(buf1)) piece1 = transfer_buffer1.read(len(buf1))
buf1[:len(piece1)] = piece1 buf1[: len(piece1)] = piece1
buf2 = out2.get_buffer() buf2 = out2.get_buffer()
piece2 = transfer_buffer2.read(len(buf2)) piece2 = transfer_buffer2.read(len(buf2))
buf2[:len(piece2)] = piece2 buf2[: len(piece2)] = piece2
class JackSender(object): class JackSender(object):
def __init__(self, track): def __init__(self, track):
@ -79,19 +86,24 @@ class JackSender(object):
# (my testing has shown it to be signed 16-bit) # (my testing has shown it to be signed 16-bit)
# We use PyAV to resample it into the right format # We use PyAV to resample it into the right format
if self.resampler is None: if self.resampler is None:
self.resampler = av.audio.resampler.AudioResampler(format="fltp", layout="stereo", rate=jack.samplerate) self.resampler = av.audio.resampler.AudioResampler(
frame.pts = None # DIRTY HACK format="fltp", layout="stereo", rate=jack.samplerate
)
frame.pts = None # DIRTY HACK
new_frame = self.resampler.resample(frame) new_frame = self.resampler.resample(frame)
transfer_buffer1.write(new_frame.planes[0]) transfer_buffer1.write(new_frame.planes[0])
transfer_buffer2.write(new_frame.planes[1]) transfer_buffer2.write(new_frame.planes[1])
current_session = None current_session = None
class Session(object): class Session(object):
def __init__(self): def __init__(self):
self.websocket = None self.websocket = None
self.sender = None self.sender = None
self.pc = None self.pc = None
self.connection_state = None
async def end(): async def end():
print(self.connection_id, "going away") print(self.connection_id, "going away")
@ -100,29 +112,24 @@ class Session(object):
await self.pc.close() await self.pc.close()
init_buffers() init_buffers()
write_ob_status(False) write_ob_status(False)
await self.websocket.send(json.dumps({ "kind": "REPLACED" })) await self.websocket.send(json.dumps({"kind": "REPLACED"}))
async def connect(self, websocket):
self.websocket = websocket
self.connection_id = uuid.uuid4();
print(self.connection_id, "Connected")
await websocket.send(json.dumps({"kind": "HELLO", "connectionId": str(self.connection_id)}))
sdp_offer = json.loads(await websocket.recv())
if sdp_offer["kind"] != "OFFER":
await websocket.close(1008)
return
offer = RTCSessionDescription(sdp=sdp_offer["sdp"], type=sdp_offer["type"])
print(self.connection_id, "Received offer")
def create_peerconnection(self):
self.pc = RTCPeerConnection() self.pc = RTCPeerConnection()
@self.pc.on("signalingstatechange") @self.pc.on("signalingstatechange")
async def on_signalingstatechange(): async def on_signalingstatechange():
print(self.connection_id, "Signaling state is {}".format(self.pc.signalingState)) print(
self.connection_id,
"Signaling state is {}".format(self.pc.signalingState),
)
@self.pc.on("iceconnectionstatechange") @self.pc.on("iceconnectionstatechange")
async def on_iceconnectionstatechange(): async def on_iceconnectionstatechange():
print(self.connection_id, "ICE connection state is {}".format(self.pc.iceConnectionState)) print(
self.connection_id,
"ICE connection state is {}".format(self.pc.iceConnectionState),
)
if self.pc.iceConnectionState == "failed": if self.pc.iceConnectionState == "failed":
await self.pc.close() await self.pc.close()
self.pc = None self.pc = None
@ -148,26 +155,57 @@ class Session(object):
current_session = self current_session = self
write_ob_status(True) write_ob_status(True)
await self.sender.process() await self.sender.process()
await self.pc.setRemoteDescription(offer) async def process_ice(self, message):
if self.connection_state == "HELLO" and message["kind"] == "OFFER":
offer = RTCSessionDescription(sdp=message["sdp"], type=message["type"])
print(self.connection_id, "Received offer")
self.create_peerconnection()
await self.pc.setRemoteDescription(offer)
answer = await self.pc.createAnswer() answer = await self.pc.createAnswer()
await self.pc.setLocalDescription(answer) await self.pc.setLocalDescription(answer)
await websocket.send( await websocket.send(
json.dumps( json.dumps(
{ {
"kind": "ANSWER", "kind": "ANSWER",
"type": self.pc.localDescription.type, "type": self.pc.localDescription.type,
"sdp": self.pc.localDescription.sdp, "sdp": self.pc.localDescription.sdp,
} }
)
) )
self.connection_state = "ANSWER"
print(self.connection_id, "Sent answer")
else:
print(
self.connection_state,
"Incorrect kind {} for state {}".format(
message["kind"], self.connection_state
),
)
async def connect(self, websocket):
self.websocket = websocket
self.connection_id = uuid.uuid4()
self.connection_state = "HELLO"
print(self.connection_id, "Connected")
await websocket.send(
json.dumps({"kind": "HELLO", "connectionId": str(self.connection_id)})
) )
print(self.connection_id, "Sent answer")
async for msg in websocket: async for msg in websocket:
print(self.connection_id, msg) data = json.loads(msg)
if data["kind"] == "OFFER":
await self.process_ice(data)
elif data["kind"] == "TIME":
time = datetime.now().time()
await websocket.send(json.dumps({"kind": "TIME", "time": str(time)}))
else:
print(self.connection_id, "Unknown kind {}".format(data["kind"]))
await websocket.send(
json.dumps({"kind": "ERROR", "error": "unknown_kind"})
)
async def serve(websocket, path): async def serve(websocket, path):