Skip to main content

src/aws@retry.erl

-module(aws@retry).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/aws/retry.gleam").
-export([standard/0, standard_with/5, adaptive/1, adaptive_with/6, with_max_attempts/2, with_base_delay_ms/2, with_max_delay_ms/2, exponential_backoff/2, is_retryable_error_code/1, classify/3, with_retry/2]).
-export_type([decision/0, strategy/0, gate_outcome/0, status_kind/0]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(
    " Retry middleware. Wraps any `http_send.Send` with the retry semantics\n"
    " the AWS Smithy runtime ships\n"
    " (aws-sdk-rust/.../client/retries/strategy/standard.rs).\n"
    "\n"
    " Behaviour mirrored from upstream:\n"
    "\n"
    "   - Exponential backoff with full jitter:\n"
    "     `min(initial * 2^attempt, max) * rand()`.\n"
    "     Matches `calculate_exponential_backoff` in Rust.\n"
    "   - Status-code classifier: 2xx success; 4xx (except 408/429) is\n"
    "     non-retryable; 408/429/5xx retryable; transport errors classified\n"
    "     as `TransientError` (timeout-class) and charged at the higher\n"
    "     `timeout_retry_cost`. Per AWS-Smithy `ErrorKind` mapping.\n"
    "   - `Retry-After` header (integer seconds) overrides the computed\n"
    "     backoff and is clamped to `max_delay`.\n"
    "   - Token bucket gates retries (`try_acquire(cost)`); each retry holds\n"
    "     a permit across attempts that is released on success / final\n"
    "     non-retryable outcome, or replaced before the next retry. Matches\n"
    "     Rust's `set_retry_permit` / `release_retry_permit`.\n"
    "\n"
    " **Deliberately out of scope** (explicit deltas vs Rust SDK, documented\n"
    " for the M4 audit):\n"
    "\n"
    "   - Time-based bucket refill (`refill_rate` in Rust SDK). Our bucket\n"
    "     only changes on acquire/release/reward.\n"
    "   - Pre-request rate limiter gating\n"
    "     (`should_attempt_initial_request`). Only retries are gated.\n"
    "   - The CUBIC `ClientRateLimiter` — Rust ships this as a separate\n"
    "     component for \"true\" adaptive mode. Our `adaptive` builder\n"
    "     installs the token bucket only; CUBIC is a follow-up milestone.\n"
).

-type decision() :: stop | {retry_after, integer(), integer()} | give_up.

-opaque strategy() :: {strategy,
        integer(),
        integer(),
        integer(),
        fun((integer()) -> nil),
        fun(() -> float()),
        gleam@option:option(aws@internal@retry@rate_limiter:bucket())}.

-type gate_outcome() :: no_bucket |
    {got_permit, aws@internal@retry@rate_limiter:permit()} |
    bucket_empty.

-type status_kind() :: not_retryable | retryable.

-file("src/aws/retry.gleam", 87).
-spec standard() -> strategy().
standard() ->
    {strategy,
        3,
        100,
        20000,
        fun gleam_erlang_ffi:sleep/1,
        fun aws_ffi:random_float/0,
        none}.

-file("src/aws/retry.gleam", 98).
-spec standard_with(
    integer(),
    integer(),
    integer(),
    fun((integer()) -> nil),
    fun(() -> float())
) -> strategy().
standard_with(Max_attempts, Base_delay_ms, Max_delay_ms, Sleep, Rng) ->
    {strategy, Max_attempts, Base_delay_ms, Max_delay_ms, Sleep, Rng, none}.

-file("src/aws/retry.gleam", 117).
?DOC(
    " Standard retry with a token-bucket gate. Equivalent to Rust SDK\n"
    " \"adaptive\" minus the CUBIC client rate limiter (see module docs).\n"
).
-spec adaptive(aws@internal@retry@rate_limiter:bucket()) -> strategy().
adaptive(Bucket) ->
    {strategy,
        3,
        100,
        20000,
        fun gleam_erlang_ffi:sleep/1,
        fun aws_ffi:random_float/0,
        {some, Bucket}}.

-file("src/aws/retry.gleam", 128).
-spec adaptive_with(
    aws@internal@retry@rate_limiter:bucket(),
    integer(),
    integer(),
    integer(),
    fun((integer()) -> nil),
    fun(() -> float())
) -> strategy().
adaptive_with(Bucket, Max_attempts, Base_delay_ms, Max_delay_ms, Sleep, Rng) ->
    {strategy,
        Max_attempts,
        Base_delay_ms,
        Max_delay_ms,
        Sleep,
        Rng,
        {some, Bucket}}.

-file("src/aws/retry.gleam", 156).
?DOC(
    " Override the per-request max attempt count on an existing\n"
    " `Strategy`, preserving the other knobs (delays, sleep / rng /\n"
    " rate-limiter). The common case for tuning retry behaviour —\n"
    " callers usually want the AWS-recommended backoff curve but a\n"
    " different attempt budget (1 for fail-fast tests, 5 for\n"
    " long-running batch workloads, etc.).\n"
    "\n"
    " Strategy is opaque so this setter lives here; the runtime\n"
    " re-exports a `with_max_attempts(config, n)` convenience that\n"
    " threads through to this setter.\n"
).
-spec with_max_attempts(strategy(), integer()) -> strategy().
with_max_attempts(Strategy, Max_attempts) ->
    {strategy,
        Max_attempts,
        erlang:element(3, Strategy),
        erlang:element(4, Strategy),
        erlang:element(5, Strategy),
        erlang:element(6, Strategy),
        erlang:element(7, Strategy)}.

-file("src/aws/retry.gleam", 165).
?DOC(
    " Override the initial-retry-backoff base on an existing\n"
    " `Strategy`. The first retry sleeps roughly `base_delay_ms` with\n"
    " full jitter; each subsequent attempt doubles the cap up to\n"
    " `max_delay_ms`. Set to 0 (paired with `with_max_delay_ms(0)`)\n"
    " for tests that want zero-delay retries.\n"
).
-spec with_base_delay_ms(strategy(), integer()) -> strategy().
with_base_delay_ms(Strategy, Base_delay_ms) ->
    {strategy,
        erlang:element(2, Strategy),
        Base_delay_ms,
        erlang:element(4, Strategy),
        erlang:element(5, Strategy),
        erlang:element(6, Strategy),
        erlang:element(7, Strategy)}.

-file("src/aws/retry.gleam", 173).
?DOC(
    " Override the maximum-backoff cap on an existing `Strategy`.\n"
    " Pairs with `with_base_delay_ms` for tests that need zero or\n"
    " very short delays; production callers usually keep the AWS-\n"
    " recommended default.\n"
).
-spec with_max_delay_ms(strategy(), integer()) -> strategy().
with_max_delay_ms(Strategy, Max_delay_ms) ->
    {strategy,
        erlang:element(2, Strategy),
        erlang:element(3, Strategy),
        Max_delay_ms,
        erlang:element(5, Strategy),
        erlang:element(6, Strategy),
        erlang:element(7, Strategy)}.

-file("src/aws/retry.gleam", 462).
-spec release_if_held(
    strategy(),
    gleam@option:option(aws@internal@retry@rate_limiter:permit())
) -> nil.
release_if_held(Strategy, Permit) ->
    case {erlang:element(7, Strategy), Permit} of
        {{some, Bucket}, {some, P}} ->
            aws@internal@retry@rate_limiter:release(Bucket, P);

        {_, _} ->
            nil
    end.

-file("src/aws/retry.gleam", 240).
?DOC(
    " A retry firing is notable-but-recovered (`warning`, default-on); the\n"
    " backoff arithmetic is firehose detail (`debug`).\n"
).
-spec log_retry(integer(), integer()) -> nil.
log_retry(Attempt, Delay) ->
    aws@internal@log:warning(
        <<<<<<<<"aws retry: attempt "/utf8,
                        (erlang:integer_to_binary(Attempt))/binary>>/binary,
                    " failed, retrying in "/utf8>>/binary,
                (erlang:integer_to_binary(Delay))/binary>>/binary,
            "ms"/utf8>>
    ),
    aws@internal@log:debug(
        fun() ->
            <<<<<<<<"aws retry: scheduling attempt "/utf8,
                            (erlang:integer_to_binary(Attempt + 1))/binary>>/binary,
                        " after "/utf8>>/binary,
                    (erlang:integer_to_binary(Delay))/binary>>/binary,
                "ms backoff"/utf8>>
        end
    ).

-file("src/aws/retry.gleam", 451).
-spec gate(strategy(), integer()) -> gate_outcome().
gate(Strategy, Cost) ->
    case erlang:element(7, Strategy) of
        none ->
            no_bucket;

        {some, Bucket} ->
            case aws@internal@retry@rate_limiter:try_acquire(Bucket, Cost) of
                {acquired, P} ->
                    {got_permit, P};

                empty ->
                    bucket_empty
            end
    end.

-file("src/aws/retry.gleam", 469).
-spec reward_if_present(strategy()) -> nil.
reward_if_present(Strategy) ->
    case erlang:element(7, Strategy) of
        {some, Bucket} ->
            aws@internal@retry@rate_limiter:reward_success(Bucket);

        none ->
            nil
    end.

-file("src/aws/retry.gleam", 441).
-spec do_pow2(integer(), integer()) -> integer().
do_pow2(N, Acc) ->
    case N =< 0 of
        true ->
            Acc;

        false ->
            do_pow2(N - 1, Acc * 2)
    end.

-file("src/aws/retry.gleam", 437).
-spec pow2(integer()) -> integer().
pow2(N) ->
    do_pow2(N, 1).

-file("src/aws/retry.gleam", 427).
?DOC(
    " Exponential backoff with full jitter, mirrored from Rust SDK\n"
    " `calculate_exponential_backoff`:\n"
    "\n"
    "   raw   = base * 2^(attempt-1)\n"
    "   bound = min(raw, max)\n"
    "   delay = rng() * bound\n"
    "\n"
    " `attempt` is 1-indexed (the first retry uses base * 2^0 = base).\n"
).
-spec exponential_backoff(strategy(), integer()) -> integer().
exponential_backoff(Strategy, Attempt) ->
    Raw = erlang:element(3, Strategy) * pow2(Attempt - 1),
    Bound = gleam@int:min(Raw, erlang:element(4, Strategy)),
    Jittered = (erlang:element(6, Strategy))() * erlang:float(Bound),
    case erlang:round(Jittered) of
        N when N < 0 ->
            0;

        N@1 ->
            N@1
    end.

-file("src/aws/retry.gleam", 299).
-spec classify_transport_error(
    aws@internal@http_send:http_error(),
    integer(),
    strategy()
) -> decision().
classify_transport_error(_, Attempt, Strategy) ->
    case Attempt < erlang:element(2, Strategy) of
        true ->
            {retry_after, exponential_backoff(Strategy, Attempt), 10};

        false ->
            give_up
    end.

-file("src/aws/retry.gleam", 404).
-spec retry_after_seconds(gleam@http@response:response(bitstring())) -> gleam@option:option(integer()).
retry_after_seconds(Resp) ->
    case gleam@http@response:get_header(Resp, <<"retry-after"/utf8>>) of
        {ok, Value} ->
            case gleam_stdlib:parse_int(Value) of
                {ok, Secs} ->
                    case Secs >= 0 of
                        true ->
                            {some, Secs};

                        false ->
                            none
                    end;

                {error, _} ->
                    none
            end;

        {error, _} ->
            none
    end.

-file("src/aws/retry.gleam", 346).
?DOC(
    " Extract the modeled error code (local Smithy shape name) from a\n"
    " response — `x-amzn-errortype` header first, then the body's\n"
    " `__type` / `code` (JSON) or `<Code>` (restXml). Shared with the\n"
    " runtime's typed-error path via `aws/internal/error_code`.\n"
).
-spec modeled_error_code(gleam@http@response:response(bitstring())) -> binary().
modeled_error_code(Resp) ->
    aws@internal@error_code:from_header_value_and_body(
        gleam@http@response:get_header(Resp, <<"x-amzn-errortype"/utf8>>),
        erlang:element(4, Resp)
    ).

-file("src/aws/retry.gleam", 386).
?DOC(
    " Return the substring after the final occurrence of `on`, or the whole\n"
    " string when `on` is absent. Used to peel a namespace / URI prefix off\n"
    " a modeled error code down to its local shape name.\n"
).
-spec suffix_after(binary(), binary()) -> binary().
suffix_after(Text, Sep) ->
    case gleam@string:split_once(
        gleam@string:reverse(Text),
        gleam@string:reverse(Sep)
    ) of
        {ok, {Rev_suffix, _}} ->
            gleam@string:reverse(Rev_suffix);

        {error, _} ->
            Text
    end.

-file("src/aws/retry.gleam", 359).
?DOC(
    " True when `code` names a throttling / transient error AWS expects\n"
    " clients to retry. Mirrors the AWS SDK `THROTTLING_ERRORS` set; the\n"
    " match is case-sensitive on the *local* shape name, so a namespaced\n"
    " or URI-suffixed wire value (`com.amazonaws…#ThrottlingException`,\n"
    " `ThrottlingException:http://…`) is reduced to its suffix after the\n"
    " last `#` and last `.` before comparison.\n"
).
-spec is_retryable_error_code(binary()) -> boolean().
is_retryable_error_code(Code) ->
    Local = begin
        _pipe = Code,
        _pipe@1 = suffix_after(_pipe, <<"#"/utf8>>),
        suffix_after(_pipe@1, <<"."/utf8>>)
    end,
    case Local of
        <<"Throttling"/utf8>> ->
            true;

        <<"ThrottlingException"/utf8>> ->
            true;

        <<"ThrottledException"/utf8>> ->
            true;

        <<"RequestThrottled"/utf8>> ->
            true;

        <<"RequestThrottledException"/utf8>> ->
            true;

        <<"ProvisionedThroughputExceededException"/utf8>> ->
            true;

        <<"TransactionInProgressException"/utf8>> ->
            true;

        <<"RequestLimitExceeded"/utf8>> ->
            true;

        <<"BandwidthLimitExceeded"/utf8>> ->
            true;

        <<"LimitExceededException"/utf8>> ->
            true;

        <<"SlowDown"/utf8>> ->
            true;

        <<"EC2ThrottledException"/utf8>> ->
            true;

        <<"TooManyRequestsException"/utf8>> ->
            true;

        <<"PriorRequestNotComplete"/utf8>> ->
            true;

        _ ->
            false
    end.

-file("src/aws/retry.gleam", 396).
?DOC(
    " 408 and 429 are throttling-adjacent (request timeout / too many\n"
    " requests); 5xx is server fault. All map to the same retry cost class —\n"
    " matches the Rust SDK's `acquire(ErrorKind)` switch.\n"
).
-spec classify_status(integer()) -> status_kind().
classify_status(Status) ->
    case Status of
        408 ->
            retryable;

        429 ->
            retryable;

        S when (S >= 500) andalso (S =< 599) ->
            retryable;

        _ ->
            not_retryable
    end.

-file("src/aws/retry.gleam", 334).
?DOC(
    " Decide whether a completed response should be retried. A response is\n"
    " retryable when *either* the status code is retryable *or* the body /\n"
    " header carries a modeled throttling-class error code.\n"
    "\n"
    " The error-code check is the fix for the common AWS pattern where a\n"
    " throttling / transient failure arrives as HTTP **400** with the real\n"
    " signal in the modeled code (DynamoDB\n"
    " `ProvisionedThroughputExceededException`, `ThrottlingException`,\n"
    " `TransactionInProgressException`, …) rather than in the status line.\n"
    " Real AWS SDKs retry on that modeled code, not just the status.\n"
    "\n"
    " 2xx responses are never retried via the error code — a success body\n"
    " never names a retryable error — so the modeled-code parse is skipped\n"
    " for them.\n"
).
-spec is_retryable_response(gleam@http@response:response(bitstring())) -> boolean().
is_retryable_response(Resp) ->
    case classify_status(erlang:element(2, Resp)) of
        retryable ->
            true;

        not_retryable ->
            (erlang:element(2, Resp) >= 300) andalso is_retryable_error_code(
                modeled_error_code(Resp)
            )
    end.

-file("src/aws/retry.gleam", 277).
-spec classify_response(
    gleam@http@response:response(bitstring()),
    integer(),
    strategy()
) -> decision().
classify_response(Resp, Attempt, Strategy) ->
    case {is_retryable_response(Resp), Attempt < erlang:element(2, Strategy)} of
        {false, _} ->
            stop;

        {true, false} ->
            give_up;

        {true, true} ->
            Delay = case retry_after_seconds(Resp) of
                {some, Secs} ->
                    gleam@int:min(Secs * 1000, erlang:element(4, Strategy));

                none ->
                    exponential_backoff(Strategy, Attempt)
            end,
            {retry_after, Delay, 5}
    end.

-file("src/aws/retry.gleam", 266).
?DOC(
    " Classify one attempt's outcome. Exposed for test asserting and so the\n"
    " protocol-codec layer (M5) can override per-service error semantics.\n"
).
-spec classify(
    {ok, gleam@http@response:response(bitstring())} |
        {error, aws@internal@http_send:http_error()},
    integer(),
    strategy()
) -> decision().
classify(Result, Attempt, Strategy) ->
    case Result of
        {ok, Resp} ->
            classify_response(Resp, Attempt, Strategy);

        {error, Err} ->
            classify_transport_error(Err, Attempt, Strategy)
    end.

-file("src/aws/retry.gleam", 182).
-spec do_attempt(
    fun((gleam@http@request:request(bitstring())) -> {ok,
            gleam@http@response:response(bitstring())} |
        {error, aws@internal@http_send:http_error()}),
    strategy(),
    gleam@http@request:request(bitstring()),
    integer(),
    gleam@option:option(aws@internal@retry@rate_limiter:permit())
) -> {ok, gleam@http@response:response(bitstring())} |
    {error, aws@internal@http_send:http_error()}.
do_attempt(Send, Strategy, Req, Attempt, Held_permit) ->
    Result = Send(Req),
    case classify(Result, Attempt, Strategy) of
        stop ->
            release_if_held(Strategy, Held_permit),
            reward_if_present(Strategy),
            Result;

        give_up ->
            aws@internal@log:error(
                <<<<"aws: giving up on retryable failure after "/utf8,
                        (erlang:integer_to_binary(Attempt))/binary>>/binary,
                    " attempt(s)"/utf8>>
            ),
            release_if_held(Strategy, Held_permit),
            Result;

        {retry_after, Delay, Cost} ->
            case gate(Strategy, Cost) of
                no_bucket ->
                    log_retry(Attempt, Delay),
                    (erlang:element(5, Strategy))(Delay),
                    do_attempt(Send, Strategy, Req, Attempt + 1, none);

                {got_permit, New_permit} ->
                    release_if_held(Strategy, Held_permit),
                    log_retry(Attempt, Delay),
                    (erlang:element(5, Strategy))(Delay),
                    do_attempt(
                        Send,
                        Strategy,
                        Req,
                        Attempt + 1,
                        {some, New_permit}
                    );

                bucket_empty ->
                    aws@internal@log:warning(
                        <<<<"aws retry: rate limiter exhausted after attempt "/utf8,
                                (erlang:integer_to_binary(Attempt))/binary>>/binary,
                            ", not retrying"/utf8>>
                    ),
                    release_if_held(Strategy, Held_permit),
                    Result
            end
    end.

-file("src/aws/retry.gleam", 178).
?DOC(" Wrap a `Send` with retry semantics.\n").
-spec with_retry(
    fun((gleam@http@request:request(bitstring())) -> {ok,
            gleam@http@response:response(bitstring())} |
        {error, aws@internal@http_send:http_error()}),
    strategy()
) -> fun((gleam@http@request:request(bitstring())) -> {ok,
        gleam@http@response:response(bitstring())} |
    {error, aws@internal@http_send:http_error()}).
with_retry(Send, Strategy) ->
    fun(Req) -> do_attempt(Send, Strategy, Req, 1, none) end.