-module(aion_client@stream).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/aion_client/stream.gleam").
-export([subscribe/2, subscribe_with_stub/2, collect/1]).
-export_type([event/1, stream_item/1, event_stream/1, frame/0, stub_transport/0]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(" Event stream abstraction with sequence-based resumption.\n").
-type event(FKI) :: {event, integer(), FKI}.
-type stream_item(FKJ) :: {event_item, event(FKJ)} |
{stream_error, aion_client@error:error()} |
stream_end.
-type event_stream(FKK) :: {event_stream, fun(() -> list(stream_item(FKK)))}.
-type frame() :: {frame, integer(), aion_client@payload:payload()} |
transient_disconnect |
{terminal_failure, aion_client@error:error()} |
end_of_stream.
-type stub_transport() :: {stub_transport,
fun((gleam@option:option(integer())) -> list(frame()))}.
-file("src/aion_client/stream.gleam", 51).
?DOC(
" Build a stream for a workflow handle. The concrete WebSocket adapter is an\n"
" AW-owned transport concern; until that adapter is wired this returns an\n"
" Unavailable item rather than silently ending.\n"
).
-spec subscribe(
aion_client:workflow_handle(),
gleam@dynamic@decode:decoder(FKL)
) -> event_stream(FKL).
subscribe(_, _) ->
{event_stream, fun() -> [{stream_error, unavailable}] end}.
-file("src/aion_client/stream.gleam", 136).
-spec reverse(list(FLI)) -> list(FLI).
reverse(Items) ->
lists:reverse(Items).
-file("src/aion_client/stream.gleam", 94).
-spec read_frames(
stub_transport(),
gleam@dynamic@decode:decoder(FLB),
list(frame()),
integer(),
list(stream_item(FLB))
) -> list(stream_item(FLB)).
read_frames(Transport, Decoder, Frames, Last_delivered, Delivered) ->
case Frames of
[] ->
reverse(Delivered);
[First | Rest] ->
case First of
{frame, Sequence, Raw_payload} ->
case Sequence =< Last_delivered of
true ->
read_frames(
Transport,
Decoder,
Rest,
Last_delivered,
Delivered
);
false ->
case Sequence =:= (Last_delivered + 1) of
false ->
reverse(
[{stream_error, unavailable} |
Delivered]
);
true ->
case aion_client@payload:decode(
Raw_payload,
Decoder
) of
{ok, Event} ->
read_frames(
Transport,
Decoder,
Rest,
Sequence,
[{event_item,
{event, Sequence, Event}} |
Delivered]
);
{error, Error} ->
reverse(
[{stream_error, Error} |
Delivered]
)
end
end
end;
transient_disconnect ->
read_from_stub(
Transport,
Decoder,
Last_delivered,
Delivered
);
{terminal_failure, Error@1} ->
reverse([{stream_error, Error@1} | Delivered]);
end_of_stream ->
reverse([stream_end | Delivered])
end
end.
-file("src/aion_client/stream.gleam", 76).
-spec read_from_stub(
stub_transport(),
gleam@dynamic@decode:decoder(FKV),
integer(),
list(stream_item(FKV))
) -> list(stream_item(FKV)).
read_from_stub(Transport, Decoder, Last_delivered, Delivered) ->
{stub_transport, Open} = Transport,
Cursor = case Last_delivered of
0 ->
none;
Sequence ->
{some, Sequence + 1}
end,
Frames = Open(Cursor),
read_frames(Transport, Decoder, Frames, Last_delivered, Delivered).
-file("src/aion_client/stream.gleam", 64).
?DOC(
" Conformance/test helper that exercises the same cursor protocol as the\n"
" reference SDK transports: the initial open passes `None` (no resume field\n"
" — live tail), every reconnect after a transient disconnect passes\n"
" `Some(last delivered + 1)` (the first sequence wanted), re-sent\n"
" duplicates are filtered, and a sequence gap surfaces as `Unavailable`\n"
" instead of silently losing events.\n"
).
-spec subscribe_with_stub(stub_transport(), gleam@dynamic@decode:decoder(FKO)) -> event_stream(FKO).
subscribe_with_stub(Transport, Decoder) ->
{event_stream, fun() -> read_from_stub(Transport, Decoder, 0, []) end}.
-file("src/aion_client/stream.gleam", 71).
-spec collect(event_stream(FKR)) -> list(stream_item(FKR)).
collect(Stream) ->
{event_stream, Read_all} = Stream,
Read_all().