Skip to content

Commit d931168

Browse files
authored
Add chunk_size option to client/server (#19)
1 parent e8c9705 commit d931168

6 files changed

Lines changed: 72 additions & 34 deletions

File tree

examples/publish_mp4.exs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ defmodule Publisher do
100100
%ExVideoData{
101101
frame_type: :keyframe,
102102
packet_type: :sequence_start,
103-
fourcc: :hvc1,
103+
codec_id: :hvc1,
104104
data: dcr
105105
}
106106
end
@@ -125,7 +125,7 @@ defmodule Publisher do
125125
%ExVideoData{
126126
frame_type: if(keyframe?, do: :keyframe, else: :interframe),
127127
packet_type: :coded_frames,
128-
fourcc: :hvc1,
128+
codec_id: :hvc1,
129129
composition_time_offset: ct,
130130
data: sample.payload
131131
}

lib/ex_rtmp/client.ex

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -42,25 +42,27 @@ defmodule ExRTMP.Client do
4242
alias ExRTMP.Message.UserControl.Event
4343

4444
@default_buffer_size 2_000_000
45+
@default_chunk_size 1024
4546
@audio_msg_type 8
4647
@video_msg_type 9
4748

4849
@type start_options :: [
4950
{:uri, String.t()},
5051
{:stream_key, String.t()},
5152
{:name, GenServer.name()},
52-
{:receiver, Process.dest()}
53+
{:receiver, Process.dest()},
54+
{:chunk_size, non_neg_integer()}
5355
]
5456

5557
@doc """
5658
Starts and links a new RTMP client.
5759
5860
## Options
5961
* `:uri` - The RTMP server URI to connect to. This option is required.
60-
6162
* `:stream_key` - The stream key. This option is required.
62-
6363
* `:name` - The name to register the client process. This option is optional.
64+
* `:receiver` - The process that will receive the media data when playing a stream. This option is optional and defaults to the calling process.
65+
* `:chunk_size` - The RTMP chunk size to use for data sent to the server. This option is optional.
6466
"""
6567
@spec start_link(start_options()) :: GenServer.on_start()
6668
def start_link(opts) do
@@ -134,7 +136,15 @@ defmodule ExRTMP.Client do
134136
@impl true
135137
def init(opts) do
136138
opts = Config.validate!(opts)
137-
state = %State{uri: opts[:uri], stream_key: opts[:stream_key], receiver: opts[:receiver]}
139+
140+
state =
141+
%State{
142+
uri: opts[:uri],
143+
stream_key: opts[:stream_key],
144+
receiver: opts[:receiver],
145+
chunk_size: opts[:chunk_size] || @default_chunk_size
146+
}
147+
138148
{:ok, state}
139149
end
140150

@@ -162,7 +172,7 @@ defmodule ExRTMP.Client do
162172
state.stream_key
163173
|> Play.new()
164174
|> Message.command(state.stream_id)
165-
|> send_message(state.socket)
175+
|> send_message(state)
166176

167177
{:noreply, %{state | pending_action: :play, pending_peer: from}}
168178
end
@@ -177,7 +187,7 @@ defmodule ExRTMP.Client do
177187
state.stream_key
178188
|> Publish.new("live")
179189
|> Message.command(state.stream_id)
180-
|> send_message(state.socket)
190+
|> send_message(state)
181191

182192
{:noreply, %{state | pending_action: :publish, pending_peer: from}}
183193
end
@@ -264,7 +274,8 @@ defmodule ExRTMP.Client do
264274
}
265275
}
266276

267-
send_message(Message.command(connect), state.socket)
277+
send_message(Message.chunk_size(state.chunk_size), state)
278+
send_message(Message.command(connect), state)
268279
{:noreply, %{state | pending_peer: from, pending_action: :connect}}
269280

270281
error ->
@@ -292,7 +303,7 @@ defmodule ExRTMP.Client do
292303

293304
defp create_stream(state) do
294305
ts_id = state.next_ts_id
295-
%CreateStream{transaction_id: ts_id} |> Message.command() |> send_message(state.socket)
306+
%CreateStream{transaction_id: ts_id} |> Message.command() |> send_message(state)
296307
%{state | pending_action: :create_stream, next_ts_id: ts_id + 1}
297308
end
298309

@@ -303,7 +314,7 @@ defmodule ExRTMP.Client do
303314
%Event{type: :ping_request, data: timestamp} ->
304315
timestamp
305316
|> Message.ping_response()
306-
|> send_message(state.socket)
317+
|> send_message(state)
307318

308319
state
309320

@@ -424,7 +435,7 @@ defmodule ExRTMP.Client do
424435

425436
defp do_delete_stream(stream_id, state) do
426437
if stream_id do
427-
DeleteStream.new(stream_id) |> Message.command(stream_id) |> send_message(state.socket)
438+
DeleteStream.new(stream_id) |> Message.command(stream_id) |> send_message(state)
428439
end
429440

