Skip to content

Commit d5d29f7

Browse files
authored
Add support for sending opus to rtmp clients (#16)
* Add support for sending opus to rtmp clients * Fix tests * Refactor
1 parent 4d9666c commit d5d29f7

3 files changed

Lines changed: 63 additions & 42 deletions

File tree

lib/shinkai/sink/rtmp.ex

Lines changed: 47 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@ defmodule Shinkai.Sink.RTMP do
1111

1212
import Shinkai.Utils
1313

14-
alias ExFLV.Tag.{AudioData, ExVideoData, Serializer, VideoData}
14+
alias ExFLV.Tag.{AudioData, ExAudioData, ExVideoData, Serializer, VideoData}
1515
alias ExRTMP.Server.ClientSession
1616
alias Phoenix.PubSub
1717

1818
@timescale 1000
19-
@supported_codesc [:h264, :h265, :av1, :aac, :pcma, :pcmu]
19+
@supported_codesc [:h264, :h265, :av1, :aac, :pcma, :pcmu, :opus]
2020

2121
def start_link(opts) do
2222
name = {:via, Registry, {Source.Registry, {:rtmp_sink, opts[:id]}}}
@@ -34,7 +34,7 @@ defmodule Shinkai.Sink.RTMP do
3434
:ok = PubSub.subscribe(Shinkai.PubSub, tracks_topic(id))
3535
:ok = PubSub.subscribe(Shinkai.PubSub, state_topic(id))
3636

37-
{:ok, %{source_id: id, tracks: %{}, init_tags: [], packet_topic: packets_topic(id)}}
37+
{:ok, %{source_id: id, tracks: %{}, init_tags: []}}
3838
end
3939

4040
@impl true
@@ -69,7 +69,7 @@ defmodule Shinkai.Sink.RTMP do
6969
end)
7070

7171
if supported_tracks != [] do
72-
:ok = PubSub.subscribe(Shinkai.PubSub, state.packet_topic)
72+
:ok = PubSub.subscribe(Shinkai.PubSub, packets_topic(state.source_id))
7373
end
7474

7575
{:noreply, %{state | tracks: Map.new(supported_tracks, &{&1.id, &1}), init_tags: init_tags}}
@@ -97,58 +97,65 @@ defmodule Shinkai.Sink.RTMP do
9797
Logger.warning("[#{state.source_id}] [RTMP Sink] source disconnected")
9898

9999
Registry.dispatch(Sink.Registry, {:rtmp, state.source_id}, fn entries ->
100-
for {pid, _} <- entries do
101-
send(pid, :exit)
102-
end
100+
for {pid, _} <- entries, do: send(pid, :exit)
103101
end)
104102

105103
{:noreply, state}
106104
end
107105

108106
defp dispatch_packets(entries, packets, track) do
109-
tags = Enum.map(packets, &packet_to_tag(track, &1))
107+
tags =
108+
Enum.map(packets, fn packet ->
109+
dts = div(packet.dts * @timescale, track.timescale)
110+
cts = div((packet.pts - packet.dts) * @timescale, track.timescale)
111+
112+
tag =
113+
track.codec
114+
|> packet_to_tag(packet, cts)
115+
|> Serializer.serialize()
116+
|> IO.iodata_to_binary()
117+
118+
{dts, tag}
119+
end)
110120

111121
for {pid, _} <- entries, {timestamp, data} <- tags do
112-
# credo:disable-for-next-line
113122
case track.type do
114123
:video -> ClientSession.send_video_data(pid, timestamp, data)
115124
:audio -> ClientSession.send_audio_data(pid, timestamp, data)
116125
end
117126
end
118127
end
119128

