lib/membrane_flv_plugin/demuxer.ex

defmodule Membrane.FLV.Demuxer do
  @moduledoc """
  Element for demuxing FLV streams into audio and video streams.
  FLV format supports only one video and audio stream.
  They are optional however, FLV without either audio or video is also possible.

  When a new FLV stream is detected, you will be notified with `Membrane.FLV.Demuxer.new_stream_notification()`.

  If you want to pre-link the pipeline and skip handling notifications, make sure use the following output pads:
  - `Pad.ref(:audio, 0)` for audio stream
  - `Pad.ref(:video, 0)` for video stream
  """
  use Membrane.Filter
  use Bunch

  require Membrane.Logger

  alias Membrane.{Buffer, FLV}
  alias Membrane.FLV.Parser
  alias Membrane.RemoteStream

  @typedoc """
  Type of notification that is sent when a new FLV stream is detected.
  """
  @type new_stream_notification_t() :: {:new_stream, Membrane.Pad.ref_t(), codec_t()}

  @typedoc """
  List of formats supported by the demuxer.

  For video, only H264 is supported
  Audio codecs other than AAC might not work correctly, although they won't throw any errors.
  """
  @type codec_t() :: FLV.audio_codec_t() | :H264

  def_input_pad :input,
    availability: :always,
    caps:
      {RemoteStream, content_format: Membrane.Caps.Matcher.one_of([nil, FLV]), type: :bytestream},
    mode: :pull,
    demand_unit: :buffers

  def_output_pad :audio,
    availability: :on_request,
    caps: [RemoteStream, Membrane.AAC.RemoteStream],
    mode: :pull

  def_output_pad :video,
    availability: :on_request,
    caps: {Membrane.H264.RemoteStream, stream_format: :byte_stream},
    mode: :pull

  @impl true
  def handle_init(_opts) do
    {:ok, %{partial: <<>>, pads_buffer: %{}, aac_asc: <<>>, header_present?: true}}
  end

  @impl true
  def handle_prepared_to_playing(_ctx, state) do
    {{:ok, demand: :input}, state}
  end

  @impl true
  def handle_demand(_pad, size, :buffers, _ctx, state) do
    {{:ok, demand: {:input, size}}, state}
  end

  @impl true
  def handle_caps(_pad, _caps, _context, state), do: {:ok, state}

  @impl true
  def handle_process(:input, %Buffer{payload: payload}, _ctx, %{header_present?: true} = state) do
    case Membrane.FLV.Parser.parse_header(state.partial <> payload) do
      {:ok, _header, rest} ->
        {{:ok, demand: :input}, %{state | partial: rest, header_present?: false}}

      {:error, :not_enough_data} ->
        {{:ok, demand: :input}, %{state | partial: state.partial <> payload}}

      {:error, :not_a_header} ->
        raise("Invalid data detected on the input. Expected FLV header")
    end
  end

  @impl true
  def handle_process(:input, %Buffer{payload: payload}, _ctx, %{header_present?: false} = state) do
    case Parser.parse_body(state.partial <> payload) do
      {:ok, frames, rest} ->
        {actions, state} = get_actions(frames, state)
        actions = Enum.concat(actions, demand: :input)
        {{:ok, actions}, %{state | partial: rest}}

      {:error, :not_enough_data} ->
        {{:ok, demand: :input}, %{state | partial: state.partial <> payload}}
    end
  end

  @impl true
  def handle_pad_added(pad, _ctx, state) do
    actions = Map.get(state.pads_buffer, pad, []) |> Enum.to_list()
    state = put_in(state, [:pads_buffer, pad], :connected)
    {{:ok, actions}, state}
  end

  @impl true
  def handle_end_of_stream(:input, _ctx, state) do
    result =
      state.pads_buffer
      |> Enum.map(fn {pad, value} ->
        if value == :connected do
          {[end_of_stream: pad], {pad, value}}
        else
          {[], {pad, Qex.push(value, {:end_of_stream, pad})}}
        end
      end)

    actions = Enum.flat_map(result, &elem(&1, 0))
    pads_buffer = Enum.map(result, &elem(&1, 1)) |> Enum.into(%{})

    {{:ok, actions}, %{state | pads_buffer: pads_buffer}}
  end

  defp get_actions(frames, original_state) do
    Enum.reduce(frames, {[], original_state}, fn %{type: type} = packet, {actions, state} ->
      pad = pad(packet)

      pts = Membrane.Time.milliseconds(packet.pts)
      dts = Membrane.Time.milliseconds(packet.dts)

      cond do
        type == :audio_config and packet.codec == :AAC ->
          Membrane.Logger.debug("Audio configuration received")
          {:caps, {pad, %Membrane.AAC.RemoteStream{audio_specific_config: packet.payload}}}

        type == :audio_config ->
          [
            caps: {pad, %RemoteStream{content_format: packet.codec}},
            buffer: {pad, %Buffer{pts: pts, dts: dts, payload: get_payload(packet, state)}}
          ]

        type == :video_config and packet.codec == :H264 ->
          Membrane.Logger.debug("Video configuration received")

          {:caps,
           {pad,
            %Membrane.H264.RemoteStream{
              decoder_configuration_record: packet.payload,
              stream_format: :byte_stream
            }}}

        true ->
          buffer = %Buffer{pts: pts, dts: dts, payload: get_payload(packet, state)}
          {:buffer, {pad, buffer}}
      end
      |> buffer_or_send(packet, state)
      |> then(fn {out_actions, state} -> {actions ++ out_actions, state} end)
    end)
  end

  defp buffer_or_send(actions, packet, state) when is_list(actions) do
    Enum.reduce(actions, {[], state}, fn action, {actions, state} ->
      {out_actions, state} = buffer_or_send(action, packet, state)
      {actions ++ out_actions, state}
    end)
  end

  defp buffer_or_send(action, packet, state) when not is_list(action) do
    pad = pad(packet)

    cond do
      match?(%{^pad => :connected}, state.pads_buffer) ->
        {Bunch.listify(action), state}

      Map.has_key?(state.pads_buffer, pad(packet)) ->
        state = update_in(state, [:pads_buffer, pad(packet)], &Qex.push(&1, action))
        {[], state}

      true ->
        state = put_in(state, [:pads_buffer, pad(packet)], Qex.new([action]))
        {notify_about_new_stream(packet), state}
    end
  end

  defp get_payload(%FLV.Packet{type: :video, codec: :H264} = packet, _state) do
    Membrane.AVC.Utils.to_annex_b(packet.payload)
  end

  defp get_payload(packet, _state), do: packet.payload

  defp notify_about_new_stream(packet) do
    [notify: {:new_stream, pad(packet), packet.codec}]
  end

  defp pad(%FLV.Packet{type: type, stream_id: stream_id}) when type in [:audio_config, :audio],
    do: Pad.ref(:audio, stream_id)

  defp pad(%FLV.Packet{type: type, stream_id: stream_id}) when type in [:video_config, :video],
    do: Pad.ref(:video, stream_id)
end