Skip to main content

src/aws@s3@transfer.erl

-module(aws@s3@transfer).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/aws/s3/transfer.gleam").
-export([default_options/0, with_max_concurrency/2, part_size_for/1, upload_with_options/6, upload/5, upload_from_stream_with_options/6, upload_from_stream/5]).
-export_type([error/0, upload_result/0, upload_options/0]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(
    " S3 multipart-upload helper. Splits a buffered body into parts,\n"
    " runs `CreateMultipartUpload` → `UploadPart` × N →\n"
    " `CompleteMultipartUpload`, and best-effort aborts the upload on\n"
    " any failure so dangling uploads don't accumulate in the bucket\n"
    " (S3 charges storage for incomplete multipart uploads until you\n"
    " abort them, and large numbers of orphaned uploads slow down\n"
    " `ListObjects`).\n"
    "\n"
    " Two entry points: `upload` for callers that already have the\n"
    " bytes in a `BitArray`, and `upload_from_stream` for callers\n"
    " holding a `StreamingBody`. The streaming variant rechunks across\n"
    " chunk boundaries so wire-side part sizes follow `part_size_bytes`\n"
    " rather than the source's chunking. Both today hold the full body\n"
    " in memory; bounded-memory streaming arrives when `StreamingBody`\n"
    " grows a lazy `Source(...)` variant (file handles, generators).\n"
    "\n"
    " The upload-coordination logic is sequential — parts upload one\n"
    " at a time. Parallel uploads (the bandwidth-saturating common\n"
    " case) want a Task-based fan-out around this helper; building\n"
    " that lives in `aws/s3/transfer_parallel.gleam` once a use case\n"
    " pins the right concurrency knob.\n"
).

-type error() :: {create_failed,
        aws@services@s3:create_multipart_upload_error()} |
    {upload_part_failed, integer(), aws@services@s3:upload_part_error()} |
    {complete_failed, aws@services@s3:complete_multipart_upload_error()} |
    missing_upload_id |
    empty_body |
    {invalid_part_size, integer()}.

-type upload_result() :: {upload_result,
        binary(),
        binary(),
        binary(),
        integer()}.

-type upload_options() :: {upload_options,
        gleam@option:option(binary()),
        gleam@option:option(binary()),
        gleam@option:option(binary()),
        gleam@option:option(binary()),
        gleam@option:option(gleam@dict:dict(binary(), binary())),
        gleam@option:option(aws@services@s3:object_canned_a_c_l()),
        gleam@option:option(aws@services@s3:storage_class()),
        gleam@option:option(aws@services@s3:server_side_encryption()),
        gleam@option:option(integer())}.

-file("src/aws/s3/transfer.gleam", 114).
?DOC(
    " All-`None` options — what `upload` / `upload_from_stream` pass\n"
    " when callers don't supply their own. Equivalent to using\n"
    " `s3.create_multipart_upload` with no metadata overrides; S3\n"
    " applies its bucket-level defaults. `max_concurrency: None`\n"
    " keeps the sequential coordinator — `with_max_concurrency` flips\n"
    " it to the parallel path.\n"
).
-spec default_options() -> upload_options().
default_options() ->
    {upload_options, none, none, none, none, none, none, none, none, none}.

-file("src/aws/s3/transfer.gleam", 132).
?DOC(
    " Override the parallel-upload concurrency cap on an existing\n"
    " `UploadOptions`. `n` must be ≥ 1 — values ≤ 0 are coerced to\n"
    " sequential (None) so callers can pass a derived count without\n"
    " guarding it.\n"
).
-spec with_max_concurrency(upload_options(), integer()) -> upload_options().
with_max_concurrency(Opts, N) ->
    Capped = case N >= 1 of
        true ->
            {some, N};

        false ->
            none
    end,
    {upload_options,
        erlang:element(2, Opts),
        erlang:element(3, Opts),
        erlang:element(4, Opts),
        erlang:element(5, Opts),
        erlang:element(6, Opts),
        erlang:element(7, Opts),
        erlang:element(8, Opts),
        erlang:element(9, Opts),
        Capped}.

-file("src/aws/s3/transfer.gleam", 169).
?DOC(
    " Pick a part size large enough to fit `total_bytes` inside S3's\n"
    " 10,000-parts-per-upload cap. Always returns at least\n"
    " `default_part_size_bytes` (5 MiB, the S3 minimum). Use this to\n"
    " drive `upload` / `upload_from_stream` when the body could be\n"
    " arbitrarily large — under 50 GB it returns the 5 MiB default,\n"
    " past 50 GB it scales up so the part count stays at or under\n"
    " 10,000.\n"
    "\n"
    " For zero or negative `total_bytes` the helper returns the\n"
    " default — callers that don't know the size up front can pass 0\n"
    " and accept the 5 MiB part size until they have a better estimate.\n"
).
-spec part_size_for(integer()) -> integer().
part_size_for(Total_bytes) ->
    case Total_bytes =< 0 of
        true ->
            5242880;

        false ->
            Needed = case 10000 of
                0 -> 0;
                Gleam@denominator -> ((Total_bytes + 10000) - 1) div Gleam@denominator
            end,
            case Needed < 5242880 of
                true ->
                    5242880;

                false ->
                    Needed
            end
    end.

-file("src/aws/s3/transfer.gleam", 380).
-spec split_into_parts(bitstring(), integer()) -> list(bitstring()).
split_into_parts(Bytes, Part_size) ->
    Total = erlang:byte_size(Bytes),
    case Total of
        0 ->
            [];

        N when N =< Part_size ->
            [Bytes];

        _ ->
            Head@1 = case gleam_stdlib:bit_array_slice(Bytes, 0, Part_size) of
                {ok, Head} -> Head;
                _assert_fail ->
                    erlang:error(#{gleam_error => let_assert,
                                message => <<"Pattern match failed, no pattern matched the value."/utf8>>,
                                file => <<?FILEPATH/utf8>>,
                                module => <<"aws/s3/transfer"/utf8>>,
                                function => <<"split_into_parts"/utf8>>,
                                line => 386,
                                value => _assert_fail,
                                start => 13765,
                                'end' => 13823,
                                pattern_start => 13776,
                                pattern_end => 13784})
            end,
            Tail@1 = case gleam_stdlib:bit_array_slice(
                Bytes,
                Part_size,
                Total - Part_size
            ) of
                {ok, Tail} -> Tail;
                _assert_fail@1 ->
                    erlang:error(#{gleam_error => let_assert,
                                message => <<"Pattern match failed, no pattern matched the value."/utf8>>,
                                file => <<?FILEPATH/utf8>>,
                                module => <<"aws/s3/transfer"/utf8>>,
                                function => <<"split_into_parts"/utf8>>,
                                line => 387,
                                value => _assert_fail@1,
                                start => 13830,
                                'end' => 13904,
                                pattern_start => 13841,
                                pattern_end => 13849})
            end,
            [Head@1 | split_into_parts(Tail@1, Part_size)]
    end.

