defmodule Membrane.RTP.VP8.Payloader do
@moduledoc """
Payloads VP8 frames into RTP packets according to: https://tools.ietf.org/html/rfc7741
"""
use Membrane.Filter
alias Membrane.VP8
alias Membrane.{Buffer, RemoteStream, RTP}
# s-bit set and partition index equal to 0
@first_fragment_descriptor <<16>>
# s-bit is 0 as well as partition index
@following_fragment_descriptor <<0>>
def_options max_payload_size: [
spec: non_neg_integer(),
default: 1400,
description: """
Maximal size of outputted payloads in bytes. RTP packet will contain VP8 payload descriptor which can have max: 6B.
The resulting RTP packet will also RTP header (min 12B). After adding UDP header (8B), IPv4 header(min 20B, max 60B)
everything should fit in standard MTU size (1500B)
"""
],
payload_descriptor_type: [
spec: :simple,
default: :simple,
description: """
When set to :simple payloader will generate only minimal payload descriptors required for fragmentation.
More complex payload descriptors are not yet supported so this option should be left as default.
"""
]
def_output_pad :output, caps: RTP
def_input_pad :input,
caps: {RemoteStream, content_format: VP8, type: :packetized},
demand_unit: :buffers
defmodule State do
@moduledoc false
defstruct [
:max_payload_size
]
end
@impl true
def handle_init(options), do: {:ok, Map.merge(%State{}, Map.from_struct(options))}
@impl true
def handle_caps(:input, _caps, _context, state) do
{:ok, state}
end
@impl true
def handle_prepared_to_playing(_ctx, state) do
{{:ok, caps: {:output, %RTP{}}}, state}
end
@impl true
def handle_demand(:output, size, :buffers, _ctx, state) do
{{:ok, demand: {:input, size}}, state}
end
@impl true
def handle_process(
:input,
buffer,
_ctx,
state
) do
%Buffer{metadata: metadata, payload: payload} = buffer
chunk_count = ceil(byte_size(payload) / state.max_payload_size)
max_chunk_size = ceil(byte_size(payload) / chunk_count)
buffers =
payload
|> Bunch.Binary.chunk_every_rem(max_chunk_size)
|> add_descriptors()
|> Enum.map(
&%Buffer{
metadata: Bunch.Struct.put_in(metadata, [:rtp], %{marker: false}),
payload: &1
}
)
|> List.update_at(-1, &Bunch.Struct.put_in(&1, [:metadata, :rtp, :marker], true))
{{:ok, [buffer: {:output, buffers}, redemand: :output]}, state}
end
defp add_descriptors({chunks, last_chunk}) do
chunks = if byte_size(last_chunk) > 0, do: chunks ++ [last_chunk], else: chunks
[first_chunk | rest] = chunks
[@first_fragment_descriptor <> first_chunk] ++
Enum.map(rest, &(@following_fragment_descriptor <> &1))
end
end