Skip to main content

src/aws@internal@retry@rate_limiter.erl

-module(aws@internal@retry@rate_limiter).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/aws/internal/retry/rate_limiter.gleam").
-export([permit_cost/1, start/2, start_default/0, try_acquire/2, release/2, reward_success/1, current/1, shutdown/1, shutdown_sync/2]).
-export_type([bucket/0, permit/0, acquire_result/0, message/0, bucket_state/0, state/0, start_error/0]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(
    " Token-bucket retry rate limiter, port of the AWS Smithy runtime's\n"
    " `TokenBucket` (aws-sdk-rust/.../client/retries/token_bucket.rs).\n"
    "\n"
    " Semantics:\n"
    "\n"
    "   - Each retry attempt `acquire`s tokens. The cost depends on the error\n"
    "     class (see retry.gleam: `retry_cost` 5 for normal retryable,\n"
    "     `timeout_retry_cost` 10 for timeouts / transient).\n"
    "   - The acquired tokens are HELD as a `Permit` while the retry is in\n"
    "     flight. On success or final non-retryable outcome the caller\n"
    "     `release`s the permit, returning the tokens. On retry-failure-then-\n"
    "     -retry, the caller releases the prior permit before acquiring the\n"
    "     next (Rust does this via permit drop on `set_retry_permit`).\n"
    "   - `reward_success` additionally tops up the bucket by `success_reward`\n"
    "     tokens. Rust's default is **0** — the bucket is a concurrent-retry\n"
    "     semaphore, not an AIMD-style limiter; the time-based `refill_rate`\n"
    "     handles long-term recovery in Rust. We follow Rust's default.\n"
    "\n"
    " Concurrency: messages are processed sequentially per `gleam_otp`\n"
    " actor semantics.\n"
).

-opaque bucket() :: {bucket, gleam@erlang@process:subject(message())}.

-opaque permit() :: {permit, integer()}.

-type acquire_result() :: {acquired, permit()} | empty.

-type message() :: {try_acquire,
        integer(),
        gleam@erlang@process:subject(acquire_result())} |
    {release, integer()} |
    reward_success |
    {read, gleam@erlang@process:subject(bucket_state())} |
    stop.

-type bucket_state() :: {bucket_state, integer(), integer()}.

-type state() :: {state, integer(), integer(), integer()}.

-type start_error() :: {start_failed, gleam@otp@actor:start_error()}.

-file("src/aws/internal/retry/rate_limiter.gleam", 38).
-spec permit_cost(permit()) -> integer().
permit_cost(P) ->
    erlang:element(2, P).

-file("src/aws/internal/retry/rate_limiter.gleam", 195).
-spec cap_at(integer(), integer()) -> integer().
cap_at(Value, Max) ->
    case Value > Max of
        true ->
            Max;

        false ->
            Value
    end.

-file("src/aws/internal/retry/rate_limiter.gleam", 162).
-spec handle(state(), message()) -> gleam@otp@actor:next(state(), message()).
handle(State, Message) ->
    case Message of
        stop ->
            gleam@otp@actor:stop();

        {try_acquire, Cost, Reply} ->
            case erlang:element(2, State) >= Cost of
                true ->
                    gleam@erlang@process:send(Reply, {acquired, {permit, Cost}}),
                    gleam@otp@actor:continue(
                        {state,
                            erlang:element(2, State) - Cost,
                            erlang:element(3, State),
                            erlang:element(4, State)}
                    );

                false ->
                    gleam@erlang@process:send(Reply, empty),
                    gleam@otp@actor:continue(State)
            end;

        {release, Cost@1} ->
            New_avail = cap_at(
                erlang:element(2, State) + Cost@1,
                erlang:element(3, State)
            ),
            gleam@otp@actor:continue(
                {state,
                    New_avail,
                    erlang:element(3, State),
                    erlang:element(4, State)}
            );

        reward_success ->
            New_avail@1 = cap_at(
                erlang:element(2, State) + erlang:element(4, State),
                erlang:element(3, State)
            ),
            gleam@otp@actor:continue(
                {state,
                    New_avail@1,
                    erlang:element(3, State),
                    erlang:element(4, State)}
            );

        {read, Reply@1} ->
            gleam@erlang@process:send(
                Reply@1,
                {bucket_state,
                    erlang:element(2, State),
                    erlang:element(3, State)}
            ),
            gleam@otp@actor:continue(State)
    end.

-file("src/aws/internal/retry/rate_limiter.gleam", 79).
?DOC(" Start a bucket with explicit capacity + reward.\n").
-spec start(integer(), integer()) -> {ok, bucket()} | {error, start_error()}.
start(Capacity, Success_reward) ->
    Initial = {state, Capacity, Capacity, Success_reward},
    case begin
        _pipe = gleam@otp@actor:new(Initial),
        _pipe@1 = gleam@otp@actor:on_message(_pipe, fun handle/2),
        gleam@otp@actor:start(_pipe@1)
    end of
        {ok, Started} ->
            {ok, {bucket, erlang:element(3, Started)}};

        {error, Reason} ->
            {error, {start_failed, Reason}}
    end.

-file("src/aws/internal/retry/rate_limiter.gleam", 96).
?DOC(" Start a bucket with Rust SDK defaults (500 / 0).\n").
-spec start_default() -> {ok, bucket()} | {error, start_error()}.
start_default() ->
    start(500, 0).

-file("src/aws/internal/retry/rate_limiter.gleam", 113).
?DOC(
    " Try to acquire `cost` tokens. Returns `Acquired(permit)` if the bucket\n"
    " can pay; `Empty` if not (caller must NOT retry).\n"
    "\n"
    " Degrade-open on a dead/unresponsive bucket: if the actor has crashed or\n"
    " doesn't reply in time, `safe_call` returns `Error` and we hand back a\n"
    " zero-cost `Acquired` permit rather than `Empty`. Rationale: the limiter\n"
    " is a *best-effort* concurrent-retry semaphore, not a correctness gate —\n"
    " `Empty` means \"stop retrying\", so returning it on a dead limiter would\n"
    " silently suppress legitimate retries and degrade availability. Treating\n"
    " an unavailable limiter as \"no throttle\" keeps the request path working;\n"
    " the zero-cost permit makes the later `release`/`reward_success` sends\n"
    " (which Erlang drops to a dead Pid anyway) harmless no-ops. The original\n"
    " fix for #30: a panicking limiter must never crash the request path.\n"
).
-spec try_acquire(bucket(), integer()) -> acquire_result().
try_acquire(Bucket, Cost) ->
    case aws@internal@actor_lifecycle:safe_call(
        erlang:element(2, Bucket),
        1000,
        fun(Reply) -> {try_acquire, Cost, Reply} end
    ) of
        {ok, Result} ->
            Result;

        {error, nil} ->
            aws@internal@log:warning(
                <<"aws retry rate limiter: actor unavailable — proceeding without throttle"/utf8>>
            ),
            {acquired, {permit, 0}}
    end.

-file("src/aws/internal/retry/rate_limiter.gleam", 132).
?DOC(
    " Return the permit's tokens to the bucket. Called on success, on a final\n"
    " non-retryable response, and before replacing one held permit with a new\n"
    " one mid-retry-loop.\n"
).
-spec release(bucket(), permit()) -> nil.
release(Bucket, Permit) ->
    gleam@erlang@process:send(
        erlang:element(2, Bucket),
        {release, erlang:element(2, Permit)}
    ).

-file("src/aws/internal/retry/rate_limiter.gleam", 138).
?DOC(
    " Top up the bucket by `success_reward` tokens, capped at `capacity`.\n"
    " Called once per successful operation.\n"
).
-spec reward_success(bucket()) -> nil.
reward_success(Bucket) ->
    gleam@erlang@process:send(erlang:element(2, Bucket), reward_success).

-file("src/aws/internal/retry/rate_limiter.gleam", 143).
?DOC(" Read the current bucket state. Tests only.\n").
-spec current(bucket()) -> bucket_state().
current(Bucket) ->
    gleam@otp@actor:call(
        erlang:element(2, Bucket),
        1000,
        fun(Field@0) -> {read, Field@0} end
    ).

-file("src/aws/internal/retry/rate_limiter.gleam", 151).
?DOC(
    " Tell the bucket actor to exit. Fire-and-forget. Mirrors\n"
    " `credentials_cache.shutdown`; both call into\n"
    " `aws/internal/actor_lifecycle.shutdown_via_stop`. Safe to call\n"
    " multiple times.\n"
).
-spec shutdown(bucket()) -> nil.
shutdown(Bucket) ->
    aws@internal@actor_lifecycle:shutdown_via_stop(
        erlang:element(2, Bucket),
        stop
    ).

-file("src/aws/internal/retry/rate_limiter.gleam", 158).
?DOC(
    " Synchronous teardown — monitors the actor, sends `Stop`, waits for\n"
    " `DOWN`. `Ok(Nil)` on clean exit, `Error(Nil)` on real timeout.\n"
    " Idempotent for already-dead actors.\n"
).
-spec shutdown_sync(bucket(), integer()) -> {ok, nil} | {error, nil}.
shutdown_sync(Bucket, Timeout_ms) ->
    aws@internal@actor_lifecycle:shutdown_via_stop_sync(
        erlang:element(2, Bucket),
        stop,
        Timeout_ms
    ).