Skip to content

Commit 1259a30

Browse files
committed
Add delete session endpoint
1 parent 55706ad commit 1259a30

5 files changed

Lines changed: 59 additions & 13 deletions

File tree

lib/plug/router/webrtc.ex

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,5 +53,10 @@ if Code.ensure_loaded?(Plug) do
5353
send_resp(conn, 415, "Unsupported Media Type")
5454
end
5555
end
56+
57+
delete "/:source_id/whep/:session_id" do
58+
Shinkai.Sources.remove_webrtc_peer(source_id, session_id)
59+
send_resp(conn, 204, "")
60+
end
5661
end
5762
end

lib/shinkai/pipeline.ex

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ defmodule Shinkai.Pipeline do
1717
Sink.WebRTC.handle_peer_answer(:"webrtc_sink_#{source_id}", session_id, sdp_answer)
1818
end
1919

20+
def remove_webrtc_peer(source_id, session_id) do
21+
Sink.WebRTC.remove_peer(:"webrtc_sink_#{source_id}", session_id)
22+
end
23+
2024
def stop(source_id) do
2125
Supervisor.stop(:"#{source_id}")
2226
end

lib/shinkai/sink/webrtc.ex

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,22 +33,24 @@ defmodule Shinkai.Sink.WebRTC do
3333
GenServer.call(server, {:handle_peer_answer, session_id, sdp})
3434
end
3535

36+
@spec remove_peer(server :: pid() | atom(), session_id :: String.t()) :: :ok
37+
def remove_peer(server, session_id) do
38+
GenServer.cast(server, {:remove_peer, session_id})
39+
end
40+
3641
@impl true
3742
def init(opts) do
3843
source_id = opts[:id]
3944
{:ok, peer_manager} = PeerManager.start_link(source_id: source_id)
4045

4146
PubSub.subscribe(Shinkai.PubSub, tracks_topic(source_id))
4247

43-
{:ok, sock} = :gen_udp.open(0, [:binary, active: true])
44-
4548
{:ok,
4649
%{
4750
peer_manager: peer_manager,
4851
source_id: source_id,
4952
packets_topic: packets_topic(source_id),
50-
tracks: %{},
51-
sock: sock
53+
tracks: %{}
5254
}}
5355
end
5456

@@ -67,6 +69,12 @@ defmodule Shinkai.Sink.WebRTC do
6769
{:noreply, state}
6870
end
6971

