Skip to main content

src/aws/s3/transfer.gleam

//// S3 multipart-upload helper. Splits a buffered body into parts,
//// runs `CreateMultipartUpload` → `UploadPart` × N →
//// `CompleteMultipartUpload`, and best-effort aborts the upload on
//// any failure so dangling uploads don't accumulate in the bucket
//// (S3 charges storage for incomplete multipart uploads until you
//// abort them, and large numbers of orphaned uploads slow down
//// `ListObjects`).
////
//// Two entry points: `upload` for callers that already have the
//// bytes in a `BitArray`, and `upload_from_stream` for callers
//// holding a `StreamingBody`. The streaming variant rechunks across
//// chunk boundaries so wire-side part sizes follow `part_size_bytes`
//// rather than the source's chunking. Both today hold the full body
//// in memory; bounded-memory streaming arrives when `StreamingBody`
//// grows a lazy `Source(...)` variant (file handles, generators).
////
//// The upload-coordination logic is sequential — parts upload one
//// at a time. Parallel uploads (the bandwidth-saturating common
//// case) want a Task-based fan-out around this helper; building
//// that lives in `aws/s3/transfer_parallel.gleam` once a use case
//// pins the right concurrency knob.

import aws/services/s3
import aws/streaming.{type StreamingBody}
import gleam/bit_array
import gleam/dict.{type Dict}
import gleam/erlang/process
import gleam/int
import gleam/list
import gleam/option.{type Option, None, Some}
import gleam/result

/// Errors a multipart upload surfaces. `CreateFailed` /
/// `UploadPartFailed` / `CompleteFailed` wrap the underlying typed
/// S3 error so callers can pattern-match on the wire-side cause
/// (NoSuchBucket, AccessDenied, etc.). `UploadPartFailed` also
/// records which part number failed so callers know what to retry.
/// `MissingUploadId` fires if S3's `CreateMultipartUpload` response
/// arrives without an `upload_id` (should never happen in
/// production, but the wire-type is `Option(String)` so we surface
/// it explicitly rather than `assert`ing). `InvalidPartSize` rejects
/// caller-supplied part sizes outside S3's documented bounds before
/// any multipart-upload request reaches the bucket.
pub type Error {
  CreateFailed(cause: s3.CreateMultipartUploadError)
  UploadPartFailed(part_number: Int, cause: s3.UploadPartError)
  CompleteFailed(cause: s3.CompleteMultipartUploadError)
  MissingUploadId
  EmptyBody
  InvalidPartSize(part_size_bytes: Int)
}

/// Result of a successful multipart upload. `upload_id` is exposed
/// so callers can correlate with S3 access logs or with their own
/// audit trail.
pub type UploadResult {
  UploadResult(
    bucket: String,
    key: String,
    upload_id: String,
    parts_uploaded: Int,
  )
}

/// Per-object metadata + access-control options applied at
/// `CreateMultipartUpload` time. Only `content_type`, `metadata`,
/// `acl`, `cache_control`, `content_encoding`,
/// `content_disposition`, `storage_class`, and
/// `server_side_encryption` are surfaced today — they cover the
/// 90% of S3 PutObject use cases (HTTP-content-metadata, ACL,
/// storage tier, SSE). Callers needing more exotic options
/// (object lock, grants, request payer, SSE-C, etc.) should call
/// `s3.create_multipart_upload` / `s3.upload_part` /
/// `s3.complete_multipart_upload` directly until / unless those
/// fields land on `UploadOptions`.
///
/// Construct with `default_options()` and override individual
/// fields via record-update syntax:
///
/// ```gleam
/// let opts = transfer.UploadOptions(
///   ..transfer.default_options(),
///   content_type: option.Some("application/json"),
///   cache_control: option.Some("max-age=3600"),
/// )
/// ```
pub type UploadOptions {
  UploadOptions(
    content_type: Option(String),
    content_encoding: Option(String),
    content_disposition: Option(String),
    cache_control: Option(String),
    metadata: Option(Dict(String, String)),
    acl: Option(s3.ObjectCannedACL),
    storage_class: Option(s3.StorageClass),
    server_side_encryption: Option(s3.ServerSideEncryption),
    /// Upper bound on outstanding `UploadPart` calls. `None`
    /// (default) keeps the sequential one-part-at-a-time
    /// coordinator. `Some(n)` fans out via OTP processes, capping
    /// in-flight parts to `n` — typical values 4-16 saturate a
    /// single Lambda's bandwidth without overwhelming S3's per-
    /// bucket rate limits. Errors short-circuit the whole batch
    /// (one failed part aborts the multipart upload).
    max_concurrency: Option(Int),
  )
}