-file("src/aws/s3/transfer.gleam", 721).
-spec empty_complete_request(
    binary(),
    binary(),
    binary(),
    list(aws@services@s3:completed_part())
) -> aws@services@s3:complete_multipart_upload_request().
empty_complete_request(Bucket, Key, Upload_id, Parts) ->
    {complete_multipart_upload_request,
        Bucket,
        none,
        none,
        none,
        none,
        none,
        none,
        none,
        none,
        none,
        none,
        none,
        none,
        none,
        none,
        Key,
        none,
        {some, {completed_multipart_upload, {some, Parts}}},
        none,
        none,
        none,
        none,
        Upload_id}.

-file("src/aws/s3/transfer.gleam", 754).
-spec empty_abort_request(binary(), binary(), binary()) -> aws@services@s3:abort_multipart_upload_request().
empty_abort_request(Bucket, Key, Upload_id) ->
    {abort_multipart_upload_request, Bucket, none, none, Key, none, Upload_id}.

-file("src/aws/s3/transfer.gleam", 606).
-spec abort_quietly(aws@services@s3:client(), binary(), binary(), binary()) -> nil.
abort_quietly(Client, Bucket, Key, Upload_id) ->
    _ = aws@services@s3:abort_multipart_upload(
        Client,
        empty_abort_request(Bucket, Key, Upload_id)
    ),
    nil.

