lib/membrane_mp4/muxer/isom.ex

defmodule Membrane.MP4.Muxer.ISOM do
  @moduledoc """
  Puts payloaded streams into an MPEG-4 container.

  Due to the structure of MPEG-4 containers, the muxer has to be used along with
  `Membrane.File.Sink` or any other sink that can handle `Membrane.File.SeekEvent`.
  The event is used to fill in `mdat` box size after processing all incoming buffers
  and, if `fast_start` is set to `true`, to insert `moov` box at the beginning of the file.
  """
  use Membrane.Filter

  alias Membrane.{Buffer, File, MP4, RemoteStream, Time}
  alias Membrane.MP4.{Container, FileTypeBox, MediaDataBox, MovieBox, Track}

  @ftyp FileTypeBox.assemble("isom", ["isom", "iso2", "avc1", "mp41"])
  @ftyp_size @ftyp |> Container.serialize!() |> byte_size()
  @mdat_header_size 8

  def_input_pad :input,
    demand_unit: :buffers,
    caps: Membrane.MP4.Payload,
    availability: :on_request

  def_output_pad :output, caps: {RemoteStream, type: :bytestream, content_format: MP4}

  def_options fast_start: [
                spec: boolean(),
                default: false,
                description: """
                Generates a container more suitable for streaming by allowing media players to start
                playback as soon as they start to receive its media data.

                When set to `true`, the container metadata (`moov` atom) will be placed before media
                data (`mdat` atom). The equivalent of FFmpeg's `-movflags faststart` option.
                """
              ],
              chunk_duration: [
                spec: Time.t(),
                default: Time.seconds(1),
                description: """
                Expected duration of each chunk in the resulting MP4 container.

                Once the total duration of samples received on one of the input pads exceeds
                that threshold, a chunk containing these samples is flushed. Interleaving chunks
                belonging to different tracks may have positive impact on performance of media players.
                """
              ]

  @impl true
  def handle_init(options) do
    state =
      options
      |> Map.from_struct()
      |> Map.merge(%{
        mdat_size: 0,
        next_track_id: 1,
        pad_order: [],
        pad_to_track: %{}
      })

    {:ok, state}
  end

  @impl true
  def handle_pad_added(Pad.ref(:input, pad_ref), _ctx, state) do
    {track_id, state} = Map.get_and_update!(state, :next_track_id, &{&1, &1 + 1})

    state =
      state
      |> Map.update!(:pad_order, &[pad_ref | &1])
      |> put_in([:pad_to_track, pad_ref], track_id)

    {:ok, state}
  end

  @impl true
  def handle_caps(Pad.ref(:input, pad_ref) = pad, %Membrane.MP4.Payload{} = caps, ctx, state) do
    cond do
      # Handle receiving very first caps on the given pad
      is_nil(ctx.pads[pad].caps) ->
        update_in(state, [:pad_to_track, pad_ref], fn track_id ->
          caps
          |> Map.take([:width, :height, :content, :timescale])
          |> Map.put(:id, track_id)
          |> Track.new()
        end)

      # Handle receiving all but first caps on the given pad when
      # inband_parameters? are allowed or caps are duplicated - ignore
      Map.get(ctx.pads[pad].caps.content, :inband_parameters?, false) ||
          ctx.pads[pad].caps == caps ->
        state

      # otherwise we can assume that output will be corrupted
      true ->
        raise("ISOM Muxer doesn't support variable parameters")
    end
    |> then(&{:ok, &1})
  end

  @impl true
  def handle_prepared_to_playing(_ctx, state) do
    # dummy mdat header will be overwritten in `finalize_mp4/1`, when media data size is known
    header = [@ftyp, MediaDataBox.assemble(<<>>)] |> Enum.map_join(&Container.serialize!/1)

    actions = [
      caps: {:output, %RemoteStream{content_format: MP4}},
      buffer: {:output, %Buffer{payload: header}}
    ]

    {{:ok, actions}, state}
  end

  @impl true
  def handle_demand(:output, _size, :buffers, _ctx, state) do
    next_ref = hd(state.pad_order)

    {{:ok, demand: {Pad.ref(:input, next_ref), 1}}, state}
  end

  @impl true
  def handle_process(Pad.ref(:input, pad_ref), buffer, _ctx, state) do
    {maybe_buffer, state} =
      state
      |> update_in([:pad_to_track, pad_ref], &Track.store_sample(&1, buffer))
      |> maybe_flush_chunk(pad_ref)

    {{:ok, maybe_buffer ++ [redemand: :output]}, state}
  end

  @impl true
  def handle_end_of_stream(Pad.ref(:input, pad_ref), _ctx, state) do
    {buffer, state} = do_flush_chunk(state, pad_ref)
    state = Map.update!(state, :pad_order, &List.delete(&1, pad_ref))

    if state.pad_order != [] do
      {{:ok, buffer ++ [redemand: :output]}, state}
    else
      actions = finalize_mp4(state)
      {{:ok, buffer ++ actions ++ [end_of_stream: :output]}, state}
    end
  end

  defp maybe_flush_chunk(state, pad_ref) do
    use Ratio, comparison: true
    track = get_in(state, [:pad_to_track, pad_ref])

    if Track.current_chunk_duration(track) >= state.chunk_duration do
      do_flush_chunk(state, pad_ref)
    else
      {[], state}
    end
  end

  defp do_flush_chunk(state, pad_ref) do
    {chunk, track} =
      state
      |> get_in([:pad_to_track, pad_ref])
      |> Track.flush_chunk(chunk_offset(state))

    state =
      state
      |> Map.put(:mdat_size, state.mdat_size + byte_size(chunk))
      |> put_in([:pad_to_track, pad_ref], track)
      |> Map.update!(:pad_order, &shift_left/1)

    {[buffer: {:output, %Buffer{payload: chunk}}], state}
  end

  defp finalize_mp4(state) do
    movie_box = state.pad_to_track |> Map.values() |> Enum.sort_by(& &1.id) |> MovieBox.assemble()
    after_ftyp = {:bof, @ftyp_size}
    mdat_total_size = @mdat_header_size + state.mdat_size

    update_mdat_actions = [
      event: {:output, %File.SeekEvent{position: after_ftyp}},
      buffer: {:output, %Buffer{payload: <<mdat_total_size::32>>}}
    ]

    if state.fast_start do
      moov = movie_box |> prepare_for_fast_start() |> Container.serialize!()

      update_mdat_actions ++
        [
          event: {:output, %File.SeekEvent{position: after_ftyp, insert?: true}},
          buffer: {:output, %Buffer{payload: moov}}
        ]
    else
      moov = Container.serialize!(movie_box)

      [buffer: {:output, %Buffer{payload: moov}}] ++ update_mdat_actions
    end
  end

  defp prepare_for_fast_start(movie_box) do
    movie_box_size = movie_box |> Container.serialize!() |> byte_size()
    movie_box_children = get_in(movie_box, [:moov, :children])

    # updates all `trak` boxes by adding `movie_box_size` to the offset of each chunk in their sample tables
    track_boxes_with_offset =
      movie_box_children
      |> Keyword.get_values(:trak)
      |> Enum.map(fn trak ->
        Container.update_box(
          trak.children,
          [:mdia, :minf, :stbl, :stco],
          [:fields, :entry_list],
          &Enum.map(&1, fn %{chunk_offset: offset} -> %{chunk_offset: offset + movie_box_size} end)
        )
      end)
      |> Enum.map(&{:trak, %{children: &1, fields: %{}}})

    # replaces all `trak` boxes with the ones with updated chunk offsets
    movie_box_children
    |> Keyword.delete(:trak)
    |> Keyword.merge(track_boxes_with_offset)
    |> then(&[moov: %{children: &1, fields: %{}}])
  end

  defp chunk_offset(%{mdat_size: mdat_size}),
    do: @ftyp_size + @mdat_header_size + mdat_size

  defp shift_left([]), do: []

  defp shift_left([first | rest]), do: rest ++ [first]
end