lib/ex_waiter/rate_limiting.ex

defmodule ExWaiter.RateLimiting do
  alias ExWaiter.RateLimiting.Limiter

  @type options :: [
          {:refill_rate, pos_integer()}
          | {:interval, pos_integer()}
          | {:burst_limit, pos_integer()}
          | {:cost, pos_integer()}
        ]
  @type token_count :: non_neg_integer()
  @type unix_timestamp_in_ms :: pos_integer()
  @type bucket :: {token_count(), unix_timestamp_in_ms()}

  @spec limit_rate(bucket() | nil, options()) ::
          {:ok, bucket(), Limiter.t()} | {:error, bucket(), Limiter.t()}
  def limit_rate(bucket, opts \\ []) do
    refill_rate = Keyword.get(opts, :refill_rate, 1)

    limiter = %Limiter{
      refill_rate: refill_rate,
      interval: Keyword.get(opts, :interval, 1000),
      burst_limit: Keyword.get(opts, :burst_limit, refill_rate),
      cost: Keyword.get(opts, :cost, 1),
      checked_at: System.os_time(:millisecond)
    }

    limiter =
      case bucket do
        {previous_tokens, previous_updated_at}
        when is_integer(previous_tokens) and
               previous_tokens >= 0 and
               is_integer(previous_updated_at) and
               previous_updated_at > 0 ->
          unless previous_updated_at <= limiter.checked_at do
            raise "Bucket timestamp must not be in the future"
          end

          handle_existing_bucket(limiter, previous_tokens, previous_updated_at)

        nil ->
          new_bucket(limiter)

        _ ->
          raise "Bucket must be either nil or {integer_token_count, unix_timestamp_in_ms}"
      end

    if limiter.paid_tokens > 0 do
      {:ok, {limiter.tokens_after_paid, limiter.updated_at}, limiter}
    else
      {:error, {limiter.tokens_after_paid, limiter.updated_at}, limiter}
    end
  end

  defp handle_existing_bucket(%Limiter{} = limiter, previous_tokens, previous_updated_at) do
    %{
      limiter
      | previous_tokens: previous_tokens,
        previous_updated_at: previous_updated_at
    }
    |> refill_tokens()
    |> pay_cost()
    |> set_updated_at()
  end

  defp new_bucket(%Limiter{} = limiter) do
    %{
      limiter
      | updated_at: limiter.checked_at,
        previous_updated_at: nil,
        created_at: limiter.checked_at,
        previous_tokens: nil,
        tokens_after_refill: limiter.burst_limit,
        tokens_after_paid: limiter.burst_limit - limiter.cost,
        ms_until_next_refill: limiter.interval,
        next_refill_at: limiter.checked_at + limiter.interval,
        refilled_tokens: limiter.burst_limit,
        paid_tokens: limiter.cost
    }
  end

  defp refill_tokens(%Limiter{} = limiter) do
    gained_tokens = calculate_gained_tokens(limiter)

    if gained_tokens > 0 do
      tokens_after_refill = min(limiter.previous_tokens + gained_tokens, limiter.burst_limit)

      %{
        limiter
        | tokens_after_refill: tokens_after_refill,
          refilled_tokens: tokens_after_refill - limiter.previous_tokens,
          ms_until_next_refill: limiter.interval,
          next_refill_at: limiter.checked_at + limiter.interval
      }
    else
      next_refill_at = limiter.previous_updated_at + limiter.interval

      %{
        limiter
        | tokens_after_refill: limiter.previous_tokens,
          refilled_tokens: 0,
          ms_until_next_refill: next_refill_at - limiter.checked_at,
          next_refill_at: next_refill_at
      }
    end
  end

  defp calculate_gained_tokens(%Limiter{} = limiter) do
    milliseconds_since_last_request = limiter.checked_at - limiter.previous_updated_at

    div(milliseconds_since_last_request, limiter.interval) *
      limiter.refill_rate
  end

  defp pay_cost(%Limiter{} = limiter) do
    tokens_after_paid = limiter.tokens_after_refill - limiter.cost

    if tokens_after_paid >= 0 do
      %{limiter | paid_tokens: limiter.cost, tokens_after_paid: tokens_after_paid}
    else
      %{limiter | paid_tokens: 0, tokens_after_paid: limiter.tokens_after_refill}
    end
  end

  defp set_updated_at(%Limiter{} = limiter) do
    if limiter.refilled_tokens > 0 do
      %{limiter | updated_at: limiter.checked_at}
    else
      %{limiter | updated_at: limiter.previous_updated_at}
    end
  end
end