-file("src/aws/s3/transfer.gleam", 364).
-spec abort_on_error(
    aws@services@s3:client(),
    binary(),
    binary(),
    binary(),
    {ok, AWUL} | {error, error()}
) -> {ok, AWUL} | {error, error()}.
abort_on_error(Client, Bucket, Key, Upload_id, Result) ->
    case Result of
        {ok, Value} ->
            {ok, Value};

        {error, E} ->
            abort_quietly(Client, Bucket, Key, Upload_id),
            {error, E}
    end.

-file("src/aws/s3/transfer.gleam", 579).
-spec drain_remaining(
    gleam@erlang@process:subject({ok, aws@services@s3:completed_part()} |
        {error, error()}),
    integer()
) -> nil.
drain_remaining(Inbox, Remaining) ->
    case Remaining of
        0 ->
            nil;

        _ ->
            _ = gleam_erlang_ffi:'receive'(Inbox),
            drain_remaining(Inbox, Remaining - 1)
    end.

-file("src/aws/s3/transfer.gleam", 559).
-spec collect_batch_results(
    gleam@erlang@process:subject({ok, aws@services@s3:completed_part()} |
        {error, error()}),
    integer(),
    list(aws@services@s3:completed_part())
) -> {ok, list(aws@services@s3:completed_part())} | {error, error()}.
collect_batch_results(Inbox, Remaining, Acc) ->
    case Remaining of
        0 ->
            {ok, Acc};

        _ ->
            case gleam_erlang_ffi:'receive'(Inbox) of
                {ok, Part} ->
                    collect_batch_results(Inbox, Remaining - 1, [Part | Acc]);

                {error, E} ->
                    drain_remaining(Inbox, Remaining - 1),
                    {error, E}
            end
    end.

-file("src/aws/s3/transfer.gleam", 701).
-spec empty_completed_part(integer(), gleam@option:option(binary())) -> aws@services@s3:completed_part().
empty_completed_part(Part_number, E_tag) ->
    {completed_part,
        none,
        none,
        none,
        none,
        none,
        none,
        none,
        none,
        none,
        none,
        E_tag,
        {some, Part_number}}.

-file("src/aws/s3/transfer.gleam", 667).
-spec empty_upload_part_request(
    binary(),
    binary(),
    binary(),
    integer(),
    bitstring()
) -> aws@services@s3:upload_part_request().
empty_upload_part_request(Bucket, Key, Upload_id, Part_number, Body) ->
    {upload_part_request,
        {some, aws@streaming:from_bit_array(Body)},
        Bucket,
        none,
        none,
        none,
        none,
        none,
        none,
        none,
        none,
        none,
        none,
        none,
        {some, erlang:byte_size(Body)},
        none,
        none,
        Key,
        Part_number,
        none,
        none,
        none,
        none,
        Upload_id}.

-file("src/aws/s3/transfer.gleam", 531).
-spec upload_one_batch(
    aws@services@s3:client(),
    binary(),
    binary(),
    binary(),
    list({integer(), bitstring()})
) -> {ok, list(aws@services@s3:completed_part())} | {error, error()}.
upload_one_batch(Client, Bucket, Key, Upload_id, Batch) ->
    Inbox = gleam@erlang@process:new_subject(),
    gleam@list:each(
        Batch,
        fun(Numbered) ->
            {Part_number, Part} = Numbered,
            _ = proc_lib:spawn_link(
                fun() ->
                    Result = case aws@services@s3:upload_part(
                        Client,
                        empty_upload_part_request(
                            Bucket,
                            Key,
                            Upload_id,
                            Part_number,
                            Part
                        )
                    ) of
                        {ok, Out} ->
                            {ok,
                                empty_completed_part(
                                    Part_number,
                                    erlang:element(13, Out)
                                )};

                        {error, E} ->
                            {error, {upload_part_failed, Part_number, E}}
                    end,
                    gleam@erlang@process:send(Inbox, Result)
                end
            ),
            nil
        end
    ),
    collect_batch_results(Inbox, erlang:length(Batch), []).

