Skip to content

Commit 4945abc

Browse files
authored
Add rtsp publishing (#14)
* Add rtsp server * Use media processor in rtsp client * Fix test * Add tests * Fix tests * Fix tests again * Make credo happy
1 parent bd9c887 commit 4945abc

15 files changed

Lines changed: 663 additions & 217 deletions

File tree

lib/shinkai/application.ex

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ defmodule Shinkai.Application do
1616
{Sources.PublishManager, []},
1717
{Registry, name: Sink.Registry, keys: :duplicate},
1818
{Registry, name: Source.Registry, keys: :unique},
19-
{Task, fn -> Sources.start_all() end}
19+
{Task, fn -> Sources.start_all() end},
20+
{RTSP.Server, handler: Sources.RTSP.Handler, port: 8554}
2021
]
2122

2223
children =

lib/shinkai/sink/hls.ex

Lines changed: 34 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,13 @@ defmodule Shinkai.Sink.Hls do
4242
File.rm_rf!(hls_config[:storage_dir])
4343

4444
:ok = Phoenix.PubSub.subscribe(Shinkai.PubSub, tracks_topic(id))
45+
:ok = Phoenix.PubSub.subscribe(Shinkai.PubSub, packets_topic(id))
4546
:ok = Phoenix.PubSub.subscribe(Shinkai.PubSub, state_topic(id))
4647

4748
{:ok, _} = RequestHolder.start_link(:"request_holder_#{id}")
4849

