Skip to content

Commit 34d2cf3

Browse files
authored
Add option to demux incoming rtmp streams (#7)
* Add option to demux incoming rtmp streams * refactor media processor
1 parent 957564d commit 34d2cf3

4 files changed

Lines changed: 63 additions & 25 deletions

File tree

lib/ex_rtmp/client/media_processor.ex

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ defmodule ExRTMP.Client.MediaProcessor do
22
@moduledoc false
33

44
alias ExFLV.Tag.{AudioData, VideoData}
5+
alias ExRTMP.Message
56

67
@type track :: {:codec, atom(), binary()}
78
@type video_sample ::
@@ -24,20 +25,20 @@ defmodule ExRTMP.Client.MediaProcessor do
2425
@spec new() :: t()
2526
def new(), do: %__MODULE__{}
2627

27-
@spec push_video(t(), non_neg_integer(), iodata()) :: {video_return(), t()}
28-
def push_video(processor, timestamp, data) do
29-
data
28+
@spec push_video(Message.t(), t()) :: {video_return(), t()}
29+
def push_video(message, processor) do
30+
message.payload
3031
|> IO.iodata_to_binary()
3132
|> VideoData.parse!()
32-
|> handle_video_tag(timestamp, processor)
33+
|> handle_video_tag(message.timestamp, processor)
3334
end
3435

35-
@spec push_audio(t(), non_neg_integer(), iodata()) :: {audio_return(), t()}
36-
def push_audio(processor, timestamp, data) do
37-
data
36+
@spec push_audio(Message.t(), t()) :: {audio_return(), t()}
37+
def push_audio(message, processor) do
38+
message.payload
3839
|> IO.iodata_to_binary()
3940
|> AudioData.parse!()
40-
|> handle_audio_tag(timestamp, processor)
41+
|> handle_audio_tag(message.timestamp, processor)
4142
end
4243

4344
defp handle_video_tag(%VideoData{codec_id: :avc} = tag, timestamp, processor) do

lib/ex_rtmp/client/stream_context.ex

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,20 +24,14 @@ defmodule ExRTMP.Client.StreamContext do
2424
@doc false
2525
@spec handle_video_data(t(), ExRTMP.Message.t()) :: {MediaProcessor.video_return(), t()}
2626
def handle_video_data(stream_ctx, message) do
27-
{data, processor} =
28-
MediaProcessor.push_video(stream_ctx.media_processor, message.timestamp, message.payload)
29-
30-
stream_ctx = %{stream_ctx | media_processor: processor}
31-
{data, stream_ctx}
27+
{data, processor} = MediaProcessor.push_video(message, stream_ctx.media_processor)
28+
{data, %{stream_ctx | media_processor: processor}}
3229
end
3330

3431
@doc false
3532
@spec handle_audio_data(t(), ExRTMP.Message.t()) :: {MediaProcessor.audio_return(), t()}
3633
def handle_audio_data(stream_ctx, message) do
37-
{data, processor} =
38-
MediaProcessor.push_audio(stream_ctx.media_processor, message.timestamp, message.payload)
39-
40-
stream_ctx = %{stream_ctx | media_processor: processor}
41-
{data, stream_ctx}
34+
{data, processor} = MediaProcessor.push_audio(message, stream_ctx.media_processor)
35+
{data, %{stream_ctx | media_processor: processor}}
4236
end
4337
end

lib/ex_rtmp/server.ex

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,19 @@ defmodule ExRTMP.Server do
44
55
The server listens for incoming RTMP client connections and spawns a new
66
`ExRTMP.Server.ClientSession` process for each connected client.
7+
8+
## Handling Media
9+
When `demux` is set to `true`, the server will demux the incoming RTMP
10+
streams into audio and video frames before passing them to the handler module.
11+
12+
The format of the data received by the handler is:
13+
* `{:codec, codec_type, init_data}` - The codec type and initialization data.
14+
* `{:sample, payload, dts, pts, keyframe?}` - A video sample, the payload depends on the codec type. In the case of `avc`,
15+
the payload is a list of NAL units, for other codecs, it is the raw video frame data.
16+
* `{:sample, payload, timestamp}` - An audio sample.
17+
18+
If `demux` is set to `false`, the handler module will receive the raw RTMP
19+
data as is.
720
"""
821

922
use GenServer
@@ -15,7 +28,8 @@ defmodule ExRTMP.Server do
1528
@type start_options :: [
1629
{:port, :inet.port_number()},
1730
{:handler, module()},
18-
{:handler_options, any()}
31+
{:handler_options, any()},
32+
{:demux, boolean()}
1933
]
2034

2135
@default_port 1935
@@ -42,6 +56,9 @@ defmodule ExRTMP.Server do
4256
4357
* `handler_options` - A keyword list of options that will be passed to the
4458
handler module when it is started. This option is optional.
59+
60+
* `demux` - Whether the server will demux the incoming RTMP streams into
61+
audio and video frames. Defaults to `true`. See [Handling Media](#module-handling-media) below.
4562
"""
4663
@spec start_link(start_options()) :: GenServer.on_start()
4764
def start_link(opts) do
@@ -76,7 +93,8 @@ defmodule ExRTMP.Server do
7693
socket: server_socket,
7794
pid: self(),
7895
handler: opts[:handler] || raise("Handler module is required"),
79-
handler_options: opts[:handler_options]
96+
handler_options: opts[:handler_options],
97+
demux: Keyword.get(opts, :demux, true)
8098
}
8199