-file("src/aws/s3/transfer.gleam", 598).
-spec do_take_split(list(AWWA), integer(), list(AWWA)) -> {list(AWWA),
    list(AWWA)}.
do_take_split(Xs, N, Acc) ->
    case {Xs, N} of
        {[], _} ->
            {lists:reverse(Acc), []};

        {_, 0} ->
            {lists:reverse(Acc), Xs};

        {[X | Rest], _} ->
            do_take_split(Rest, N - 1, [X | Acc])
    end.

-file("src/aws/s3/transfer.gleam", 594).
?DOC(
    " Take the first `n` elements; return `(taken, rest)`. Used to\n"
    " chunk the work-list into bounded-concurrency batches.\n"
).
-spec take_split(list(AWVW), integer()) -> {list(AWVW), list(AWVW)}.
take_split(Xs, N) ->
    do_take_split(Xs, N, []).

-file("src/aws/s3/transfer.gleam", 490).
-spec upload_batches(
    aws@services@s3:client(),
    binary(),
    binary(),
    binary(),
    list({integer(), bitstring()}),
    integer(),
    list(aws@services@s3:completed_part())
) -> {ok, list(aws@services@s3:completed_part())} | {error, error()}.
upload_batches(Client, Bucket, Key, Upload_id, Remaining, Max_concurrency, Acc) ->
    case Remaining of
        [] ->
            {ok,
                gleam@list:sort(
                    Acc,
                    fun(A, B) ->
                        gleam@int:compare(
                            gleam@option:unwrap(erlang:element(13, A), 0),
                            gleam@option:unwrap(erlang:element(13, B), 0)
                        )
                    end
                )};

        _ ->
            {Batch, Rest} = take_split(Remaining, Max_concurrency),
            gleam@result:'try'(
                upload_one_batch(Client, Bucket, Key, Upload_id, Batch),
                fun(Batch_done) ->
                    upload_batches(
                        Client,
                        Bucket,
                        Key,
                        Upload_id,
                        Rest,
                        Max_concurrency,
                        lists:append(Batch_done, Acc)
                    )
                end
            )
    end.

-file("src/aws/s3/transfer.gleam", 478).
?DOC(
    " Parallel coordinator. Processes `parts` in batches of at most\n"
    " `max_concurrency` simultaneously-in-flight `UploadPart` calls.\n"
    " Each batch spawns a worker per part, awaits all results, then\n"
    " the outer loop moves on to the next batch. Results are sorted\n"
    " by part-number at the end so the `CompleteMultipartUpload`\n"
    " request sees parts in ascending order (S3 requires it).\n"
    "\n"
    " First failure in any batch short-circuits the whole upload —\n"
    " the outer `coordinate` then issues a best-effort\n"
    " `AbortMultipartUpload`.\n"
).
-spec upload_all_parts_parallel(
    aws@services@s3:client(),
    binary(),
    binary(),
    binary(),
    list(bitstring()),
    integer()
) -> {ok, list(aws@services@s3:completed_part())} | {error, error()}.
upload_all_parts_parallel(
    Client,
    Bucket,
    Key,
    Upload_id,
    Parts,
    Max_concurrency
) ->
    Numbered = gleam@list:index_map(Parts, fun(P, I) -> {I + 1, P} end),
    upload_batches(
        Client,
        Bucket,
        Key,
        Upload_id,
        Numbered,
        Max_concurrency,
        []
    ).