50+
Process.flag(:trap_exit, true)
51+
4952
{:ok,
5053
%{
5154
writer: Writer.new!(hls_config),
@@ -71,6 +74,8 @@ defmodule Shinkai.Sink.Hls do
7174
)
7275
end
7376

77+
Logger.info("[#{state.source_id}] [hls] start muxing")
78+
7479
audio_track = Enum.find(hls_tracks, fn t -> t.type == :audio end)
7580
video_track = Enum.find(hls_tracks, fn t -> t.type == :video end)
7681

@@ -89,7 +94,10 @@ defmodule Shinkai.Sink.Hls do
8994
end
9095

9196
buffer? = length(hls_tracks) > 1 and Enum.any?(hls_tracks, &(&1.type == :video))
92-
:ok = PubSub.subscribe(Shinkai.PubSub, packets_topic(state.source_id))
97+
98+
if hls_tracks == [] do
99+
:ok = PubSub.unsubscribe(Shinkai.PubSub, packets_topic(state.source_id))
100+
end
93101

94102
{:noreply,
95103
%{
@@ -100,30 +108,45 @@ defmodule Shinkai.Sink.Hls do
100108
}}
101109
end
102110

103-
def handle_info({:packet, packets}, state) when is_list(packets) do
104-
{:noreply, Enum.reduce(packets, state, &do_handle_packet/2)}
105-
end
106-
107111
@impl true
108-
def handle_info({:packet, packet}, state) do
112+
def handle_info({:packet, %Shinkai.Packet{} = packet}, state) do
109113
{:noreply, do_handle_packet(packet, state)}
110114
end
111115

116+
def handle_info({:packet, packets}, state) do
117+
{:noreply, Enum.reduce(packets, state, &do_handle_packet/2)}
118+
end
119+
112120
@impl true
113121
def handle_info(:disconnected, state) do
114122
:ok = Writer.close(state.writer)
115-
:ok = PubSub.unsubscribe(Shinkai.PubSub, packets_topic(state.source_id))
116123
:ok = PubSub.local_broadcast(Shinkai.PubSub, sink_topic(state.source_id), {:hls, :done})
117124

118125
{:noreply,
119126
%{state | writer: Writer.new!(state.config), last_sample: %{}, packets: [], buffer?: false}}
120127
end
121128

122-
defp do_handle_packet(%{track_id: id}, state) when not is_map_key(state.tracks, id) do
123-
state
129+
defp do_handle_packet(packet, %{buffer?: false} = state)
130+
when is_map_key(state.tracks, packet.track_id) do
131+
case Map.fetch(state.last_sample, packet.track_id) do
132+
:error ->
133+
last_samples = Map.put(state.last_sample, packet.track_id, packet_to_sample(packet))
134+
%{state | last_sample: last_samples}
135+
136+
{:ok, last_sample} ->
137+
variant_name = state.tracks[packet.track_id].type |> to_string()
138+
sample = packet_to_sample(packet)
139+
last_sample = %{last_sample | duration: sample.dts - last_sample.dts}
140+
141+
%{
142+
state
143+
| writer: Writer.write_sample(state.writer, variant_name, last_sample),
144+
last_sample: Map.put(state.last_sample, packet.track_id, sample)
145+
}
146+
end
124147
end
125148

126-
defp do_handle_packet(packet, %{buffer?: true} = state) do
149+
defp do_handle_packet(packet, state) when is_map_key(state.tracks, packet.track_id) do
127150
# buffer until we get a video packet
128151
# and then drop all packets with dts < dts of that video packet
129152
track = state.tracks[packet.track_id]
@@ -143,24 +166,7 @@ defmodule Shinkai.Sink.Hls do
143166
end
144167
end
145168

146-
defp do_handle_packet(packet, state) do
147-
case Map.fetch(state.last_sample, packet.track_id) do
148-
:error ->
149-
last_samples = Map.put(state.last_sample, packet.track_id, packet_to_sample(packet))
150-
%{state | last_sample: last_samples}
151-
152-
{:ok, last_sample} ->
153-
variant_name = state.tracks[packet.track_id].type |> to_string()
154-
sample = packet_to_sample(packet)
155-
last_sample = %{last_sample | duration: sample.dts - last_sample.dts}
156-
157-
%{
158-
state
159-
| writer: Writer.write_sample(state.writer, variant_name, last_sample),
160-
last_sample: Map.put(state.last_sample, packet.track_id, sample)
161-
}
162-
end
163-
end
169+
defp do_handle_packet(_packet, state), do: state
164170

165171
defp reject?(packet, state, max_dts) do
166172
track = state.tracks[packet.track_id]

lib/shinkai/sink/rtmp.ex

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ defmodule Shinkai.Sink.RTMP do
5656

5757
if unsupported_tracks != [] do
5858
Logger.warning(
59-
"[#{state.source_id}] Ignore unsupported tracks: #{Enum.map_join(unsupported_tracks, ", ", & &1.codec)}"
59+
"[#{state.source_id}] rtmp sink: ignore unsupported tracks: #{Enum.map_join(unsupported_tracks, ", ", & &1.codec)}"
6060
)
6161
end
6262

@@ -79,14 +79,13 @@ defmodule Shinkai.Sink.RTMP do
7979
def handle_info({:packet, packets}, state) do
8080
Registry.dispatch(Sink.Registry, {:rtmp, state.source_id}, fn entries ->
8181
packets = List.wrap(packets)
82-
track = state.tracks[hd(packets).track_id]
83-
tags = Enum.map(packets, &packet_to_tag(track, &1))
8482

85-
for {pid, _} <- entries, {timestamp, data} <- tags do
86-
case track.type do
87-
:video -> ClientSession.send_video_data(pid, timestamp, data)
88-
:audio -> ClientSession.send_audio_data(pid, timestamp, data)
89-
end
83+
case state.tracks[hd(packets).track_id] do
84+
nil ->
85+
:ok
86+
87+
track ->
88+
dispatch_packets(entries, packets, track)
9089
end
9190
end)
9291

@@ -106,6 +105,18 @@ defmodule Shinkai.Sink.RTMP do
106105
{:noreply, state}
107106
end
108107

108+
defp dispatch_packets(entries, packets, track) do
109+
tags = Enum.map(packets, &packet_to_tag(track, &1))
110+
111+
for {pid, _} <- entries, {timestamp, data} <- tags do
112+
# credo:disable-for-next-line
113+
case track.type do
114+
:video -> ClientSession.send_video_data(pid, timestamp, data)
115+
:audio -> ClientSession.send_audio_data(pid, timestamp, data)
116+
end
117+
end
118+
end
119+
109120
defp packet_to_tag(track, packet) do
110121
dts = div(packet.dts * @timescale, track.timescale)
111122
cts = div((packet.pts - packet.dts) * @timescale, track.timescale)

lib/shinkai/sources.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ defmodule Shinkai.Sources do
2626
[] ->
2727
if source.type == :publish do
2828
:ok = PublishManager.monitor(source, self())
29-
:ets.insert(:sources, {source.id, source})
29+
:ets.insert(:sources, {source.id, %{source | status: :streaming}})
3030
end
3131

3232
DynamicSupervisor.start_child(

lib/shinkai/sources/rtsp.ex

Lines changed: 10 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,8 @@ defmodule Shinkai.Sources.RTSP do
55

66
require Logger
77

8-
import Shinkai.Utils
9-
10-
alias MediaCodecs.MPEG4
11-
alias Shinkai.{Sources, Track}
8+
alias Shinkai.Sources
9+
alias Shinkai.Sources.RTSP.MediaProcessor
1210

1311
@timeout 6_000
1412
@reconnect_timeout 5_000
@@ -33,7 +31,7 @@ defmodule Shinkai.Sources.RTSP do
3331
id: source.id,
3432
rtsp_pid: pid,
3533
tracks: %{},
36-
packets_topic: packets_topic(source.id)
34+
media_processor: nil
3735
}
3836

3937
{:ok, state, {:continue, :connect}}
@@ -46,21 +44,15 @@ defmodule Shinkai.Sources.RTSP do
4644
def handle_info(:reconnect, state), do: do_connect(state)
4745

4846
def handle_info({:rtsp, _pid, {id, sample_or_samples}}, state) do
49-
:ok =
50-
Phoenix.PubSub.broadcast(
51-
Shinkai.PubSub,
52-
state.packets_topic,
53-
{:packet, to_packets(sample_or_samples, state.tracks[id].id)}
54-
)
55-
56-
{:noreply, state}
47+
media_processor = MediaProcessor.handle_sample(id, sample_or_samples, state.media_processor)
48+
{:noreply, %{state | media_processor: media_processor}}
5749
end
5850

5951
@impl true
6052
def handle_info({:rtsp, pid, :session_closed}, %{rtsp_pid: pid} = state) do
6153
Logger.error("[#{state.id}] rtsp client disconnected")
62-
Phoenix.PubSub.broadcast!(Shinkai.PubSub, state_topic(state.id), :disconnected)
63-
Sources.update_source_status(state.id, :failed)
54+
Phoenix.PubSub.broadcast!(Shinkai.PubSub, Shinkai.Utils.state_topic(state.id), :disconnected)
55+
update_status(state, :failed)
6456
Process.send_after(self(), :reconnect, @reconnect_timeout)
6557
{:noreply, state}
6658
end
@@ -73,21 +65,10 @@ defmodule Shinkai.Sources.RTSP do
7365

7466
defp do_connect(state) do
7567
with {:ok, tracks} <- RTSP.connect(state.rtsp_pid, @timeout),
76-
tracks <- build_tracks(tracks),
7768
:ok <- RTSP.play(state.rtsp_pid) do
78-
codecs = tracks |> Map.values() |> Enum.map_join(", ", & &1.codec)
79-
Logger.info("[#{state.id}] start reading from #{map_size(tracks)} tracks (#{codecs})")
80-
8169
update_status(state, :streaming)
82-
83-
:ok =
84-
Phoenix.PubSub.broadcast(
85-
Shinkai.PubSub,
86-
tracks_topic(state.id),
87-
{:tracks, Map.values(tracks)}
88-
)
89-
90-
{:noreply, %{state | tracks: tracks}}
70+
media_processor = MediaProcessor.new(state.id, tracks)
71+
{:noreply, %{state | media_processor: media_processor}}
9172
else
9273
{:error, reason} ->
9374
Logger.error("[#{state.id}] rtsp connection failed: #{inspect(reason)}")
@@ -97,49 +78,5 @@ defmodule Shinkai.Sources.RTSP do
9778
end
9879
end
9980

100-
defp build_tracks(tracks) do
101-
tracks
102-
|> Enum.with_index(1)
103-
|> Map.new(fn {track, id} ->
104-
codec = codec(String.downcase(track.rtpmap.encoding))
105-
106-
{track.control_path,
107-
Track.new(
108-
id: id,
109-
type: track.type,
110-
codec: codec,
111-
timescale: track.rtpmap.clock_rate,
112-
priv_data: priv_data(codec, track.fmtp)
113-
)}
114-
end)
115-
end
116-
117-
defp codec("mpeg4-generic"), do: :aac
118-
defp codec(other), do: String.to_atom(other)
119-
120-
defp priv_data(:aac, fmtp), do: MPEG4.AudioSpecificConfig.parse(fmtp.config)
121-
defp priv_data(:h264, %{sprop_parameter_sets: nil}), do: nil
122-
defp priv_data(:h264, %{sprop_parameter_sets: pps}), do: {pps.sps, [pps.pps]}
123-
defp priv_data(:h265, %{sprop_vps: nil}), do: nil
124-
defp priv_data(:h265, fmtp), do: {hd(fmtp.sprop_vps), hd(fmtp.sprop_sps), fmtp.sprop_pps}
125-
defp priv_data(_codec, _fmtp), do: nil
126-
127-
defp to_packets(samples, track_id) when is_list(samples) do
128-
Enum.map(samples, &packet_from_sample(track_id, &1))
129-
end
130-
131-
defp to_packets(sample, track_id), do: packet_from_sample(track_id, sample)
132-
133-
defp packet_from_sample(track_id, {payload, pts, sync?, _timestamp}) do
134-
Shinkai.Packet.new(payload,
135-
track_id: track_id,
136-
dts: pts,
137-
pts: pts,
138-
sync?: sync?
139-
)
140-
end
141-
142-
defp update_status(state, status) do
143-
Sources.update_source_status(state.id, status)
144-
end
81+
defp update_status(state, status), do: Sources.update_source_status(state.id, status)
14582
end
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
defmodule Shinkai.Sources.RTSP.Handler do
2+
@moduledoc false
3+
4+
use RTSP.Server.ClientHandler
5+
6+
require Logger
7+
8+
alias Shinkai.Sources
9+
alias Shinkai.Sources.RTSP.MediaProcessor
10+
11+
@impl true
12+
def init(_options) do
13+
nil
14+
end
15+
16+
@impl true
17+
def handle_record(path, tracks, _state) do
18+
with {:ok, source_id} <- source_id(path),
19+
source <- %Sources.Source{id: source_id, type: :publish},
20+
{:ok, _pid} <- Sources.start(source) do
21+
Logger.info("[RTSP] is publishing to: #{path}")
22+
{:ok, MediaProcessor.new(source_id, tracks)}
23+
end
24+
end
25+
26+
@impl true
27+
def handle_media(control_path, sample, state) do
28+
MediaProcessor.handle_sample(control_path, sample, state)
29+
end
30+
31+
@impl true
32+
def handle_closed_connection(state) do
33+
MediaProcessor.close(state)
34+
end
35+
36+
defp source_id("/"), do: {:error, :missing_path}
37+
defp source_id(<<"/", path::binary>>), do: {:ok, String.replace(path, "/", "-")}
38+
end

0 commit comments

Comments
 (0)