82100
listener = spawn_link(fn -> accept_client_connection(state) end)
@@ -121,7 +139,8 @@ defmodule ExRTMP.Server do
121139
ClientSession.start(
122140
socket: client_socket,
123141
handler: state.handler,
124-
handler_options: state.handler_options
142+
handler_options: state.handler_options,
143+
demux: state.demux
125144
)
126145

127146
:ok = :gen_tcp.controlling_process(client_socket, pid)

lib/ex_rtmp/server/client_session.ex

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ defmodule ExRTMP.Server.ClientSession do
88
require Logger
99

1010
alias ExRTMP.ChunkParser
11+
alias ExRTMP.Client.MediaProcessor
1112
alias ExRTMP.Message
1213
alias ExRTMP.Message.Command.NetConnection
1314
alias ExRTMP.Message.Command.NetConnection.{CreateStream, Response}
@@ -27,14 +28,16 @@ defmodule ExRTMP.Server.ClientSession do
2728
handler_mod: module(),
2829
handler_state: any(),
2930
state: state(),
30-
stream_id: non_neg_integer() | nil
31+
stream_id: non_neg_integer() | nil,
32+
media_processor: MediaProcessor.t() | nil
3133
}
3234

3335
@enforce_keys [:socket]
3436
defstruct @enforce_keys ++
3537
[
3638
:handler_mod,
3739
:handler_state,
40+
:media_processor,
3841
chunk_parser: ChunkParser.new(),
3942
state: :init,
4043
stream_id: nil
@@ -78,7 +81,8 @@ defmodule ExRTMP.Server.ClientSession do
7881
state = %State{
7982
handler_mod: handler_mod,
8083
handler_state: handler_mod.init(options[:handler_options]),
81-
socket: options[:socket]
84+
socket: options[:socket],
85+
media_processor: if(options[:demux], do: MediaProcessor.new())
8286
}
8387

8488
{:ok, state, {:continue, :handshake}}
@@ -176,7 +180,7 @@ defmodule ExRTMP.Server.ClientSession do
176180
state
177181
end
178182

179-
defp handle_message(%{type: 8} = message, %{state: :publishing} = state) do
183+
defp handle_message(%{type: 8} = message, %{state: :publishing, media_processor: nil} = state) do
180184
handler_state =
181185
state.handler_mod.handle_audio_data(
182186
message.timestamp,
@@ -187,7 +191,17 @@ defmodule ExRTMP.Server.ClientSession do
187191
%{state | handler_state: handler_state}
188192
end
189193

190-
defp handle_message(%{type: 9} = message, %{state: :publishing} = state) do
194+
defp handle_message(%{type: 8} = message, %{state: :publishing} = state) do
195+
{media, processor} = MediaProcessor.push_audio(message, state.media_processor)
196+
mod = state.handler_mod
197+
198+
handler_state =
199+
Enum.reduce(List.wrap(media), state.handler_state, &mod.handle_audio_data(0, &1, &2))
200+
201+
%{state | handler_state: handler_state, media_processor: processor}
202+
end
203+
204+
defp handle_message(%{type: 9} = message, %{state: :publishing, media_processor: nil} = state) do
191205
handler_state =
192206
state.handler_mod.handle_video_data(
193207
message.timestamp,
@@ -198,6 +212,16 @@ defmodule ExRTMP.Server.ClientSession do
198212
%{state | handler_state: handler_state}
199213
end
200214

215+
defp handle_message(%{type: 9} = message, %{state: :publishing} = state) do
216+
{media, processor} = MediaProcessor.push_video(message, state.media_processor)
217+
mod = state.handler_mod
218+
219+
handler_state =
220+
Enum.reduce(List.wrap(media), state.handler_state, &mod.handle_video_data(0, &1, &2))
221+
222+
%{state | handler_state: handler_state, media_processor: processor}
223+
end
224+
201225
defp handle_message(%{type: type}, state) when type == 8 or type == 9, do: state
202226

203227
defp handle_message(%{type: 18, payload: %Metadata{data: data}}, state) do

0 commit comments

Comments
 (0)