lib/ark/drip.ex

defmodule Ark.Drip do
  use GenServer
  alias :queue, as: Q

  @doc false
  def __ark__(:doc) do
    """
    This module allows to throttle calls to a common resource with a simple
    broker process.
    """
  end

  @moduledoc false

  def start_link(opts) do
    {gen_opts, opts} = normalize_opts(opts)
    GenServer.start_link(__MODULE__, opts, gen_opts)
  end

  def start(opts) do
    {gen_opts, opts} = normalize_opts(opts)
    GenServer.start(__MODULE__, opts, gen_opts)
  end

  defp normalize_opts(opts) do
    {gen_opts, opts} = split_gen_opts(opts)

    # support tuple :spec param
    opts =
      case Keyword.pop(opts, :spec) do
        {{max_drips, range_ms}, opts} ->
          opts
          |> Keyword.put(:range_ms, range_ms)
          |> Keyword.put(:max_drips, max_drips)

        {nil, opts} ->
          opts
      end

    {gen_opts, opts}
  end

  defp split_gen_opts(opts) when is_list(opts) do
    Keyword.split(opts, [:debug, :name, :timeout, :spawn_opt, :hibernate_after])
  end

  def stop(bucket) do
    GenServer.stop(bucket)
  end

  def await(bucket, timeout \\ :infinity)

  def await(bucket, :infinity) do
    GenServer.call(bucket, {:await, make_ref()}, :infinity)
  end

  def await(bucket, timeout) do
    ref = make_ref()

    try do
      GenServer.call(bucket, {:await, ref}, timeout)
    catch
      :exit, e ->
        cancel(bucket, ref)
        exit(e)
    end
  end

  def cancel(bucket, ref) when is_reference(ref) do
    GenServer.cast(bucket, {:cancel, ref})
  end

  defmodule S do
    @enforce_keys [
      # maximum drips for the time window
      :max_drips,
      # width of the time window
      :range_ms,
      # dispatched drips for the current window
      :used,
      # timestamp at which the next windows will start
      :next_reset,
      # a queue to buffer demands when there is no available drip
      :clients
    ]
    defstruct @enforce_keys
  end

  @impl GenServer
  def init(opts) do
    with {:ok, opts} <- validate_opts(opts) do
      max_drips = Keyword.fetch!(opts, :max_drips)
      range_ms = Keyword.fetch!(opts, :range_ms)

      state = %S{
        max_drips: max_drips,
        range_ms: range_ms,
        used: 0,
        next_reset: now_ms() + range_ms,
        clients: Q.new()
      }

      {:ok, state, range_ms}
    end
  end

  defp validate_opts(opts) when is_list(opts) do
    with true <- Keyword.keyword?(opts),
         {:ok, range_ms} <- Keyword.fetch(opts, :range_ms),
         {:ok, max_drips} <- Keyword.fetch(opts, :max_drips) do
      cond do
        max_drips < 1 ->
          {:stop, "Minimum drip per period is 1, got: #{inspect(max_drips)}"}

        range_ms < 1 ->
          {:stop, "Minimum period is 1, got: #{inspect(range_ms)}"}

        true ->
          {:ok, opts}
      end
    else
      _ -> {:stop, {:invalid_opts, opts}}
    end
  end

  @impl GenServer
  def handle_call({:await, _ref}, _from, %S{used: used, max_drips: max} = state)
      when used < max do
    {:reply, :ok, %S{state | used: used + 1}, next_timeout(state)}
  end

  def handle_call({:await, ref}, from, %S{} = state) do
    state = enqueue_client(state, {from, ref})
    {:noreply, state, next_timeout(state)}
  end

  @impl GenServer
  def handle_info(:timeout, %S{} = state) do
    %S{range_ms: range_ms} = state
    # What happends when the drip is used very slowly
    #
    # As long as the usage is not max, requests are served immediately and no
    # timeout is set (:infinity). That means that the :next_reset value stays in
    # the past. It can stay in the past for a long time.  At some point, a
    # request comes, reaches the max threshold and is put into the queue.  From
    # there, the timeout will be calculated from :next_reset which is still far
    # in the past, so the timeout will be zero.  Then a :timeout info will be
    # immediately dispatched to this function clause, and a new time window will
    # start, starting from now_ms(), and the request will be immediately served.
    #
    # So there is no lag behind.
    next_reset = now_ms() + range_ms
    state = %S{state | used: 0, next_reset: next_reset}
    state = run_queue(state)
    {:noreply, state, next_timeout(state)}
  end

  defp run_queue(%S{clients: q, used: used, max_drips: max} = state)
       when used < max do
    case Q.out(q) do
      {:empty, _} ->
        state

      {{:value, {from, _}}, new_q} ->
        GenServer.reply(from, :ok)
        run_queue(%S{state | used: used + 1, clients: new_q})
    end
  end

  defp run_queue(%S{} = state) do
    state
  end

  @impl GenServer
  def handle_cast({:cancel, ref}, %S{} = state) do
    clients =
      Q.filter(
        fn
          {_from, ^ref} -> false
          _ -> true
        end,
        state.clients
      )

    {:noreply, %S{state | clients: clients}}
  end

  defp enqueue_client(state, client) do
    q = Q.in(client, state.clients)
    %S{state | clients: q}
  end

  def now_ms do
    :erlang.system_time(:millisecond)
  end

  defp next_timeout(%{clients: clients, next_reset: next}) do
    case Q.is_empty(clients) do
      true -> :infinity
      false -> max(0, next - now_ms())
    end
  end
end