/// All-`None` options — what `upload` / `upload_from_stream` pass
/// when callers don't supply their own. Equivalent to using
/// `s3.create_multipart_upload` with no metadata overrides; S3
/// applies its bucket-level defaults. `max_concurrency: None`
/// keeps the sequential coordinator — `with_max_concurrency` flips
/// it to the parallel path.
pub fn default_options() -> UploadOptions {
  UploadOptions(
    content_type: None,
    content_encoding: None,
    content_disposition: None,
    cache_control: None,
    metadata: None,
    acl: None,
    storage_class: None,
    server_side_encryption: None,
    max_concurrency: None,
  )
}

/// Override the parallel-upload concurrency cap on an existing
/// `UploadOptions`. `n` must be ≥ 1 — values ≤ 0 are coerced to
/// sequential (None) so callers can pass a derived count without
/// guarding it.
pub fn with_max_concurrency(opts: UploadOptions, n: Int) -> UploadOptions {
  let capped = case n >= 1 {
    True -> Some(n)
    False -> None
  }
  UploadOptions(..opts, max_concurrency: capped)
}

/// S3's documented minimum part size (5 MiB) for all parts except
/// the last. Smaller part sizes are rejected with `EntityTooSmall`
/// at `CompleteMultipartUpload` time; larger sizes cut down on
/// per-part round trips but raise outstanding-request memory.
pub const default_part_size_bytes: Int = 5_242_880

/// S3's maximum size for any individual multipart-upload part
/// (5 GiB). `upload` / `upload_from_stream` reject larger
/// `part_size_bytes` values up front so callers get a typed SDK
/// error instead of a later S3 service error.
pub const max_part_size_bytes: Int = 5_368_709_120

/// S3's hard cap on parts per multipart upload. Past 10,000 the
/// `Complete` call returns `InvalidArgument` regardless of total
/// size, so `part_size_for` scales `part_size_bytes` up for large
/// totals to stay inside this limit.
pub const max_parts_per_upload: Int = 10_000

/// Pick a part size large enough to fit `total_bytes` inside S3's
/// 10,000-parts-per-upload cap. Always returns at least
/// `default_part_size_bytes` (5 MiB, the S3 minimum). Use this to
/// drive `upload` / `upload_from_stream` when the body could be
/// arbitrarily large — under 50 GB it returns the 5 MiB default,
/// past 50 GB it scales up so the part count stays at or under
/// 10,000.
///
/// For zero or negative `total_bytes` the helper returns the
/// default — callers that don't know the size up front can pass 0
/// and accept the 5 MiB part size until they have a better estimate.
pub fn part_size_for(total_bytes: Int) -> Int {
  case total_bytes <= 0 {
    True -> default_part_size_bytes
    False -> {
      // ceil(total_bytes / max_parts_per_upload), so that
      // ceil(total_bytes / part_size) <= max_parts_per_upload.
      let needed =
        { total_bytes + max_parts_per_upload - 1 } / max_parts_per_upload
      case needed < default_part_size_bytes {
        True -> default_part_size_bytes
        False -> needed
      }
    }
  }
}

/// Upload `body` as `bucket/key` via S3's multipart API. Splits the
/// body into parts of `part_size_bytes` (the last part may be
/// smaller), uploads each, then finalises with
/// `CompleteMultipartUpload`.
///
/// Any failure mid-flight triggers a best-effort
/// `AbortMultipartUpload` so the bucket doesn't accumulate dangling
/// uploads. The abort's own success / failure is intentionally
/// silenced — the caller already has the more interesting error
/// from the step that failed.
///
/// `part_size_bytes` must be between
/// `default_part_size_bytes` (5 MiB) and `max_part_size_bytes`
/// (5 GiB), inclusive. Invalid values return
/// `Error(InvalidPartSize(part_size_bytes))` before any HTTP work.
/// An empty body returns `Error(EmptyBody)`; S3 rejects empty
/// multipart uploads with `EntityTooSmall`, so we short-circuit
/// before the create round trip.
pub fn upload(
  client client: s3.Client,
  bucket bucket: String,
  key key: String,
  body body: BitArray,
  part_size_bytes part_size_bytes: Int,
) -> Result(UploadResult, Error) {
  upload_with_options(
    client:,
    bucket:,
    key:,
    body:,
    part_size_bytes:,
    options: default_options(),
  )
}

