Skip to content

Commit 957564d

Browse files
authored
Allow only one stream per connection in server (#6)
* Allow only one stream per connection * Fix reviews
1 parent f6be34d commit 957564d

8 files changed

Lines changed: 136 additions & 193 deletions

File tree

lib/ex_rtmp/client.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ defmodule ExRTMP.Client do
193193
with :ok <- :gen_tcp.send(socket, [3]),
194194
:ok <- :gen_tcp.send(socket, [<<0::32>>, <<0::32>>, rand_bytes]),
195195
{:ok, <<0x03::8>>} <- :gen_tcp.recv(socket, 1),
196-
{:ok, <<timestamp::32, 0::32, r::binary-size(1528)>>} <- :gen_tcp.recv(socket, 1536),
196+
{:ok, <<timestamp::32, _::32, r::binary-size(1528)>>} <- :gen_tcp.recv(socket, 1536),
197197
:ok <- :gen_tcp.send(socket, <<timestamp::32, 0::32, r::binary>>),
198198
{:ok, <<0::32, _::32, ^rand_bytes::binary>>} <- :gen_tcp.recv(socket, 1536) do
199199
:ok

lib/ex_rtmp/client/media_processor.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,15 @@ defmodule ExRTMP.Client.MediaProcessor do
2828
def push_video(processor, timestamp, data) do
2929
data
3030
|> IO.iodata_to_binary()
31-
|> VideoData.parse()
31+
|> VideoData.parse!()
3232
|> handle_video_tag(timestamp, processor)
3333
end
3434

3535
@spec push_audio(t(), non_neg_integer(), iodata()) :: {audio_return(), t()}
3636
def push_audio(processor, timestamp, data) do
3737
data
3838
|> IO.iodata_to_binary()
39-
|> AudioData.parse()
39+
|> AudioData.parse!()
4040
|> handle_audio_tag(timestamp, processor)
4141
end
4242

lib/ex_rtmp/server.ex

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ defmodule ExRTMP.Server do
7070

7171
{:ok, server_socket} = :gen_tcp.listen(port, [:binary, active: false, reuseaddr: true])
7272

73+
Logger.info("RTMP Server listening on port #{port}")
74+
7375
state = %{
7476
socket: server_socket,
7577
pid: self(),

lib/ex_rtmp/server/client_session.ex

Lines changed: 96 additions & 145 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,15 @@ defmodule ExRTMP.Server.ClientSession do
1919
defmodule State do
2020
@moduledoc false
2121

22-
@type state :: :init | :connected
23-
@type stream_state :: :created | :publishing | :playing
22+
@type state :: :init | :connected | :publishing | :playing
2423

2524
@type t :: %__MODULE__{
2625
socket: :inet.socket(),
2726
chunk_parser: ChunkParser.t(),
2827
handler_mod: module(),
2928
handler_state: any(),
3029
state: state(),
31-
streams_state: %{optional(non_neg_integer()) => stream_state()},
32-
next_stream_id: non_neg_integer()
30+
stream_id: non_neg_integer() | nil
3331
}
3432

3533
@enforce_keys [:socket]
@@ -39,8 +37,7 @@ defmodule ExRTMP.Server.ClientSession do
3937
:handler_state,
4038
chunk_parser: ChunkParser.new(),
4139
state: :init,
42-
streams_state: %{},
43-
next_stream_id: 1
40+
stream_id: nil
4441
]
4542
end
4643

@@ -53,25 +50,25 @@ defmodule ExRTMP.Server.ClientSession do
5350
@doc """
5451
Sends video data to the client.
5552
"""
56-
@spec send_video_data(pid(), non_neg_integer(), non_neg_integer(), iodata()) :: :ok
57-
def send_video_data(pid, stream_id, timestamp, data) do
58-
GenServer.cast(pid, {:video_data, stream_id, timestamp, data})
53+
@spec send_video_data(pid(), non_neg_integer(), iodata()) :: :ok
54+
def send_video_data(pid, timestamp, data) do
55+
GenServer.cast(pid, {:video_data, timestamp, data})
5956
end
6057

6158
@doc """
6259
Sends audio data to the client.
6360
"""
64-
@spec send_audio_data(pid(), non_neg_integer(), non_neg_integer(), iodata()) :: :ok
65-
def send_audio_data(pid, stream_id, timestamp, data) do
66-
GenServer.cast(pid, {:audio_data, stream_id, timestamp, data})
61+
@spec send_audio_data(pid(), non_neg_integer(), iodata()) :: :ok
62+
def send_audio_data(pid, timestamp, data) do
63+
GenServer.cast(pid, {:audio_data, timestamp, data})
6764
end
6865

6966
@doc """
7067
Sends metadata about the media to the client.
7168
"""
72-
@spec send_metadata(pid(), non_neg_integer(), map()) :: :ok
73-
def send_metadata(pid, stream_id, data) do
74-
GenServer.cast(pid, {:metadata, stream_id, data})
69+
@spec send_metadata(pid(), map()) :: :ok
70+
def send_metadata(pid, data) do
71+
GenServer.cast(pid, {:metadata, data})
7572
end
7673

7774
@impl true
@@ -91,7 +88,7 @@ defmodule ExRTMP.Server.ClientSession do
9188
def handle_continue(:handshake, state) do
9289
case do_handle_handshake(state.socket) do
9390
:ok ->
94-
Logger.info("RTMP Handshake successful")
91+
Logger.debug("RTMP Handshake successful")
9592
{:ok, data} = :gen_tcp.recv(state.socket, 0)
9693
:ok = :inet.setopts(state.socket, active: true)
9794
{:noreply, do_handle_data(state, data)}
@@ -102,20 +99,20 @@ defmodule ExRTMP.Server.ClientSession do
10299
end
103100

104101
@impl true
105-
def handle_cast({:video_data, stream_id, timestamp, data}, state) do
106-
send_media(:video, state.socket, stream_id, timestamp, data)
102+
def handle_cast({:video_data, timestamp, data}, state) do
103+
send_media(:video, state.socket, state.stream_id, timestamp, data)
107104
{:noreply, state}
108105
end
109106

110107
@impl true
111-
def handle_cast({:audio_data, stream_id, timestamp, data}, state) do
112-
send_media(:audio, state.socket, stream_id, timestamp, data)
108+
def handle_cast({:audio_data, timestamp, data}, state) do
109+
send_media(:audio, state.socket, state.stream_id, timestamp, data)
113110
{:noreply, state}
114111
end
115112

116113
@impl true
117-
def handle_cast({:metadata, stream_id, data}, state) do
118-
message = Message.metadata(data, stream_id)
114+
def handle_cast({:metadata, data}, state) do
115+
message = Message.metadata(data, state.stream_id)
119116
:ok = :gen_tcp.send(state.socket, Message.serialize(message))
120117
{:noreply, state}
121118
end
@@ -130,6 +127,11 @@ defmodule ExRTMP.Server.ClientSession do
130127
{:stop, :normal, state}
131128
end
132129

130+
@impl true
131+
def handle_info(:exit, state) do
132+
{:stop, :normal, state}
133+
end
134+
133135
@impl true
134136
def handle_info(msg, state) do
135137
Logger.warning("Received an unexpected message: #{inspect(msg)}")
@@ -174,52 +176,32 @@ defmodule ExRTMP.Server.ClientSession do
174176
state
175177
end
176178

177-
defp handle_message(%{type: 8} = message, state) do
178-
case state.streams_state[message.stream_id] do
179-
:publishing ->
180-
handler_state =
181-
state.handler_mod.handle_audio_data(
182-
message.stream_id,
183-
message.timestamp,
184-
message.payload,
185-
state.handler_state
186-
)
187-
188-
%{state | handler_state: handler_state}
179+
defp handle_message(%{type: 8} = message, %{state: :publishing} = state) do
180+
handler_state =
181+
state.handler_mod.handle_audio_data(
182+
message.timestamp,
183+
message.payload,
184+
state.handler_state
185+
)
189186

190-
_other ->
191-
state
192-
end
187+
%{state | handler_state: handler_state}
193188
end
194189

195-
defp handle_message(%{type: 9} = message, state) do
196-
case state.streams_state[message.stream_id] do
197-
:publishing ->
198-
handler_state =
199-
state.handler_mod.handle_video_data(
200-
message.stream_id,
201-
message.timestamp,
202-
message.payload,
203-
state.handler_state
204-
)
190+
defp handle_message(%{type: 9} = message, %{state: :publishing} = state) do
191+
handler_state =
192+
state.handler_mod.handle_video_data(
193+
message.timestamp,
194+
message.payload,
195+
state.handler_state
196+
)
205197

206-
%{state | handler_state: handler_state}
207-
208-
_other ->
209-
state
210-
end
198+
%{state | handler_state: handler_state}
211199
end
212200

213-
defp handle_message(%{type: 18, payload: %Metadata{data: data}} = message, state) do
214-
%{
215-
state
216-
| handler_state:
217-
state.handler_mod.handle_metadata(
218-
message.stream_id,
219-
data,
220-
state.handler_state
221-
)
222-
}
201+
defp handle_message(%{type: type}, state) when type == 8 or type == 9, do: state
202+
203+
defp handle_message(%{type: 18, payload: %Metadata{data: data}}, state) do
204+
%{state | handler_state: state.handler_mod.handle_metadata(data, state.handler_state)}
223205
end
224206

225207
defp handle_message(%{type: 20} = message, state) do
@@ -272,112 +254,81 @@ defmodule ExRTMP.Server.ClientSession do
272254
end
273255
end
274256

275-
defp handle_create_stream_message(create_stream, %{state: :connected} = state) do
276-
transaction_id = create_stream.transaction_id
257+
defp handle_create_stream_message(create_stream, %{state: :connected, stream_id: nil} = state) do
258+
message =
259+
create_stream.transaction_id
260+
|> Response.ok(data: 1)
261+
|> Message.command()
262+
263+
{[message], %{state | stream_id: 1}}
264+
end
265+
266+
defp handle_create_stream_message(%{transaction_id: id}, state) do
267+
reason = if state.state != :connected, do: "Not Connected", else: "Stream Already Created"
268+
{[Message.command(Response.create_stream_failed(id, reason))], state}
269+
end
270+
271+
defp handle_publish_message(
272+
publish,
273+
stream_id,
274+
%{state: :connected, stream_id: stream_id} = state
275+
) do
276+
Logger.debug("Received publish command for #{publish.name} on stream: #{stream_id}")
277277

278-
case state.handler_mod.handle_create_stream(state.handler_state) do
278+
case state.handler_mod.handle_publish(publish.name, state.handler_state) do
279279
{:ok, handler_state} ->
280-
message =
281-
transaction_id
282-
|> Response.ok(data: state.next_stream_id)
283-
|> Message.command()
280+
state = %{state | handler_state: handler_state, state: :publishing}
284281

285-
state = %{
286-
state
287-
| handler_state: handler_state,
288-
next_stream_id: state.next_stream_id + 1,
289-
streams_state: Map.put(state.streams_state, state.next_stream_id, :created)
290-
}
282+
messages = [
283+
Message.stream_begin(stream_id),
284+
Message.command(OnStatus.publish_ok(), stream_id)
285+
]
291286

292-
{[message], state}
287+
{messages, state}
293288

294289
{:error, reason} ->
295-
{[Message.command(Response.create_stream_failed(transaction_id, reason))], state}
290+
{[Message.command(OnStatus.publish_failed(reason), stream_id)], state}
296291
end
297292
end
298293

299-
defp handle_create_stream_message(create_stream, state) do
300-
transaction_id = create_stream.transaction_id
301-
{[Message.command(Response.create_stream_failed(transaction_id, "Not Connected"))], state}
294+
defp handle_publish_message(_publish, stream_id, state) do
295+
{[Message.command(OnStatus.publish_bad_stream(), stream_id)], state}
302296
end
303297

304-
defp handle_publish_message(publish, stream_id, state) do
305-
stream_state = Map.get(state.streams_state, stream_id)
306-
307-
cond do
308-
is_nil(stream_state) ->
309-
{[Message.command(OnStatus.publish_bad_stream(), stream_id)], state}
310-
311-
stream_state != :created ->
312-
message = Message.command(OnStatus.publish_failed("Stream is #{stream_state}"), stream_id)
313-
{[message], state}
298+
defp handle_play_message(play, stream_id, %{state: :connected, stream_id: stream_id} = state) do
299+
Logger.debug("Received play command for #{play.name} on stream: #{stream_id}")
314300

315-
true ->
316-
case state.handler_mod.handle_publish(stream_id, publish.name, state.handler_state) do
317-
{:ok, handler_state} ->
318-
state = %{
319-
state
320-
| handler_state: handler_state,
321-
streams_state: Map.put(state.streams_state, stream_id, :publishing)
322-
}
301+
case state.handler_mod.handle_play(play, state.handler_state) do
302+
{:ok, handler_state} ->
303+
state = %{state | handler_state: handler_state, state: :playing}
323304

324-
messages = [
325-
Message.stream_begin(stream_id),
326-
Message.command(OnStatus.publish_ok(), stream_id)
327-
]
305+
messages = [
306+
Message.stream_begin(stream_id),
307+
Message.command(OnStatus.play_ok(), stream_id)
308+
]
328309

329-
{messages, state}
310+
{messages, state}
330311

331-
{:error, reason} ->
332-
{[Message.command(OnStatus.publish_failed(reason), stream_id)], state}
333-
end
312+
{:error, reason} ->
313+
{[Message.command(OnStatus.play_failed(reason), stream_id)], state}
334314
end
335315
end
336316

337-
defp handle_play_message(play, stream_id, state) do
338-
Logger.debug("Received play command for #{play.name} on stream: #{stream_id}")
339-
stream_state = Map.get(state.streams_state, stream_id)
340-
341-
cond do
342-
is_nil(stream_state) ->
343-
{[Message.command(OnStatus.play_bad_stream(), stream_id)], state}
344-
345-
stream_state != :created ->
346-
message = Message.command(OnStatus.play_failed("Stream is #{stream_state}"), stream_id)
347-
{[message], state}
348-
349-
true ->
350-
case state.handler_mod.handle_play(stream_id, play, state.handler_state) do
351-
{:ok, handler_state} ->
352-
state = %{
353-
state
354-
| handler_state: handler_state,
355-
streams_state: Map.put(state.streams_state, stream_id, :playing)
356-
}
357-
358-
messages = [
359-
Message.stream_begin(stream_id),
360-
Message.command(OnStatus.play_ok(), stream_id)
361-
]
362-
363-
{messages, state}
364-
365-
{:error, reason} ->
366-
{[Message.command(OnStatus.play_failed(reason), stream_id)], state}
367-
end
368-
end
317+
defp handle_play_message(_play, stream_id, state) do
318+
{[Message.command(OnStatus.play_bad_stream(), stream_id)], state}
369319
end
370320

371321
defp handle_delete_stream(stream_id, state) do
372-
Logger.debug("Received delete stream commad on stream: #{stream_id}")
322+
Logger.debug("Received delete stream command on stream: #{stream_id}")
373323

374-
state = %{
375-
state
376-
| streams_state: Map.delete(state.streams_state, stream_id),
377-
handler_state: state.handler_mod.handle_delete_stream(stream_id, state.handler_state)
378-
}
324+
case state.handler_mod.handle_delete_stream(state.handler_state) do
325+
:close ->
326+
send(self(), :exit)
327+
{[], state}
379328

380-
{[], state}
329+
handler_state ->
330+
{[], %{state | stream_id: nil, handler_state: handler_state}}
331+
end
381332
end
382333

383334
defp send_media(media, socket, stream_id, timestamp, data) do

0 commit comments

Comments
 (0)