Working streaming (using a public TURN server)

This commit is contained in:
Marks Polakovs 2020-03-28 19:16:14 +01:00
parent 0ce3e73465
commit c92d959753
3 changed files with 192 additions and 19 deletions

13
requirements.txt Normal file
View file

@ -0,0 +1,13 @@
aioice==0.6.18
aiortc==0.9.27
av==7.0.1
cffi==1.14.0
crc32c==2.0
cryptography==2.8
JACK-Client==0.5.2
netifaces==0.10.9
pycparser==2.20
pyee==7.0.1
pylibsrtp==0.6.6
six==1.14.0
websockets==8.1

130
server.py Normal file
View file

@ -0,0 +1,130 @@
import asyncio
import websockets
import json
import uuid
import av
import struct
from aiortc import MediaStreamTrack, RTCPeerConnection, RTCSessionDescription
from aiortc.contrib.media import MediaBlackhole, MediaPlayer, MediaRecorder
import jack as Jack
@Jack.set_error_function
def error(msg):
print('Error:', msg)
@Jack.set_info_function
def info(msg):
print('Info:', msg)
jack = Jack.Client('webstudio')
out1 = jack.outports.register('out_1')
out2 = jack.outports.register('out_2')
transfer_buffer1 = Jack.RingBuffer(jack.samplerate * 10)
transfer_buffer2 = Jack.RingBuffer(jack.samplerate * 10)
@jack.set_process_callback
def process(frames):
buf1 = out1.get_buffer()
piece1 = transfer_buffer1.read(len(buf1))
buf1[:len(piece1)] = piece1
buf2 = out2.get_buffer()
piece2 = transfer_buffer2.read(len(buf2))
buf2[:len(piece2)] = piece2
class JackSender(object):
def __init__(self, track):
self.track = track
self.resampler = None
async def process(self):
while True:
frame = await self.track.recv()
# Right, depending on the format, we may need to do some fuckery.
# Jack expects all audio to be 32 bit floating point
# while PyAV may give us audio in any format
# (my testing has shown it to be signed 16-bit)
# We use PyAV to resample it into the right format
if self.resampler is None:
self.resampler = av.audio.resampler.AudioResampler(format="fltp", layout="stereo", rate=jack.samplerate)
frame.pts = None # DIRTY HACK
new_frame = self.resampler.resample(frame)
transfer_buffer1.write(new_frame.planes[0])
transfer_buffer2.write(new_frame.planes[1])
class Session(object):
async def connect(self, websocket):
connection_id = uuid.uuid4();
print(connection_id, "Connected")
await websocket.send(json.dumps({"kind": "HELLO", "connectionId": str(connection_id)}))
sdp_offer = json.loads(await websocket.recv())
if sdp_offer["kind"] != "OFFER":
await websocket.close(1008)
return
offer = RTCSessionDescription(sdp=sdp_offer["sdp"], type=sdp_offer["type"])
print(connection_id, "Received offer")
self.pc = RTCPeerConnection()
self.recorder = MediaRecorder("/home/marks/test.opus")
@self.pc.on("signalingstatechange")
async def on_signalingstatechange():
print(connection_id, "Signaling state is {}".format(self.pc.signalingState))
@self.pc.on("iceconnectionstatechange")
async def on_iceconnectionstatechange():
print(connection_id, "ICE connection state is {}".format(self.pc.iceConnectionState))
if self.pc.iceConnectionState == "failed":
await self.pc.close()
self.pc = None
await websocket.close(1008)
return
@self.pc.on("track")
async def on_track(track):
print(connection_id, "Received track")
if track.kind == "audio":
print(connection_id, "Adding to Jack.")
sender = JackSender(track)
@track.on("ended")
async def on_ended():
print(connection_id, "Track {} ended".format(track.kind))
await sender.process()
await self.pc.setRemoteDescription(offer)
answer = await self.pc.createAnswer()
await self.pc.setLocalDescription(answer)
await websocket.send(
json.dumps(
{
"kind": "ANSWER",
"type": self.pc.localDescription.type,
"sdp": self.pc.localDescription.sdp,
}
)
)
print(connection_id, "Sent answer")
async for msg in websocket:
print(connection_id, msg)
async def serve(websocket, path):
if path == "/stream":
session = Session()
await session.connect(websocket)
else:
pass
jack.activate()
start_server = websockets.serve(serve, "localhost", 8079)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

View file

@ -1,9 +1,11 @@
import { Streamer, ConnectionStateListener, ConnectionStateEnum } from "./streamer";
import {
Streamer,
ConnectionStateListener,
ConnectionStateEnum
} from "./streamer";
type StreamerState = "HELLO" | "OFFER" | "ANSWER" | "CONNECTED";
export class WebRTCStreamer extends Streamer {
pc: RTCPeerConnection;
ws: WebSocket | undefined;
@ -11,7 +13,26 @@ export class WebRTCStreamer extends Streamer {
constructor(stream: MediaStream) {
super();
this.pc = new RTCPeerConnection({});
this.pc = new RTCPeerConnection({
iceServers: [
{
urls: ["stun:eu-turn4.xirsys.com"]
},
{
username:
"h42bRBHL2GtRTiQRoXN8GCG-PFYMl4Acel6EQ9xINBWdTpoZyBEGyCcJBCtT3iINAAAAAF5_NJptYXJrc3BvbGFrb3Zz",
credential: "17e834fa-70e7-11ea-a66c-faa4ea02ad5c",
urls: [
"turn:eu-turn4.xirsys.com:80?transport=udp",
"turn:eu-turn4.xirsys.com:3478?transport=udp",
"turn:eu-turn4.xirsys.com:80?transport=tcp",
"turn:eu-turn4.xirsys.com:3478?transport=tcp",
"turns:eu-turn4.xirsys.com:443?transport=tcp",
"turns:eu-turn4.xirsys.com:5349?transport=tcp"
]
}
]
});
this.pc.onconnectionstatechange = e => {
console.log("Connection state change: " + this.pc.connectionState);
this.onStateChange(this.mapStateToConnectionState());
@ -94,17 +115,26 @@ export class WebRTCStreamer extends Streamer {
mapStateToConnectionState(): ConnectionStateEnum {
switch (this.pc.connectionState) {
case "connected": return "CONNECTED";
case "connecting": return "CONNECTING";
case "disconnected": return "CONNECTION_LOST";
case "failed": return "CONNECTION_LOST";
case "connected":
return "CONNECTED";
case "connecting":
return "CONNECTING";
case "disconnected":
return "CONNECTION_LOST";
case "failed":
return "CONNECTION_LOST";
default:
if (this.ws) {
switch (this.ws.readyState) {
case 1: return "CONNECTING";
case 2: case 3: return "CONNECTION_LOST";
case 0: return "NOT_CONNECTED";
default: throw new Error();
case 1:
return "CONNECTING";
case 2:
case 3:
return "CONNECTION_LOST";
case 0:
return "NOT_CONNECTED";
default:
throw new Error();
}
}
return "NOT_CONNECTED";