/// `upload` with caller-specified per-object metadata — sets HTTP
/// content metadata (Content-Type, Cache-Control, etc.), the
/// optional ACL / storage class / SSE choice, and any user
/// metadata at `CreateMultipartUpload` time. See `UploadOptions`
/// for the field set.
pub fn upload_with_options(
  client client: s3.Client,
  bucket bucket: String,
  key key: String,
  body body: BitArray,
  part_size_bytes part_size_bytes: Int,
  options options: UploadOptions,
) -> Result(UploadResult, Error) {
  use _ <- result.try(validate_part_size(part_size_bytes))
  case bit_array.byte_size(body) {
    0 -> Error(EmptyBody)
    _ ->
      coordinate(
        client,
        bucket,
        key,
        split_into_parts(body, part_size_bytes),
        options,
      )
  }
}

/// Same as `upload`, but takes a `StreamingBody` instead of a buffered
/// `BitArray`. Walks the body's chunks once, re-aggregating across
/// chunk boundaries so the wire-side part sizes follow
/// `part_size_bytes` rather than the source's chunking — useful when
/// the body comes from a chunked transport or builder that emits
/// frequent small chunks (request streaming, log ingestion, line-
/// oriented producers).
///
/// Today both `StreamingBody` representations (Buffered / Chunked)
/// hold their full bytes in memory, so this variant doesn't yet
/// reduce peak memory vs `upload(streaming.to_bit_array(body), ...)`.
/// Once `StreamingBody` grows a lazy `Source(...)` variant (file
/// handles, generators), this path picks up true bounded-memory
/// streaming for free.
///
/// `part_size_bytes` has the same validation as `upload`: it must
/// be between 5 MiB and 5 GiB, inclusive, or the function returns
/// `Error(InvalidPartSize(part_size_bytes))` before reading chunks
/// or creating the multipart upload.
pub fn upload_from_stream(
  client client: s3.Client,
  bucket bucket: String,
  key key: String,
  body body: StreamingBody,
  part_size_bytes part_size_bytes: Int,
) -> Result(UploadResult, Error) {
  upload_from_stream_with_options(
    client:,
    bucket:,
    key:,
    body:,
    part_size_bytes:,
    options: default_options(),
  )
}

/// `upload_from_stream` with caller-specified per-object metadata.
/// See `UploadOptions`.
pub fn upload_from_stream_with_options(
  client client: s3.Client,
  bucket bucket: String,
  key key: String,
  body body: StreamingBody,
  part_size_bytes part_size_bytes: Int,
  options options: UploadOptions,
) -> Result(UploadResult, Error) {
  use _ <- result.try(validate_part_size(part_size_bytes))
  let parts = rechunk_to_parts(body, part_size_bytes)
  case list.is_empty(parts) {
    True -> Error(EmptyBody)
    False -> coordinate(client, bucket, key, parts, options)
  }
}

fn validate_part_size(part_size: Int) -> Result(Nil, Error) {
  case
    part_size >= default_part_size_bytes && part_size <= max_part_size_bytes
  {
    True -> Ok(Nil)
    False -> Error(InvalidPartSize(part_size_bytes: part_size))
  }
}

