//// Token-bucket retry rate limiter, port of the AWS Smithy runtime's
//// `TokenBucket` (aws-sdk-rust/.../client/retries/token_bucket.rs).
////
//// Semantics:
////
//// - Each retry attempt `acquire`s tokens. The cost depends on the error
//// class (see retry.gleam: `retry_cost` 5 for normal retryable,
//// `timeout_retry_cost` 10 for timeouts / transient).
//// - The acquired tokens are HELD as a `Permit` while the retry is in
//// flight. On success or final non-retryable outcome the caller
//// `release`s the permit, returning the tokens. On retry-failure-then-
//// -retry, the caller releases the prior permit before acquiring the
//// next (Rust does this via permit drop on `set_retry_permit`).
//// - `reward_success` additionally tops up the bucket by `success_reward`
//// tokens. Rust's default is **0** — the bucket is a concurrent-retry
//// semaphore, not an AIMD-style limiter; the time-based `refill_rate`
//// handles long-term recovery in Rust. We follow Rust's default.
////
//// Concurrency: messages are processed sequentially per `gleam_otp`
//// actor semantics.
import aws/internal/actor_lifecycle
import aws/internal/log
import gleam/erlang/process.{type Subject}
import gleam/otp/actor
/// Opaque bucket handle.
pub opaque type Bucket {
Bucket(subject: Subject(Message))
}
/// An acquired set of tokens. Hold this until success or final outcome,
/// then `release` it to return the tokens to the bucket.
pub opaque type Permit {
Permit(cost: Int)
}
pub fn permit_cost(p: Permit) -> Int {
p.cost
}
pub type AcquireResult {
Acquired(permit: Permit)
Empty
}
type Message {
TryAcquire(cost: Int, reply: Subject(AcquireResult))
Release(cost: Int)
RewardSuccess
Read(reply: Subject(BucketState))
/// Politely ask the actor to exit. Sent by `shutdown` /
/// `shutdown_sync`; the actor returns `actor.stop()` next iteration.
Stop
}
pub type BucketState {
BucketState(available: Int, capacity: Int)
}
type State {
State(available: Int, capacity: Int, success_reward: Int)
}
pub type StartError {
StartFailed(actor.StartError)
}
/// Rust SDK default capacity. See DEFAULT_CAPACITY in token_bucket.rs.
pub const default_capacity: Int = 500
/// Rust SDK default success reward. See DEFAULT_SUCCESS_REWARD.
///
/// The bucket relies on permit RAII for normal recovery; success_reward is
/// the *additional* top-up on success and defaults to 0.
pub const default_success_reward: Int = 0
/// Start a bucket with explicit capacity + reward.
pub fn start(
capacity capacity: Int,
success_reward success_reward: Int,
) -> Result(Bucket, StartError) {
let initial =
State(
available: capacity,
capacity: capacity,
success_reward: success_reward,
)
case actor.new(initial) |> actor.on_message(handle) |> actor.start {
Ok(started) -> Ok(Bucket(subject: started.data))
Error(reason) -> Error(StartFailed(reason))
}
}
/// Start a bucket with Rust SDK defaults (500 / 0).
pub fn start_default() -> Result(Bucket, StartError) {
start(capacity: default_capacity, success_reward: default_success_reward)
}
/// Try to acquire `cost` tokens. Returns `Acquired(permit)` if the bucket
/// can pay; `Empty` if not (caller must NOT retry).
///
/// Degrade-open on a dead/unresponsive bucket: if the actor has crashed or
/// doesn't reply in time, `safe_call` returns `Error` and we hand back a
/// zero-cost `Acquired` permit rather than `Empty`. Rationale: the limiter
/// is a *best-effort* concurrent-retry semaphore, not a correctness gate —
/// `Empty` means "stop retrying", so returning it on a dead limiter would
/// silently suppress legitimate retries and degrade availability. Treating
/// an unavailable limiter as "no throttle" keeps the request path working;
/// the zero-cost permit makes the later `release`/`reward_success` sends
/// (which Erlang drops to a dead Pid anyway) harmless no-ops. The original
/// fix for #30: a panicking limiter must never crash the request path.
pub fn try_acquire(bucket: Bucket, cost cost: Int) -> AcquireResult {
case
actor_lifecycle.safe_call(bucket.subject, waiting: 1000, sending: fn(reply) {
TryAcquire(cost: cost, reply: reply)
})
{
Ok(result) -> result
Error(Nil) -> {
log.warning(
"aws retry rate limiter: actor unavailable — proceeding without throttle",
)
Acquired(permit: Permit(cost: 0))
}
}
}
/// Return the permit's tokens to the bucket. Called on success, on a final
/// non-retryable response, and before replacing one held permit with a new
/// one mid-retry-loop.
pub fn release(bucket: Bucket, permit permit: Permit) -> Nil {
process.send(bucket.subject, Release(cost: permit.cost))
}
/// Top up the bucket by `success_reward` tokens, capped at `capacity`.
/// Called once per successful operation.
pub fn reward_success(bucket: Bucket) -> Nil {
process.send(bucket.subject, RewardSuccess)
}
/// Read the current bucket state. Tests only.
pub fn current(bucket: Bucket) -> BucketState {
actor.call(bucket.subject, waiting: 1000, sending: Read)
}
/// Tell the bucket actor to exit. Fire-and-forget. Mirrors
/// `credentials_cache.shutdown`; both call into
/// `aws/internal/actor_lifecycle.shutdown_via_stop`. Safe to call
/// multiple times.
pub fn shutdown(bucket: Bucket) -> Nil {
actor_lifecycle.shutdown_via_stop(bucket.subject, Stop)
}
/// Synchronous teardown — monitors the actor, sends `Stop`, waits for
/// `DOWN`. `Ok(Nil)` on clean exit, `Error(Nil)` on real timeout.
/// Idempotent for already-dead actors.
pub fn shutdown_sync(bucket: Bucket, timeout_ms: Int) -> Result(Nil, Nil) {
actor_lifecycle.shutdown_via_stop_sync(bucket.subject, Stop, timeout_ms)
}
fn handle(state: State, message: Message) -> actor.Next(State, Message) {
case message {
Stop -> actor.stop()
TryAcquire(cost: cost, reply: reply) ->
case state.available >= cost {
True -> {
process.send(reply, Acquired(permit: Permit(cost: cost)))
actor.continue(State(..state, available: state.available - cost))
}
False -> {
process.send(reply, Empty)
actor.continue(state)
}
}
Release(cost: cost) -> {
let new_avail = cap_at(state.available + cost, state.capacity)
actor.continue(State(..state, available: new_avail))
}
RewardSuccess -> {
let new_avail =
cap_at(state.available + state.success_reward, state.capacity)
actor.continue(State(..state, available: new_avail))
}
Read(reply: reply) -> {
process.send(
reply,
BucketState(available: state.available, capacity: state.capacity),
)
actor.continue(state)
}
}
}
fn cap_at(value: Int, max: Int) -> Int {
case value > max {
True -> max
False -> value
}
}