lib/membrane/realtimer.ex

defmodule Membrane.Realtimer do
  @moduledoc """
  Sends buffers to the output in real time, according to buffers' timestamps.

  If buffers come in slower than realtime, they're sent as they come in.

  It can be reset by sending `%#{inspect(__MODULE__)}.Events.Reset{}` event on its input pad.
  """
  use Membrane.Filter

  alias __MODULE__.Events
  alias Membrane.{Buffer, Time}

  def_input_pad :input,
    accepted_format: _any,
    flow_control: :manual,
    demand_unit: :buffers

  def_output_pad :output,
    accepted_format: _any,
    flow_control: :push

  def_options max_latency: [
                spec: Time.non_neg(),
                default: 0,
                inspector: &Time.inspect/1,
                description: """
                This element will keep a part of the stream, of duration specified by this option,
                buffered. The purpose of this it to handle cases where the incoming stream can get
                lagged, for example when an element up the pipeline can get held up for some time
                and then produce all the "late" media at once. The buffered stream gives Realtimer some
                margin in waiting for this "late" media, so that the outgoing stream is still smooth.
                The initial accumulation of the buffer can introduce some latency, especially when
                the input stream is in realtime, but it will never exceed the amount provided via this
                option.
                """
              ]

  defmodule State do
    @moduledoc false

    defmodule Substream do
      @moduledoc false

      @type t :: %__MODULE__{
              action_batches_queue:
                Qex.t(%{timestamp: Time.non_neg(), actions: [Membrane.Element.Action.t()]}),
              reference_timestamps: %{
                absolute: Time.t(),
                stream: Time.t()
              }
            }

      @enforce_keys [:reference_timestamps]

      defstruct @enforce_keys ++
                  [
                    action_batches_queue: Qex.new()
                  ]
    end

    @type t :: %__MODULE__{
            max_latency: Time.non_neg(),
            offset: Time.non_neg() | :calculating,
            newest_timestamp: Time.non_neg(),
            currently_incoming_substream_id: non_neg_integer() | nil,
            start_of_stream_time: Time.t() | nil,
            initialize_new_substream?: boolean(),
            substreams: %{non_neg_integer() => Substream.t()}
          }

    @enforce_keys [:max_latency]

    defstruct @enforce_keys ++
                [
                  currently_incoming_substream_id: nil,
                  substreams: %{},
                  initialize_new_substream?: true,
                  offset: :calculating,
                  newest_timestamp: 0,
                  start_of_stream_time: nil
                ]
  end

  @impl true
  def handle_init(_ctx, opts) do
    {[], %State{max_latency: opts.max_latency}}
  end

  @impl true
  def handle_playing(_ctx, state) do
    {[demand: :input], state}
  end

  @impl true
  def handle_start_of_stream(:input, _ctx, %State{} = state) do
    Process.send_after(
      self(),
      :max_latency_passed,
      Time.as_milliseconds(state.max_latency, :round)
    )

    {[], %State{state | start_of_stream_time: Time.monotonic_time()}}
  end

  @impl true
  def handle_buffer(:input, buffer, ctx, %State{} = state) do
    buffer_timestamp = Buffer.get_dts_or_pts(buffer)

    state =
      if state.initialize_new_substream? do
        initialize_new_substream?(buffer_timestamp, state)
      else
        state
      end

    state = %State{state | newest_timestamp: buffer_timestamp}

    new_action_batch = %{timestamp: buffer_timestamp, actions: [buffer: {:output, buffer}]}

    state =
      update_in(
        state.substreams[state.currently_incoming_substream_id].action_batches_queue,
        &Qex.push(&1, new_action_batch)
      )

    buffered_stream_duration = calculate_buffered_stream_duration(state)

    state =
      cond do
        state.offset != :calculating ->
          :ok =
            schedule_action_batch(buffer_timestamp, state.currently_incoming_substream_id, state)

          state

        buffered_stream_duration >= state.max_latency ->
          lock_offset_and_schedule_all_queued_actions(state)

        true ->
          state
      end

    demand_action = maybe_demand(buffered_stream_duration, ctx, state)

    {demand_action, state}
  end

  @impl true
  def handle_info(:max_latency_passed, _ctx, %State{} = state) do
    state =
      if state.offset == :calculating do
        lock_offset_and_schedule_all_queued_actions(state)
      else
        state
      end

    {[], state}
  end

  @impl true
  def handle_info({:send_scheduled_action_batch, substream_id}, ctx, %State{} = state) do
    {oldest_action_batch, rest_of_action_batches} =
      Qex.pop!(state.substreams[substream_id].action_batches_queue)

    queued_actions = Enum.reverse(oldest_action_batch.actions)

    demand_action =
      calculate_buffered_stream_duration(state)
      |> maybe_demand(ctx, state)

    state =
      if substream_id != state.currently_incoming_substream_id and
           Qex.first(rest_of_action_batches) == :empty do
        update_in(state.substreams, &Map.delete(&1, substream_id))
      else
        put_in(state.substreams[substream_id].action_batches_queue, rest_of_action_batches)
      end

    {queued_actions ++ demand_action, state}
  end

  @impl true
  def handle_event(:input, %Events.Reset{}, _ctx, %State{} = state) do
    {[], %State{state | initialize_new_substream?: true}}
  end

  @impl true
  def handle_event(:output, event, _ctx, state) do
    {[forward: event], state}
  end

  @impl true
  def handle_event(:input, event, _ctx, %State{} = state) do
    enqueue_or_emit_action({:event, {:output, event}}, state)
  end

  @impl true
  def handle_stream_format(:input, stream_format, _ctx, %State{} = state) do
    enqueue_or_emit_action({:stream_format, {:output, stream_format}}, state)
  end

  @impl true
  def handle_end_of_stream(:input, _ctx, %State{} = state) do
    state =
      if state.offset == :calculating do
        lock_offset_and_schedule_all_queued_actions(state)
      else
        state
      end

    enqueue_or_emit_action({:end_of_stream, :output}, state)
  end

  @spec initialize_new_substream?(Time.t(), State.t()) :: State.t()
  defp initialize_new_substream?(buffer_timestamp, %State{} = state) do
    {absolute_reference_timestamp, new_substream_id} =
      if state.currently_incoming_substream_id == nil do
        {Time.monotonic_time(), 0}
      else
        previous_reference_timestamps =
          state.substreams[state.currently_incoming_substream_id].reference_timestamps

        previous_substream_total_duration =
          state.newest_timestamp - previous_reference_timestamps.stream

        {previous_reference_timestamps.absolute + previous_substream_total_duration,
         state.currently_incoming_substream_id + 1}
      end

    substream = %State.Substream{
      reference_timestamps: %{
        absolute: absolute_reference_timestamp,
        stream: buffer_timestamp
      }
    }

    %State{
      state
      | initialize_new_substream?: false,
        currently_incoming_substream_id: new_substream_id,
        substreams: Map.put(state.substreams, new_substream_id, substream)
    }
  end

  @spec calculate_buffered_stream_duration(State.t()) :: Membrane.Time.t()
  defp calculate_buffered_stream_duration(state) do
    Enum.sum_by(state.substreams, fn {_substream_id, substream} ->
      case {Qex.last(substream.action_batches_queue), Qex.first(substream.action_batches_queue)} do
        {:empty, :empty} ->
          0

        {{:value, latest_action_batch}, {:value, oldest_action_batch}} ->
          latest_action_batch.timestamp - oldest_action_batch.timestamp
      end
    end)
  end

  @spec maybe_demand(Time.t(), Membrane.Element.CallbackContext.t(), State.t()) ::
          [Membrane.Element.Action.demand()]
  defp maybe_demand(buffered_stream_duration, ctx, state) do
    if ctx.pads.input.end_of_stream? or ctx.pads.input.manual_demand_size > 0 or
         buffered_stream_duration > state.max_latency do
      []
    else
      [demand: :input]
    end
  end

  @spec schedule_action_batch(Time.t(), non_neg_integer(), State.t()) :: :ok
  defp schedule_action_batch(buffer_timestamp, substream_id, state) do
    substream = state.substreams[substream_id]

    buffer_relative_timestamp =
      buffer_timestamp - substream.reference_timestamps.stream

    target_time =
      substream.reference_timestamps.absolute + buffer_relative_timestamp + state.offset

    send_after_time =
      (target_time - Time.monotonic_time())
      |> max(0)
      |> Time.as_milliseconds(:round)

    Process.send_after(
      self(),
      {:send_scheduled_action_batch, substream_id},
      send_after_time
    )

    :ok
  end

  @spec lock_offset_and_schedule_all_queued_actions(State.t()) :: State.t()
  defp lock_offset_and_schedule_all_queued_actions(%State{} = state) do
    state = %State{state | offset: Time.monotonic_time() - state.start_of_stream_time}

    Enum.each(state.substreams, fn {substream_id, substream} ->
      Enum.each(
        substream.action_batches_queue,
        &schedule_action_batch(&1.timestamp, substream_id, state)
      )
    end)

    state
  end

  @spec enqueue_or_emit_action(Membrane.Element.Action.t(), State.t()) ::
          {[Membrane.Element.Action.t()], State.t()}
  defp enqueue_or_emit_action(action, state) do
    with current_substream when current_substream != nil <-
           Map.get(state.substreams, state.currently_incoming_substream_id),
         {{:value, latest_action_batch}, action_batches_queue} <-
           Qex.pop_back(current_substream.action_batches_queue) do
      latest_action_batch = update_in(latest_action_batch.actions, &[action | &1])

      action_batches_queue = Qex.push(action_batches_queue, latest_action_batch)

      state =
        put_in(
          state.substreams[state.currently_incoming_substream_id].action_batches_queue,
          action_batches_queue
        )

      {[], state}
    else
      _nothing_buffered ->
        {[action], state}
    end
  end
end