Skip to main content

lib/mix/tasks/oban_powertools.limiter.simulate.ex

defmodule Mix.Tasks.ObanPowertools.Limiter.Simulate do
  use Mix.Task

  @shortdoc "Preview limiter behavior for a config without mutating state"

  @moduledoc """
  Previews the per-request reserved/blocked verdicts for a worker's declared
  limiter config without touching the database, emitting telemetry, or writing
  any audit rows. Runs `--count N` sequential reservations against a fresh
  empty bucket through the pure `Limits.compute_reservation/4` core.

  ## Exit Codes

  | Code | Meaning |
  |------|---------|
  | 0    | Simulation complete — per-request verdicts in stdout |
  | 2    | Bad input: unknown --worker module or worker has no limits |

  ## Flags

      --worker MOD             Worker module to read declared :limits from.
                               (required)
      --bucket-capacity N      Override declared bucket_capacity.
      --bucket-span-ms N       Override declared bucket_span_ms (milliseconds).
      --weight N               Override declared default_weight per reservation.
      --count N                Number of sequential reservations to simulate
                               (default: 1).
      --partition KEY          Override resolved partition_key (default:
                               "__global__").
      --repo MyApp.Repo        Ecto repo module. Falls back to
                               `config :oban_powertools, repo: MyApp.Repo`.
      --format human|json      Output format. "human" (default) renders a
                               readable per-request sequence with ANSI color
                               that auto-degrades in CI/non-TTY. "json" emits
                               a machine-readable payload with a
                               `schema_version: 1` stability contract.

  ## Boot Strategy

  This task starts only the Ecto repo via `Ecto.Migrator.with_repo/2` for
  CLI-family consistency. The simulation loop itself is pure and writes zero
  rows to `oban_powertools_limit_states` or `oban_powertools_limit_resources`.

  ## Rate-Limit Glossary

  Source: `ObanPowertools.Limits.Glossary.text/0`

  **token_bucket** — The rate-limiting algorithm used by ObanPowertools limiters. Each
  partition has a bucket of `bucket_capacity` tokens. Each reservation consumes `weight`
  tokens. The bucket refills (resets to zero tokens used) after `bucket_span_ms`
  milliseconds have elapsed since `bucket_started_at`.

  **bucket_capacity** — The maximum number of tokens available per bucket window. A
  reservation that would bring `tokens_used + weight` above this value is blocked with
  the `limit_reached` blocker code.

  **bucket_span_ms** — The duration of one bucket window in milliseconds. After this
  interval elapses since `bucket_started_at`, the bucket resets and tokens are available
  again. Used to compute `retry_at` for a `limit_reached` block.

  **weight** — The per-reservation token cost. Defaults to the resource's
  `default_weight` (usually 1). Each successful reservation consumes `weight` tokens from
  the bucket.

  **weight_by** — A dynamic weight resolver declared on the worker (e.g.
  `weight_by: {:args, :cost}`). At enqueue time the resolved value is bound to the
  reservation snapshot as the effective `weight`.

  **partition** — A named isolation group within a limiter resource. Each partition
  maintains its own independent token bucket. For `scope: :global` limiters there is
  one partition (`__global__`); for `scope: :partitioned` there is one bucket per
  resolved `partition_key`.

  **partition_by** — A dynamic partition key resolver declared on the worker (e.g.
  `partition_by: {:args, :user_id}`). At enqueue time the resolved value becomes the
  `partition_key` used to look up the correct bucket.

  **scope** — The partitioning strategy for a limiter resource. `global` means one
  shared bucket across all callers. `partitioned` means one independent bucket per
  resolved `partition_key`, enabling per-user, per-account, or per-tenant limits.

  **cooldown** — An operator-set hold on a partition until a specific `DateTime`. While
  a cooldown is active, all reservations for that partition are blocked with the
  `cooldown` blocker code regardless of remaining bucket capacity. Useful for
  propagating backpressure signals (e.g. HTTP 429 responses) into the limiter.

  **limit_reached** — Blocker code returned when `tokens_used + weight > bucket_capacity`.
  The `retry_at` field indicates when the bucket will reset
  (`bucket_started_at + bucket_span_ms`, clamped to at least now).

  **cooldown** (blocker code) — Blocker code returned when a resource partition is under
  an active operator cooldown. The `retry_at` field is `cooldown_until` — the
  `DateTime` at which the cooldown expires and reservations are permitted again.
  """

  alias ObanPowertools.Limits
  alias ObanPowertools.Limits.{Resource, State}

  @switches [
    repo: :string,
    format: :string,
    worker: :string,
    bucket_capacity: :integer,
    bucket_span_ms: :integer,
    weight: :integer,
    count: :integer,
    partition: :string
  ]

  @impl Mix.Task
  def run(argv) do
    # Load the host application's configuration and code paths so the repo module
    # and worker module are available, WITHOUT starting any application (D-09/D-10).
    # "app.config" loads config + code paths but never *starts* apps, so Oban's
    # supervision tree stays down. Called inline (not via a module-attribute task
    # requirement) and never via "app.start", which would start Oban (Pitfall 1).
    Mix.Task.run("app.config")

    {opts, _args, _invalid} = OptionParser.parse(argv, strict: @switches)

    repo_module = resolve_repo(opts)

    result =
      Ecto.Migrator.with_repo(
        repo_module,
        fn _repo ->
          # Map the --format string flag to a known atom explicitly. Avoids
          # String.to_existing_atom (fragile: the target atom may not be
          # registered yet at runtime) and never creates atoms from arbitrary
          # CLI input (T-48-05 mitigation). Unknown values fall back to human.
          format =
            case Keyword.get(opts, :format, "human") do
              "json" -> :json
              _ -> :human
            end

          run_simulate(opts, format)
        end,
        pool_size: 2
      )

    case result do
      {:ok, exit_code, _apps} ->
        System.halt(exit_code)

      {:error, reason} ->
        Mix.shell().error(
          "Oban Powertools Limiter.Simulate: cannot start repo — #{inspect(reason)}\n" <>
            "Configure your repo with: config :oban_powertools, repo: MyApp.Repo\n" <>
            "Or pass the flag: mix oban_powertools.limiter.simulate --repo MyApp.Repo"
        )

        System.halt(2)
    end
  end

  # ---------------------------------------------------------------------------
  # Simulate core — calls ONLY compute_reservation/4, zero side effects
  # ---------------------------------------------------------------------------

  defp run_simulate(opts, format) do
    with {:ok, worker_mod} <- resolve_worker(opts),
         {:ok, config} <- resolve_worker_config(worker_mod, opts),
         capacity = Keyword.get(opts, :bucket_capacity, config.bucket_capacity),
         span_ms = Keyword.get(opts, :bucket_span_ms, config.bucket_span_ms),
         weight = Keyword.get(opts, :weight, config.weight),
         count = Keyword.get(opts, :count, 1),
         partition = Keyword.get(opts, :partition, config.partition_key),
         :ok <-
           validate_positive([
             {capacity, "--bucket-capacity"},
             {span_ms, "--bucket-span-ms"},
             {weight, "--weight"},
             {count, "--count"}
           ]) do
      # Synthetic Resource struct — never touches the DB for simulation
      resource = %Resource{
        name: config.resource_name,
        bucket_capacity: capacity,
        bucket_span_ms: span_ms,
        scope_kind: config.scope_kind
      }

      now = DateTime.utc_now()

      # Fresh empty bucket per D-07 — simulation always starts from zero tokens used
      initial_state = %State{
        partition_key: partition,
        tokens_used: 0,
        bucket_started_at: now,
        cooldown_until: nil,
        cooldown_reason: nil
      }

      verdicts = simulate_reservations(resource, initial_state, weight, count, now)
      print_simulation(verdicts, resource, weight, count, partition, format)
      0
    else
      {:error, :no_worker} ->
        Mix.shell().error("--worker MOD is required")
        2

      {:error, :unknown_module, mod_string} ->
        Mix.shell().error("unknown --worker module: #{mod_string}")
        2

      {:error, :no_limits} ->
        Mix.shell().error("worker has no :limits configured — nothing to simulate")
        2

      {:error, {:bad_override, message}} ->
        Mix.shell().error(message)
        2
    end
  end

  # Validate effective numeric inputs are positive integers before simulating, mirroring
  # the worker macro's `validate_positive_integer!` contract (WR-03/WR-01). A non-positive
  # bucket/span/weight/count would otherwise produce a preview that silently lies
  # (e.g. `--bucket-span-ms 0` resets every call; `--count 0` iterates the descending
  # range `1..0` and emits a bogus request "0").
  defp validate_positive(pairs) do
    Enum.reduce_while(pairs, :ok, fn {value, flag}, _acc ->
      if is_integer(value) and value > 0 do
        {:cont, :ok}
      else
        {:halt, {:error, {:bad_override, "#{flag} must be a positive integer"}}}
      end
    end)
  end

  # Sequential reservation loop — pure computation, no DB access, no telemetry.
  # Calls ONLY ObanPowertools.Limits.compute_reservation/4.
  defp simulate_reservations(resource, initial_state, weight, count, now) do
    Enum.reduce(1..count, {initial_state, []}, fn i, {state, acc} ->
      case Limits.compute_reservation(state, resource, weight, now) do
        {:reserved, new_tokens_used} ->
          new_state = %{state | tokens_used: new_tokens_used}
          verdict = %{request: i, result: :reserved, tokens_used: new_tokens_used}
          {new_state, [verdict | acc]}

        {:blocked, code, retry_at, details} ->
          verdict = %{
            request: i,
            result: :blocked,
            blocker_code: code,
            retry_at: retry_at,
            details: details
          }

          {state, [verdict | acc]}
      end
    end)
    |> elem(1)
    |> Enum.reverse()
  end

  # ---------------------------------------------------------------------------
  # Formatting
  # ---------------------------------------------------------------------------

  defp print_simulation(verdicts, resource, weight, count, partition, :human) do
    Mix.shell().info(
      "\nSimulating #{count} request(s) against #{colorize(resource.name, IO.ANSI.cyan())} " <>
        "(capacity: #{resource.bucket_capacity}, span: #{resource.bucket_span_ms}ms, " <>
        "weight: #{weight}, partition: #{partition})\n"
    )

    Enum.each(verdicts, fn verdict ->
      case verdict.result do
        :reserved ->
          Mix.shell().info(
            "  Request #{verdict.request}: #{colorize("reserved", IO.ANSI.green())} " <>
              "(tokens_used: #{verdict.tokens_used}/#{resource.bucket_capacity})"
          )

        :blocked ->
          retry_str =
            if verdict.retry_at,
              do: " retry_at=#{DateTime.to_iso8601(verdict.retry_at)}",
              else: ""

          Mix.shell().info(
            "  Request #{verdict.request}: #{colorize("blocked", IO.ANSI.red())} " <>
              "(#{verdict.blocker_code}#{retry_str})"
          )
      end
    end)

    Mix.shell().info("")
  end

  defp print_simulation(verdicts, resource, weight, count, partition, :json) do
    serialized =
      Enum.map(verdicts, fn verdict ->
        base = %{
          request: verdict.request,
          result: verdict.result
        }

        case verdict.result do
          :reserved ->
            Map.put(base, :tokens_used, verdict.tokens_used)

          :blocked ->
            base
            |> Map.put(:blocker_code, verdict.blocker_code)
            |> Map.put(:retry_at, verdict.retry_at && DateTime.to_iso8601(verdict.retry_at))
            |> Map.put(:details, verdict.details)
        end
      end)

    payload = %{
      schema_version: 1,
      resource: resource.name,
      bucket_capacity: resource.bucket_capacity,
      bucket_span_ms: resource.bucket_span_ms,
      weight: weight,
      count: count,
      partition: partition,
      verdicts: serialized
    }

    Mix.shell().info(Jason.encode!(payload))
  end

  defp colorize(text, color) do
    if IO.ANSI.enabled?() do
      [color, text, IO.ANSI.reset()] |> IO.ANSI.format() |> IO.iodata_to_binary()
    else
      text
    end
  end

  # ---------------------------------------------------------------------------
  # Worker config resolution (Pitfall 4 workaround — avoids limit_snapshot/2
  # which raises ArgumentError for partition_by: {:args, key} with empty args)
  # ---------------------------------------------------------------------------

  # Read @powertools_limits directly and apply --partition/defaults manually
  # rather than calling Worker.limit_snapshot(mod, %{}) which raises for
  # partitioned workers when args is %{} (Pitfall 4 / T-49-09 mitigation).
  defp resolve_worker_config(worker_mod, opts) do
    if function_exported?(worker_mod, :__powertools_limits__, 0) do
      limits = worker_mod.__powertools_limits__()

      if limits == [] do
        {:error, :no_limits}
      else
        partition_key = Keyword.get(opts, :partition, "__global__")
        weight = Keyword.get(opts, :weight, limits[:default_weight] || 1)

        # Nil-safe scope_kind: :scope is absent for default-scoped (global) workers,
        # so `limits[:scope]` may be nil. Default nil to :global before converting
        # to avoid ArgumentError from Atom.to_string(nil). (T-49-09 / Pitfall 4)
        scope_kind = (limits[:scope] || :global) |> Atom.to_string()

        {:ok,
         %{
           resource_name: limits[:name],
           scope_kind: scope_kind,
           bucket_capacity: limits[:bucket_capacity],
           bucket_span_ms: limits[:bucket_span_ms],
           weight: weight,
           partition_key: partition_key
         }}
      end
    else
      {:error, :no_limits}
    end
  end

  # ---------------------------------------------------------------------------
  # Repo resolution (D-07 / T-48-05)
  # ---------------------------------------------------------------------------

  defp resolve_repo(opts) do
    case Keyword.get(opts, :repo) do
      nil ->
        # Fallback to the project's ObanPowertools.RuntimeConfig contract.
        ObanPowertools.RuntimeConfig.repo!()

      repo_string ->
        # Safe atom resolution: use Module.safe_concat which normalises the string
        # into a proper module atom without invoking String.to_atom on raw CLI input
        # (T-48-05 mitigation — never String.to_atom/1 on user-supplied input).
        Module.safe_concat([repo_string])
    end
  end

  # ---------------------------------------------------------------------------
  # Worker resolution (T-48-05)
  # ---------------------------------------------------------------------------

  defp resolve_worker(opts) do
    case Keyword.get(opts, :worker) do
      nil ->
        {:error, :no_worker}

      worker_string ->
        # Module.safe_concat normalises the string into a module atom without
        # invoking String.to_atom on raw CLI input (T-48-05 mitigation).
        {:ok, Module.safe_concat([worker_string])}
    end
  end
end