lib/ark/drip.ex

defmodule Ark.Drip do
  use GenServer
  alias :queue, as: Q

  @doc false
  def __ark__(:doc) do
    """
    This module allows to throttle calls to a shared resource with a simple
    broker process.
    """
  end

  defmodule Bucket do
    @enforce_keys [
      # How much drips in the time period, used to calculate the sliding window
      :max_drops,

      # Duration limiting allowing max_drops drops
      :range_ms,

      # The maximum usage available for the current time.
      :allowance,

      # Count of drops in the current time slot
      :slot_usage,

      # The duration of a slot
      :slot_time,

      # The absolute time at which the current slot ends, exclusive (a call
      # coming at that exact timestamp will belong to the new slot)
      :slot_end,

      # The total dropped count
      :count,

      # a queue of new allowances to come. New allowances are created at the end
      # of each time slot plus range_ms, incrementing allowance with the same
      # amount of the current usage
      :refills,

      # Timestamp of the last used, which will delimit the new time slot and
      # refill time
      :last_use
    ]

    defstruct @enforce_keys

    def new_ok(max_drops, range_ms) do
      new_ok(max_drops: max_drops, range_ms: range_ms)
    end

    def new_ok(opts) do
      with {:ok, max_drops} <- validate_pos_integer(opts, :max_drops),
           {:ok, range_ms} <- validate_pos_integer(opts, :range_ms),
           {:ok, now} <- validate_non_neg_integer(opts, :start_time),
           {:ok, slot_time} <- validate_slot_time(opts, range_ms),
           :ok <- verify_slot_time(range_ms, slot_time) do
        bucket = %__MODULE__{
          allowance: max_drops,
          count: 0,
          max_drops: max_drops,
          range_ms: range_ms,
          refills: Q.new(),
          slot_end: now + slot_time,
          slot_time: slot_time,
          slot_usage: 0,
          last_use: now
        }

        {:ok, bucket}
      end
    end

    defp verify_slot_time(range_ms, slot_time) do
      if slot_time <= range_ms do
        :ok
      else
        {:error, "slot time #{slot_time} is greater than range #{range_ms}"}
      end
    end

    def drop(%__MODULE__{slot_end: slend} = bucket, now) when now >= slend do
      bucket
      |> rotate(now)
      |> refill(now)
      |> drop(now)
    end

    def drop(%__MODULE__{allowance: al, count: c, slot_usage: used} = bucket, now)
        when al > 0 do
      bucket = %__MODULE__{
        bucket
        | allowance: al - 1,
          slot_usage: used + 1,
          count: c + 1,
          last_use: now
      }

      # if the new allowance will be zero we can immediately rotate

      case al do
        1 -> {:ok, rotate(bucket, now)}
        _ -> {:ok, bucket}
      end
    end

    def drop(bucket, _now) do
      # since we may have called rotate(), we still return the updated bucket.
      {:reject, bucket}
    end

    defp rotate(bucket, now) do
      %__MODULE__{last_use: last, range_ms: range_ms} = bucket

      if now > last + range_ms do
        reset(bucket, now)
      else
        do_rotate(bucket)
      end
    end

    defp do_rotate(bucket) do
      %__MODULE__{
        last_use: last,
        range_ms: range_ms,
        slot_time: sltime,
        slot_end: old_slend,
        refills: q,
        slot_usage: usage
      } = bucket

      slend = last + sltime
      refill_time = last + range_ms

      # in order to force the data to advance in time we force the last usage
      # date to the end of the slot that just finished
      last = old_slend

      q =
        case usage do
          0 -> q
          _ -> Q.in({refill_time, usage}, q)
        end

      %__MODULE__{bucket | refills: q, slot_end: slend, slot_usage: 0, last_use: last}
    end

    defp reset(bucket, now) do
      %__MODULE__{max_drops: max_drops, slot_time: slot_time} = bucket

      %__MODULE__{
        bucket
        | allowance: max_drops,
          refills: Q.new(),
          slot_end: now + slot_time,
          slot_usage: 0,
          last_use: now
      }
    end

    defp refill(bucket, now) do
      %__MODULE__{refills: q, allowance: al} = bucket

      case Q.peek(q) do
        {:value, {refill_time, amount}} when refill_time <= now ->
          refill(%__MODULE__{bucket | allowance: al + amount, refills: Q.drop(q)}, now)

        _ ->
          bucket
      end
    end

    def next_refill!(%__MODULE__{refills: q}) do
      {:value, {refill_time, _}} = Q.peek(q)
      refill_time
    end

    defp validate_pos_integer(opts, key) do
      case Keyword.fetch(opts, key) do
        {:ok, val} when is_integer(val) and val >= 1 ->
          {:ok, val}

        {:ok, val} ->
          {:error, "option #{inspect(key)} is not a positive integer: #{inspect(val)}"}

        :error ->
          {:error, "missing option #{inspect(key)}"}
      end
    end

    defp validate_non_neg_integer(opts, key) do
      case Keyword.fetch(opts, key) do
        {:ok, val} when is_integer(val) and val >= 0 ->
          {:ok, val}

        {:ok, val} ->
          {:error,
           "option #{inspect(key)} is not zero or a positive integer: #{inspect(val)}"}

        :error ->
          {:error, "missing option #{inspect(key)}"}
      end
    end

    defp validate_slot_time(opts, range_ms) do
      case opts[:slot_time] do
        :one_tenth -> {:ok, div(range_ms, 10)}
        _ -> validate_pos_integer(opts, :slot_time)
      end
    end
  end

  # @moduledoc false

  def start_link(opts) do
    {gen_opts, opts} = split_gen_opts(opts)
    GenServer.start_link(__MODULE__, opts, gen_opts)
  end

  def start(opts) do
    {gen_opts, opts} = split_gen_opts(opts)
    GenServer.start(__MODULE__, opts, gen_opts)
  end

  defp split_gen_opts(opts) when is_list(opts) do
    Keyword.split(opts, [:debug, :name, :timeout, :spawn_opt, :hibernate_after])
  end

  defp split_gen_opts({max_drops, range_ms}) do
    [max_drops: max_drops, range_ms: range_ms]
  end

  def stop(bucket) do
    GenServer.stop(bucket)
  end

  def await(bucket, timeout \\ :infinity)

  def await(bucket, :infinity) do
    GenServer.call(bucket, {:await, make_ref()}, :infinity)
  end

  def await(bucket, timeout) do
    ref = make_ref()

    try do
      GenServer.call(bucket, {:await, ref}, timeout)
    catch
      :exit, {:timeout, {GenServer, :call, _}} = e ->
        cancel(bucket, ref)
        exit(e)
    end
  end

  def cancel(bucket, ref) when is_reference(ref) do
    GenServer.cast(bucket, {:cancel, ref})
  end

  defmodule S do
    @enforce_keys [:bucket, :clients]

    defstruct @enforce_keys
  end

  @impl GenServer
  def init(opts) do
    opts =
      opts
      |> Keyword.put_new(:start_time, now_ms())
      |> Keyword.put_new(:slot_time, :one_tenth)

    case Bucket.new_ok(opts) do
      {:ok, bucket} -> {:ok, %S{bucket: bucket, clients: Q.new()}}
      {:error, reason} -> {:stop, reason}
    end
  end

  @impl GenServer
  def handle_call({:await, ref}, from, %S{bucket: bucket, clients: q} = state) do
    now = now_ms()

    case Bucket.drop(bucket, now) do
      {:ok, bucket} ->
        state = %S{state | bucket: bucket}
        {:reply, :ok, state, :infinity}

      {:reject, bucket} ->
        state = %S{state | bucket: bucket, clients: Q.in({from, ref}, q)}

        {:noreply, state, next_timeout(state, now)}
    end
  end

  @impl GenServer
  def handle_info(:timeout, state) do
    state = run_queue(state)
    {:noreply, state, next_timeout(state, now_ms())}
  end

  defp run_queue(%S{clients: q, bucket: bucket} = state) do
    case Q.out(q) do
      {:empty, _} ->
        state

      {{:value, {from, _} = _client}, new_q} ->
        case Bucket.drop(bucket, now_ms()) do
          {:ok, bucket} ->
            GenServer.reply(from, :ok)
            run_queue(%S{state | bucket: bucket, clients: new_q})

          {:reject, bucket} ->
            %S{state | bucket: bucket}
        end
    end
  end

  @impl GenServer
  def handle_cast({:cancel, ref}, %S{clients: clients} = state) do
    clients =
      Q.filter(
        fn
          {_from, ^ref} -> false
          _ -> true
        end,
        clients
      )

    {:noreply, %S{state | clients: clients}}
  end

  def now_ms do
    :erlang.system_time(:millisecond)
  end

  defp next_timeout(%{bucket: %Bucket{allowance: al} = bucket}, now) do
    if al == 0 do
      max(0, Bucket.next_refill!(bucket) - now)
    else
      :infinity
    end
  end
end