Skip to main content

src/aws_streaming_ffi.erl

%% Streaming HTTP send via OTP's `httpc` async-self mode. The
%% caller asks for a request; we set `{sync, false}, {stream, self}`,
%% block on the resulting stream messages, and return the assembled
%% response as `{Status, Headers, [Chunk]}` so the SDK runtime can
%% wrap it as a `streaming.StreamingBody`.
%%
%% Why a dedicated FFI rather than extending `gleam_httpc`:
%% `gleam_httpc` only exposes synchronous (buffer-all-bytes) mode.
%% Streaming mode needs `{sync, false} + {stream, self}` which
%% gleam_httpc doesn't surface; the caller controls the receive
%% loop. That loop lives here so it can be unit-tested by sending
%% synthetic stream messages to a known request id, without booting
%% a real HTTP server.
%%
%% Why we collect synchronously instead of returning a lazy yielder:
%% v1 keeps the existing call-site contract (synchronous Result of
%% Response). Once the SDK gets a streaming consumer API, this loop
%% can be split so chunks reach the caller incrementally — the same
%% `collect_stream/2` becomes the per-chunk handler hook.
%%
%% Status code provenance: OTP's `{stream, self}` only streams for
%% 2xx responses. The first message in that case is `stream_start`
%% which carries only headers — there's no explicit status field.
%% We default Status = 200 because the contract guarantees a 2xx
%% to even get this far; the SDK runtime's success-path code only
%% checks `>= 200 && < 300`, so the exact 200 vs 206 distinction
%% doesn't change error handling. For non-2xx responses httpc sends
%% the synchronous-mode message which carries the full status line;
%% we surface that verbatim.

-module(aws_streaming_ffi).
-export([
    request_streaming/4, collect_stream/2,
    streaming_send/6, streaming_send/7,
    build_http_options/3
]).

%% Single-call streaming send for the Gleam wrapper. Translates the
%% Gleam-side request parameters into an httpc tuple here so the
%% Gleam side doesn't need to think about per-method tuple shapes.
%%
%% - Method: atom (`get`, `post`, etc.)
%% - Url: binary
%% - Headers: list of `{Name, Value}` binary pairs
%% - Body: binary (empty for GET / HEAD)
%% - Timeout: int (ms)
%% - VerifyTls: bool
%%
%% Returns the same `{ok, {Status, Headers, [Chunk]}} | {error, Reason}`
%% shape as `request_streaming/4`.
streaming_send(Method, Url, Headers, Body, Timeout, VerifyTls) ->
    streaming_send(Method, Url, Headers, Body, Timeout, VerifyTls, false).

%% 7-arg form opts into HTTP/2 via `{http_version, "HTTP/2"}`.
%% OTP 27.1+ supports HTTP/2 in `httpc`; servers that don't speak it
%% negotiate down to HTTP/1.1. Pass `true` only for endpoints
%% known to benefit (S3 multipart, Bedrock streaming responses).
streaming_send(Method, Url, Headers, Body, Timeout, VerifyTls, Http2) ->
    UrlList = unicode:characters_to_list(Url),
    HeadersList = prepare_headers(Headers),
    HttpOptions = build_http_options(Timeout, VerifyTls, Http2),
    Request = build_request(Method, UrlList, HeadersList, Headers, Body),
    request_streaming(Method, Request, HttpOptions, Timeout).

prepare_headers(Headers) ->
    prepare_headers_loop(Headers, [], false).

prepare_headers_loop([], Acc, true) ->
    Acc;
prepare_headers_loop([], Acc, false) ->
    [{"user-agent", "aws-gleam/0.1"} | Acc];
prepare_headers_loop([{Name, Value} | Rest], Acc, UaSet) ->
    UaSet1 = UaSet orelse Name =:= <<"user-agent">>,
    Pair = {unicode:characters_to_list(Name), unicode:characters_to_list(Value)},
    prepare_headers_loop(Rest, [Pair | Acc], UaSet1).


%% 3-arg form keyed on (timeout, verify_tls, http2). HTTP/2 = true
%% adds `{http_version, "HTTP/2"}` to the option list — httpc
%% upgrades the request over TLS via ALPN. Servers that don't
%% speak HTTP/2 negotiate down without an error.
build_http_options(Timeout, true, false) ->
    [{autoredirect, false}, {timeout, Timeout}];
build_http_options(Timeout, false, false) ->
    [{autoredirect, false}, {timeout, Timeout}, {ssl, [{verify, verify_none}]}];
build_http_options(Timeout, true, true) ->
    [{autoredirect, false}, {timeout, Timeout}, {http_version, "HTTP/2"}];
