defmodule Mix.Tasks.ObanPowertools.Limiter.Explain do
use Mix.Task
import Ecto.Query
@shortdoc "Explain a limiter's current blocking state (read-only)"
@moduledoc """
Explains a limiter's current blocking state by querying the live limiter state
and the latest persisted blocker snapshot. Read-only — never mutates limiter state.
## Usage
# Resource-primary path (matches vocabulary in the Limiters UI):
mix oban_powertools.limiter.explain --resource my-resource
mix oban_powertools.limiter.explain --resource my-resource --partition user-42
# Worker + args secondary path (maps to Explain.explain/3):
mix oban_powertools.limiter.explain --worker MyApp.Workers.ApiWorker
mix oban_powertools.limiter.explain --worker MyApp.Workers.ApiWorker --args '{"user_id":1}'
## Exit Codes
| Code | Meaning |
|------|---------|
| 0 | Ran successfully — result is in stdout |
| 2 | Cannot run: no repo, DB unreachable, or unknown --worker module |
## Flags
--resource NAME Limiter resource name (primary path). Resolves live
state from the DB and renders via explain_snapshot/2.
--partition KEY Partition key (default: "__global__"). Use with --resource.
--worker MOD Worker module (secondary path). Maps to Explain.explain/3.
--args JSON JSON args string for --worker (default: "{}").
--repo MyApp.Repo Ecto repo module. Falls back to
`config :oban_powertools, repo: MyApp.Repo`.
--format human|json Output format. "human" degrades ANSI in CI/non-TTY.
"json" emits schema_version: 1 stability contract.
## Boot Strategy
This task starts only the Ecto repo via `Ecto.Migrator.with_repo/2`. It does **not**
start Oban or any queue/worker supervision tree. It is safe to run around deploys
without triggering job processing.
## Rate-Limit Glossary
Source: `ObanPowertools.Limits.Glossary.text/0`
**token_bucket** — The rate-limiting algorithm used by ObanPowertools limiters. Each
partition has a bucket of `bucket_capacity` tokens. Each reservation consumes `weight`
tokens. The bucket refills (resets to zero tokens used) after `bucket_span_ms`
milliseconds have elapsed since `bucket_started_at`.
**bucket_capacity** — The maximum number of tokens available per bucket window. A
reservation that would bring `tokens_used + weight` above this value is blocked with
the `limit_reached` blocker code.
**bucket_span_ms** — The duration of one bucket window in milliseconds. After this
interval elapses since `bucket_started_at`, the bucket resets and tokens are available
again. Used to compute `retry_at` for a `limit_reached` block.
**weight** — The per-reservation token cost. Defaults to the resource's
`default_weight` (usually 1). Each successful reservation consumes `weight` tokens from
the bucket.
**weight_by** — A dynamic weight resolver declared on the worker (e.g.
`weight_by: {:args, :cost}`). At enqueue time the resolved value is bound to the
reservation snapshot as the effective `weight`.
**partition** — A named isolation group within a limiter resource. Each partition
maintains its own independent token bucket. For `scope: :global` limiters there is
one partition (`__global__`); for `scope: :partitioned` there is one bucket per
resolved `partition_key`.
**partition_by** — A dynamic partition key resolver declared on the worker (e.g.
`partition_by: {:args, :user_id}`). At enqueue time the resolved value becomes the
`partition_key` used to look up the correct bucket.
**scope** — The partitioning strategy for a limiter resource. `global` means one
shared bucket across all callers. `partitioned` means one independent bucket per
resolved `partition_key`, enabling per-user, per-account, or per-tenant limits.
**cooldown** — An operator-set hold on a partition until a specific `DateTime`. While
a cooldown is active, all reservations for that partition are blocked with the
`cooldown` blocker code regardless of remaining bucket capacity. Useful for
propagating backpressure signals (e.g. HTTP 429 responses) into the limiter.
**limit_reached** — Blocker code returned when `tokens_used + weight > bucket_capacity`.
The `retry_at` field indicates when the bucket will reset
(`bucket_started_at + bucket_span_ms`, clamped to at least now).
**cooldown** (blocker code) — Blocker code returned when a resource partition is under
an active operator cooldown. The `retry_at` field is `cooldown_until` — the
`DateTime` at which the cooldown expires and reservations are permitted again.
"""
@switches [
repo: :string,
format: :string,
resource: :string,
partition: :string,
worker: :string,
args: :string
]
@impl Mix.Task
def run(argv) do
Mix.Task.run("app.config")
{opts, _args, _invalid} = OptionParser.parse(argv, strict: @switches)
repo_module = resolve_repo(opts)
result =
Ecto.Migrator.with_repo(
repo_module,
fn repo ->
format =
case Keyword.get(opts, :format, "human") do
"json" -> :json
_ -> :human
end
dispatch(repo, opts, format)
end,
pool_size: 2
)
case result do
{:ok, exit_code, _apps} ->
System.halt(exit_code)
{:error, reason} ->
Mix.shell().error(
"Oban Powertools Limiter.Explain: cannot start repo — #{inspect(reason)}\n" <>
"Configure your repo with: config :oban_powertools, repo: MyApp.Repo\n" <>
"Or pass the flag: mix oban_powertools.limiter.explain --repo MyApp.Repo"
)
System.halt(2)
end
end
# ---------------------------------------------------------------------------
# Dispatch: resource-primary, worker-secondary, or usage error
# ---------------------------------------------------------------------------
@doc false
def dispatch(repo, opts, format) do
cond do
Keyword.has_key?(opts, :resource) ->
run_resource_path(repo, opts, format)
Keyword.has_key?(opts, :worker) ->
run_worker_path(repo, opts, format)
true ->
Mix.shell().error(
"Oban Powertools Limiter.Explain: provide --resource NAME or --worker MOD\n" <>
" Resource path: mix oban_powertools.limiter.explain --resource my-limiter\n" <>
" Worker path: mix oban_powertools.limiter.explain --worker MyApp.MyWorker"
)
2
end
end
# ---------------------------------------------------------------------------
# Resource-primary path (D-03/D-04)
# ---------------------------------------------------------------------------
defp run_resource_path(repo, opts, format) do
resource_name = Keyword.get(opts, :resource)
partition_key = Keyword.get(opts, :partition, "__global__")
snapshot =
repo.one(
from(event in ObanPowertools.Explain,
where: event.scope_id == ^resource_name,
order_by: [desc: event.captured_at],
limit: 1
)
)
case snapshot do
nil ->
# Honest empty state (D-04): no snapshot recorded yet → report runnable
print_empty_state(resource_name, partition_key, format)
0
snap ->
explanation = ObanPowertools.Explain.explain_snapshot(snap, repo: repo)
normalized_status = normalize_status(explanation.status)
print_explanation(%{explanation | status: normalized_status}, resource_name, format)
0
end
end
# ---------------------------------------------------------------------------
# Worker-secondary path (D-03)
# ---------------------------------------------------------------------------
defp run_worker_path(repo, opts, format) do
# Resolve the worker's declared limiter snapshot FIRST. `Explain.explain/3` returns
# a plain map even for a no-limits worker (its `with {:ok, snapshot}` binds nil and
# falls through to status: :runnable), so we cannot distinguish "no limiter at all"
# from "runnable" downstream — we must detect the nil snapshot here (CR-01, D-02).
with {:ok, worker_mod} <- resolve_worker(opts),
{:ok, parsed_args} <- parse_args_json(opts),
{:ok, snapshot} when not is_nil(snapshot) <-
worker_limit_snapshot(worker_mod, parsed_args) do
explanation = ObanPowertools.Explain.explain(worker_mod, parsed_args, repo: repo)
normalized = normalize_status(explanation.status)
print_explanation(%{explanation | status: normalized}, inspect(worker_mod), format)
0
else
{:ok, nil} ->
Mix.shell().error("worker has no limits configured")
2
{:error, :not_loaded, mod_string} ->
Mix.shell().error("unknown --worker module: #{mod_string}")
2
{:error, :invalid_json} ->
Mix.shell().error(
"--args must be a valid JSON object string (e.g. '{\"key\":\"value\"}')"
)
2
{:error, :bad_args, message} ->
Mix.shell().error(message)
2
end
end
# `Worker.limit_snapshot/2` can raise ArgumentError for `partition_by`/`weight_by`
# workers when the required args key is missing (Pitfall 4). Rescue and map to a
# cannot-run exit 2 instead of letting the task crash.
defp worker_limit_snapshot(worker_mod, args) do
ObanPowertools.Worker.limit_snapshot(worker_mod, args)
rescue
ArgumentError ->
{:error, :bad_args,
"worker requires args to resolve its limiter (partition_by/weight_by); " <>
"pass --args with the required keys"}
end
# ---------------------------------------------------------------------------
# Output formatting
# ---------------------------------------------------------------------------
defp print_empty_state(resource_name, partition_key, :json) do
payload = %{
schema_version: 1,
resource: resource_name,
partition: partition_key,
status: "runnable",
message: "no limiter state recorded yet",
blockers: []
}
IO.puts(Jason.encode!(payload))
end
defp print_empty_state(resource_name, partition_key, :human) do
Mix.shell().info(
colorize("Limiter: #{resource_name} (partition: #{partition_key})", IO.ANSI.cyan()) <>
"\n" <>
colorize("Status: ", IO.ANSI.bright()) <>
colorize("runnable", IO.ANSI.green()) <>
"\nMessage: no limiter state recorded yet"
)
end
defp print_explanation(explanation, resource_name, :json) do
payload = %{
schema_version: 1,
resource: resource_name,
status: to_string(explanation.status),
blockers: format_blockers_json(explanation.blockers),
live_now: format_blockers_json(explanation.live_now)
}
IO.puts(Jason.encode!(payload))
end
defp print_explanation(explanation, resource_name, :human) do
status_text =
case explanation.status do
:runnable -> colorize("runnable", IO.ANSI.green())
:blocked -> colorize("blocked", IO.ANSI.red())
_ -> colorize(to_string(explanation.status), IO.ANSI.yellow())
end
header = colorize("Limiter: #{resource_name}", IO.ANSI.cyan())
status_line = colorize("Status: ", IO.ANSI.bright()) <> status_text
blocker_lines =
case explanation.blockers do
[] ->
""
blockers ->
formatted =
Enum.map_join(blockers, "\n", fn b ->
retry_str =
if b[:retry_at],
do: " (retry at: #{b.retry_at})",
else: ""
" [#{b.code}] #{b.summary}#{retry_str}"
end)
"\nBlockers:\n" <> formatted
end
Mix.shell().info(header <> "\n" <> status_line <> blocker_lines)
end
defp format_blockers_json(blockers) when is_list(blockers) do
Enum.map(blockers, fn b ->
%{
code: b[:code] || b.code,
summary: b[:summary] || b.summary,
retry_at: format_datetime(b[:retry_at] || b[:retry_at]),
scope: b[:scope] || b.scope,
details: b[:details] || b.details
}
end)
end
defp format_blockers_json(_), do: []
defp format_datetime(nil), do: nil
defp format_datetime(%DateTime{} = dt), do: DateTime.to_iso8601(dt)
defp format_datetime(other), do: inspect(other)
defp colorize(text, color) do
if IO.ANSI.enabled?() do
[color, text, IO.ANSI.reset()] |> IO.ANSI.format() |> IO.iodata_to_binary()
else
text
end
end
# ---------------------------------------------------------------------------
# Status normalization (T-49-03 / T-48-05: never String.to_atom on raw values)
# ---------------------------------------------------------------------------
defp normalize_status(status) when is_binary(status) do
case status do
"runnable" -> :runnable
"blocked" -> :blocked
_ -> :unknown
end
end
defp normalize_status(status) when is_atom(status), do: status
# ---------------------------------------------------------------------------
# Repo resolution (T-48-05 / T-49-03)
# ---------------------------------------------------------------------------
defp resolve_repo(opts) do
case Keyword.get(opts, :repo) do
nil ->
ObanPowertools.RuntimeConfig.repo!()
repo_string ->
Module.safe_concat([repo_string])
end
end
# ---------------------------------------------------------------------------
# Worker resolution (T-49-03)
# ---------------------------------------------------------------------------
defp resolve_worker(opts) do
case Keyword.get(opts, :worker) do
nil ->
# Should not happen given the dispatch guard, but be explicit
{:error, :not_loaded, "nil"}
worker_string ->
try do
mod = Module.safe_concat([worker_string])
if Code.ensure_loaded?(mod) do
{:ok, mod}
else
{:error, :not_loaded, worker_string}
end
rescue
ArgumentError ->
# Module.safe_concat raises ArgumentError if the module atom does not
# already exist in the VM — treat as unknown module (T-49-03, D-04)
{:error, :not_loaded, worker_string}
end
end
end
# ---------------------------------------------------------------------------
# JSON args parsing (T-49-04: never atomize untrusted keys)
# ---------------------------------------------------------------------------
defp parse_args_json(opts) do
json_string = Keyword.get(opts, :args, "{}")
case Jason.decode(json_string) do
{:ok, map} when is_map(map) ->
{:ok, map}
{:ok, _other} ->
{:error, :invalid_json}
{:error, _} ->
{:error, :invalid_json}
end
end
end