lib/mixpanel/queue.ex

defmodule Mixpanel.Queue do
  @callback new(pos_integer) :: any
  @callback push(any, any) :: {:ok, any} | :discarded
  @callback take(any, non_neg_integer) :: {list, any}
  @callback length(any) :: non_neg_integer
end

defmodule Mixpanel.Queue.Simple do
  @moduledoc """
  A simple queue implementation that discards elements when it's full.
  """

  @type t :: %__MODULE__{
          length: non_neg_integer,
          max_size: non_neg_integer,
          head: nil | nonempty_maybe_improper_list,
          tail: nil | nonempty_maybe_improper_list
        }

  @enforce_keys [:length, :max_size, :head, :tail]
  defstruct @enforce_keys

  @spec new(pos_integer) :: t
  def new(limit) when limit > 0 do
    %__MODULE__{
      length: 0,
      max_size: limit,
      head: [],
      tail: []
    }
  end

  def new(limit), do: raise(ArgumentError, "limit must be greater than 0, got #{inspect(limit)}")

  @spec push(t, any) :: {:ok, t} | :discarded
  def push(%__MODULE__{length: length, max_size: max_size}, _element)
      when length >= max_size,
      do: :discarded

  def push(%__MODULE__{} = queue, element),
    do: {:ok, %__MODULE__{queue | length: queue.length + 1, tail: [element | queue.tail]}}

  @spec take(t, non_neg_integer) :: {list, t}
  def take(%__MODULE__{tail: []} = queue, amount) do
    case Enum.split(queue.head, amount) do
      {result, []} ->
        {result, %__MODULE__{queue | length: 0, head: [], tail: []}}

      {result, new_head} ->
        {result, %__MODULE__{queue | length: queue.length - amount, head: new_head}}
    end
  end

  def take(%__MODULE__{} = queue, amount) do
    take(%__MODULE__{queue | head: queue.head ++ Enum.reverse(queue.tail), tail: []}, amount)
  end

  @spec length(t) :: non_neg_integer
  def length(%__MODULE__{length: length}), do: length
end