Skip to main content

lib/marqeta/http/rate_limiter.ex

defmodule Marqeta.RateLimiter do
  @moduledoc """
  Token-bucket rate limiter for the Marqeta API.

  Maintains per-category token buckets. When a bucket is empty, callers
  block until tokens refill on the next 100 ms tick.

  ## Buckets

  | Bucket        | Default RPS | Burst |
  |---------------|-------------|-------|
  | `:default`    | 50          | 100   |
  | `:diva`       | 20          | 50    |
  | `:simulations`| 10          | 20    |

  The limiter is started automatically by Marqeta.Application and is
  called by `Marqeta.Client` before every outbound request.
  In tests the GenServer is not started so the call is a no-op.
  """

  use GenServer

  require Logger

  @refill_ms 100

  @initial_buckets %{
    default: %{max: 100, rps: 50, tokens: 100},
    diva: %{max: 50, rps: 20, tokens: 50},
    simulations: %{max: 20, rps: 10, tokens: 20}
  }

  # ---------------------------------------------------------------------------
  # Public API
  # ---------------------------------------------------------------------------

  @spec start_link(term()) :: GenServer.on_start()
  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  @doc """
  Acquires a rate-limit token for the given `path`.

  Blocks (up to 10 s) if the relevant bucket is empty.
  Returns `:ok` once a token has been acquired.
  """
  @spec check_rate_limit(String.t()) :: :ok
  def check_rate_limit(path) do
    bucket = bucket_for(path)
    GenServer.call(__MODULE__, {:acquire, bucket}, 10_000)
  end

  # ---------------------------------------------------------------------------
  # GenServer callbacks
  # ---------------------------------------------------------------------------

  @impl GenServer
  def init(_opts) do
    schedule_refill()
    {:ok, %{buckets: @initial_buckets, waiters: %{}}}
  end

  @impl GenServer
  def handle_call({:acquire, bucket}, from, state) do
    case get_in(state, [:buckets, bucket, :tokens]) do
      tokens when is_number(tokens) and tokens >= 1 ->
        new_state = update_in(state, [:buckets, bucket, :tokens], &(&1 - 1))
        {:reply, :ok, new_state}

      _ ->
        # Park the caller; reply on the next refill tick.
        waiters = Map.update(state.waiters, bucket, [from], &[from | &1])
        {:noreply, %{state | waiters: waiters}}
    end
  end

  @impl GenServer
  def handle_info(:refill, state) do
    new_buckets =
      Map.new(state.buckets, fn {name, bucket} ->
        refill = bucket.rps * @refill_ms / 1_000
        new_tokens = min(bucket.tokens + refill, bucket.max)
        {name, %{bucket | tokens: new_tokens}}
      end)

    {new_waiters, final_buckets} =
      Enum.reduce(
        state.waiters,
        {%{}, new_buckets},
        fn {bucket, callers}, {wacc, bacc} ->
          {remaining, updated_bacc} = release_waiters(callers, bucket, bacc)
          {Map.put(wacc, bucket, remaining), updated_bacc}
        end
      )

    schedule_refill()
    {:noreply, %{state | buckets: final_buckets, waiters: new_waiters}}
  end

  # ---------------------------------------------------------------------------
  # Private
  # ---------------------------------------------------------------------------

  @spec release_waiters([GenServer.from()], atom(), map()) ::
          {[GenServer.from()], map()}
  defp release_waiters([], _bucket, buckets), do: {[], buckets}

  defp release_waiters([caller | rest], bucket, buckets) do
    tokens = get_in(buckets, [bucket, :tokens])

    if tokens >= 1 do
      GenServer.reply(caller, :ok)
      updated = update_in(buckets, [bucket, :tokens], &(&1 - 1))
      release_waiters(rest, bucket, updated)
    else
      {[caller | rest], buckets}
    end
  end

  defp schedule_refill, do: Process.send_after(self(), :refill, @refill_ms)

  @spec bucket_for(String.t()) :: atom()
  defp bucket_for("/simulate" <> _), do: :simulations
  defp bucket_for("/simulations" <> _), do: :simulations
  defp bucket_for("/diva" <> _), do: :diva
  defp bucket_for(_), do: :default
end