430441
:ok = :gen_tcp.close(state.socket)
@@ -439,15 +450,15 @@ defmodule ExRTMP.Client do
439450
timestamp: timestamp
440451
)
441452

442-
send_message(message, state.socket)
453+
send_message(message, state)
443454
end
444455

445456
defp handle_play_resp_code("NetStream.Play.Start"), do: :ok
446457
defp handle_play_resp_code("NetStream.Play.Reset"), do: :ignore
447458
defp handle_play_resp_code("NetStream.Play.StreamNotFound"), do: {:error, "Stream not found"}
448459
defp handle_play_resp_code("NetStream.Play.Failed"), do: {:error, "Play failed"}
449460

450-
defp send_message(message, socket) do
451-
:ok = :gen_tcp.send(socket, Message.serialize(message))
461+
defp send_message(message, state) do
462+
:ok = :gen_tcp.send(state.socket, Message.serialize(message, state.chunk_size))
452463
end
453464
end

lib/ex_rtmp/client/state.ex

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ defmodule ExRTMP.Client.State do
2222
window_ack_size: non_neg_integer(),
2323
stream_id: Message.stream_id() | nil,
2424
state: state(),
25-
media_processor: MediaProcessor.t() | nil
25+
media_processor: MediaProcessor.t() | nil,
26+
chunk_size: non_neg_integer() | nil
2627
}
2728

2829
@enforce_keys [:uri, :stream_key]
@@ -34,6 +35,7 @@ defmodule ExRTMP.Client.State do
3435
:receiver,
3536
:stream_id,
3637
:media_processor,
38+
:chunk_size,
3739
state: :init,
3840
chunk_parser: ChunkParser.new(),
3941
next_ts_id: 2,
@@ -56,6 +58,11 @@ defmodule ExRTMP.Client.State do
5658
@doc false
5759
@spec reset(t()) :: t()
5860
def reset(state) do
59-
%__MODULE__{uri: state.uri, stream_key: state.stream_key, receiver: state.receiver}
61+
%__MODULE__{
62+
uri: state.uri,
63+
stream_key: state.stream_key,
64+
receiver: state.receiver,
65+
chunk_size: state.chunk_size
66+
}
6067
end
6168
end

lib/ex_rtmp/message.ex

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ defmodule ExRTMP.Message do
1111
alias __MODULE__.UserControl.Event
1212
alias ExRTMP.{Chunk, Message}
1313

14+
@default_chunk_size 128
15+
1416
@type stream_id :: non_neg_integer()
1517

1618
@type t :: %__MODULE__{
@@ -139,12 +141,10 @@ defmodule ExRTMP.Message do
139141
140142
The following options may be provided:
141143
142-
* `:chunk_size` - The size of each chunk (default: 128)
143144
* `:chunk_stream_id` - The chunk stream id to use (default: 2)
144145
"""
145-
@spec serialize(t(), keyword()) :: binary()
146-
def serialize(message, opts \\ []) do
147-
chunk_size = Keyword.get(opts, :chunk_size, 128)
146+
@spec serialize(t(), non_neg_integer(), keyword()) :: binary()
147+
def serialize(message, chunk_size \\ @default_chunk_size, opts \\ []) do
148148
chunk_stream_id = Keyword.get(opts, :chunk_stream_id, 2)
149149

150150
payload =

lib/ex_rtmp/server.ex

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ defmodule ExRTMP.Server do
2929
{:port, :inet.port_number()},
3030
{:handler, module()},
3131
{:handler_options, any()},
32-
{:demux, boolean()}
32+
{:demux, boolean()},
33+
{:chunk_size, non_neg_integer()}
3334
]
3435

3536
@default_port 1935
@@ -59,6 +60,8 @@ defmodule ExRTMP.Server do
5960
6061
* `demux` - Whether the server will demux the incoming RTMP streams into
6162
audio and video frames. Defaults to `true`. See [Handling Media](#module-handling-media) below.
63+
64+
* `chunk_size` - The RTMP chunk size to use for data sent to the clients.
6265
"""
6366
@spec start_link(start_options()) :: GenServer.on_start()
6467
def start_link(opts) do
@@ -91,7 +94,8 @@ defmodule ExRTMP.Server do
9194
pid: self(),
9295
handler: opts[:handler] || raise("Handler module is required"),
9396
handler_options: opts[:handler_options],
94-
demux: Keyword.get(opts, :demux, true)
97+
demux: Keyword.get(opts, :demux, true),
98+
chunk_size: opts[:chunk_size]
9599
}
96100

97101
Logger.info("RTMP Server listening on port #{port}")
@@ -139,7 +143,8 @@ defmodule ExRTMP.Server do
139143
socket: client_socket,
140144
handler: state.handler,
141145
handler_options: state.handler_options,
142-
demux: state.demux
146+
demux: state.demux,
147+
chunk_size: state.chunk_size
143148
)
144149

145150
:ok = :gen_tcp.controlling_process(client_socket, pid)

