-module(aws@internal@retry@rate_limiter).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/aws/internal/retry/rate_limiter.gleam").
-export([permit_cost/1, start/2, start_default/0, try_acquire/2, release/2, reward_success/1, current/1, shutdown/1, shutdown_sync/2]).
-export_type([bucket/0, permit/0, acquire_result/0, message/0, bucket_state/0, state/0, start_error/0]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(
" Token-bucket retry rate limiter, port of the AWS Smithy runtime's\n"
" `TokenBucket` (aws-sdk-rust/.../client/retries/token_bucket.rs).\n"
"\n"
" Semantics:\n"
"\n"
" - Each retry attempt `acquire`s tokens. The cost depends on the error\n"
" class (see retry.gleam: `retry_cost` 5 for normal retryable,\n"
" `timeout_retry_cost` 10 for timeouts / transient).\n"
" - The acquired tokens are HELD as a `Permit` while the retry is in\n"
" flight. On success or final non-retryable outcome the caller\n"
" `release`s the permit, returning the tokens. On retry-failure-then-\n"
" -retry, the caller releases the prior permit before acquiring the\n"
" next (Rust does this via permit drop on `set_retry_permit`).\n"
" - `reward_success` additionally tops up the bucket by `success_reward`\n"
" tokens. Rust's default is **0** — the bucket is a concurrent-retry\n"
" semaphore, not an AIMD-style limiter; the time-based `refill_rate`\n"
" handles long-term recovery in Rust. We follow Rust's default.\n"
"\n"
" Concurrency: messages are processed sequentially per `gleam_otp`\n"
" actor semantics.\n"
).
-opaque bucket() :: {bucket, gleam@erlang@process:subject(message())}.
-opaque permit() :: {permit, integer()}.
-type acquire_result() :: {acquired, permit()} | empty.
-type message() :: {try_acquire,
integer(),
gleam@erlang@process:subject(acquire_result())} |
{release, integer()} |
reward_success |
{read, gleam@erlang@process:subject(bucket_state())} |
stop.
-type bucket_state() :: {bucket_state, integer(), integer()}.
-type state() :: {state, integer(), integer(), integer()}.
-type start_error() :: {start_failed, gleam@otp@actor:start_error()}.
-file("src/aws/internal/retry/rate_limiter.gleam", 38).
-spec permit_cost(permit()) -> integer().
permit_cost(P) ->
erlang:element(2, P).
-file("src/aws/internal/retry/rate_limiter.gleam", 195).
-spec cap_at(integer(), integer()) -> integer().
cap_at(Value, Max) ->
case Value > Max of
true ->
Max;
false ->
Value
end.
-file("src/aws/internal/retry/rate_limiter.gleam", 162).
-spec handle(state(), message()) -> gleam@otp@actor:next(state(), message()).
handle(State, Message) ->
case Message of
stop ->
gleam@otp@actor:stop();
{try_acquire, Cost, Reply} ->
case erlang:element(2, State) >= Cost of
true ->
gleam@erlang@process:send(Reply, {acquired, {permit, Cost}}),
gleam@otp@actor:continue(
{state,
erlang:element(2, State) - Cost,
erlang:element(3, State),
erlang:element(4, State)}
);
false ->
gleam@erlang@process:send(Reply, empty),
gleam@otp@actor:continue(State)
end;
{release, Cost@1} ->
New_avail = cap_at(
erlang:element(2, State) + Cost@1,
erlang:element(3, State)
),
gleam@otp@actor:continue(
{state,
New_avail,
erlang:element(3, State),
erlang:element(4, State)}
);
reward_success ->
New_avail@1 = cap_at(
erlang:element(2, State) + erlang:element(4, State),
erlang:element(3, State)
),
gleam@otp@actor:continue(
{state,
New_avail@1,
erlang:element(3, State),
erlang:element(4, State)}
);
{read, Reply@1} ->
gleam@erlang@process:send(
Reply@1,
{bucket_state,
erlang:element(2, State),
erlang:element(3, State)}
),
gleam@otp@actor:continue(State)
end.
-file("src/aws/internal/retry/rate_limiter.gleam", 79).
?DOC(" Start a bucket with explicit capacity + reward.\n").
-spec start(integer(), integer()) -> {ok, bucket()} | {error, start_error()}.
start(Capacity, Success_reward) ->
Initial = {state, Capacity, Capacity, Success_reward},
case begin
_pipe = gleam@otp@actor:new(Initial),
_pipe@1 = gleam@otp@actor:on_message(_pipe, fun handle/2),
gleam@otp@actor:start(_pipe@1)
end of
{ok, Started} ->
{ok, {bucket, erlang:element(3, Started)}};
{error, Reason} ->
{error, {start_failed, Reason}}
end.
-file("src/aws/internal/retry/rate_limiter.gleam", 96).
?DOC(" Start a bucket with Rust SDK defaults (500 / 0).\n").
-spec start_default() -> {ok, bucket()} | {error, start_error()}.
start_default() ->
start(500, 0).
-file("src/aws/internal/retry/rate_limiter.gleam", 113).
?DOC(
" Try to acquire `cost` tokens. Returns `Acquired(permit)` if the bucket\n"
" can pay; `Empty` if not (caller must NOT retry).\n"
"\n"
" Degrade-open on a dead/unresponsive bucket: if the actor has crashed or\n"
" doesn't reply in time, `safe_call` returns `Error` and we hand back a\n"
" zero-cost `Acquired` permit rather than `Empty`. Rationale: the limiter\n"
" is a *best-effort* concurrent-retry semaphore, not a correctness gate —\n"
" `Empty` means \"stop retrying\", so returning it on a dead limiter would\n"
" silently suppress legitimate retries and degrade availability. Treating\n"
" an unavailable limiter as \"no throttle\" keeps the request path working;\n"
" the zero-cost permit makes the later `release`/`reward_success` sends\n"
" (which Erlang drops to a dead Pid anyway) harmless no-ops. The original\n"
" fix for #30: a panicking limiter must never crash the request path.\n"
).
-spec try_acquire(bucket(), integer()) -> acquire_result().
try_acquire(Bucket, Cost) ->
case aws@internal@actor_lifecycle:safe_call(
erlang:element(2, Bucket),
1000,
fun(Reply) -> {try_acquire, Cost, Reply} end
) of
{ok, Result} ->
Result;
{error, nil} ->
aws@internal@log:warning(
<<"aws retry rate limiter: actor unavailable — proceeding without throttle"/utf8>>
),
{acquired, {permit, 0}}
end.
-file("src/aws/internal/retry/rate_limiter.gleam", 132).
?DOC(
" Return the permit's tokens to the bucket. Called on success, on a final\n"
" non-retryable response, and before replacing one held permit with a new\n"
" one mid-retry-loop.\n"
).
-spec release(bucket(), permit()) -> nil.
release(Bucket, Permit) ->
gleam@erlang@process:send(
erlang:element(2, Bucket),
{release, erlang:element(2, Permit)}
).
-file("src/aws/internal/retry/rate_limiter.gleam", 138).
?DOC(
" Top up the bucket by `success_reward` tokens, capped at `capacity`.\n"
" Called once per successful operation.\n"
).
-spec reward_success(bucket()) -> nil.
reward_success(Bucket) ->
gleam@erlang@process:send(erlang:element(2, Bucket), reward_success).
-file("src/aws/internal/retry/rate_limiter.gleam", 143).
?DOC(" Read the current bucket state. Tests only.\n").
-spec current(bucket()) -> bucket_state().
current(Bucket) ->
gleam@otp@actor:call(
erlang:element(2, Bucket),
1000,
fun(Field@0) -> {read, Field@0} end
).
-file("src/aws/internal/retry/rate_limiter.gleam", 151).
?DOC(
" Tell the bucket actor to exit. Fire-and-forget. Mirrors\n"
" `credentials_cache.shutdown`; both call into\n"
" `aws/internal/actor_lifecycle.shutdown_via_stop`. Safe to call\n"
" multiple times.\n"
).
-spec shutdown(bucket()) -> nil.
shutdown(Bucket) ->
aws@internal@actor_lifecycle:shutdown_via_stop(
erlang:element(2, Bucket),
stop
).
-file("src/aws/internal/retry/rate_limiter.gleam", 158).
?DOC(
" Synchronous teardown — monitors the actor, sends `Stop`, waits for\n"
" `DOWN`. `Ok(Nil)` on clean exit, `Error(Nil)` on real timeout.\n"
" Idempotent for already-dead actors.\n"
).
-spec shutdown_sync(bucket(), integer()) -> {ok, nil} | {error, nil}.
shutdown_sync(Bucket, Timeout_ms) ->
aws@internal@actor_lifecycle:shutdown_via_stop_sync(
erlang:element(2, Bucket),
stop,
Timeout_ms
).