72+
@impl true
73+
def handle_cast({:remove_peer, session_id}, state) do
74+
:ok = PeerManager.remove_peer(state.peer_manager, session_id)
75+
{:noreply, state}
76+
end
77+
7078
@impl true
7179
def handle_info({:tracks, tracks}, state) do
7280
{tracks, unsupported_tracks} = Enum.split_with(tracks, &(&1.codec in @supported_codecs))
@@ -87,7 +95,7 @@ defmodule Shinkai.Sink.WebRTC do
8795
|> Enum.reject(&is_nil/1)
8896
|> Enum.reduce(state, fn track, state ->
8997
media_stream = ExWebRTC.MediaStreamTrack.new(track.type, [stream_id])
90-
webrtc_track = webrtc_track(track, stream_id)
98+
webrtc_track = webrtc_track(track)
9199

92100
payloader_mod = payloader_mod(track.codec)
93101

@@ -168,13 +176,15 @@ defmodule Shinkai.Sink.WebRTC do
168176
%{track_ctx | payloader_state: payloader_state}
169177
end
170178

171-
defp webrtc_track(track, stream_id) do
179+
defp webrtc_track(track) do
180+
pt = payload_type(track.codec)
181+
172182
%RTPCodecParameters{
173-
payload_type: payload_type(track.codec),
183+
payload_type: pt,
174184
mime_type: mime_type(track.codec),
175185
clock_rate: clock_rate(track),
176186
channels: if(track.type == :audio, do: 1, else: nil),
177-
sdp_fmtp_line: sdp_fmtp_line(track.codec, payload_type)
187+
sdp_fmtp_line: sdp_fmtp_line(track.codec, pt)
178188
}
179189
end
180190

lib/shinkai/sink/webrtc/peer_manager.ex

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@ defmodule Shinkai.Sink.WebRTC.PeerManager do
3636
GenServer.cast(manager, {:handle_peer_answer, from, session_id, sdp_answer})
3737
end
3838

39+
@spec remove_peer(server :: pid() | atom(), session_id :: String.t()) :: :ok
40+
def remove_peer(manager, session_id) do
41+
GenServer.call(manager, {:remove_peer, session_id})
42+
end
43+
3944
@impl true
4045
def init(opts) do
4146
{:ok,
@@ -57,6 +62,20 @@ defmodule Shinkai.Sink.WebRTC.PeerManager do
5762
{:reply, :ok, %{state | audio_track: track}}
5863
end
5964

65+
def handle_call({:remove_peer, session_id}, _from, state) do
66+
Logger.info("Removing WebRTC peer with session ID: #{session_id}")
67+
68+
case Registry.match(Shinkai.Registry, {:webrtc, state.source_id}, {:_, session_id}) do
69+
[{_pid, {pc, _session_id}}] -> unregister(state.source_id, pc)
70+
[] -> :ok
71+
end
72+
73+
{pc, sessions} = Map.pop(state.sessions, session_id)
74+
if pc, do: PeerConnection.stop(pc)
75+
76+
{:reply, :ok, %{state | sessions: sessions}}
77+
end
78+
6079
@impl true
6180
def handle_cast({:add_peer, from}, state) do
6281
video_tracks = if state.video_track, do: [elem(state.video_track, 1)], else: []
@@ -125,10 +144,12 @@ defmodule Shinkai.Sink.WebRTC.PeerManager do
125144
end
126145

127146
def handle_info({:ex_webrtc, pid, {:connection_state_change, connection_state}}, state)
128-
when connection_state in [:failed, :closed, :disconnected] do
129-
Logger.info("WebRTC PeerConnection #{inspect(pid)} connection state: #{connection_state}")
130-
PeerConnection.stop(pid)
131-
Registry.unregister_match(Shinkai.Registry, {:webrtc, state.source_id}, {pid, :_})
147+
when connection_state in [:failed, :closed] do
148+
Logger.debug(
149+
"WebRTC PeerConnection #{inspect(pid)} connection state changed to: #{connection_state}"
150+
)
151+
152+
unregister(state.source_id, pid)
132153
{:noreply, state}
133154
end
134155

@@ -137,7 +158,7 @@ defmodule Shinkai.Sink.WebRTC.PeerManager do
137158
end
138159

139160
def handle_info({:ex_webrtc, _pid, msg}, state) do
140-
Logger.info("Unhandled ExWebRTC message: #{inspect(msg)}")
161+
# Logger.info("Unhandled ExWebRTC message: #{inspect(msg)}")
141162
{:noreply, state}
142163
end
143164

@@ -153,4 +174,9 @@ defmodule Shinkai.Sink.WebRTC.PeerManager do
153174
end
154175
end)
155176
end
177+
178+
defp unregister(source_id, pc) do
179+
PeerConnection.stop(pc)
180+
Registry.unregister_match(Shinkai.Registry, {:webrtc, source_id}, {pc, :_})
181+
end
156182
end

lib/shinkai/sources.ex

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ defmodule Shinkai.Sources do
5454

5555
defdelegate add_webrtc_peer(source_id), to: Shinkai.Pipeline
5656
defdelegate handle_webrtc_peer_answer(source_id, session_id, sdp_answer), to: Shinkai.Pipeline
57+
defdelegate remove_webrtc_peer(source_id, session_id), to: Shinkai.Pipeline
5758

5859
defp storage_impl do
5960
Application.get_env(:shinkai, :storage_impl, Shinkai.Sources.Storage.File)

0 commit comments

Comments
 (0)