-file("src/aws/s3/transfer.gleam", 428).
-spec upload_all_parts(
    aws@services@s3:client(),
    binary(),
    binary(),
    binary(),
    list(bitstring()),
    integer(),
    list(aws@services@s3:completed_part())
) -> {ok, list(aws@services@s3:completed_part())} | {error, error()}.
upload_all_parts(Client, Bucket, Key, Upload_id, Parts, Next_part_number, Acc) ->
    case Parts of
        [] ->
            {ok, lists:reverse(Acc)};

        [Part | Rest] ->
            Req = empty_upload_part_request(
                Bucket,
                Key,
                Upload_id,
                Next_part_number,
                Part
            ),
            gleam@result:'try'(
                begin
                    _pipe = aws@services@s3:upload_part(Client, Req),
                    gleam@result:map_error(
                        _pipe,
                        fun(E) -> {upload_part_failed, Next_part_number, E} end
                    )
                end,
                fun(Out) ->
                    Completed = empty_completed_part(
                        Next_part_number,
                        erlang:element(13, Out)
                    ),
                    upload_all_parts(
                        Client,
                        Bucket,
                        Key,
                        Upload_id,
                        Rest,
                        Next_part_number + 1,
                        [Completed | Acc]
                    )
                end
            )
    end.

-file("src/aws/s3/transfer.gleam", 627).
-spec create_request(binary(), binary(), upload_options()) -> aws@services@s3:create_multipart_upload_request().
create_request(Bucket, Key, Options) ->
    {create_multipart_upload_request,
        erlang:element(7, Options),
        Bucket,
        none,
        erlang:element(5, Options),
        none,
        none,
        erlang:element(4, Options),
        erlang:element(3, Options),
        none,
        erlang:element(2, Options),
        none,
        none,
        none,
        none,
        none,
        none,
        Key,
        erlang:element(6, Options),
        none,
        none,
        none,
        none,
        none,
        none,
        none,
        none,
        none,
        erlang:element(9, Options),
        erlang:element(8, Options),
        none,
        none}.

-file("src/aws/s3/transfer.gleam", 310).
-spec coordinate(
    aws@services@s3:client(),
    binary(),
    binary(),
    list(bitstring()),
    upload_options()
) -> {ok, upload_result()} | {error, error()}.
coordinate(Client, Bucket, Key, Parts, Options) ->
    gleam@result:'try'(
        begin
            _pipe = aws@services@s3:create_multipart_upload(
                Client,
                create_request(Bucket, Key, Options)
            ),
            gleam@result:map_error(
                _pipe,
                fun(Field@0) -> {create_failed, Field@0} end
            )
        end,
        fun(Create_out) ->
            gleam@result:'try'(
                gleam@option:to_result(
                    erlang:element(15, Create_out),
                    missing_upload_id
                ),
                fun(Upload_id) ->
                    gleam@result:'try'(
                        abort_on_error(
                            Client,
                            Bucket,
                            Key,
                            Upload_id,
                            case erlang:element(10, Options) of
                                none ->
                                    upload_all_parts(
                                        Client,
                                        Bucket,
                                        Key,
                                        Upload_id,
                                        Parts,
                                        1,
                                        []
                                    );

                                {some, N} ->
                                    upload_all_parts_parallel(
                                        Client,
                                        Bucket,
                                        Key,
                                        Upload_id,
                                        Parts,
                                        N
                                    )
                            end
                        ),
                        fun(Completed_parts) ->
                            gleam@result:'try'(
                                abort_on_error(
                                    Client,
                                    Bucket,
                                    Key,
                                    Upload_id,
                                    begin
                                        _pipe@1 = aws@services@s3:complete_multipart_upload(
                                            Client,
                                            empty_complete_request(
                                                Bucket,
                                                Key,
                                                Upload_id,
                                                Completed_parts
                                            )
                                        ),
                                        gleam@result:map_error(
                                            _pipe@1,
                                            fun(Field@0) -> {complete_failed, Field@0} end
                                        )
                                    end
                                ),
                                fun(_) ->
                                    {ok,
                                        {upload_result,
                                            Bucket,
                                            Key,
                                            Upload_id,
                                            erlang:length(Completed_parts)}}
                                end
                            )
                        end
                    )
                end
            )
        end
    ).

-file("src/aws/s3/transfer.gleam", 301).
-spec validate_part_size(integer()) -> {ok, nil} | {error, error()}.
validate_part_size(Part_size) ->
    case (Part_size >= 5242880) andalso (Part_size =< 5368709120) of
        true ->
            {ok, nil};

        false ->
            {error, {invalid_part_size, Part_size}}
    end.

