lib/mixpanel/queue.ex

defmodule Mixpanel.Queue do
  @callback new(pos_integer) :: any
  @callback push(any, any) :: {:ok, any} | :discarded
  @callback take(any, non_neg_integer) :: {list, any}
  @callback length(any) :: non_neg_integer
end

# defmodule Mixpanel.Queue.Simple do
#   @moduledoc """
#   A simple queue implementation that discards elements when it's full.
#   """

#   @behaviour Access
#   @behaviour Mixpanel.Queue

#   @type prefix :: atom
#   @type zipper :: %{
#           length: non_neg_integer,
#           head: nil | nonempty_maybe_improper_list,
#           tail: nil | nonempty_maybe_improper_list
#         }
#   @type t :: %__MODULE__{
#           max_size: non_neg_integer,
#           prefixes: %{prefix => zipper}
#         }

#   @enforce_keys [:max_size, :prefixes]
#   defstruct @enforce_keys

#   @impl Mixpanel.Queue
#   @spec new(pos_integer) :: t
#   def new(limit) when is_integer(limit) and limit > 0 do
#     %__MODULE__{
#       max_size: limit,
#       prefixes: %{}
#     }
#   end

#   def new(limit),
#     do: raise(ArgumentError, "limit must be greater than 0, got #{inspect(limit)}")

#   @impl Mixpanel.Queue
#   @spec push(t, prefix, any) :: {:ok, t} | :discarded
#   def push(%__MODULE__{max_size: max_size, prefixes: prefixes} = queue, prefix, element) do
#     case __MODULE__.length(queue) >= max_size do
#       false when is_map_key(prefixes, prefix) ->
#         prefixes =
#           queue.prefixes
#           |> update_in([prefix, :length], &(&1 + 1))
#           |> update_in([prefix, :tail], &[element | &1])

#         {:ok, %__MODULE__{queue | prefixes: prefixes}}

#       false ->
#         {:ok,
#          %__MODULE__{
#            queue
#            | prefixes:
#                Map.put(
#                  queue.prefixes,
#                  prefix,
#                  %{length: 1, head: [], tail: [element]}
#                )
#          }}

#       true ->
#         :discarded
#     end
#   end

#   @impl Mixpanel.Queue
#   @spec take(t, prefix, non_neg_integer) :: {list, t}
#   def take(%__MODULE__{prefixes: prefixes} = queue, prefix, amount)
#       when is_map_key(prefixes, prefix) do
#     case get_in(prefixes, [prefix, :tail]) do
#       [] ->
#         case Enum.split(get_in(prefixes, [prefix, :head]), amount) do
#           {result, []} ->
#             prefixes =
#               prefixes
#               |> update_in([prefix, :length], fn _ -> 0 end)
#               |> update_in([prefix, :head], fn _ -> [] end)
#               |> update_in([prefix, :tail], fn _ -> [] end)

#             {result, %__MODULE__{queue | prefixes: prefixes}}

#           {result, new_head} ->
#             prefixes =
#               prefixes
#               |> update_in([prefix, :length], &(&1 - amount))
#               |> update_in([prefix, :head], fn _ -> new_head end)

#             {result, %__MODULE__{queue | prefixes: prefixes}}
#         end

#       _ ->
#         prefixes =
#           prefixes
#           |> update_in([prefix, :head], &(&1 ++ Enum.reverse(get_in(prefixes, [prefix, :tail]))))
#           |> update_in([prefix, :tail], fn _ -> [] end)

#         take(%__MODULE__{queue | prefixes: prefixes}, prefix, amount)
#     end
#   end

#   def take(%__MODULE__{} = queue, _prefix, _amount), do: {[], queue}

#   @impl Mixpanel.Queue
#   @spec length(t) :: non_neg_integer
#   def length(%__MODULE__{prefixes: prefixes}) do
#     for {_prefix, %{length: length}} <- prefixes, reduce: 0 do
#       acc -> acc + length
#     end
#   end

#   # def at(_x, y), do: y
#   # def replace_at(x, _y, _z), do: x

#   # @impl Access
#   # @doc false
#   # def fetch(deque, index),
#   #   do: at(deque, index)

#   # @impl Access
#   # @doc false
#   # def get_and_update(deque, index, fun) do
#   #   case at(deque, index) do
#   #     {:ok, current} ->
#   #       case fun.(current) do
#   #         {get, update} ->
#   #           {:ok, deque} = replace_at(deque, index, update)
#   #           {get, deque}

#   #         :pop ->
#   #           {[item], deque} = take(deque, :default, 1)
#   #           {item, deque}
#   #       end

#   #     _error ->
#   #       raise ArgumentError, "index out of bounds"
#   #   end
#   # end

#   # @impl Access
#   # @doc false
#   # def pop(deque, index) do
#   #   cond do
#   #     index == 0 ->
#   #       {[item], deque} = take(deque, :deafult, 1)
#   #       {item, deque}

#   #     index == deque.length - 1 ->
#   #       # TODO should pop the last item
#   #       {[item], deque} = take(deque, :default, 1)
#   #       {item, deque}

#   #     :else ->
#   #       raise ArgumentError, "removing items not at head or tail is unsupported"
#   #   end
#   # end
# end

# defimpl Collectable, for: Mixpanel.Queue.Simple do
#   @spec into(@for.t()) :: {@for.t(), (@for.t, {:cont, any} | :done | :halt -> @for.t() | :ok)}
#   def into(orig) do
#     {orig,
#      fn
#        queue, {:cont, item} ->
#          case @for.push(queue, :default, item) do
#            {:ok, queue} -> queue
#            :discarded -> queue
#          end

#        queue, :done ->
#          queue

#        _, :halt ->
#          :ok
#      end}
#   end
# end