build_http_options(Timeout, false, true) ->
    [
        {autoredirect, false}, {timeout, Timeout},
        {ssl, [{verify, verify_none}]},
        {http_version, "HTTP/2"}
    ].

%% GET / HEAD / OPTIONS take a `{Url, Headers}` request tuple; every
%% other method needs `{Url, Headers, ContentType, Body}`. The
%% content-type comes from the caller-supplied headers (Authority on
%% AWS request bodies), defaulting to `application/octet-stream` to
%% match gleam_httpc.
build_request(Method, UrlList, HeadersList, _RawHeaders, _Body) when
    Method =:= get; Method =:= head; Method =:= options
->
    {UrlList, HeadersList};
build_request(_Method, UrlList, HeadersList, RawHeaders, Body) ->
    ContentType = lookup_content_type(RawHeaders),
    {UrlList, HeadersList, ContentType, Body}.

lookup_content_type([]) ->
    "application/octet-stream";
lookup_content_type([{Name, Value} | Rest]) ->
    case string:lowercase(Name) of
        <<"content-type">> -> unicode:characters_to_list(Value);
        _ -> lookup_content_type(Rest)
    end.

%% Issue Method to the URL embedded in Request, drive httpc in
%% async-self stream mode, collect every stream message until end-
%% of-stream or error, and return the assembled response.
%%
%% Returns `{ok, {Status, Headers, [Chunk]}}` on success or
%% `{error, Reason}` on transport failure / timeout. Status is an
%% Int, Headers is the raw list of `{Name, Value}` tuples httpc
%% delivers, Chunk is a binary.
request_streaming(Method, Request, HttpOptions, Timeout) when
    is_atom(Method),
    is_tuple(Request),
    is_list(HttpOptions),
    is_integer(Timeout)
->
    Options = [
        {sync, false},
        {stream, self},
        {body_format, binary},
        {receiver, self()}
    ],
    case httpc:request(Method, Request, HttpOptions, Options) of
        {ok, RequestId} -> collect_stream(RequestId, Timeout);
        {error, Reason} -> {error, normalise_error(Reason)}
    end.

%% Message-collection loop. Exposed so tests can drive it by
%% sending synthetic messages to `self()` and calling this with
%% the matching request id — no HTTP server required.
%%
%% Returns the same shape as `request_streaming/4`.
collect_stream(RequestId, Timeout) ->
    collect_stream(RequestId, Timeout, 200, [], []).

collect_stream(RequestId, Timeout, Status, Headers, Chunks) ->
    receive
        {http, {RequestId, stream_start, ResponseHeaders}} ->
            %% Streaming-mode start: pin headers, status defaults to
            %% 200 per the OTP `{stream, self}` contract (it only
            %% streams 2xx). Subsequent stream / stream_end frames
            %% append chunks and finalise.
            collect_stream(
                RequestId,
                Timeout,
                Status,
                ResponseHeaders,
                Chunks
            );
        {http, {RequestId, stream, BinPart}} ->
            collect_stream(
                RequestId,
                Timeout,
                Status,
                Headers,
                [BinPart | Chunks]
            );
        {http, {RequestId, stream_end, FinalHeaders}} ->
            %% stream_end may carry trailer headers; concatenate to
            %% the response headers so trailers are visible to the
            %% caller. Order is FinalHeaders ++ Headers because
            %% trailers are a tail of the conceptual header list.
            {ok, {Status, FinalHeaders ++ Headers, lists:reverse(Chunks)}};
        {http, {RequestId, {{_HTTPVersion, RespStatus, _Reason}, RespHeaders, RespBody}}} ->
            %% Non-streaming response: httpc only streams 2xx, so a
            %% 4xx/5xx arrives as the synchronous-mode `Result`
            %% tuple. Body is already buffered as a single binary;
            %% surface it as a one-element chunk list so the caller
            %% can treat both paths uniformly.
            {ok, {RespStatus, RespHeaders, [RespBody]}};
        {http, {RequestId, {error, Reason}}} ->
            {error, normalise_error(Reason)}
    after Timeout ->
        {error, timeout}
    end.

%% Map httpc's varied error tuples to a single atom the Gleam side
%% can pattern-match on. Mirrors the cases in
%% `aws/internal/http_send.do_send` so streaming and non-streaming
%% transports surface identical errors for the same root cause.
%% Anything we don't recognise that's already an atom passes
%% through verbatim; anything more exotic (a tuple etc.) collapses
%% to `unknown_transport_error` so the Gleam-side type signature
%% (`Atom`) stays satisfied.
normalise_error({failed_connect, _}) -> failed_to_connect;
normalise_error(timeout) -> response_timeout;
normalise_error(Reason) when is_atom(Reason) -> Reason;
normalise_error(_) -> unknown_transport_error.