Skip to main content

src/aion@signal.erl

-module(aion@signal).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/aion/signal.gleam").
-export([new/2, name/1, codec/1, 'receive'/1, send/3]).
-export_type([signal_ref/1]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(" Typed signal references plus receive/send wrappers.\n").

-opaque signal_ref(EGK) :: {signal_ref, binary(), aion@codec:codec(EGK)}.

-file("src/aion/signal.gleam", 20).
?DOC(" Construct a typed signal reference.\n").
-spec new(binary(), aion@codec:codec(EGL)) -> signal_ref(EGL).
new(Name, Payload_codec) ->
    {signal_ref, Name, Payload_codec}.

-file("src/aion/signal.gleam", 25).
?DOC(" Return the signal name used by the engine signal router boundary.\n").
-spec name(signal_ref(any())) -> binary().
name(Reference) ->
    erlang:element(2, Reference).

-file("src/aion/signal.gleam", 30).
?DOC(" Return the payload codec carried by a signal reference.\n").
-spec codec(signal_ref(EGQ)) -> aion@codec:codec(EGQ).
codec(Reference) ->
    erlang:element(3, Reference).

-file("src/aion/signal.gleam", 85).
-spec receive_error(binary()) -> aion@error:receive_error().
receive_error(Raw) ->
    case gleam_stdlib:string_starts_with(Raw, <<"unknown:"/utf8>>) of
        true ->
            {unknown_signal, gleam@string:drop_start(Raw, 8)};

        false ->
            case gleam_stdlib:string_starts_with(Raw, <<"cancelled:"/utf8>>) of
                true ->
                    {receive_cancelled,
                        {cancelled, gleam@string:drop_start(Raw, 10)}};

                false ->
                    case gleam_stdlib:string_starts_with(
                        Raw,
                        <<"non_determinism:"/utf8>>
                    ) of
                        true ->
                            {receive_non_deterministic,
                                {non_determinism_violation,
                                    gleam@string:drop_start(Raw, 16)}};

                        false ->
                            {receive_engine_failure, Raw}
                    end
            end
    end.

-file("src/aion/signal.gleam", 81).
-spec receive_config() -> binary().
receive_config() ->
    _pipe = gleam@json:object([]),
    gleam@json:to_string(_pipe).

-file("src/aion/signal.gleam", 41).
?DOC(
    " Receive the next payload for a typed signal reference.\n"
    "\n"
    " The actual selective-receive and replay behavior is owned by AT/AD. This SDK\n"
    " wrapper binds to that router through `aion/internal/ffi`, then decodes the\n"
    " recorded payload with the reference codec and returns decode failures as\n"
    " typed data. The await is a yield point: pending workflow queries are\n"
    " serviced by the query pump before the signal resolves.\n"
).
-spec 'receive'(signal_ref(EGT)) -> {ok, EGT} |
    {error, aion@error:receive_error()}.
'receive'(Reference) ->
    Signal_name = name(Reference),
    Config = receive_config(),
    case aion@internal@pump:run(
        fun() ->
            aion@internal@pump:shield(
                aion_flow_ffi:receive_signal(Signal_name, Config)
            )
        end
    ) of
        {ok, Raw_payload} ->
            Payload_codec = codec(Reference),
            case (erlang:element(3, Payload_codec))(Raw_payload) of
                {ok, Payload} ->
                    {ok, Payload};

                {error, Decode_error} ->
                    {error, {receive_decode_failed, Decode_error}}
            end;

        {error, Raw_error} ->
            {error, receive_error(Raw_error)}
    end.

-file("src/aion/signal.gleam", 67).
?DOC(
    " Send a typed signal payload to a workflow through the in-engine/Gleam-client\n"
    " binding.\n"
    "\n"
    " This helper encodes the payload with the `SignalRef` codec and calls AT's\n"
    " signal-delivery boundary through FFI. It is not an HTTP or network client;\n"
    " network-facing clients are provided outside `aion_flow`.\n"
).
-spec send(binary(), signal_ref(EGX), EGX) -> {ok, nil} |
    {error, aion@error:engine_error()}.
send(Workflow_id, Reference, Payload) ->
    Payload_codec = codec(Reference),
    Encoded_payload = (erlang:element(2, Payload_codec))(Payload),
    case aion_flow_ffi:send_signal(
        Workflow_id,
        name(Reference),
        Encoded_payload
    ) of
        {ok, _} ->
            {ok, nil};

        {error, Raw_error} ->
            {error, {engine_failure, Raw_error}}
    end.