lib/ex_rtmp/server/client_session.ex

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ defmodule ExRTMP.Server.ClientSession do
1616
alias ExRTMP.Message.Metadata
1717

1818
@default_acknowledgement_size 3_000_000
19+
@default_chunk_size 128
1920

2021
defmodule State do
2122
@moduledoc false
@@ -29,7 +30,8 @@ defmodule ExRTMP.Server.ClientSession do
2930
handler_state: any(),
3031
state: state(),
3132
stream_id: non_neg_integer() | nil,
32-
media_processor: MediaProcessor.t() | nil
33+
media_processor: MediaProcessor.t() | nil,
34+
chunk_size: non_neg_integer()
3335
}
3436

3537
@enforce_keys [:socket]
@@ -38,6 +40,7 @@ defmodule ExRTMP.Server.ClientSession do
3840
:handler_mod,
3941
:handler_state,
4042
:media_processor,
43+
:chunk_size,
4144
chunk_parser: ChunkParser.new(),
4245
state: :init,
4346
stream_id: nil
@@ -87,7 +90,8 @@ defmodule ExRTMP.Server.ClientSession do
8790
handler_mod: handler_mod,
8891
handler_state: handler_mod.init(options[:handler_options]),
8992
socket: options[:socket],
90-
media_processor: if(options[:demux], do: MediaProcessor.new())
93+
media_processor: if(options[:demux], do: MediaProcessor.new()),
94+
chunk_size: options[:chunk_size] || @default_chunk_size
9195
}
9296

9397
:ok =
@@ -105,6 +109,7 @@ defmodule ExRTMP.Server.ClientSession do
105109
case do_handle_handshake(state.socket) do
106110
:ok ->
107111
Logger.debug("RTMP Handshake successful")
112+
set_chunk_size(state.socket, state.chunk_size)
108113
{:ok, data} = :gen_tcp.recv(state.socket, 0)
109114
:ok = :inet.setopts(state.socket, active: true)
110115
{:noreply, do_handle_data(state, data)}
@@ -116,15 +121,15 @@ defmodule ExRTMP.Server.ClientSession do
116121

117122
@impl true
118123
def handle_cast({:video_data, timestamp, data}, state) do
119-
case send_media(:video, state.socket, state.stream_id, timestamp, data) do
124+
case send_media(:video, timestamp, data, state) do
120125
:ok -> {:noreply, state}
121126
{:error, reason} -> {:stop, reason, state}
122127
end
123128
end
124129

125130
@impl true
126131
def handle_cast({:audio_data, timestamp, data}, state) do
127-
case send_media(:audio, state.socket, state.stream_id, timestamp, data) do
132+
case send_media(:audio, timestamp, data, state) do
128133
:ok -> {:noreply, state}
129134
{:error, reason} -> {:stop, reason, state}
130135
end
@@ -133,7 +138,7 @@ defmodule ExRTMP.Server.ClientSession do
133138
@impl true
134139
def handle_cast({:metadata, data}, state) do
135140
message = Message.metadata(data, state.stream_id)
136-
:ok = :gen_tcp.send(state.socket, Message.serialize(message))
141+
:ok = :gen_tcp.send(state.socket, Message.serialize(message, state.chunk_size))
137142
{:noreply, state}
138143
end
139144

@@ -173,6 +178,13 @@ defmodule ExRTMP.Server.ClientSession do
173178
end
174179
end
175180

181+
defp set_chunk_size(_socket, @default_chunk_size), do: :ok
182+
183+
defp set_chunk_size(socket, chunk_size) do
184+
message = Message.chunk_size(chunk_size)
185+
:gen_tcp.send(socket, Message.serialize(message))
186+
end
187+
176188
defp do_handle_data(state, data) do
177189
{messages, parser} = ChunkParser.process(data, state.chunk_parser)
178190
Enum.reduce(messages, %{state | chunk_parser: parser}, &handle_message/2)
@@ -375,21 +387,24 @@ defmodule ExRTMP.Server.ClientSession do
375387
end
376388
end
377389

378-
defp send_media(media, socket, stream_id, timestamp, data) do
390+
defp send_media(media, timestamp, data, state) do
379391
{type, chunk_stream_id} =
380392
case media do
381-
:audio -> {8, stream_id * 3}
382-
:video -> {9, stream_id * 3 + 1}
393+
:audio -> {8, state.stream_id * 3}
394+
:video -> {9, state.stream_id * 3 + 1}
383395
end
384396

385397
message = %Message{
386398
type: type,
387399
timestamp: timestamp,
388-
stream_id: stream_id,
400+
stream_id: state.stream_id,
389401
payload: data
390402
}
391403

392-
:gen_tcp.send(socket, Message.serialize(message, chunk_stream_id: chunk_stream_id))
404+
:gen_tcp.send(
405+
state.socket,
406+
Message.serialize(message, state.chunk_size, chunk_stream_id: chunk_stream_id)
407+
)
393408
end
394409

395410
defp send_messages(state, []), do: state

0 commit comments

Comments
 (0)