lib/limited_queue/limited_queue.ex

defmodule LimitedQueue do
  @moduledoc """
  An elixir wrapper for erlang's `:queue`, with a constant-time `size/1` and a maximum capacity.

  When items pushed on to the `LimitedQueue` put it over its maximum capacity,
  it will drop events according to its `drop_strategy/0`.
  """

  @typedoc """
  The opaque internal state of the `LimitedQueue`.
  """
  @opaque t(value) :: %__MODULE__{
            queue: :queue.queue(value),
            size: non_neg_integer(),
            capacity: pos_integer(),
            drop_strategy: drop_strategy()
          }

  @typedoc """
  The `drop_strategy/0` determines how the queue handles dropping events when overloaded.

  `:drop_newest` (default) drops incoming events and is the most efficient
    because it will avoid touching the state when the queue is overloaded.

  `:drop_oldest` drops the oldest events from the queue,
    which may be better behavior where newer events are more relevant to process than older ones.
  """
  @type drop_strategy :: :drop_newest | :drop_oldest

  @enforce_keys [:queue, :size, :capacity, :drop_strategy]
  defstruct [:queue, :size, :capacity, :drop_strategy]

  @doc """
  Create a new `LimitedQueue` with the given maximum capacity.

  The `drop_strategy` determines how the queue handles dropping events when overloaded.
  See `drop_strategy/0` for more information.
  """
  @spec new(capacity :: pos_integer()) :: t(value) when value: any()
  @spec new(capacity :: pos_integer(), drop_strategy()) :: t(value) when value: any()
  def new(capacity, drop_strategy \\ :drop_newest)
      when capacity > 0 and drop_strategy in [:drop_newest, :drop_oldest] do
    %__MODULE__{queue: :queue.new(), size: 0, capacity: capacity, drop_strategy: drop_strategy}
  end

  @doc """
  Push a value to the back of the `LimitedQueue`.

  If the `LimitedQueue` is full, it will drop an event according to the `LimitedQueue`'s `drop_strategy/0`.
  """
  @spec push(t(value), value) :: t(value) when value: any()
  def push(
        %__MODULE__{capacity: capacity, size: capacity, drop_strategy: :drop_newest} = state,
        _element
      ) do
    state
  end

  def push(
        %__MODULE__{capacity: capacity, size: capacity, drop_strategy: :drop_oldest} = state,
        element
      ) do
    queue = :queue.drop(state.queue)
    queue = :queue.in(element, queue)
    %__MODULE__{state | queue: queue}
  end

  def push(%__MODULE__{} = state, element) do
    queue = :queue.in(element, state.queue)

    %__MODULE__{
      state
      | queue: queue,
        size: state.size + 1
    }
  end

  @doc """
  Push multiple values to the back of the `LimitedQueue`.

  Returns the number of values that were dropped if the `LimitedQueue` reaches its capacity.
  """
  @spec append(t(value), [value]) :: {t(value), dropped :: non_neg_integer()} when value: any()
  def append(
        %__MODULE__{capacity: capacity, size: capacity, drop_strategy: :drop_newest} = state,
        events
      ) do
    {state, length(events)}
  end

  def append(%__MODULE__{capacity: capacity, size: capacity} = state, [event]) do
    state = push(state, event)
    {state, 1}
  end

  def append(%__MODULE__{} = state, [event]) do
    state = push(state, event)
    {state, 0}
  end

  def append(%__MODULE__{} = state, events) do
    Enum.reduce(events, {state, 0}, fn value, {state, dropped} ->
      dropped =
        if state.size == state.capacity do
          dropped + 1
        else
          dropped
        end

      state = push(state, value)
      {state, dropped}
    end)
  end

  @doc """
  Remove and return a value from the front of the `LimitedQueue`.
  If the `LimitedQueue` is empty, {:error, :empty} will be returned.
  """
  @spec pop(t(value)) :: {:ok, t(value), value} | {:error, :empty} when value: any()
  def pop(%__MODULE__{} = state) do
    case :queue.out(state.queue) do
      {{:value, value}, queue} ->
        state = %__MODULE__{
          state
          | queue: queue,
            size: state.size - 1
        }

        {:ok, state, value}

      {:empty, _queue} ->
        {:error, :empty}
    end
  end

  @doc """
  Remove and return multiple values from the front of the `LimitedQueue`.
  If the `LimitedQueue` runs out of values, fewer values than the requested amount will be returned.
  """
  @spec split(t(value), amount :: non_neg_integer()) :: {t(value), [value]} when value: any()
  def split(%__MODULE__{size: 0} = state, amount) when amount >= 0 do
    {state, []}
  end

  def split(%__MODULE__{size: size} = state, amount) when amount >= size do
    split = state.queue
    state = %__MODULE__{state | queue: :queue.new(), size: 0}
    {state, :queue.to_list(split)}
  end

  def split(%__MODULE__{} = state, amount) when amount > 0 do
    {split, queue} = :queue.split(amount, state.queue)
    state = %__MODULE__{state | queue: queue, size: state.size - amount}
    {state, :queue.to_list(split)}
  end

  def split(%__MODULE__{} = state, 0) do
    {state, []}
  end

  @doc """
  The current number of values stored in the `LimitedQueue`.
  """
  @spec size(t(value)) :: non_neg_integer() when value: any()
  def size(%__MODULE__{} = state) do
    state.size
  end

  @doc """
  The maximum capacity of the `LimitedQueue`.
  """
  @spec capacity(t(value)) :: non_neg_integer() when value: any()
  def capacity(%__MODULE__{} = state) do
    state.capacity
  end

  @doc """
  The contents of the `LimitedQueue` as a list.
  """
  @spec to_list(t(value)) :: [value] when value: any()
  def to_list(%__MODULE__{} = state) do
    :queue.to_list(state.queue)
  end
end