-module(aion@signal).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/aion/signal.gleam").
-export([new/2, name/1, codec/1, 'receive'/1, send/3]).
-export_type([signal_ref/1]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(" Typed signal references plus receive/send wrappers.\n").
-opaque signal_ref(EGK) :: {signal_ref, binary(), aion@codec:codec(EGK)}.
-file("src/aion/signal.gleam", 20).
?DOC(" Construct a typed signal reference.\n").
-spec new(binary(), aion@codec:codec(EGL)) -> signal_ref(EGL).
new(Name, Payload_codec) ->
{signal_ref, Name, Payload_codec}.
-file("src/aion/signal.gleam", 25).
?DOC(" Return the signal name used by the engine signal router boundary.\n").
-spec name(signal_ref(any())) -> binary().
name(Reference) ->
erlang:element(2, Reference).
-file("src/aion/signal.gleam", 30).
?DOC(" Return the payload codec carried by a signal reference.\n").
-spec codec(signal_ref(EGQ)) -> aion@codec:codec(EGQ).
codec(Reference) ->
erlang:element(3, Reference).
-file("src/aion/signal.gleam", 85).
-spec receive_error(binary()) -> aion@error:receive_error().
receive_error(Raw) ->
case gleam_stdlib:string_starts_with(Raw, <<"unknown:"/utf8>>) of
true ->
{unknown_signal, gleam@string:drop_start(Raw, 8)};
false ->
case gleam_stdlib:string_starts_with(Raw, <<"cancelled:"/utf8>>) of
true ->
{receive_cancelled,
{cancelled, gleam@string:drop_start(Raw, 10)}};
false ->
case gleam_stdlib:string_starts_with(
Raw,
<<"non_determinism:"/utf8>>
) of
true ->
{receive_non_deterministic,
{non_determinism_violation,
gleam@string:drop_start(Raw, 16)}};
false ->
{receive_engine_failure, Raw}
end
end
end.
-file("src/aion/signal.gleam", 81).
-spec receive_config() -> binary().
receive_config() ->
_pipe = gleam@json:object([]),
gleam@json:to_string(_pipe).
-file("src/aion/signal.gleam", 41).
?DOC(
" Receive the next payload for a typed signal reference.\n"
"\n"
" The actual selective-receive and replay behavior is owned by AT/AD. This SDK\n"
" wrapper binds to that router through `aion/internal/ffi`, then decodes the\n"
" recorded payload with the reference codec and returns decode failures as\n"
" typed data. The await is a yield point: pending workflow queries are\n"
" serviced by the query pump before the signal resolves.\n"
).
-spec 'receive'(signal_ref(EGT)) -> {ok, EGT} |
{error, aion@error:receive_error()}.
'receive'(Reference) ->
Signal_name = name(Reference),
Config = receive_config(),
case aion@internal@pump:run(
fun() ->
aion@internal@pump:shield(
aion_flow_ffi:receive_signal(Signal_name, Config)
)
end
) of
{ok, Raw_payload} ->
Payload_codec = codec(Reference),
case (erlang:element(3, Payload_codec))(Raw_payload) of
{ok, Payload} ->
{ok, Payload};
{error, Decode_error} ->
{error, {receive_decode_failed, Decode_error}}
end;
{error, Raw_error} ->
{error, receive_error(Raw_error)}
end.
-file("src/aion/signal.gleam", 67).
?DOC(
" Send a typed signal payload to a workflow through the in-engine/Gleam-client\n"
" binding.\n"
"\n"
" This helper encodes the payload with the `SignalRef` codec and calls AT's\n"
" signal-delivery boundary through FFI. It is not an HTTP or network client;\n"
" network-facing clients are provided outside `aion_flow`.\n"
).
-spec send(binary(), signal_ref(EGX), EGX) -> {ok, nil} |
{error, aion@error:engine_error()}.
send(Workflow_id, Reference, Payload) ->
Payload_codec = codec(Reference),
Encoded_payload = (erlang:element(2, Payload_codec))(Payload),
case aion_flow_ffi:send_signal(
Workflow_id,
name(Reference),
Encoded_payload
) of
{ok, _} ->
{ok, nil};
{error, Raw_error} ->
{error, {engine_failure, Raw_error}}
end.