-file("src/aws/s3/transfer.gleam", 225).
?DOC(
    " `upload` with caller-specified per-object metadata — sets HTTP\n"
    " content metadata (Content-Type, Cache-Control, etc.), the\n"
    " optional ACL / storage class / SSE choice, and any user\n"
    " metadata at `CreateMultipartUpload` time. See `UploadOptions`\n"
    " for the field set.\n"
).
-spec upload_with_options(
    aws@services@s3:client(),
    binary(),
    binary(),
    bitstring(),
    integer(),
    upload_options()
) -> {ok, upload_result()} | {error, error()}.
upload_with_options(Client, Bucket, Key, Body, Part_size_bytes, Options) ->
    gleam@result:'try'(
        validate_part_size(Part_size_bytes),
        fun(_) -> case erlang:byte_size(Body) of
                0 ->
                    {error, empty_body};

                _ ->
                    coordinate(
                        Client,
                        Bucket,
                        Key,
                        split_into_parts(Body, Part_size_bytes),
                        Options
                    )
            end end
    ).

-file("src/aws/s3/transfer.gleam", 203).
?DOC(
    " Upload `body` as `bucket/key` via S3's multipart API. Splits the\n"
    " body into parts of `part_size_bytes` (the last part may be\n"
    " smaller), uploads each, then finalises with\n"
    " `CompleteMultipartUpload`.\n"
    "\n"
    " Any failure mid-flight triggers a best-effort\n"
    " `AbortMultipartUpload` so the bucket doesn't accumulate dangling\n"
    " uploads. The abort's own success / failure is intentionally\n"
    " silenced — the caller already has the more interesting error\n"
    " from the step that failed.\n"
    "\n"
    " `part_size_bytes` must be between\n"
    " `default_part_size_bytes` (5 MiB) and `max_part_size_bytes`\n"
    " (5 GiB), inclusive. Invalid values return\n"
    " `Error(InvalidPartSize(part_size_bytes))` before any HTTP work.\n"
    " An empty body returns `Error(EmptyBody)`; S3 rejects empty\n"
    " multipart uploads with `EntityTooSmall`, so we short-circuit\n"
    " before the create round trip.\n"
).
-spec upload(
    aws@services@s3:client(),
    binary(),
    binary(),
    bitstring(),
    integer()
) -> {ok, upload_result()} | {error, error()}.
upload(Client, Bucket, Key, Body, Part_size_bytes) ->
    upload_with_options(
        Client,
        Bucket,
        Key,
        Body,
        Part_size_bytes,
        default_options()
    ).

-file("src/aws/s3/transfer.gleam", 412).
-spec flush_full_parts(bitstring(), integer(), list(bitstring())) -> {list(bitstring()),
    bitstring()}.
flush_full_parts(Buf, Part_size, Acc) ->
    Size = erlang:byte_size(Buf),
    case Size >= Part_size of
        true ->
            Head@1 = case gleam_stdlib:bit_array_slice(Buf, 0, Part_size) of
                {ok, Head} -> Head;
                _assert_fail ->
                    erlang:error(#{gleam_error => let_assert,
                                message => <<"Pattern match failed, no pattern matched the value."/utf8>>,
                                file => <<?FILEPATH/utf8>>,
                                module => <<"aws/s3/transfer"/utf8>>,
                                function => <<"flush_full_parts"/utf8>>,
                                line => 420,
                                value => _assert_fail,
                                start => 14903,
                                'end' => 14959,
                                pattern_start => 14914,
                                pattern_end => 14922})
            end,
            Tail@1 = case gleam_stdlib:bit_array_slice(
                Buf,
                Part_size,
                Size - Part_size
            ) of
                {ok, Tail} -> Tail;
                _assert_fail@1 ->
                    erlang:error(#{gleam_error => let_assert,
                                message => <<"Pattern match failed, no pattern matched the value."/utf8>>,
                                file => <<?FILEPATH/utf8>>,
                                module => <<"aws/s3/transfer"/utf8>>,
                                function => <<"flush_full_parts"/utf8>>,
                                line => 421,
                                value => _assert_fail@1,
                                start => 14966,
                                'end' => 15037,
                                pattern_start => 14977,
                                pattern_end => 14985})
            end,
            flush_full_parts(Tail@1, Part_size, [Head@1 | Acc]);

        false ->
            {Acc, Buf}
    end.

