lib/membrane_opus/parser.ex

defmodule Membrane.Opus.Parser do
  @moduledoc """
  Parses a raw incoming Opus stream and adds caps information, as well as metadata.

  Adds the following metadata:

  duration :: non_neg_integer()
    Number of nanoseconds encoded in this packet
  """

  use Membrane.Filter

  alias __MODULE__.{Delimitation, FrameLengths}
  alias Membrane.{Buffer, Opus, RemoteStream}
  alias Membrane.Opus.Util

  @type delimitation_t :: :delimit | :undelimit | :keep

  def_options delimitation: [
                spec: delimitation_t(),
                default: :keep,
                description: """
                If input is delimitted? (as indicated by the `self_delimiting?`
                field in %Opus) and `:undelimit` is selected, will remove delimiting.

                If input is not delimitted? and `:delimit` is selected, will add delimiting.

                If `:keep` is selected, will not change delimiting.

                Otherwise will act like `:keep`.

                See https://tools.ietf.org/html/rfc6716#appendix-B for details
                on the self-delimiting Opus format.
                """
              ],
              input_delimitted?: [
                spec: boolean(),
                default: false,
                description: """
                If you know that the input is self-delimitted? but you're reading from
                some element that isn't sending the correct structure, you can set this
                to true to force the Parser to assume the input is self-delimitted? and
                ignore upstream caps information on self-delimitation.
                """
              ]

  def_input_pad :input,
    demand_unit: :buffers,
    demand_mode: :auto,
    caps: [Opus, {RemoteStream, content_format: one_of([Opus, nil])}]

  def_output_pad :output, caps: Opus, demand_mode: :auto

  @impl true
  def handle_init(%__MODULE__{} = options) do
    state =
      options
      |> Map.from_struct()
      |> Map.merge(%{
        pts: 0,
        buffer: <<>>
      })

    {:ok, state}
  end

  @impl true
  def handle_caps(:input, _caps, _ctx, state) do
    # ignore caps, they will be sent in handle_process
    {:ok, state}
  end

  @impl true
  def handle_process(:input, %Buffer{payload: data}, ctx, state) do
    {delimitation_processor, self_delimiting?} =
      Delimitation.get_processor(state.delimitation, state.input_delimitted?)

    case maybe_parse(
           state.buffer <> data,
           state.pts,
           state.input_delimitted?,
           delimitation_processor
         ) do
      {:ok, buffer, pts, packets, channels} ->
        caps = %Opus{
          self_delimiting?: self_delimiting?,
          channels: channels
        }

        packets_len = length(packets)

        packet_actions =
          cond do
            packets_len > 0 and caps != ctx.pads.output.caps ->
              [caps: {:output, caps}, buffer: {:output, packets}]

            packets_len > 0 ->
              [buffer: {:output, packets}]

            true ->
              []
          end

        {{:ok, packet_actions}, %{state | buffer: buffer, pts: pts}}

      :error ->
        {{:error, "An error occured in parsing"}, state}
    end
  end

  @spec maybe_parse(
          data :: binary,
          pts :: Membrane.Time.t(),
          input_delimitted? :: boolean,
          processor :: Delimitation.processor_t(),
          packets :: [Buffer.t()],
          channels :: 0..2
        ) ::
          {:ok, remaining_buffer :: binary, pts :: Membrane.Time.t(), packets :: [Buffer.t()],
           channels :: 0..2}
          | :error
  defp maybe_parse(data, pts, input_delimitted?, processor, packets \\ [], channels \\ 0)

  defp maybe_parse(data, pts, input_delimitted?, processor, packets, channels)
       when byte_size(data) > 0 do
    with {:ok, configuration_number, stereo_flag, frame_packing} <- Util.parse_toc_byte(data),
         channels <- max(channels, Util.parse_channels(stereo_flag)),
         {:ok, _mode, _bandwidth, frame_duration} <-
           Util.parse_configuration(configuration_number),
         {:ok, header_size, frame_lengths, padding_size} <-
           FrameLengths.parse(frame_packing, data, input_delimitted?),
         expected_packet_size <- header_size + Enum.sum(frame_lengths) + padding_size,
         {:ok, raw_packet, rest} <- rest_of_packet(data, expected_packet_size) do
      duration = elapsed_time(frame_lengths, frame_duration)

      packet = %Buffer{
        pts: pts,
        payload: processor.process(raw_packet, frame_lengths, header_size),
        metadata: %{
          duration: duration
        }
      }

      maybe_parse(
        rest,
        pts + duration,
        input_delimitted?,
        processor,
        [packet | packets],
        channels
      )
    else
      {:error, :cont} ->
        {:ok, data, pts, packets |> Enum.reverse(), channels}

      :error ->
        :error
    end
  end

  defp maybe_parse(data, pts, _input_delimitted?, _processor, packets, channels) do
    {:ok, data, pts, packets |> Enum.reverse(), channels}
  end

  @spec rest_of_packet(data :: binary, expected_packet_size :: pos_integer) ::
          {:ok, raw_packet :: binary, rest :: binary} | {:error, :cont}
  defp rest_of_packet(data, expected_packet_size) do
    case data do
      <<raw_packet::binary-size(expected_packet_size), rest::binary>> ->
        {:ok, raw_packet, rest}

      _otherwise ->
        {:error, :cont}
    end
  end

  @spec elapsed_time(frame_lengths :: [non_neg_integer], frame_duration :: pos_integer) ::
          elapsed_time :: Membrane.Time.non_neg_t()
  defp elapsed_time(frame_lengths, frame_duration) do
    # if a frame has length 0 it indicates a dropped frame and should not be
    # included in this calc
    present_frames = frame_lengths |> Enum.count(fn length -> length > 0 end)
    present_frames * frame_duration
  end
end