defmodule Mixpanel.Queue do
@callback new(pos_integer) :: any
@callback push(any, any) :: {:ok, any} | :discarded
@callback take(any, non_neg_integer) :: {list, any}
@callback length(any) :: non_neg_integer
end
defmodule Mixpanel.Queue.Simple do
@moduledoc """
A simple queue implementation that discards elements when it's full.
"""
@type t :: %__MODULE__{
length: non_neg_integer,
max_size: non_neg_integer,
head: nil | nonempty_maybe_improper_list,
tail: nil | nonempty_maybe_improper_list
}
@enforce_keys [:length, :max_size, :head, :tail]
defstruct @enforce_keys
@spec new(pos_integer) :: t
def new(limit) when limit > 0 do
%__MODULE__{
length: 0,
max_size: limit,
head: [],
tail: []
}
end
def new(limit), do: raise(ArgumentError, "limit must be greater than 0, got #{inspect(limit)}")
@spec push(t, any) :: {:ok, t} | :discarded
def push(%__MODULE__{length: length, max_size: max_size}, _element)
when length >= max_size,
do: :discarded
def push(%__MODULE__{} = queue, element),
do: {:ok, %__MODULE__{queue | length: queue.length + 1, tail: [element | queue.tail]}}
@spec take(t, non_neg_integer) :: {list, t}
def take(%__MODULE__{tail: []} = queue, amount) do
case Enum.split(queue.head, amount) do
{result, []} ->
{result, %__MODULE__{queue | length: 0, head: [], tail: []}}
{result, new_head} ->
{result, %__MODULE__{queue | length: queue.length - amount, head: new_head}}
end
end
def take(%__MODULE__{} = queue, amount) do
take(%__MODULE__{queue | head: queue.head ++ Enum.reverse(queue.tail), tail: []}, amount)
end
@spec length(t) :: non_neg_integer
def length(%__MODULE__{length: length}), do: length
end