120-
defp packet_to_tag(track, packet) do
121-
dts = div(packet.dts * @timescale, track.timescale)
122-
cts = div((packet.pts - packet.dts) * @timescale, track.timescale)
123-
124-
tag =
125-
case track.codec do
126-
:h264 ->
127-
maybe_prefix_payload(:h264, packet.data)
128-
|> VideoData.AVC.new(:nalu, cts)
129-
|> VideoData.new(:h264, if(packet.sync?, do: :keyframe, else: :interframe))
130-
131-
:aac ->
132-
packet.data
133-
|> AudioData.AAC.new(:raw)
134-
|> AudioData.new(:aac, 1, 3, :stereo)
135-
136-
codec when codec in [:h265, :av1] ->
137-
packet_type = if codec == :h265 and cts != 0, do: :coded_frames, else: :coded_frames_x
138-
139-
%ExVideoData{
140-
codec_id: codec,
141-
frame_type: if(packet.sync?, do: :keyframe, else: :interframe),
142-
packet_type: packet_type,
143-
composition_time_offset: cts,
144-
data: maybe_prefix_payload(codec, packet.data)
145-
}
146-
147-
codec ->
148-
AudioData.new(packet.data, codec, 3, 1, :stereo)
149-
end
129+
defp packet_to_tag(:h264, packet, cts) do
130+
maybe_prefix_payload(:h264, packet.data)
131+
|> VideoData.AVC.new(:nalu, cts)
132+
|> VideoData.new(:h264, if(packet.sync?, do: :keyframe, else: :interframe))
133+
end
134+
135+
defp packet_to_tag(:aac, packet, _cts) do
136+
packet.data
137+
|> AudioData.AAC.new(:raw)
138+
|> AudioData.new(:aac, 1, 3, :stereo)
139+
end
140+
141+
defp packet_to_tag(:opus, packet, _cts) do
142+
%ExAudioData{codec_id: :opus, packet_type: :coded_frames, data: packet.data}
143+
end
144+
145+
defp packet_to_tag(codec, packet, cts) when codec in [:av1, :h265] do
146+
packet_type = if codec == :h265 and cts != 0, do: :coded_frames, else: :coded_frames_x
147+
148+
%ExVideoData{
149+
codec_id: codec,
150+
frame_type: if(packet.sync?, do: :keyframe, else: :interframe),
151+
packet_type: packet_type,
152+
composition_time_offset: cts,
153+
data: maybe_prefix_payload(codec, packet.data)
154+
}
155+
end
150156

151-
{dts, Serializer.serialize(tag) |> IO.iodata_to_binary()}
157+
defp packet_to_tag(codec, packet, _cts) do
158+
AudioData.new(packet.data, codec, 3, 1, :stereo)
152159
end
153160

154161
defp maybe_prefix_payload(codec, payload) when codec in [:h264, :h265] do

lib/shinkai/track.ex

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ defmodule Shinkai.Track do
33
Module describing a media track.
44
"""
55

6-
alias ExFLV.Tag.{AudioData, ExVideoData, VideoData}
6+
alias ExFLV.Tag.{AudioData, ExAudioData, ExVideoData, VideoData}
77
alias ExMP4.Box
88
alias MediaCodecs.MPEG4
99

@@ -76,6 +76,20 @@ defmodule Shinkai.Track do
7676
|> AudioData.new(:aac, 1, 3, :stereo)
7777
end
7878

79+
def to_rtmp_tag(%{codec: :opus} = track) do
80+
# priv_data is the channel count
81+
# No support for more than 2 channels for now
82+
dops = %Box.Dops{
83+
output_channel_count: track.priv_data || 2,
84+
pre_skip: 0,
85+
input_sample_rate: 48_000,
86+
output_gain: 0,
87+
channel_mapping_family: 0
88+
}
89+
90+
%ExAudioData{codec_id: :opus, packet_type: :sequence_start, data: Box.serialize(dops)}
91+
end
92+
7993
def to_rtmp_tag(%{codec: codec} = track) when codec in [:h265, :av1] do
8094
%ExVideoData{
8195
codec_id: codec,

test/shinkai/pipeline_test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ defmodule Shinkai.PipelineTest do
188188
{video_codec, audio_codec} =
189189
case fixture do
190190
"test/fixtures/big_buck_avc_aac.mp4" -> {:h264, :aac}
191-
_ -> {:av1, nil}
191+
_ -> {:av1, :opus}
192192
end
193193

194194
if video_codec do

0 commit comments

Comments
 (0)