-module(aws@streaming).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/aws/streaming.gleam").
-export([from_bit_array/1, from_chunks/1, from_source/1, to_bit_array/1, to_chunks/1, byte_size/1, is_empty/1, empty/0, from_string/1, append/2, fold_chunks/3, try_fold_chunks/3, to_bit_array_max/2, collect_to_bit_array_max/2, collect_to_string_max/2, to_string_max/2]).
-export_type([streaming_body/0, response/0, collect_error/1]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(
" `StreamingBody` — wrapper for HTTP request / response bodies\n"
" that may be too large to hold in memory.\n"
"\n"
" The opaque type has three representations:\n"
"\n"
" - **Buffered**: a single `BitArray` materialised up front.\n"
" Produced by `from_bit_array` and by buffered callers that\n"
" only ever hold the full payload.\n"
" - **Chunked**: an ordered list of byte chunks. The chunked\n"
" transport (`http_streaming.default_send`) produces these\n"
" directly off the wire; multipart builders construct them\n"
" via `from_chunks`. `to_chunks` exposes the chunk list to\n"
" consumers without materialising the full payload.\n"
" - **Source**: a pull-based callback `fn() -> Result(BitArray,\n"
" Nil)`. Each call yields the next chunk; `Error(Nil)` marks\n"
" end-of-stream. File-backed and generator-backed producers\n"
" construct one of these so callers can stream multi-GB\n"
" payloads without holding the full body in memory. Use\n"
" `from_source` to construct, `from_file` for the common\n"
" file-backed case. Note: a `Source` is single-pass — once\n"
" consumed it can't be replayed, and `is_empty` / `byte_size`\n"
" either consume it (`byte_size`) or conservatively assume\n"
" non-empty (`is_empty`).\n"
).
-opaque streaming_body() :: {buffered, bitstring()} |
{chunked, list(bitstring())} |
{source, fun(() -> {ok, bitstring()} | {error, nil})}.
-type response() :: {response,
integer(),
list({binary(), binary()}),
streaming_body()}.
-type collect_error(GPS) :: {transport, GPS} |
{too_large, integer()} |
invalid_utf8.
-file("src/aws/streaming.gleam", 54).
?DOC(
" Build a `StreamingBody` from a `BitArray` already in memory.\n"
" Use this when the body bytes are buffered up front (a UTF-8\n"
" payload, the result of a buffered read, a generated XML/JSON\n"
" body) and you want to pass a `StreamingBody`-typed value through.\n"
).
-spec from_bit_array(bitstring()) -> streaming_body().
from_bit_array(Bytes) ->
{buffered, Bytes}.
-file("src/aws/streaming.gleam", 62).
?DOC(
" Build a `StreamingBody` from an ordered list of byte chunks.\n"
" Chunk boundaries are preserved by `to_chunks` and by `append`\n"
" when both operands are chunked, so multipart builders and the\n"
" chunked transport see exactly the chunking the caller produced.\n"
).
-spec from_chunks(list(bitstring())) -> streaming_body().
from_chunks(Chunks) ->
{chunked, Chunks}.
-file("src/aws/streaming.gleam", 76).
?DOC(
" Build a `StreamingBody` from a pull-based callback. Each call\n"
" to `next` returns the next chunk; `Error(Nil)` marks end-of-\n"
" stream. The callback is invoked once per `fold_chunks` step\n"
" (so multi-GB payloads stream without ever materialising the\n"
" full body), and once per element when `to_chunks` materialises.\n"
"\n"
" Single-pass: once a `Source` has been folded / materialised,\n"
" the callback is exhausted and subsequent calls return `Error`.\n"
" Callers that need to consume the same body twice should\n"
" reconstruct the body fresh on the second pass.\n"
).
-spec from_source(fun(() -> {ok, bitstring()} | {error, nil})) -> streaming_body().
from_source(Next) ->
{source, Next}.
-file("src/aws/streaming.gleam", 125).
-spec drain_source(
fun(() -> {ok, bitstring()} | {error, nil}),
list(bitstring())
) -> list(bitstring()).
drain_source(Next, Acc) ->
case Next() of
{ok, Chunk} ->
drain_source(Next, [Chunk | Acc]);
{error, _} ->
lists:reverse(Acc)
end.
-file("src/aws/streaming.gleam", 85).
?DOC(
" Return the body as a `BitArray`. For `Buffered` this is a\n"
" constant-time accessor; for `Chunked` it concatenates the chunks.\n"
" For `Source` it materialises the whole stream — defeats the point\n"
" of streaming, so chunk-by-chunk consumers should prefer\n"
" `to_chunks` / `fold_chunks` instead.\n"
).
-spec to_bit_array(streaming_body()) -> bitstring().
to_bit_array(Body) ->
case Body of
{buffered, B} ->
B;
{chunked, Cs} ->
gleam_stdlib:bit_array_concat(Cs);
{source, Next} ->
gleam_stdlib:bit_array_concat(drain_source(Next, []))
end.
-file("src/aws/streaming.gleam", 100).
?DOC(
" Return the body as an ordered list of byte chunks. Buffered\n"
" bodies surface as a single-element list (or the empty list if\n"
" the buffer is empty), so consumers can write one chunk-oriented\n"
" loop and have it work uniformly across both representations.\n"
" `Source` bodies are materialised by draining the callback —\n"
" expensive for large streams; use `fold_chunks` to stream\n"
" element-by-element without materialising.\n"
).
-spec to_chunks(streaming_body()) -> list(bitstring()).
to_chunks(Body) ->
case Body of
{buffered, <<>>} ->
[];
{buffered, B} ->
[B];
{chunked, Cs} ->
Cs;
{source, Next} ->
drain_source(Next, [])
end.
-file("src/aws/streaming.gleam", 135).
-spec fold_source(
fun(() -> {ok, bitstring()} | {error, nil}),
GQD,
fun((GQD, bitstring()) -> GQD)
) -> GQD.
fold_source(Next, Acc, F) ->
case Next() of
{ok, Chunk} ->
fold_source(Next, F(Acc, Chunk), F);
{error, _} ->
Acc
end.
-file("src/aws/streaming.gleam", 115).
?DOC(
" Byte size of the body. Constant-time for `Buffered`; walks the\n"
" chunk list (summing `bit_array.byte_size`) for `Chunked`. For\n"
" `Source` it drains the callback to count — single-pass, so the\n"
" stream is consumed by the call. Callers who need to know the\n"
" size without consuming the body should track it themselves\n"
" alongside the `StreamingBody` value.\n"
).
-spec byte_size(streaming_body()) -> integer().
byte_size(Body) ->
case Body of
{buffered, B} ->
erlang:byte_size(B);
{chunked, Cs} ->
gleam@list:fold(
Cs,
0,
fun(Acc, Chunk) -> Acc + erlang:byte_size(Chunk) end
);
{source, Next} ->
fold_source(
Next,
0,
fun(Acc@1, Chunk@1) -> Acc@1 + erlang:byte_size(Chunk@1) end
)
end.
-file("src/aws/streaming.gleam", 152).
?DOC(
" `True` iff the body is empty. For `Buffered` / `Chunked` it's a\n"
" direct check; for `Source` it returns `False` conservatively\n"
" without consuming the callback — a `Source` could yield zero\n"
" chunks, but checking would require calling `next()` which is\n"
" destructive (single-pass). Callers who need a definitive answer\n"
" should call `byte_size` instead, accepting the consumption cost.\n"
).
-spec is_empty(streaming_body()) -> boolean().
is_empty(Body) ->
case Body of
{buffered, B} ->
erlang:byte_size(B) =:= 0;
{chunked, Cs} ->
gleam@list:all(Cs, fun(C) -> erlang:byte_size(C) =:= 0 end);
{source, _} ->
false
end.
-file("src/aws/streaming.gleam", 163).
?DOC(
" Buffered empty body. Used by request builders when no body\n"
" is present (the SDK threads a `StreamingBody` end-to-end even\n"
" for GET-style operations).\n"
).
-spec empty() -> streaming_body().
empty() ->
{buffered, <<>>}.
-file("src/aws/streaming.gleam", 170).
?DOC(
" Build a `StreamingBody` from a UTF-8 `String`. Common path\n"
" for caller-supplied text payloads (JSON bodies, XML bodies);\n"
" equivalent to `from_bit_array(bit_array.from_string(s))`.\n"
).
-spec from_string(binary()) -> streaming_body().
from_string(S) ->
{buffered, gleam_stdlib:identity(S)}.
-file("src/aws/streaming.gleam", 180).
?DOC(
" Concatenate two streaming bodies. When both sides are chunked\n"
" the result preserves chunk boundaries from each operand —\n"
" useful for multipart builders that already chose their chunk\n"
" shape. Mixing buffered and chunked merges through `to_chunks`\n"
" so the result is still walkable chunk-by-chunk by downstream\n"
" consumers.\n"
).
-spec append(streaming_body(), streaming_body()) -> streaming_body().
append(A, B) ->
case {A, B} of
{{buffered, Ba}, {buffered, Bb}} ->
{buffered, gleam@bit_array:append(Ba, Bb)};
{_, _} ->
{chunked, lists:append(to_chunks(A), to_chunks(B))}
end.
-file("src/aws/streaming.gleam", 195).
?DOC(
" Reduce a streaming body left-to-right by accumulating one chunk\n"
" at a time. Buffered bodies surface as a single chunk per\n"
" `to_chunks`, so the fold runs once; chunked bodies fold across\n"
" every chunk the transport delivered; `Source` bodies stream\n"
" chunks one-by-one without materialising — `next` is called once\n"
" per fold step. Use this for running-hash / running-length /\n"
" stream-to-disk pipelines without buffering the full body.\n"
).
-spec fold_chunks(streaming_body(), GQE, fun((GQE, bitstring()) -> GQE)) -> GQE.
fold_chunks(Body, Initial, F) ->
case Body of
{source, Next} ->
fold_source(Next, Initial, F);
_ ->
gleam@list:fold(to_chunks(Body), Initial, F)
end.
-file("src/aws/streaming.gleam", 222).
-spec try_fold_source(
fun(() -> {ok, bitstring()} | {error, nil}),
GQN,
fun((GQN, bitstring()) -> {ok, GQN} | {error, GQO})
) -> {ok, GQN} | {error, GQO}.
try_fold_source(Next, Acc, F) ->
case Next() of
{ok, Chunk} ->
case F(Acc, Chunk) of
{ok, Next_acc} ->
try_fold_source(Next, Next_acc, F);
{error, E} ->
{error, E}
end;
{error, _} ->
{ok, Acc}
end.
-file("src/aws/streaming.gleam", 211).
?DOC(
" `fold_chunks` variant that short-circuits on `Error`. Returns the\n"
" first error the folder produces (or `Ok(acc)` if every chunk\n"
" accepted). Useful for streaming decoders that can fail partway\n"
" (e.g. UTF-8 validation rejecting a torn multi-byte sequence at a\n"
" chunk boundary, or a JSON parser hitting a malformed token).\n"
).
-spec try_fold_chunks(
streaming_body(),
GQF,
fun((GQF, bitstring()) -> {ok, GQF} | {error, GQG})
) -> {ok, GQF} | {error, GQG}.
try_fold_chunks(Body, Initial, F) ->
case Body of
{source, Next} ->
try_fold_source(Next, Initial, F);
_ ->
gleam@list:try_fold(to_chunks(Body), Initial, F)
end.
-file("src/aws/streaming.gleam", 248).
?DOC(
" Materialise the body as a `BitArray`, refusing to do so if the\n"
" cumulative size would exceed `max_bytes`. Walks chunks lazily\n"
" on the chunked path so the cap fires before concatenation. Use\n"
" this when a caller wants buffered access but must guard against\n"
" OOM on pathologically-large responses (event-stream control-\n"
" message buffers, downloads of unknown size, server bugs).\n"
"\n"
" `Error(Nil)` means the body exceeded the cap; the caller can\n"
" fall back to chunk-by-chunk processing or surface a typed\n"
" error to its own callers. A `max_bytes` of 0 still accepts the\n"
" empty body (`Ok(<<>>)`).\n"
).
-spec to_bit_array_max(streaming_body(), integer()) -> {ok, bitstring()} |
{error, nil}.
to_bit_array_max(Body, Max_bytes) ->
try_fold_chunks(
Body,
<<>>,
fun(Acc, Chunk) ->
Combined = gleam@bit_array:append(Acc, Chunk),
case erlang:byte_size(Combined) > Max_bytes of
true ->
{error, nil};
false ->
{ok, Combined}
end
end
).
-file("src/aws/streaming.gleam", 287).
?DOC(
" Generic capped-buffered collection: takes the `Result` produced\n"
" by a `<op>_streaming` wrapper and materialises the body as a\n"
" `BitArray`, refusing if size would exceed `max_bytes`. Works\n"
" against any service's streaming wrapper since the wrappers all\n"
" return `Result(streaming.Response, _)`.\n"
"\n"
" Typical \"download a smallish-bounded object\" case: small JSON /\n"
" config blobs / log shards where the wire bytes fit in memory\n"
" but the caller wants a hard ceiling. For multi-GB objects skip\n"
" this helper and consume chunks via `fold_chunks` directly.\n"
).
-spec collect_to_bit_array_max({ok, response()} | {error, GQV}, integer()) -> {ok,
bitstring()} |
{error, collect_error(GQV)}.
collect_to_bit_array_max(Resp, Max_bytes) ->
gleam@result:'try'(
begin
_pipe = Resp,
gleam@result:map_error(
_pipe,
fun(Field@0) -> {transport, Field@0} end
)
end,
fun(R) -> _pipe@1 = to_bit_array_max(erlang:element(4, R), Max_bytes),
gleam@result:replace_error(_pipe@1, {too_large, Max_bytes}) end
).
-file("src/aws/streaming.gleam", 302).
?DOC(
" Same as `collect_to_bit_array_max` but also runs the bytes\n"
" through `bit_array.to_string`, surfacing\n"
" `Error(InvalidUtf8)` when the body isn't valid UTF-8. Common\n"
" path for streaming text responses (JSON / XML / log shards)\n"
" where the caller wants the bytes both size-bounded AND\n"
" UTF-8-validated in a single hop.\n"
).
-spec collect_to_string_max({ok, response()} | {error, GRB}, integer()) -> {ok,
binary()} |
{error, collect_error(GRB)}.
collect_to_string_max(Resp, Max_bytes) ->
gleam@result:'try'(
collect_to_bit_array_max(Resp, Max_bytes),
fun(Bytes) -> _pipe = gleam@bit_array:to_string(Bytes),
gleam@result:replace_error(_pipe, invalid_utf8) end
).
-file("src/aws/streaming.gleam", 321).
?DOC(
" Materialise the body as a UTF-8 `String`. Returns `Error(Nil)`\n"
" if the body exceeds `max_bytes` (via `to_bit_array_max`) OR if\n"
" the bytes aren't valid UTF-8. The two-failure-modes-one-error\n"
" shape mirrors `bit_array.to_string` so callers can keep their\n"
" existing `result.try` chains.\n"
"\n"
" Common path for text response bodies — JSON-as-text, XML-as-\n"
" text, log files — where the caller wants both size-safety and\n"
" a `String` result. Equivalent to `to_bit_array_max` followed by\n"
" `bit_array.to_string`, kept here as a single entry point so\n"
" call sites read as one operation.\n"
).
-spec to_string_max(streaming_body(), integer()) -> {ok, binary()} |
{error, nil}.
to_string_max(Body, Max_bytes) ->
gleam@result:'try'(
to_bit_array_max(Body, Max_bytes),
fun(Bytes) -> gleam@bit_array:to_string(Bytes) end
).