-file("src/aws/s3/transfer.gleam", 398).
?DOC(
    " Re-aggregate a `StreamingBody`'s chunks into parts of size\n"
    " `part_size`. Walks each chunk once, appending to a running\n"
    " buffer; flushes a part every time the buffer reaches\n"
    " `part_size`, and flushes any remainder as the final (possibly\n"
    " undersized) part.\n"
).
-spec rechunk_to_parts(aws@streaming:streaming_body(), integer()) -> list(bitstring()).
rechunk_to_parts(Body, Part_size) ->
    Chunks = aws@streaming:to_chunks(Body),
    {Parts_rev, Leftover} = gleam@list:fold(
        Chunks,
        {[], <<>>},
        fun(State, Chunk) ->
            {Parts, Buf} = State,
            flush_full_parts(
                gleam@bit_array:append(Buf, Chunk),
                Part_size,
                Parts
            )
        end
    ),
    Parts_with_tail = case erlang:byte_size(Leftover) of
        0 ->
            Parts_rev;

        _ ->
            [Leftover | Parts_rev]
    end,
    lists:reverse(Parts_with_tail).

-file("src/aws/s3/transfer.gleam", 285).
?DOC(
    " `upload_from_stream` with caller-specified per-object metadata.\n"
    " See `UploadOptions`.\n"
).
-spec upload_from_stream_with_options(
    aws@services@s3:client(),
    binary(),
    binary(),
    aws@streaming:streaming_body(),
    integer(),
    upload_options()
) -> {ok, upload_result()} | {error, error()}.
upload_from_stream_with_options(
    Client,
    Bucket,
    Key,
    Body,
    Part_size_bytes,
    Options
) ->
    gleam@result:'try'(
        validate_part_size(Part_size_bytes),
        fun(_) ->
            Parts = rechunk_to_parts(Body, Part_size_bytes),
            case gleam@list:is_empty(Parts) of
                true ->
                    {error, empty_body};

                false ->
                    coordinate(Client, Bucket, Key, Parts, Options)
            end
        end
    ).

-file("src/aws/s3/transfer.gleam", 266).
?DOC(
    " Same as `upload`, but takes a `StreamingBody` instead of a buffered\n"
    " `BitArray`. Walks the body's chunks once, re-aggregating across\n"
    " chunk boundaries so the wire-side part sizes follow\n"
    " `part_size_bytes` rather than the source's chunking — useful when\n"
    " the body comes from a chunked transport or builder that emits\n"
    " frequent small chunks (request streaming, log ingestion, line-\n"
    " oriented producers).\n"
    "\n"
    " Today both `StreamingBody` representations (Buffered / Chunked)\n"
    " hold their full bytes in memory, so this variant doesn't yet\n"
    " reduce peak memory vs `upload(streaming.to_bit_array(body), ...)`.\n"
    " Once `StreamingBody` grows a lazy `Source(...)` variant (file\n"
    " handles, generators), this path picks up true bounded-memory\n"
    " streaming for free.\n"
    "\n"
    " `part_size_bytes` has the same validation as `upload`: it must\n"
    " be between 5 MiB and 5 GiB, inclusive, or the function returns\n"
    " `Error(InvalidPartSize(part_size_bytes))` before reading chunks\n"
    " or creating the multipart upload.\n"
).
-spec upload_from_stream(
    aws@services@s3:client(),
    binary(),
    binary(),
    aws@streaming:streaming_body(),
    integer()
) -> {ok, upload_result()} | {error, error()}.
upload_from_stream(Client, Bucket, Key, Body, Part_size_bytes) ->
    upload_from_stream_with_options(
        Client,
        Bucket,
        Key,
        Body,
        Part_size_bytes,
        default_options()
    ).