Skip to content

Commit 06c2526

Browse files
authored
Add server (support publish only) (#1)
* Add basic server * Add server handler behaviour * Build messages timestamp * Refactor message completion check * Add amf0 tests * test chunk stream
1 parent 336e7b5 commit 06c2526

18 files changed

Lines changed: 1243 additions & 0 deletions

File tree

lib/ex_rtmp/amf0.ex

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
defmodule ExRTMP.AMF0 do
2+
@moduledoc """
3+
AMF0 parser.
4+
"""
5+
6+
@spec parse(binary()) :: list()
7+
def parse(data), do: do_parse(data, [])
8+
9+
@spec serialize(any()) :: iodata()
10+
def serialize(true), do: <<0x01, 1>>
11+
def serialize(false), do: <<0x01, 0>>
12+
def serialize(nil), do: <<0x05>>
13+
def serialize(value) when is_number(value), do: <<0x00::8, value::float-64>>
14+
def serialize(atom) when is_atom(atom), do: serialize(to_string(atom))
15+
def serialize(str) when is_binary(str), do: <<0x02::8, byte_size(str)::16, str::binary>>
16+
17+
def serialize(map) when is_map(map) do
18+
pairs =
19+
Enum.map(map, fn {key, value} ->
20+
[<<byte_size(key)::16, key::binary>>, serialize(value)]
21+
end)
22+
23+
[0x03, pairs, 0, 0, 9]
24+
end
25+
26+
defp do_parse(<<>>, acc), do: Enum.reverse(acc)
27+
28+
defp do_parse(data, acc) do
29+
case parse_value(data) do
30+
{:object_start, rest} ->
31+
{obj, rest} = parse_object(rest, %{})
32+
do_parse(rest, [obj | acc])
33+
34+
{value, rest} ->
35+
do_parse(rest, [value | acc])
36+
end
37+
end
38+
39+
defp parse_value(<<0x00::8, number::float-64, rest::binary>>), do: {number, rest}
40+
defp parse_value(<<0x01::8, boolean::8, rest::binary>>), do: {boolean == 1, rest}
41+
defp parse_value(<<0x03::8, rest::binary>>), do: {:object_start, rest}
42+
defp parse_value(<<0x05::8, rest::binary>>), do: {nil, rest}
43+
44+
defp parse_value(<<0x02::8, str_len::16, str::binary-size(str_len), rest::binary>>),
45+
do: {str, rest}
46+
47+
defp parse_value(<<0x08::8, count::32, rest::binary>>) do
48+
{list, rest} =
49+
Enum.map_reduce(1..count, rest, fn _idx, rest ->
50+
{key, rest} = parse_object_key(rest)
51+
{value, rest} = parse_value(rest)
52+
{{key, value}, rest}
53+
end)
54+
55+
<<0, 0, 9, rest::binary>> = rest
56+
{list, rest}
57+
end
58+
59+
defp parse_value(<<0x0A::8, count::32, rest::binary>>) do
60+
Enum.map_reduce(1..count, rest, fn _idx, rest ->
61+
parse_value(rest)
62+
end)
63+
end
64+
65+
defp parse_object(rest, obj) do
66+
case parse_object_key(rest) do
67+
{:object_end, rest} ->
68+
{obj, rest}
69+
70+
{key, rest} ->
71+
{value, rest} = parse_value(rest)
72+
parse_object(rest, Map.put(obj, key, value))
73+
end
74+
end
75+
76+
defp parse_object_key(<<0::16, 0x09, rest::binary>>), do: {:object_end, rest}
77+
defp parse_object_key(<<str_len::16, str::binary-size(str_len), rest::binary>>), do: {str, rest}
78+
defp parse_object_key(_data), do: :error
79+
end

lib/ex_rtmp/chunk.ex

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
defmodule ExRTMP.Chunk do
2+
@moduledoc """
3+
RTMP Chunk structure
4+
"""
5+
6+
@type payload_size :: non_neg_integer()
7+
@type stream_id :: non_neg_integer()
8+
9+
@type t :: %__MODULE__{
10+
fmt: 0..3,
11+
stream_id: stream_id(),
12+
timestamp: non_neg_integer() | nil,
13+
message_length: non_neg_integer() | nil,
14+
message_type_id: non_neg_integer() | nil,
15+
message_stream_id: non_neg_integer() | nil,
16+
payload: binary() | nil
17+
}
18+
19+
defstruct [
20+
:fmt,
21+
:stream_id,
22+
:timestamp,
23+
:message_length,
24+
:message_type_id,
25+
:message_stream_id,
26+
:payload
27+
]
28+
29+
@spec parse_header(binary()) :: {:ok, t(), binary()} | :more
30+
def parse_header(data) do
31+
with {:ok, chunk, rest} <- parse_stream_id(data),
32+
{:ok, chunk, rest} <- parse_message_header(chunk, rest),
33+
{:ok, chunk, rest} <- parse_extended_timestamp(chunk, rest) do
34+
{:ok, chunk, rest}
35+
end
36+
end
37+
38+
@spec serialize(t()) :: iodata()
39+
def serialize(%__MODULE__{} = chunk) do
40+
timestamp = if chunk.timestamp, do: <<chunk.timestamp::24>>, else: <<>>
41+
length = if chunk.message_length, do: <<chunk.message_length::24>>, else: <<>>
42+
type_id = if chunk.message_type_id, do: <<chunk.message_type_id::8>>, else: <<>>
43+
44+
msg_stream_id =
45+
if chunk.message_stream_id, do: <<chunk.message_stream_id::32-little>>, else: <<>>
46+
47+
[
48+
<<chunk.fmt::2, encode_stream_id(chunk.stream_id)::bitstring>>,
49+
timestamp,
50+
length,
51+
type_id,
52+
msg_stream_id,
53+
chunk.payload
54+
]
55+
end
56+
57+
defp parse_stream_id(data) do
58+
case data do
59+
<<fmt::2, 0::6, cs_id::8, rest::binary>> ->
60+
{:ok, %__MODULE__{fmt: fmt, stream_id: cs_id + 64}, rest}
61+
62+
<<fmt::2, 1::6, cs_id::16, rest::binary>> ->
63+
{:ok, %__MODULE__{fmt: fmt, stream_id: cs_id + 64}, rest}
64+
65+
<<fmt::2, cs_id::6, rest::binary>> ->
66+
{:ok, %__MODULE__{fmt: fmt, stream_id: cs_id}, rest}
67+
68+
_ ->
69+
:more
70+
end
71+
end
72+
73+
defp parse_message_header(chunk, data) do
74+
case {chunk.fmt, data} do
75+
{0,
76+
<<timestamp::24, length::24, type_id::8, msg_stream_id::integer-32-little, rest::binary>>} ->
77+
chunk = %{
78+
chunk
79+
| timestamp: timestamp,
80+
message_length: length,
81+
message_type_id: type_id,
82+
message_stream_id: msg_stream_id
83+
}
84+
85+
{:ok, chunk, rest}
86+
87+
{1, <<timestamp::24, length::24, type_id::8, rest::binary>>} ->
88+
chunk = %{chunk | timestamp: timestamp, message_length: length, message_type_id: type_id}
89+
{:ok, chunk, rest}
90+
91+
{2, <<timestamp::24, rest::binary>>} ->
92+
{:ok, %{chunk | timestamp: timestamp}, rest}
93+
94+
{3, rest} ->
95+
{:ok, chunk, rest}
96+
97+
_ ->
98+
:more
99+
end
100+
end
101+
102+
defp parse_extended_timestamp(%{timestamp: 0xFFFFFF} = chunk, <<ts::32, rest::binary>>) do
103+
{:ok, %{chunk | timestamp: ts}, rest}
104+
end
105+
106+
defp parse_extended_timestamp(%{timestamp: 0xFFFFFF}, _rest), do: :more
107+
defp parse_extended_timestamp(chunk, data), do: {:ok, chunk, data}
108+
109+
defp encode_stream_id(stream_id) when stream_id in 2..63, do: <<stream_id::6>>
110+
defp encode_stream_id(stream_id) when stream_id in 64..319, do: <<0::6, stream_id - 64::8>>
111+
defp encode_stream_id(stream_id) when stream_id >= 320, do: <<1::6, stream_id - 64::16>>
112+
end

lib/ex_rtmp/chunk_parser.ex

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
defmodule ExRTMP.ChunkParser do
2+
@moduledoc false
3+
4+
alias ExRTMP.{Chunk, Message}
5+
6+
@type t :: %__MODULE__{
7+
unprocessed_data: binary(),
8+
messages: %{Chunk.stream_id() => Message.t()},
9+
chunk_size: non_neg_integer(),
10+
message_first_chunk: %{Chunk.stream_id() => tuple()}
11+
}
12+
13+
defstruct [:unprocessed_data, :messages, :message_first_chunk, chunk_size: 128]
14+
15+
@spec new() :: t()
16+
def new() do
17+
%__MODULE__{
18+
unprocessed_data: <<>>,
19+
messages: %{},
20+
message_first_chunk: %{}
21+
}
22+
end
23+
24+
@spec process(binary(), t()) :: {[Message.t()], t()}
25+
def process(data, parser) do
26+
do_process(parser.unprocessed_data <> data, parser)
27+
end
28+
29+
defp do_process(data, parser, acc \\ []) do
30+
with {:ok, chunk, rest} <- Chunk.parse_header(data),
31+
{chunk, parser} <- set_missing_fields(chunk, parser),
32+
payload_size <- get_chunk_payload_size(parser, chunk),
33+
<<payload::binary-size(payload_size), rest::binary>> <- rest do
34+
message = Map.get(parser.messages, chunk.stream_id, Message.new(chunk))
35+
36+
{parser, acc} =
37+
case Message.append(message, payload) do
38+
{:ok, message} ->
39+
parser = %{parser | messages: Map.delete(parser.messages, chunk.stream_id)}
40+
{parser, [message | acc]}
41+
42+
{:more, message} ->
43+
parser = %{parser | messages: Map.put(parser.messages, chunk.stream_id, message)}
44+
{parser, acc}
45+
end
46+
47+
do_process(rest, parser, acc)
48+
else
49+
_ ->
50+
{Enum.reverse(acc), %{parser | unprocessed_data: data}}
51+
end
52+
end
53+
54+
defp set_missing_fields(chunk, parser) when is_map_key(parser.messages, chunk.stream_id) do
55+
{chunk, parser}
56+
end
57+
58+
defp set_missing_fields(%{fmt: 0} = chunk, parser) do
59+
tuple =
60+
{chunk.timestamp, 0, chunk.message_length, chunk.message_type_id, chunk.message_stream_id}
61+
62+
message_first_chunk = Map.put(parser.message_first_chunk, chunk.stream_id, tuple)
63+
{chunk, %{parser | message_first_chunk: message_first_chunk}}
64+
end
65+
66+
defp set_missing_fields(chunk, parser) do
67+
{timestamp, timestamp_delta, message_size, message_type, message_stream_id} =
68+
Map.fetch!(parser.message_first_chunk, chunk.stream_id)
69+
70+
timestamp_delta = chunk.timestamp || timestamp_delta
71+
72+
chunk = %{
73+
chunk
74+
| timestamp: timestamp_delta + timestamp,
75+
message_length: chunk.message_length || message_size,
76+
message_type_id: chunk.message_type_id || message_type,
77+
message_stream_id: chunk.message_stream_id || message_stream_id
78+
}
79+
80+
message_first_chunk =
81+
Map.put(
82+
parser.message_first_chunk,
83+
chunk.stream_id,
84+
{chunk.timestamp, timestamp_delta, chunk.message_length, chunk.message_type_id,
85+
chunk.message_stream_id}
86+
)
87+
88+
{chunk, %{parser | message_first_chunk: message_first_chunk}}
89+
end
90+
91+
defp get_chunk_payload_size(parser, chunk) do
92+
remaining_bytes =
93+
if message = Map.get(parser.messages, chunk.stream_id),
94+
do: message.size - message.current_size,
95+
else: chunk.message_length
96+
97+
min(parser.chunk_size, remaining_bytes)
98+
end
99+
end

0 commit comments

Comments
 (0)