defmodule Membrane.RTP.VP8.Depayloader do
@moduledoc """
Depayloads VP8 frames from RTP packets according to: https://tools.ietf.org/html/rfc7741
"""
use Membrane.Filter
use Membrane.Log
alias Membrane.VP8
alias Membrane.RTP.VP8.Frame
alias Membrane.{Buffer, RemoteStream, RTP}
alias Membrane.Event.Discontinuity
@type sequence_number :: 0..65_535
def_output_pad :output, caps: {RemoteStream, content_format: VP8, type: :packetized}
def_input_pad :input, caps: RTP, demand_unit: :buffers
defmodule State do
@moduledoc false
@type t :: %__MODULE__{
frame_acc: Frame.t(),
first_buffer_metadata: nil | %{}
}
defstruct frame_acc: %Frame{}, first_buffer_metadata: nil
end
@impl true
def handle_init(_options), do: {:ok, %State{}}
@impl true
def handle_caps(:input, _caps, _context, state) do
caps = %RemoteStream{content_format: VP8, type: :packetized}
{{:ok, caps: {:output, caps}}, state}
end
@impl true
def handle_demand(:output, size, :buffers, _ctx, state),
do: {{:ok, demand: {:input, size}}, state}
@impl true
def handle_event(:input, %Discontinuity{} = event, _ctx, state),
do: {{:ok, forward: event}, %State{state | frame_acc: %Frame{}}}
@impl true
def handle_process(:input, buffer, _ctx, state) do
state = %{state | first_buffer_metadata: state.first_buffer_metadata || buffer.metadata}
case parse_buffer(buffer, state) do
{{:ok, actions}, new_state} ->
{{:ok, actions ++ [redemand: :output]}, new_state}
{:error, reason} ->
log_malformed_buffer(buffer, reason)
{{:ok, redemand: :output}, %State{}}
end
end
@impl true
def handle_end_of_stream(:input, _ctx, state) do
{frame, _acc} = Frame.flush(state.frame_acc)
{{:ok,
[
buffer: {:output, %Buffer{metadata: state.first_buffer_metadata, payload: frame}},
end_of_stream: :output
]}, %State{}}
end
defp parse_buffer(buffer, state) do
case Frame.parse(buffer, state.frame_acc) do
{:ok, :incomplete, acc} ->
{{:ok, []}, %State{state | frame_acc: acc}}
{:ok, frame, acc} ->
{{:ok,
[buffer: {:output, %{buffer | payload: frame, metadata: state.first_buffer_metadata}}]},
%State{state | frame_acc: acc, first_buffer_metadata: buffer.metadata}}
{:error, _} = error ->
error
end
end
defp log_malformed_buffer(packet, reason) do
warn("""
An error occurred while parsing RTP packet.
Reason: #{reason}
Packet: #{inspect(packet, limit: :infinity)}
""")
end
end