lib/membrane/timestamp_queue.ex

defmodule Membrane.TimestampQueue do
  @moduledoc """
  Implementation of a queue, that accepts:
   - Membrane buffers
   - events
   - stream formats
   - end of streams
  from various pads. Items in queue are sorted according to their timestamps.

  Moreover, #{inspect(__MODULE__)} is able to manage demand of pads, based on the amount of buffers
  from each pad currently stored in the queue.
  """

  use Bunch.Access

  alias Membrane.{Buffer, Event, Pad, StreamFormat}
  alias Membrane.Element.Action

  @typep pad_queue :: %{
           timestamp_offset: integer(),
           qex: Qex.t(),
           buffers_size: non_neg_integer(),
           buffers_number: non_neg_integer(),
           end_of_stream?: boolean(),
           use_pts?: boolean() | nil,
           max_timestamp_on_qex: Membrane.Time.t() | nil,
           timestamps_qex: Qex.t() | nil
         }

  @typedoc """
  A queue, that accepts buffers, stream formats and events from various pads and sorts them based on
  their timestamps.
  """
  @opaque t :: %__MODULE__{
            current_queue_time: Membrane.Time.t(),
            pause_demand_boundary: pos_integer() | :infinity,
            metric_unit: :buffers | :bytes | :time,
            pad_queues: %{optional(Pad.ref()) => pad_queue()},
            pads_heap: Heap.t(),
            blocking_registered_pads: MapSet.t(Pad.ref()),
            registered_pads_offsets: %{optional(Pad.ref()) => integer()},
            # :awaiting_pads contain at most one element at the time
            awaiting_pads: [Pad.ref()],
            closed?: boolean(),
            chunk_duration: nil | Membrane.Time.t(),
            chunk_full?: boolean(),
            next_chunk_boundary: nil | Membrane.Time.t(),
            synchronization_strategy: :synchronize_on_arrival | :explicit_offsets,
            known_pads: MapSet.t(Pad.ref())
          }

  defstruct current_queue_time: Membrane.Time.seconds(0),
            pause_demand_boundary: :infinity,
            metric_unit: :buffers,
            pad_queues: %{},
            pads_heap: Heap.max(),
            blocking_registered_pads: MapSet.new(),
            registered_pads_offsets: %{},
            awaiting_pads: [],
            closed?: false,
            chunk_duration: nil,
            chunk_full?: false,
            next_chunk_boundary: nil,
            synchronization_strategy: :synchronize_on_arrival,
            known_pads: MapSet.new()

  @typedoc """
  Value passed to `:pause_demand_boundary` option in `new/1`.

  Specyfies, what amount of buffers associated with a specific pad must be stored in the queue, to pause auto demand.

  Is a two-element tuple, which
    - the first element specifies metric, in which boundary is expressed (default to `:buffers`)
    - the second element is the boundary (default to `1000`).
  """
  @type pause_demand_boundary ::
          {:buffers | :bytes, pos_integer() | :infinity} | {:time, Membrane.Time.t()}

  @typedoc """
  Options passed to `#{inspect(__MODULE__)}.new/1`.

  Following options are allowed:
    - `:pause_demand_boundary` - `t:pause_demand_boundary/0`. Defaults to `{:buffers, 1000}`.
    - `:chunk_duration` - `t:Membrane.Time.t/0`. Specifies how long the fragments returned by
      `#{inspect(__MODULE__)}.pop_chunked/1` will be approximately. If not set, popping chunks will not be available.
    - `:synchronization_strategy` - `:synchronize_on_arrival` or `:exact_timestamps` (default to `:synchronize_on_arrival`).
      Specyfies, how items from different pads will be synchronized with each other. If it is set to:
      * `:synchronize_on_arrival` - in the moment of the arrival of the first buffer from a specific pad, there will be
        caluclated timestamp offset for this pad. These offsets will be added to the buffers timestamps, to caluclate from
        which pad items should be returned in the first order. Every offset will be calculated in such a way that the first
        buffer from a new pad will be returned as the next item.
      * `:explicit_offsets` - buffers from various pads will be sorted based on their timestamps and pads offsets. Pads
        offsets can be set using `#{inspect(__MODULE__)}.register_pad/3` function. If pad offset is not explicitly set
        before the first buffer from this pad, it will be equal 0.
  """
  @type options :: [
          pause_demand_boundary: pause_demand_boundary(),
          chunk_duration: Membrane.Time.t(),
          synchronization_strategy: :synchronize_on_arrival | :explicit_offsets
        ]

  @spec new(options) :: t()
  def new(options \\ []) do
    [
      chunk_duration: chunk_duration,
      pause_demand_boundary: {unit, boundary},
      synchronization_strategy: synchronization_strategy
    ] =
      options
      |> Keyword.validate!(
        chunk_duration: nil,
        pause_demand_boundary: {:buffers, 1000},
        synchronization_strategy: :synchronize_on_arrival
      )
      |> Enum.sort()

    %__MODULE__{
      pause_demand_boundary: boundary,
      metric_unit: unit,
      chunk_duration: chunk_duration,
      synchronization_strategy: synchronization_strategy
    }
  end

  @typedoc """
  Options passed to `#{inspect(__MODULE__)}.register_pad/3`.

  Following options are allowed:
    - `:wait_on_buffers?` - `boolean()`, default to `true`. Specyfies, if the queue will wait with returning buffers
      in `pop_*` functions, until it receives the first buffer from a pad passed as a second argument to the function.
    - `:timestamp_offset` - integer. Specyfies, what will be the timestamp offset of a pad passed as a second argument
      to the function. Allowed only if `#{inspect(__MODULE__)}` synchronization strategy is `:explicit_offsets`.
  """
  @type register_pad_options :: [
          timestamp_offset: integer(),
          wait_on_buffers?: boolean()
        ]

  @doc """
  Registers an input pad in the queue without pushing anything on that pad.

  Once a pad is registered with option `wait_on_buffers?: true` (default), the `pop_available_items/3` function won't
  return any buffers until a `buffer` or `end_of_stream` is available on the registered pad.

  Pushing a buffer on an unregistered pad automatically registers it.
  """
  @spec register_pad(t(), Pad.ref(), register_pad_options()) :: t()
  def register_pad(%__MODULE__{} = timestamp_queue, pad_ref, opts \\ []) do
    [timestamp_offset: offset, wait_on_buffers?: wait?] =
      opts
      |> Keyword.validate!(timestamp_offset: nil, wait_on_buffers?: true)
      |> Enum.sort()

    if offset != nil and timestamp_queue.synchronization_strategy == :synchronize_on_arrival do
      raise """
      Option :timestamp_offset in #{inspect(__MODULE__)}.register_pad/3 cannot be set if #{inspect(__MODULE__)} \
      synchronization strategy is :synchronize_on_arrival (default).
      """
    end

    with %{timestamp_offset: offset} when offset != nil <-
           Map.get(timestamp_queue.pad_queues, pad_ref) do
      raise """
      Cannot register pad `#{inspect(pad_ref)}, because buffers from it are already in `#{inspect(__MODULE__)}. \
      Every pad can be registered only before pushing the first buffer from it on the queue.
      """
    end

    timestamp_queue =
      if(wait?, do: [:blocking_registered_pads, :known_pads], else: [:known_pads])
      |> Enum.reduce(timestamp_queue, fn field_name, timestamp_queue ->
        Map.update!(timestamp_queue, field_name, &MapSet.put(&1, pad_ref))
      end)

    if offset != nil,
      do: put_in(timestamp_queue, [:registered_pads_offsets, pad_ref], offset),
      else: timestamp_queue
  end

  @doc """
  Pushes a buffer associated with a specified pad to the queue.

  Returns a suggested actions list and the updated queue.

  If the amount of buffers associated with the specified pad in the queue just exceded
  `pause_demand_boundary`, the suggested actions list contains `t:Membrane.Action.pause_auto_demand()`
  action, otherwise it is equal an empty list.

  Buffers pushed to the queue must have a non-`nil` `dts` or `pts`.
  """
  @spec push_buffer(t(), Pad.ref(), Buffer.t()) :: {[Action.pause_auto_demand()], t()}
  def push_buffer(_timestamp_queue, pad_ref, %Buffer{dts: nil, pts: nil} = buffer) do
    raise """
    #{inspect(__MODULE__)} accepts only buffers whose dts or pts is not nil, but it received\n#{inspect(buffer, pretty: true)}
    from pad #{inspect(pad_ref)}
    """
  end

  def push_buffer(%__MODULE__{} = timestamp_queue, pad_ref, buffer) do
    {actions, timestamp_queue} =
      timestamp_queue
      |> Map.update!(:pad_queues, &Map.put_new_lazy(&1, pad_ref, fn -> new_pad_queue() end))
      |> get_and_update_in([:pad_queues, pad_ref], fn pad_queue ->
        old_buffers_size = pad_queue.buffers_size

        pad_queue =
          pad_queue
          |> Map.update!(:buffers_number, &(&1 + 1))
          |> maybe_handle_first_buffer(pad_ref, buffer, timestamp_queue)
          |> increase_buffers_size(buffer, timestamp_queue.metric_unit)
          |> check_timestamps_consistency!(buffer, pad_ref)

        boundary = timestamp_queue.pause_demand_boundary

        actions =
          if pad_queue.buffers_size >= boundary and old_buffers_size < boundary,
            do: [pause_auto_demand: pad_ref],
            else: []

        {actions, pad_queue}
      end)

    pad_queue = timestamp_queue.pad_queues |> Map.get(pad_ref)
    buff_time = buffer_time(buffer, pad_queue)

    timestamp_queue =
      timestamp_queue
      |> Map.update!(:next_chunk_boundary, fn
        nil when timestamp_queue.chunk_duration != nil ->
          buff_time + timestamp_queue.chunk_duration

        other ->
          other
      end)
      |> Map.update!(:known_pads, &MapSet.put(&1, pad_ref))
      |> remove_pad_from_registered_and_awaiting_pads(pad_ref)
      |> push_pad_on_heap_if_qex_empty(pad_ref, -buff_time, pad_queue)
      |> push_item_on_qex(pad_ref, {:buffer, buffer})

    {actions, timestamp_queue}
  end

  defp maybe_handle_first_buffer(
         %{timestamp_offset: nil} = pad_queue,
         pad_ref,
         first_buffer,
         timestamp_queue
       ) do
    offset =
      case timestamp_queue.synchronization_strategy do
        :synchronize_on_arrival ->
          (first_buffer.dts || first_buffer.pts) - timestamp_queue.current_queue_time

        :explicit_offsets ->
          Map.get(timestamp_queue.registered_pads_offsets, pad_ref, 0)
      end

    use_pts? = first_buffer.dts == nil

    %{pad_queue | timestamp_offset: offset, use_pts?: use_pts?}
  end

  defp maybe_handle_first_buffer(pad_queue, _pad_ref, _buffer, _timestamp_queue), do: pad_queue

  defp check_timestamps_consistency!(pad_queue, buffer, pad_ref) do
    if not pad_queue.use_pts? and buffer.dts == nil do
      raise """
      Buffer #{inspect(buffer, pretty: true)} from pad #{inspect(pad_ref)} has nil dts, while \
      the first buffer from this pad had valid integer there. If the first buffer from a pad has \
      dts different from nil, all later buffers from this pad must meet this property.
      """
    end

    buffer_timestamp = if pad_queue.use_pts?, do: buffer.pts, else: buffer.dts
    max_timestamp = pad_queue.max_timestamp_on_qex

    if is_integer(max_timestamp) and max_timestamp > buffer_timestamp do
      timestamp_field = if pad_queue.use_pts?, do: "pts", else: "dts"

      raise """
      Buffer #{inspect(buffer, pretty: true)} from pad #{inspect(pad_ref)} has #{timestamp_field} equal \
      #{inspect(buffer_timestamp)}, but previous buffer pushed on queue from this pad had #{timestamp_field} \
      equal #{inspect(max_timestamp)}. Buffers from a single pad must have non-decreasing timestamps.
      """
    end

    %{pad_queue | max_timestamp_on_qex: buffer_timestamp}
  end

  @doc """
  Pushes stream format associated with a specified pad to the queue.

  Returns the updated queue.
  """
  @spec push_stream_format(t(), Pad.ref(), StreamFormat.t()) :: t()
  def push_stream_format(%__MODULE__{} = timestamp_queue, pad_ref, stream_format) do
    push_item(timestamp_queue, pad_ref, {:stream_format, stream_format})
  end

  @doc """
  Pushes event associated with a specified pad to the queue.

  Returns the updated queue.
  """
  @spec push_event(t(), Pad.ref(), Event.t()) :: t()
  def push_event(%__MODULE__{} = timestamp_queue, pad_ref, event) do
    push_item(timestamp_queue, pad_ref, {:event, event})
  end

  @doc """
  Pushes end of stream of the specified pad to the queue.

  Returns the updated queue.
  """
  @spec push_end_of_stream(t(), Pad.ref()) :: t()
  def push_end_of_stream(%__MODULE__{} = timestamp_queue, pad_ref) do
    timestamp_queue
    |> push_item(pad_ref, :end_of_stream)
    |> put_in([:pad_queues, pad_ref, :end_of_stream?], true)
    |> remove_pad_from_registered_and_awaiting_pads(pad_ref)
  end

  defp remove_pad_from_registered_and_awaiting_pads(timestamp_queue, pad_ref) do
    timestamp_queue
    |> Map.update!(:blocking_registered_pads, &MapSet.delete(&1, pad_ref))
    |> Map.update!(:registered_pads_offsets, &Map.delete(&1, pad_ref))
    |> Map.update!(:awaiting_pads, &List.delete(&1, pad_ref))
  end

  defp push_item(%__MODULE__{} = timestamp_queue, pad_ref, item) do
    timestamp_queue
    |> Map.update!(:pad_queues, &Map.put_new_lazy(&1, pad_ref, fn -> new_pad_queue() end))
    |> Map.update!(:known_pads, &MapSet.put(&1, pad_ref))
    |> push_pad_on_heap_if_qex_empty(pad_ref, :infinity)
    |> push_item_on_qex(pad_ref, item)
  end

  defp new_pad_queue() do
    %{
      timestamp_offset: nil,
      qex: Qex.new(),
      buffers_size: 0,
      buffers_number: 0,
      end_of_stream?: false,
      use_pts?: nil,
      max_timestamp_on_qex: nil,
      timestamps_qex: nil
    }
  end

  defp increase_buffers_size(pad_queue, %Buffer{} = buffer, :time) do
    new_last_timestamp = buffer_time(buffer, pad_queue)

    pad_queue =
      with %{timestamps_qex: nil} <- pad_queue do
        %{pad_queue | timestamps_qex: Qex.new()}
      end

    case Qex.last(pad_queue.timestamps_qex) do
      {:value, old_last_timestamp} ->
        time_interval = new_last_timestamp - old_last_timestamp
        %{pad_queue | buffers_size: pad_queue.buffers_size + time_interval}

      :empty ->
        pad_queue
    end
    |> Map.update!(:timestamps_qex, &Qex.push(&1, new_last_timestamp))
  end

  defp increase_buffers_size(pad_queue, _buffer, :buffers),
    do: %{pad_queue | buffers_size: pad_queue.buffers_size + 1}

  defp increase_buffers_size(pad_queue, %Buffer{payload: payload}, :bytes),
    do: %{pad_queue | buffers_size: pad_queue.buffers_size + byte_size(payload)}

  defp decrease_buffers_size(pad_queue, _buffer, :time) do
    {first_timestamp, timestamps_qex} = Qex.pop!(pad_queue.timestamps_qex)
    pad_queue = %{pad_queue | timestamps_qex: timestamps_qex}

    case Qex.first(timestamps_qex) do
      {:value, second_timestamp} ->
        time_interval = second_timestamp - first_timestamp
        %{pad_queue | buffers_size: pad_queue.buffers_size - time_interval}

      :empty ->
        pad_queue
    end
  end

  defp decrease_buffers_size(pad_queue, _buffer, :buffers),
    do: %{pad_queue | buffers_size: pad_queue.buffers_size - 1}

  defp decrease_buffers_size(pad_queue, %Buffer{payload: payload}, :bytes),
    do: %{pad_queue | buffers_size: pad_queue.buffers_size - byte_size(payload)}

  defp buffer_time(%Buffer{dts: dts}, %{use_pts?: false, timestamp_offset: timestamp_offset}),
    do: dts - timestamp_offset

  defp buffer_time(%Buffer{pts: pts}, %{use_pts?: true, timestamp_offset: timestamp_offset}),
    do: pts - timestamp_offset

  @type item ::
          {:stream_format, StreamFormat.t()}
          | {:buffer, Buffer.t()}
          | {:event, Event.t()}
          | :end_of_stream

  @type popped_value :: {Pad.ref(), item()}

  @doc """
  Pops items from the queue while they are available.

  A buffer `b` from pad `p` is available, if all pads different than `p`
    - either have a buffer in the queue, that is older than `b`
    - or haven't ever had any buffer on the queue
    - or have end of stream pushed on the queue.

  An item other than a buffer is considered available if all newer buffers on the same pad are
  available.

  The returned value is a suggested actions list, a list of popped items and the updated queue.

  If the amount of buffers associated with any pad in the queue falls below the
  `pause_demand_boundary`, the suggested actions list contains `t:Membrane.Action.resume_auto_demand()`
  actions, otherwise it is an empty list.
  """
  @spec pop_available_items(t()) :: {[Action.resume_auto_demand()], [popped_value()], t()}
  def pop_available_items(%__MODULE__{} = timestamp_queue) do
    {actions, items, timestamp_queue} = do_pop(timestamp_queue, [], [], false)

    timestamp_queue =
      with %{chunk_duration: chunk_duration} when chunk_duration != nil <- timestamp_queue do
        %{
          timestamp_queue
          | next_chunk_boundary: timestamp_queue.current_queue_time + chunk_duration
        }
      end

    {actions, items, timestamp_queue}
  end

  @doc """
  The equivalent of calling `push_buffer/2` and then `pop_available_items/1`.
  """
  @spec push_buffer_and_pop_available_items(t(), Pad.ref(), Buffer.t()) ::
          {[Action.pause_auto_demand() | Action.resume_auto_demand()], [popped_value()], t()}
  def push_buffer_and_pop_available_items(%__MODULE__{} = timestamp_queue, pad_ref, buffer) do
    push_buffer_and_pop(timestamp_queue, pad_ref, buffer, &pop_available_items/1)
  end

  @type chunk :: [popped_value()]

  @doc """
  Works like `pop_available_items/1`, but the returned items are arranged in chunks of duration `chunk_duration`.

  `chunk_duration` must be passed as an option to `new/1`. The duration of each chunk may not be exactly the
  `chunk_duration`, but the average duration will converge to it. With that exception, only full chunks are
  returned.

  See `pop_available_items/1` for details.
  """
  @spec pop_chunked(t()) :: {[Action.resume_auto_demand()], [chunk()], t()}
  def pop_chunked(%__MODULE__{chunk_duration: nil}) do
    raise """
    Cannot invoke function #{inspect(__MODULE__)}.pop_chunked/1 on a queue, that has :chunk_duration field \
    set to nil. You can set this field by passing {:chunk_duration, some_membrane_time} option to \
    #{inspect(__MODULE__)}.new/1.
    """
  end

  def pop_chunked(%__MODULE__{} = timestamp_queue) do
    min_max_time =
      timestamp_queue.pad_queues
      |> Enum.reduce(:infinity, fn
        {_pad_ref, %{end_of_stream?: true}}, min_max_time ->
          min_max_time

        {_pad_ref, %{max_timestamp_on_qex: max_ts, timestamp_offset: offset}}, min_max_time ->
          min(min_max_time, max_ts - offset)
      end)

    do_pop_chunked(timestamp_queue, min_max_time)
  end

  defp do_pop_chunked(timestamp_queue, min_max_time) do
    if min_max_time >= timestamp_queue.next_chunk_boundary and
         (min_max_time != :infinity or timestamp_queue.pad_queues != %{}) do
      {actions, chunk, timestamp_queue} = do_pop(timestamp_queue, [], [], true)

      {next_actions, chunks, timestamp_queue} =
        %{timestamp_queue | chunk_full?: false}
        |> Map.update!(:next_chunk_boundary, &(&1 + timestamp_queue.chunk_duration))
        |> do_pop_chunked(min_max_time)

      {actions ++ next_actions, [chunk] ++ chunks, timestamp_queue}
    else
      {[], [], timestamp_queue}
    end
  end

  @doc """
  The equivalent of calling `push_buffer/2` and then `pop_chunked/1`.
  """
  @spec push_buffer_and_pop_chunked(t(), Pad.ref(), Buffer.t()) ::
          {[Action.pause_auto_demand() | Action.resume_auto_demand()], [chunk()], t()}
  def push_buffer_and_pop_chunked(%__MODULE__{} = timestamp_queue, pad_ref, buffer) do
    push_buffer_and_pop(timestamp_queue, pad_ref, buffer, &pop_chunked/1)
  end

  defp do_pop(%__MODULE__{} = timestamp_queue, actions_acc, items_acc, pop_chunk?) do
    try_return_buffer? =
      MapSet.size(timestamp_queue.blocking_registered_pads) == 0 and
        timestamp_queue.awaiting_pads == [] and
        not timestamp_queue.chunk_full?

    case Heap.root(timestamp_queue.pads_heap) do
      {priority, pad_ref} when try_return_buffer? or priority == :infinity ->
        {actions, items, timestamp_queue} =
          timestamp_queue
          |> Map.update!(:pads_heap, &Heap.pop/1)
          |> pop_buffer_and_following_items(pad_ref, pop_chunk?)

        do_pop(timestamp_queue, actions ++ actions_acc, items ++ items_acc, pop_chunk?)

      _other ->
        {actions_acc, Enum.reverse(items_acc), timestamp_queue}
    end
  end

  @spec pop_buffer_and_following_items(t(), Pad.ref(), boolean()) ::
          {[Action.resume_auto_demand()], [popped_value()], t()}
  defp pop_buffer_and_following_items(%__MODULE__{} = timestamp_queue, pad_ref, pop_chunk?) do
    pad_queue = timestamp_queue.pad_queues |> Map.get(pad_ref)

    {actions, items, timestamp_queue} =
      with {{:value, {:buffer, buffer}}, qex} <- Qex.pop(pad_queue.qex),
           buffer_time when not pop_chunk? or buffer_time < timestamp_queue.next_chunk_boundary <-
             buffer_time(buffer, pad_queue) do
        old_buffers_size = pad_queue.buffers_size

        pad_queue =
          %{pad_queue | qex: qex, buffers_number: pad_queue.buffers_number - 1}
          |> decrease_buffers_size(buffer, timestamp_queue.metric_unit)

        timestamp_queue =
          with %{buffers_number: 0, end_of_stream?: false} <- pad_queue do
            Map.update!(timestamp_queue, :awaiting_pads, &[pad_ref | &1])
          else
            _pad_queue -> timestamp_queue
          end

        timestamp_queue =
          %{timestamp_queue | current_queue_time: buffer_time}
          |> put_in([:pad_queues, pad_ref], pad_queue)

        boundary = timestamp_queue.pause_demand_boundary

        actions =
          if pad_queue.buffers_size < boundary and old_buffers_size >= boundary,
            do: [resume_auto_demand: pad_ref],
            else: []

        items = [{pad_ref, {:buffer, buffer}}]

        {actions, items, timestamp_queue}
      else
        buffer_time when is_integer(buffer_time) and pop_chunk? ->
          {[], [], %{timestamp_queue | chunk_full?: true}}

        _non_buffer_pop_result ->
          {[], [], timestamp_queue}
      end

    pop_following_items(timestamp_queue, pad_ref, actions, items)
  end

  @spec pop_following_items(t(), Pad.ref(), [Action.resume_auto_demand()], [popped_value()]) ::
          {[Action.resume_auto_demand()], [popped_value()], t()}
  defp pop_following_items(%__MODULE__{} = timestamp_queue, pad_ref, actions_acc, items_acc) do
    pad_queue = timestamp_queue.pad_queues |> Map.get(pad_ref)

    case Qex.pop(pad_queue.qex) do
      {{:value, {:buffer, buffer}}, _qex} ->
        new_priority = -buffer_time(buffer, pad_queue)
        timestamp_queue = push_pad_on_heap(timestamp_queue, pad_ref, new_priority)

        {actions_acc, items_acc, timestamp_queue}

      {{:value, item}, qex} ->
        timestamp_queue = put_in(timestamp_queue, [:pad_queues, pad_ref, :qex], qex)
        items_acc = [{pad_ref, item}] ++ items_acc

        pop_following_items(timestamp_queue, pad_ref, actions_acc, items_acc)

      {:empty, _empty_qex} when timestamp_queue.awaiting_pads == [pad_ref] ->
        {actions_acc, items_acc, timestamp_queue}

      {:empty, _empty_qex} ->
        {pad_queue, timestamp_queue} = pop_in(timestamp_queue, [:pad_queues, pad_ref])

        timestamp_queue =
          if pad_queue.end_of_stream?,
            do: Map.update!(timestamp_queue, :known_pads, &MapSet.delete(&1, pad_ref)),
            else: timestamp_queue

        {actions_acc, items_acc, timestamp_queue}
    end
  end

  defp push_buffer_and_pop(timestamp_queue, pad_ref, buffer, pop_fun) do
    {maybe_pause, timestamp_queue} = push_buffer(timestamp_queue, pad_ref, buffer)
    {maybe_resume, items, timestamp_queue} = pop_fun.(timestamp_queue)

    actions =
      with [pause_auto_demand: pad_ref] <- maybe_pause,
           index when is_integer(index) <-
             Enum.find_index(maybe_resume, &(&1 == {:resume_auto_demand, pad_ref})) do
        List.delete_at(maybe_resume, index)
      else
        _other -> maybe_pause ++ maybe_resume
      end

    {actions, items, timestamp_queue}
  end

  defp push_item_on_qex(timestamp_queue, pad_ref, item) do
    :ok = ensure_queue_not_closed!(timestamp_queue, pad_ref, item)

    timestamp_queue
    |> update_in([:pad_queues, pad_ref, :qex], &Qex.push(&1, item))
  end

  defp push_pad_on_heap_if_qex_empty(timestamp_queue, pad_ref, priority, pad_queue \\ nil) do
    qex =
      (pad_queue || Map.get(timestamp_queue.pad_queues, pad_ref))
      |> Map.get(:qex)

    if qex == Qex.new(),
      do: push_pad_on_heap(timestamp_queue, pad_ref, priority),
      else: timestamp_queue
  end

  defp push_pad_on_heap(timestamp_queue, pad_ref, priority) do
    heap_item = {priority, pad_ref}
    Map.update!(timestamp_queue, :pads_heap, &Heap.push(&1, heap_item))
  end

  @doc """
  Pops all items in the proper order and closes the queue.

  After being closed, nothing can be pushed to the queue anymore - a new queue should be created if
  needed.

  The returned value is a suggested actions list, a list of popped buffers and the updated queue.

  Suggested actions list contains `t:Membrane.Action.resume_auto_demand()` for every pad, that had
  pasued auto demand before the flush.
  """
  @spec flush_and_close(t()) :: {[Action.resume_auto_demand()], [popped_value()], t()}
  def flush_and_close(%__MODULE__{} = timestamp_queue) do
    %{timestamp_queue | closed?: true, blocking_registered_pads: MapSet.new(), awaiting_pads: []}
    |> Map.update!(
      :pad_queues,
      &Map.new(&1, fn {pad_ref, data} ->
        {pad_ref, %{data | end_of_stream?: true}}
      end)
    )
    |> pop_available_items()
  end

  @doc """
  Returns true, if the pad has been registered in the queue or item from it has been pushed to the queue
  and moreover, end of stream of this pad hasn't been popped from the queue.
  """
  @spec has_pad?(t(), Pad.ref()) :: boolean()
  def has_pad?(%__MODULE__{} = timestamp_queue, pad_ref) do
    MapSet.member?(timestamp_queue.known_pads, pad_ref)
  end

  @doc """
  Returns `t:MapSet.t/0` of all pads, that:
    1) have been ever registered in the queue or item from them has been pushed to the queue
    2) their end of stream hasn't been popped from the queue.
  """
  @spec pads(t()) :: MapSet.t(Pad.ref())
  def pads(%__MODULE__{} = timestamp_queue) do
    timestamp_queue.known_pads
  end

  defp ensure_queue_not_closed!(%__MODULE__{closed?: true}, pad_ref, item) do
    inspected_item =
      case item do
        :end_of_stream -> "end of stream"
        {:stream_format, value} -> "stream format #{inspect(value)}"
        {type, value} -> "#{type} #{inspect(value)}"
      end

    raise """
    Unable to push #{inspected_item} from pad #{inspect(pad_ref)} on the already closed #{inspect(__MODULE__)}. \
    After calling #{inspect(__MODULE__)}.flush_and_close/1 queue is not capable to handle new items and new \
    queue has to be created.
    """
  end

  defp ensure_queue_not_closed!(_timestamp_queue, _pad_ref, _item), do: :ok
end