fn coordinate(
  client: s3.Client,
  bucket: String,
  key: String,
  parts: List(BitArray),
  options: UploadOptions,
) -> Result(UploadResult, Error) {
  use create_out <- result.try(
    s3.create_multipart_upload(client, create_request(bucket, key, options))
    |> result.map_error(CreateFailed),
  )
  use upload_id <- result.try(option.to_result(
    create_out.upload_id,
    MissingUploadId,
  ))

  // From here on, any failure must trigger a best-effort abort so the
  // bucket doesn't accumulate dangling multipart uploads. `abort_on_error`
  // wraps both fallible steps with that cleanup.
  use completed_parts <- result.try(
    abort_on_error(
      client,
      bucket,
      key,
      upload_id,
      case options.max_concurrency {
        None -> upload_all_parts(client, bucket, key, upload_id, parts, 1, [])
        Some(n) ->
          upload_all_parts_parallel(client, bucket, key, upload_id, parts, n)
      },
    ),
  )
  use _ <- result.try(abort_on_error(
    client,
    bucket,
    key,
    upload_id,
    s3.complete_multipart_upload(
      client,
      empty_complete_request(bucket, key, upload_id, completed_parts),
    )
      |> result.map_error(CompleteFailed),
  ))
  Ok(UploadResult(
    bucket: bucket,
    key: key,
    upload_id: upload_id,
    parts_uploaded: list.length(completed_parts),
  ))
}

// Pass-through on `Ok`; on `Error` fire a best-effort `AbortMultipartUpload`
// before propagating the error. Used post-create to clean up dangling
// uploads when a part upload or complete call fails.
fn abort_on_error(
  client: s3.Client,
  bucket: String,
  key: String,
  upload_id: String,
  result: Result(a, Error),
) -> Result(a, Error) {
  case result {
    Ok(value) -> Ok(value)
    Error(e) -> {
      abort_quietly(client, bucket, key, upload_id)
      Error(e)
    }
  }
}

fn split_into_parts(bytes: BitArray, part_size: Int) -> List(BitArray) {
  let total = bit_array.byte_size(bytes)
  case total {
    0 -> []
    n if n <= part_size -> [bytes]
    _ -> {
      let assert Ok(head) = bit_array.slice(bytes, 0, part_size)
      let assert Ok(tail) = bit_array.slice(bytes, part_size, total - part_size)
      [head, ..split_into_parts(tail, part_size)]
    }
  }
}

/// Re-aggregate a `StreamingBody`'s chunks into parts of size
/// `part_size`. Walks each chunk once, appending to a running
/// buffer; flushes a part every time the buffer reaches
/// `part_size`, and flushes any remainder as the final (possibly
/// undersized) part.
fn rechunk_to_parts(body: StreamingBody, part_size: Int) -> List(BitArray) {
  let chunks = streaming.to_chunks(body)
  let #(parts_rev, leftover) =
    list.fold(chunks, #([], <<>>), fn(state, chunk) {
      let #(parts, buf) = state
      flush_full_parts(bit_array.append(buf, chunk), part_size, parts)
    })
  let parts_with_tail = case bit_array.byte_size(leftover) {
    0 -> parts_rev
    _ -> [leftover, ..parts_rev]
  }
  list.reverse(parts_with_tail)
}

fn flush_full_parts(
  buf: BitArray,
  part_size: Int,
  acc: List(BitArray),
) -> #(List(BitArray), BitArray) {
  let size = bit_array.byte_size(buf)
  case size >= part_size {
    True -> {
      let assert Ok(head) = bit_array.slice(buf, 0, part_size)
      let assert Ok(tail) = bit_array.slice(buf, part_size, size - part_size)
      flush_full_parts(tail, part_size, [head, ..acc])
    }
    False -> #(acc, buf)
  }
}

fn upload_all_parts(
  client: s3.Client,
  bucket: String,
  key: String,
  upload_id: String,
  parts: List(BitArray),
  next_part_number: Int,
  acc: List(s3.CompletedPart),
) -> Result(List(s3.CompletedPart), Error) {
  case parts {
    [] -> Ok(list.reverse(acc))
    [part, ..rest] -> {
      let req =
        empty_upload_part_request(
          bucket,
          key,
          upload_id,
          next_part_number,
          part,
        )
      use out <- result.try(
        s3.upload_part(client, req)
        |> result.map_error(fn(e) {
          UploadPartFailed(part_number: next_part_number, cause: e)
        }),
      )
      let completed = empty_completed_part(next_part_number, out.e_tag)
      upload_all_parts(
        client,
        bucket,
        key,
        upload_id,
        rest,
        next_part_number + 1,
        [completed, ..acc],
      )
    }
  }
}

