Skip to content

Commit 55706ad

Browse files
committed
Add webrtc sink
1 parent 366404f commit 55706ad

5 files changed

Lines changed: 246 additions & 28 deletions

File tree

lib/plug/templates/webrtc.html.eex

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,38 @@
33
<head>
44
<meta charset="UTF-8" />
55
<title>Shinkai WebRTC</title>
6+
<style>
7+
html,
8+
body {
9+
margin: 0;
10+
padding: 0;
11+
width: 100%;
12+
height: 100%;
13+
overflow: hidden;
14+
background: black;
15+
}
16+
17+
video {
18+
width: 100vw;
19+
height: 100vh;
20+
}
21+
</style>
622
</head>
723
<body>
8-
<video id="video" autoplay muted playsinline></video>
24+
<video id="video" controls autoplay muted></video>
925
<script>
1026
(async () => {
27+
const avStream = new MediaStream();
1128
const pc = new RTCPeerConnection();
1229
let response = await fetch("/webrtc/<%= @source_id %>/whep", {
1330
method: "POST",
1431
headers: { "Content-Type": "application/sdp" },
1532
body: "",
1633
});
1734

18-
pc.addEventListener("track", (track) => {
35+
pc.addEventListener("track", (e) => {
1936
const video = document.getElementById("video");
20-
video.srcObject = track.streams[0];
37+
video.srcObject = e.streams[0];
2138
});
2239

2340
if (response.status == 416) {

lib/shinkai/sink/webrtc.ex

Lines changed: 166 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,17 @@ defmodule Shinkai.Sink.WebRTC do
33

44
use GenServer
55

6+
require Logger
7+
8+
import Shinkai.Utils
9+
610
alias __MODULE__.PeerManager
11+
alias ExWebRTC.RTPCodecParameters
12+
alias Phoenix.PubSub
13+
alias RTSP.RTP.Encoder, as: RTPEncoder
14+
15+
@supported_codecs [:h264, :h265, :pcma]
16+
@video_clock_rate 90_000
717

818
def start_link(opts) do
919
GenServer.start_link(__MODULE__, opts, name: opts[:name])
@@ -25,11 +35,28 @@ defmodule Shinkai.Sink.WebRTC do
2535

2636
@impl true
2737
def init(opts) do
28-
{:ok, peer_manager} = PeerManager.start_link(source_id: opts[:id])
29-
{:ok, %{peer_manager: peer_manager, source_id: opts[:id]}}
38+
source_id = opts[:id]
39+
{:ok, peer_manager} = PeerManager.start_link(source_id: source_id)
40+
41+
PubSub.subscribe(Shinkai.PubSub, tracks_topic(source_id))
42+
43+
{:ok, sock} = :gen_udp.open(0, [:binary, active: true])
44+
45+
{:ok,
46+
%{
47+
peer_manager: peer_manager,
48+
source_id: source_id,
49+
packets_topic: packets_topic(source_id),
50+
tracks: %{},
51+
sock: sock
52+
}}
3053
end
3154

3255
@impl true
56+
def handle_call(:add_new_peer, _from, %{video_tracks: [], audio_tracks: []} = state) do
57+
{:reply, {:error, :no_tracks}, state}
58+
end
59+
3360
def handle_call(:add_new_peer, from, state) do
3461
:ok = PeerManager.add_peer(state.peer_manager, from)
3562
{:noreply, state}
@@ -39,4 +66,141 @@ defmodule Shinkai.Sink.WebRTC do
3966
:ok = PeerManager.handle_peer_answer(state.peer_manager, from, session_id, sdp)
4067
{:noreply, state}
4168
end
69+
70+
@impl true
71+
def handle_info({:tracks, tracks}, state) do
72+
{tracks, unsupported_tracks} = Enum.split_with(tracks, &(&1.codec in @supported_codecs))
73+
74+
if unsupported_tracks != [] do
75+
Logger.warning(
76+
"Unsupported codecs received in WebRTC sink: #{join_codecs(unsupported_tracks)}"
77+
)
78+
end
79+
80+
video_track = Enum.find(tracks, fn t -> t.type == :video end)
81+
audio_track = Enum.find(tracks, fn t -> t.type == :audio end)
82+
83+
stream_id = ExWebRTC.MediaStreamTrack.generate_stream_id()
84+
85+
state =
86+
[video_track, audio_track]
87+
|> Enum.reject(&is_nil/1)
88+
|> Enum.reduce(state, fn track, state ->
89+
media_stream = ExWebRTC.MediaStreamTrack.new(track.type, [stream_id])
90+
webrtc_track = webrtc_track(track, stream_id)
91+
92+
payloader_mod = payloader_mod(track.codec)
93+
94+
track_ctx = %{
95+
id: media_stream.id,
96+
timescale: track.timescale,
97+
target_timescale: webrtc_track.clock_rate,
98+
payloader_mod: payloader_mod,
99+
payloader_state: payloader_mod.init([])
100+
}
101+
102+
if track.type == :video,
103+
do: PeerManager.add_video_track(state.peer_manager, {media_stream, webrtc_track}),
104+
else: PeerManager.add_audio_track(state.peer_manager, {media_stream, webrtc_track})
105+
106+
%{state | tracks: Map.put(state.tracks, track.id, track_ctx)}
107+
end)
108+
109+
:ok = PubSub.subscribe(Shinkai.PubSub, state.packets_topic)
110+
111+
{:noreply, state}
112+
end
113+
114+
@impl true
115+
def handle_info({:packet, packets}, state) when is_list(packets) do
116+
track_id = hd(packets).track_id
117+
118+
case(Map.fetch(state, track_id)) do
119+
:error ->
120+
{:noreply, state}
121+
122+
{:ok, track_ctx} ->
123+
track_ctx =
124+
Enum.reduce(packets, track_ctx, fn packet, track_ctx ->
125+
do_handle_packet(packet, state.source_id, track_ctx)
126+
end)
127+
128+
{:noreply, %{state | tracks: Map.put(state.tracks, track_id, track_ctx)}}
129+
end
130+
end
131+
132+
def handle_info({:packet, packet}, state) do
133+
case Map.fetch(state.tracks, packet.track_id) do
134+
:error ->
135+
{:noreply, state}
136+
137+
{:ok, track_ctx} ->
138+
track_ctx = do_handle_packet(packet, state.source_id, track_ctx)
139+
{:noreply, %{state | tracks: Map.put(state.tracks, packet.track_id, track_ctx)}}
140+
end
141+
end
142+
143+
def handle_info(_msg, state) do
144+
{:noreply, state}
145+
end
146+
147+
defp do_handle_packet(packet, source_id, track_ctx) do
148+
rtp_timestamp =
149+
ExMP4.Helper.timescalify(packet.pts, track_ctx.timescale, track_ctx.target_timescale)
150+
151+
{packets, payloader_state} =
152+
track_ctx.payloader_mod.handle_sample(
153+
packet.data,
154+
rtp_timestamp,
155+
track_ctx.payloader_state
156+
)
157+
158+
track_id = track_ctx.id
159+
160+
Registry.dispatch(Shinkai.Registry, {:webrtc, source_id}, fn peers ->
161+
for {_pid, {pc, _session_id}} <- peers do
162+
Enum.each(packets, fn rtp_packet ->
163+
:ok = ExWebRTC.PeerConnection.send_rtp(pc, track_id, rtp_packet)
164+
end)
165+
end
166+
end)
167+
168+
%{track_ctx | payloader_state: payloader_state}
169+
end
170+
171+
defp webrtc_track(track, stream_id) do
172+
%RTPCodecParameters{
173+
payload_type: payload_type(track.codec),
174+
mime_type: mime_type(track.codec),
175+
clock_rate: clock_rate(track),
176+
channels: if(track.type == :audio, do: 1, else: nil),
177+
sdp_fmtp_line: sdp_fmtp_line(track.codec, payload_type)
178+
}
179+
end
180+
181+
defp clock_rate(%{type: :video}), do: @video_clock_rate
182+
defp clock_rate(%{timescale: timescale}), do: timescale
183+
184+
defp payload_type(:pcma), do: 8
185+
defp payload_type(_codec), do: 96
186+
187+
defp mime_type(:h264), do: "video/H264"
188+
defp mime_type(:h265), do: "video/H265"
189+
defp mime_type(:pcma), do: "audio/PCMA"
190+
191+
defp sdp_fmtp_line(:h264, pt) do
192+
%ExSDP.Attribute.FMTP{
193+
pt: pt,
194+
level_asymmetry_allowed: true,
195+
packetization_mode: 1,
196+
profile_level_id: 0x42E01F
197+
}
198+
end
199+
200+
defp sdp_fmtp_line(:h265, _pt), do: nil
201+
defp sdp_fmtp_line(_codec, _pt), do: nil
202+
203+
defp payloader_mod(:h264), do: RTPEncoder.H264
204+
defp payloader_mod(:h265), do: RTPEncoder.H265
205+
defp payloader_mod(:pcma), do: RTPEncoder.G711
42206
end

lib/shinkai/sink/webrtc/peer_manager.ex

Lines changed: 55 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,21 @@ defmodule Shinkai.Sink.WebRTC.PeerManager do
77

88
alias ExWebRTC.PeerConnection
99

10-
@h264_codec %ExWebRTC.RTPCodecParameters{
11-
payload_type: 96,
12-
mime_type: "video/H264",
13-
clock_rate: 90_000,
14-
sdp_fmtp_line: %ExSDP.Attribute.FMTP{
15-
pt: 96,
16-
level_asymmetry_allowed: true,
17-
packetization_mode: 1,
18-
profile_level_id: 0x42E01F
19-
}
20-
}
21-
2210
def start_link(opts) do
2311
GenServer.start_link(__MODULE__, opts, name: opts[:name])
2412
end
2513

26-
@spec add_peer(server :: pid() | atom(), from :: GenServer.from()) :: :ok
14+
@spec add_video_track(server :: GenServer.name() | pid(), tuple()) :: :ok
15+
def add_video_track(manager, track) do
16+
GenServer.call(manager, {:add_video_track, track})
17+
end
18+
19+
@spec add_audio_track(server :: GenServer.name() | pid(), tuple()) :: :ok
20+
def add_audio_track(manager, track) do
21+
GenServer.call(manager, {:add_audio_track, track})
22+
end
23+
24+
@spec add_peer(server :: GenServer.name() | pid(), from :: GenServer.from()) :: :ok
2725
def add_peer(manager, from) do
2826
GenServer.cast(manager, {:add_peer, from})
2927
end
@@ -40,16 +38,36 @@ defmodule Shinkai.Sink.WebRTC.PeerManager do
4038

4139
@impl true
4240
def init(opts) do
43-
{:ok, %{source_id: opts[:source_id], sessions: %{}, peers: %{}}}
41+
{:ok,
42+
%{
43+
source_id: opts[:source_id],
44+
sessions: %{},
45+
peers: %{},
46+
video_track: nil,
47+
audio_track: nil
48+
}}
49+
end
50+
51+
@impl true
52+
def handle_call({:add_video_track, track}, _from, state) do
53+
{:reply, :ok, %{state | video_track: track}}
54+
end
55+
56+
def handle_call({:add_audio_track, track}, _from, state) do
57+
{:reply, :ok, %{state | audio_track: track}}
4458
end
4559

4660
@impl true
4761
def handle_cast({:add_peer, from}, state) do
48-
stream_id = ExWebRTC.MediaStreamTrack.generate_stream_id()
49-
video_track = ExWebRTC.MediaStreamTrack.new(:video, [stream_id])
62+
video_tracks = if state.video_track, do: [elem(state.video_track, 1)], else: []
63+
audio_tracks = if state.audio_track, do: [elem(state.audio_track, 1)], else: []
64+
65+
tracks =
66+
Enum.reject([state.video_track, state.audio_track], &is_nil/1) |> Enum.map(&elem(&1, 0))
5067

51-
with {:ok, pc} <- PeerConnection.start(video_codecs: [@h264_codec]),
52-
{:ok, _} <- PeerConnection.add_track(pc, video_track),
68+
with {:ok, pc} <-
69+
PeerConnection.start(video_codecs: video_tracks, audio_codecs: audio_tracks),
70+
:ok <- add_tracks(pc, tracks),
5371
{:ok, offer} <- PeerConnection.create_offer(pc),
5472
:ok <- PeerConnection.set_local_description(pc, offer) do
5573
{:noreply, %{state | peers: Map.put(state.peers, pc, from)}}
@@ -100,14 +118,21 @@ defmodule Shinkai.Sink.WebRTC.PeerManager do
100118
{:noreply, state}
101119
end
102120

103-
def handle_info({:ex_webrtc, pid, {:ice_connection_state_change, :connected}}, state) do
121+
def handle_info({:ex_webrtc, pid, {:connection_state_change, :connected}}, state) do
104122
{session_id, pc} = Enum.find(state.sessions, fn {_session_id, p} -> p == pid end)
105123
Registry.register(Shinkai.Registry, {:webrtc, state.source_id}, {pc, session_id})
106124
{:noreply, %{state | sessions: Map.delete(state.sessions, session_id)}}
107125
end
108126

109-
def handle_info({:ex_webrtc, _pid, {:ice_connection_state_change, :failed}}, state) do
110-
# Handle failed connection
127+
def handle_info({:ex_webrtc, pid, {:connection_state_change, connection_state}}, state)
128+
when connection_state in [:failed, :closed, :disconnected] do
129+
Logger.info("WebRTC PeerConnection #{inspect(pid)} connection state: #{connection_state}")
130+
PeerConnection.stop(pid)
131+
Registry.unregister_match(Shinkai.Registry, {:webrtc, state.source_id}, {pid, :_})
132+
{:noreply, state}
133+
end
134+
135+
def handle_info({:ex_webrtc, _pid, {:rtcp, _}}, state) do
111136
{:noreply, state}
112137
end
113138

@@ -119,4 +144,13 @@ defmodule Shinkai.Sink.WebRTC.PeerManager do
119144
def handle_info(_msg, state) do
120145
{:noreply, state}
121146
end
147+
148+
defp add_tracks(pc, tracks) do
149+
Enum.reduce_while(tracks, :ok, fn track, :ok ->
150+
case PeerConnection.add_track(pc, track) do
151+
{:ok, _} -> {:cont, :ok}
152+
{:error, reason} -> {:halt, {:error, reason}}
153+
end
154+
end)
155+
end
122156
end

lib/shinkai/utils.ex

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,7 @@ defmodule Shinkai.Utils do
1212

1313
@spec sink_topic(String.t()) :: String.t()
1414
def sink_topic(id), do: "source:sink:#{id}"
15+
16+
@spec join_codecs([Shinkai.Track.t()]) :: String.t()
17+
def join_codecs(tracks), do: Enum.map_join(tracks, ", ", & &1.codec)
1518
end

mix.lock

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
"ex_rtcp": {:hex, :ex_rtcp, "0.4.0", "f9e515462a9581798ff6413583a25174cfd2101c94a2ebee871cca7639886f0a", [:mix], [], "hexpm", "28956602cf210d692fcdaf3f60ca49681634e1deb28ace41246aee61ee22dc3b"},
2121
"ex_rtmp": {:hex, :ex_rtmp, "0.3.1", "78c814120ec569e757217fbce88af60e6fe24ec88ea23d878cec3e678f01b6f7", [:mix], [{:ex_flv, "~> 0.2.0", [hex: :ex_flv, repo: "hexpm", optional: false]}], "hexpm", "7c840e3dd15c7a4774bcf2463f8e5a48325a5caccea7c631b856d3087beaa4d4"},
2222
"ex_rtp": {:hex, :ex_rtp, "0.4.0", "1f1b5c1440a904706011e3afbb41741f5da309ce251cb986690ce9fd82636658", [:mix], [], "hexpm", "0f72d80d5953a62057270040f0f1ee6f955c08eeae82ac659c038001d7d5a790"},
23-
"ex_sdp": {:hex, :ex_sdp, "1.1.1", "1a7b049491e5ec02dad9251c53d960835dc5631321ae978ec331831f3e4f6d5f", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}], "hexpm", "1b13a72ac9c5c695b8824dbdffc671be8cbb4c0d1ccb4ff76a04a6826759f233"},
23+
"ex_sdp": {:hex, :ex_sdp, "1.1.2", "7e7465cb13b557cc76ef3e854bad7626b73cc1d1f480d38b5fbcf539c7d8a45d", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}], "hexpm", "50a27c2d745924679acca32b3d5499d0b35d135a180b83422df82c289afce564"},
2424
"ex_stun": {:hex, :ex_stun, "0.2.0", "feb1fc7db0356406655b2a617805e6c712b93308c8ea2bf0ba1197b1f0866deb", [:mix], [], "hexpm", "1e01ba8290082ccbf37acaa5190d1f69b51edd6de2026a8d6d51368b29d115d0"},
2525
"ex_turn": {:hex, :ex_turn, "0.2.0", "4e1f9b089e9a5ee44928d12370cc9ea7a89b84b2f6256832de65271212eb80de", [:mix], [{:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}], "hexpm", "08e884f0af2c4a147e3f8cd4ffe33e3452a256389f0956e55a8c4d75bf0e74cd"},
2626
"ex_webrtc": {:hex, :ex_webrtc, "0.15.0", "c5849edcf7d035fcecf01db5be6d33a9d111999640bfc9d13a8c24e8eab7cced", [:mix], [{:crc, "~> 0.10", [hex: :crc, repo: "hexpm", optional: false]}, {:ex_dtls, "~> 0.18.0", [hex: :ex_dtls, repo: "hexpm", optional: false]}, {:ex_ice, "~> 0.13.0", [hex: :ex_ice, repo: "hexpm", optional: false]}, {:ex_libsrtp, "~> 0.7.1", [hex: :ex_libsrtp, repo: "hexpm", optional: false]}, {:ex_rtcp, "~> 0.4.0", [hex: :ex_rtcp, repo: "hexpm", optional: false]}, {:ex_rtp, "~> 0.4.0", [hex: :ex_rtp, repo: "hexpm", optional: false]}, {:ex_sctp, "0.1.2", [hex: :ex_sctp, repo: "hexpm", optional: true]}, {:ex_sdp, "~> 1.0", [hex: :ex_sdp, repo: "hexpm", optional: false]}], "hexpm", "79c21017b45a464c513f87e64ae9a20c8085d937fb5e0d639c50a8c41018172d"},
@@ -49,7 +49,7 @@
4949
"qex": {:hex, :qex, "0.5.1", "0d82c0f008551d24fffb99d97f8299afcb8ea9cf99582b770bd004ed5af63fd6", [:mix], [], "hexpm", "935a39fdaf2445834b95951456559e9dc2063d0a055742c558a99987b38d6bab"},
5050
"ratio": {:hex, :ratio, "4.0.1", "3044166f2fc6890aa53d3aef0c336f84b2bebb889dc57d5f95cc540daa1912f8", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:numbers, "~> 5.2.0", [hex: :numbers, repo: "hexpm", optional: false]}], "hexpm", "c60cbb3ccdff9ffa56e7d6d1654b5c70d9f90f4d753ab3a43a6bf40855b881ce"},
5151
"req": {:hex, :req, "0.5.16", "99ba6a36b014458e52a8b9a0543bfa752cb0344b2a9d756651db1281d4ba4450", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "974a7a27982b9b791df84e8f6687d21483795882a7840e8309abdbe08bb06f09"},
52-
"rtsp": {:hex, :rtsp, "0.6.1", "e12e7d18874c159db69f8e11ebdafe584ec3e9d4b082dfbbf5b9361bdd2ea1f4", [:mix], [{:ex_rtcp, "~> 0.4.0", [hex: :ex_rtcp, repo: "hexpm", optional: false]}, {:ex_rtp, "~> 0.4.0", [hex: :ex_rtp, repo: "hexpm", optional: false]}, {:ex_sdp, "~> 1.0", [hex: :ex_sdp, repo: "hexpm", optional: false]}, {:media_codecs, "~> 0.10.0", [hex: :media_codecs, repo: "hexpm", optional: false]}, {:membrane_rtsp, "~> 0.11.0", [hex: :membrane_rtsp, repo: "hexpm", optional: false]}, {:nimble_options, "~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}], "hexpm", "9335e111019aaa82b66c6e34f76f81c6de291bfce4eb7e5899d95b8d91f915fc"},
52+
"rtsp": {:hex, :rtsp, "0.6.2", "d0d63aac87a96b062d30e8f26b0033abd2022c5c87ab3decac0b6937a4642b35", [:mix], [{:ex_rtcp, "~> 0.4.0", [hex: :ex_rtcp, repo: "hexpm", optional: false]}, {:ex_rtp, "~> 0.4.0", [hex: :ex_rtp, repo: "hexpm", optional: false]}, {:ex_sdp, "~> 1.0", [hex: :ex_sdp, repo: "hexpm", optional: false]}, {:media_codecs, "~> 0.10.0", [hex: :media_codecs, repo: "hexpm", optional: false]}, {:membrane_rtsp, "~> 0.11.0", [hex: :membrane_rtsp, repo: "hexpm", optional: false]}, {:nimble_options, "~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}], "hexpm", "bf26e77a2ef85331ec8d43d5b14d8efa88a2b1685e8b889abffa5696696dc320"},
5353
"shmex": {:hex, :shmex, "0.5.1", "81dd209093416bf6608e66882cb7e676089307448a1afd4fc906c1f7e5b94cf4", [:mix], [{:bunch_native, "~> 0.5.0", [hex: :bunch_native, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.0", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "c29f8286891252f64c4e1dac40b217d960f7d58def597c4e606ff8fbe71ceb80"},
5454
"telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"},
5555
"thousand_island": {:hex, :thousand_island, "1.4.3", "2158209580f633be38d43ec4e3ce0a01079592b9657afff9080d5d8ca149a3af", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "6e4ce09b0fd761a58594d02814d40f77daff460c48a7354a15ab353bb998ea0b"},

0 commit comments

Comments
 (0)