Skip to main content

src/aion_client@stream.erl

-module(aion_client@stream).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/aion_client/stream.gleam").
-export([subscribe/2, subscribe_with_stub/2, collect/1]).
-export_type([event/1, stream_item/1, event_stream/1, frame/0, stub_transport/0]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(" Event stream abstraction with sequence-based resumption.\n").

-type event(FKI) :: {event, integer(), FKI}.

-type stream_item(FKJ) :: {event_item, event(FKJ)} |
    {stream_error, aion_client@error:error()} |
    stream_end.

-type event_stream(FKK) :: {event_stream, fun(() -> list(stream_item(FKK)))}.

-type frame() :: {frame, integer(), aion_client@payload:payload()} |
    transient_disconnect |
    {terminal_failure, aion_client@error:error()} |
    end_of_stream.

-type stub_transport() :: {stub_transport,
        fun((gleam@option:option(integer())) -> list(frame()))}.

-file("src/aion_client/stream.gleam", 51).
?DOC(
    " Build a stream for a workflow handle. The concrete WebSocket adapter is an\n"
    " AW-owned transport concern; until that adapter is wired this returns an\n"
    " Unavailable item rather than silently ending.\n"
).
-spec subscribe(
    aion_client:workflow_handle(),
    gleam@dynamic@decode:decoder(FKL)
) -> event_stream(FKL).
subscribe(_, _) ->
    {event_stream, fun() -> [{stream_error, unavailable}] end}.

-file("src/aion_client/stream.gleam", 136).
-spec reverse(list(FLI)) -> list(FLI).
reverse(Items) ->
    lists:reverse(Items).

-file("src/aion_client/stream.gleam", 94).
-spec read_frames(
    stub_transport(),
    gleam@dynamic@decode:decoder(FLB),
    list(frame()),
    integer(),
    list(stream_item(FLB))
) -> list(stream_item(FLB)).
read_frames(Transport, Decoder, Frames, Last_delivered, Delivered) ->
    case Frames of
        [] ->
            reverse(Delivered);

        [First | Rest] ->
            case First of
                {frame, Sequence, Raw_payload} ->
                    case Sequence =< Last_delivered of
                        true ->
                            read_frames(
                                Transport,
                                Decoder,
                                Rest,
                                Last_delivered,
                                Delivered
                            );

                        false ->
                            case Sequence =:= (Last_delivered + 1) of
                                false ->
                                    reverse(
                                        [{stream_error, unavailable} |
                                            Delivered]
                                    );

                                true ->
                                    case aion_client@payload:decode(
                                        Raw_payload,
                                        Decoder
                                    ) of
                                        {ok, Event} ->
                                            read_frames(
                                                Transport,
                                                Decoder,
                                                Rest,
                                                Sequence,
                                                [{event_item,
                                                        {event, Sequence, Event}} |
                                                    Delivered]
                                            );

                                        {error, Error} ->
                                            reverse(
                                                [{stream_error, Error} |
                                                    Delivered]
                                            )
                                    end
                            end
                    end;

                transient_disconnect ->
                    read_from_stub(
                        Transport,
                        Decoder,
                        Last_delivered,
                        Delivered
                    );

                {terminal_failure, Error@1} ->
                    reverse([{stream_error, Error@1} | Delivered]);

                end_of_stream ->
                    reverse([stream_end | Delivered])
            end
    end.

-file("src/aion_client/stream.gleam", 76).
-spec read_from_stub(
    stub_transport(),
    gleam@dynamic@decode:decoder(FKV),
    integer(),
    list(stream_item(FKV))
) -> list(stream_item(FKV)).
read_from_stub(Transport, Decoder, Last_delivered, Delivered) ->
    {stub_transport, Open} = Transport,
    Cursor = case Last_delivered of
        0 ->
            none;

        Sequence ->
            {some, Sequence + 1}
    end,
    Frames = Open(Cursor),
    read_frames(Transport, Decoder, Frames, Last_delivered, Delivered).

-file("src/aion_client/stream.gleam", 64).
?DOC(
    " Conformance/test helper that exercises the same cursor protocol as the\n"
    " reference SDK transports: the initial open passes `None` (no resume field\n"
    " — live tail), every reconnect after a transient disconnect passes\n"
    " `Some(last delivered + 1)` (the first sequence wanted), re-sent\n"
    " duplicates are filtered, and a sequence gap surfaces as `Unavailable`\n"
    " instead of silently losing events.\n"
).
-spec subscribe_with_stub(stub_transport(), gleam@dynamic@decode:decoder(FKO)) -> event_stream(FKO).
subscribe_with_stub(Transport, Decoder) ->
    {event_stream, fun() -> read_from_stub(Transport, Decoder, 0, []) end}.

-file("src/aion_client/stream.gleam", 71).
-spec collect(event_stream(FKR)) -> list(stream_item(FKR)).
collect(Stream) ->
    {event_stream, Read_all} = Stream,
    Read_all().