/// Parallel coordinator. Processes `parts` in batches of at most
/// `max_concurrency` simultaneously-in-flight `UploadPart` calls.
/// Each batch spawns a worker per part, awaits all results, then
/// the outer loop moves on to the next batch. Results are sorted
/// by part-number at the end so the `CompleteMultipartUpload`
/// request sees parts in ascending order (S3 requires it).
///
/// First failure in any batch short-circuits the whole upload —
/// the outer `coordinate` then issues a best-effort
/// `AbortMultipartUpload`.
fn upload_all_parts_parallel(
  client: s3.Client,
  bucket: String,
  key: String,
  upload_id: String,
  parts: List(BitArray),
  max_concurrency: Int,
) -> Result(List(s3.CompletedPart), Error) {
  let numbered = list.index_map(parts, fn(p, i) { #(i + 1, p) })
  upload_batches(client, bucket, key, upload_id, numbered, max_concurrency, [])
}

fn upload_batches(
  client: s3.Client,
  bucket: String,
  key: String,
  upload_id: String,
  remaining: List(#(Int, BitArray)),
  max_concurrency: Int,
  acc: List(s3.CompletedPart),
) -> Result(List(s3.CompletedPart), Error) {
  case remaining {
    [] ->
      Ok(
        list.sort(acc, by: fn(a, b) {
          int.compare(
            option.unwrap(a.part_number, 0),
            option.unwrap(b.part_number, 0),
          )
        }),
      )
    _ -> {
      let #(batch, rest) = take_split(remaining, max_concurrency)
      use batch_done <- result.try(upload_one_batch(
        client,
        bucket,
        key,
        upload_id,
        batch,
      ))
      upload_batches(
        client,
        bucket,
        key,
        upload_id,
        rest,
        max_concurrency,
        list.append(batch_done, acc),
      )
    }
  }
}

fn upload_one_batch(
  client: s3.Client,
  bucket: String,
  key: String,
  upload_id: String,
  batch: List(#(Int, BitArray)),
) -> Result(List(s3.CompletedPart), Error) {
  let inbox = process.new_subject()
  list.each(batch, fn(numbered) {
    let #(part_number, part) = numbered
    let _pid =
      process.spawn(fn() {
        let result = case
          s3.upload_part(
            client,
            empty_upload_part_request(bucket, key, upload_id, part_number, part),
          )
        {
          Ok(out) -> Ok(empty_completed_part(part_number, out.e_tag))
          Error(e) -> Error(UploadPartFailed(part_number:, cause: e))
        }
        process.send(inbox, result)
      })
    Nil
  })
  collect_batch_results(inbox, list.length(batch), [])
}

fn collect_batch_results(
  inbox: process.Subject(Result(s3.CompletedPart, Error)),
  remaining: Int,
  acc: List(s3.CompletedPart),
) -> Result(List(s3.CompletedPart), Error) {
  case remaining {
    0 -> Ok(acc)
    _ ->
      case process.receive_forever(inbox) {
        Ok(part) -> collect_batch_results(inbox, remaining - 1, [part, ..acc])
        Error(e) -> {
          // Drain remaining workers so they don't leak; we still
          // return the first error.
          drain_remaining(inbox, remaining - 1)
          Error(e)
        }
      }
  }
}

fn drain_remaining(
  inbox: process.Subject(Result(s3.CompletedPart, Error)),
  remaining: Int,
) -> Nil {
  case remaining {
    0 -> Nil
    _ -> {
      let _ = process.receive_forever(inbox)
      drain_remaining(inbox, remaining - 1)
    }
  }
}

/// Take the first `n` elements; return `(taken, rest)`. Used to
/// chunk the work-list into bounded-concurrency batches.
fn take_split(xs: List(a), n: Int) -> #(List(a), List(a)) {
  do_take_split(xs, n, [])
}

fn do_take_split(xs: List(a), n: Int, acc: List(a)) -> #(List(a), List(a)) {
  case xs, n {
    [], _ -> #(list.reverse(acc), [])
    _, 0 -> #(list.reverse(acc), xs)
    [x, ..rest], _ -> do_take_split(rest, n - 1, [x, ..acc])
  }
}

fn abort_quietly(
  client: s3.Client,
  bucket: String,
  key: String,
  upload_id: String,
) -> Nil {
  let _ =
    s3.abort_multipart_upload(
      client,
      empty_abort_request(bucket, key, upload_id),
    )
  Nil
}

// ---------- request constructors ----------
//
// All four request types carry ~10–30 optional fields, none of
// which the helper needs to thread. Inlining the field-by-field
// `None` defaults keeps the call sites above terse and lets the
// codegen evolve the wire types without dragging this helper.

fn create_request(
  bucket: String,
  key: String,
  options: UploadOptions,
) -> s3.CreateMultipartUploadRequest {
  s3.CreateMultipartUploadRequest(
    acl: options.acl,
    bucket: bucket,
    bucket_key_enabled: None,
    cache_control: options.cache_control,
    checksum_algorithm: None,
    checksum_type: None,
    content_disposition: options.content_disposition,
    content_encoding: options.content_encoding,
    content_language: None,
    content_type: options.content_type,
    expected_bucket_owner: None,
    expires: None,
    grant_full_control: None,
    grant_read: None,
    grant_read_acp: None,
    grant_write_acp: None,
    key: key,
    metadata: options.metadata,
    object_lock_legal_hold_status: None,
    object_lock_mode: None,
    object_lock_retain_until_date: None,
    request_payer: None,
    sse_customer_algorithm: None,
    sse_customer_key: None,
    sse_customer_key_md5: None,
    ssekms_encryption_context: None,
    ssekms_key_id: None,
    server_side_encryption: options.server_side_encryption,
    storage_class: options.storage_class,
    tagging: None,
    website_redirect_location: None,
  )
}

fn empty_upload_part_request(
  bucket: String,
  key: String,
  upload_id: String,
  part_number: Int,
  body: BitArray,
) -> s3.UploadPartRequest {
  s3.UploadPartRequest(
    body: Some(streaming.from_bit_array(body)),
    bucket: bucket,
    checksum_algorithm: None,
    checksum_crc32: None,
    checksum_crc32_c: None,
    checksum_crc64_nvme: None,
    checksum_md5: None,
    checksum_sha1: None,
    checksum_sha256: None,
    checksum_sha512: None,
    checksum_xxhash128: None,
    checksum_xxhash3: None,
    checksum_xxhash64: None,
    content_length: Some(bit_array.byte_size(body)),
    content_md5: None,
    expected_bucket_owner: None,
    key: key,
    part_number: part_number,
    request_payer: None,
    sse_customer_algorithm: None,
    sse_customer_key: None,
    sse_customer_key_md5: None,
    upload_id: upload_id,
  )
}

fn empty_completed_part(
  part_number: Int,
  e_tag: option.Option(String),
) -> s3.CompletedPart {
  s3.CompletedPart(
    checksum_crc32: None,
    checksum_crc32_c: None,
    checksum_crc64_nvme: None,
    checksum_md5: None,
    checksum_sha1: None,
    checksum_sha256: None,
    checksum_sha512: None,
    checksum_xxhash128: None,
    checksum_xxhash3: None,
    checksum_xxhash64: None,
    e_tag: e_tag,
    part_number: Some(part_number),
  )
}

fn empty_complete_request(
  bucket: String,
  key: String,
  upload_id: String,
  parts: List(s3.CompletedPart),
) -> s3.CompleteMultipartUploadRequest {
  s3.CompleteMultipartUploadRequest(
    bucket: bucket,
    checksum_crc32: None,
    checksum_crc32_c: None,
    checksum_crc64_nvme: None,
    checksum_md5: None,
    checksum_sha1: None,
    checksum_sha256: None,
    checksum_sha512: None,
    checksum_type: None,
    checksum_xxhash128: None,
    checksum_xxhash3: None,
    checksum_xxhash64: None,
    expected_bucket_owner: None,
    if_match: None,
    if_none_match: None,
    key: key,
    mpu_object_size: None,
    multipart_upload: Some(s3.CompletedMultipartUpload(parts: Some(parts))),
    request_payer: None,
    sse_customer_algorithm: None,
    sse_customer_key: None,
    sse_customer_key_md5: None,
    upload_id: upload_id,
  )
}

fn empty_abort_request(
  bucket: String,
  key: String,
  upload_id: String,
) -> s3.AbortMultipartUploadRequest {
  s3.AbortMultipartUploadRequest(
    bucket: bucket,
    expected_bucket_owner: None,
    if_match_initiated_time: None,
    key: key,
    request_payer: None,
    upload_id: upload_id,
  )
}