lib/membrane/rtp/header_generator.ex

defmodule Membrane.RTP.HeaderGenerator do
  @moduledoc """
  Given following RTP payloads and their minimal metadata, creates their proper header information,
  incrementing timestamps and sequence numbers for each packet. Header information then is put
  inside buffer's metadata under `:rtp` key.

  Accepts the following metadata under `:rtp` key: `:marker`, `:csrcs`, `:extensions`.
  See `Membrane.RTP.Header` for their meaning and specifications.
  """
  use Membrane.Filter

  require Bitwise
  alias Membrane.RTP

  @max_seq_num Bitwise.bsl(1, 16) - 1
  @max_timestamp Bitwise.bsl(1, 32) - 1

  def_input_pad :input, accepted_format: RTP, demand_mode: :auto

  def_output_pad :output, accepted_format: RTP, demand_mode: :auto

  def_options ssrc: [spec: RTP.ssrc_t()],
              payload_type: [spec: RTP.payload_type_t()],
              clock_rate: [spec: RTP.clock_rate_t()]

  defmodule State do
    @moduledoc false
    use Bunch.Access

    defstruct [
      :ssrc,
      :payload_type,
      :clock_rate,
      sequence_number: 0,
      init_timestamp: 0
    ]

    @type t :: %__MODULE__{
            ssrc: RTP.ssrc_t(),
            payload_type: RTP.payload_type_t(),
            clock_rate: RTP.clock_rate_t(),
            sequence_number: non_neg_integer(),
            init_timestamp: non_neg_integer()
          }
  end

  @impl true
  def handle_init(_ctx, options) do
    state =
      options
      |> Map.from_struct()
      |> Map.merge(%{
        sequence_number: Enum.random(0..@max_seq_num),
        init_timestamp: Enum.random(0..@max_timestamp)
      })

    {[], struct!(State, state)}
  end

  @impl true
  def handle_stream_format(:input, _stream_format, _ctx, state) do
    {[stream_format: {:output, %RTP{}}], state}
  end

  @impl true
  def handle_process(:input, buffer, _ctx, state) do
    {rtp_metadata, metadata} = Map.pop(buffer.metadata, :rtp, %{})

    rtp_offset =
      buffer.pts
      |> Ratio.mult(state.clock_rate)
      |> Membrane.Time.round_to_seconds()

    rtp_timestamp = rem(state.init_timestamp + rtp_offset, @max_timestamp + 1)

    state = Map.update!(state, :sequence_number, &rem(&1 + 1, @max_seq_num + 1))

    header = %{
      ssrc: state.ssrc,
      marker: Map.get(rtp_metadata, :marker, false),
      payload_type: state.payload_type,
      timestamp: rtp_timestamp,
      sequence_number: state.sequence_number,
      csrcs: Map.get(rtp_metadata, :csrcs, []),
      extensions: Map.get(rtp_metadata, :extensions, [])
    }

    buffer = %Membrane.Buffer{
      buffer
      | metadata: Map.put(metadata, :rtp, header),
        payload: buffer.payload
    }

    {[buffer: {:output, buffer}], state}
  end
end