Skip to main content

src/aion@child.erl

-module(aion@child).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/aion/child.gleam").
-export([spawn/6, error_codec/1, output_codec/1, child_id/1, await/1, spawn_and_wait/6]).
-export_type([child_handle/2]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(" Typed child-workflow handles and await wrappers.\n").

-opaque child_handle(DYH, DYI) :: {child_handle,
        binary(),
        aion@codec:codec(DYH),
        aion@codec:codec(DYI)}.

-file("src/aion/child.gleam", 156).
-spec spawn_config() -> binary().
spawn_config() ->
    _pipe = gleam@json:object([]),
    gleam@json:to_string(_pipe).

-file("src/aion/child.gleam", 30).
?DOC(
    " Start a linked child workflow and return its typed handle.\n"
    "\n"
    " The `workflow_fn` is accepted as a type anchor for the child workflow's\n"
    " `fn(input) -> Result(output, workflow_error)` contract. The SDK does not call\n"
    " it here; lifecycle, linking, recording, and replay/no-respawn behavior are\n"
    " owned by AT/AD behind the FFI boundary.\n"
).
-spec spawn(
    binary(),
    fun((DYJ) -> {ok, DYK} | {error, DYL}),
    DYJ,
    aion@codec:codec(DYJ),
    aion@codec:codec(DYK),
    aion@codec:codec(DYL)
) -> {ok, child_handle(DYK, DYL)} | {error, aion@error:engine_error()}.
spawn(Name, Workflow_fn, Input, Input_codec, Output_codec, Error_codec) ->
    _ = Workflow_fn,
    Encoded_input = (erlang:element(2, Input_codec))(Input),
    case aion_flow_ffi:spawn_child(Name, Encoded_input, spawn_config()) of
        {ok, Raw_child_id} ->
            {ok, {child_handle, Raw_child_id, Output_codec, Error_codec}};

        {error, Raw_error} ->
            {error, {engine_failure, Raw_error}}
    end.

-file("src/aion/child.gleam", 114).
?DOC(" Return the workflow-error codec carried by this child handle.\n").
-spec error_codec(child_handle(any(), DZX)) -> aion@codec:codec(DZX).
error_codec(Handle) ->
    erlang:element(4, Handle).

-file("src/aion/child.gleam", 145).
-spec decode_error_payload(binary(), child_handle(EAP, EAQ)) -> {ok, EAP} |
    {error, aion@error:child_error(EAQ)}.
decode_error_payload(Payload, Handle) ->
    Codec = error_codec(Handle),
    case (erlang:element(3, Codec))(Payload) of
        {ok, Workflow_error} ->
            {error, {child_workflow_failed, Workflow_error}};

        {error, Decode_error} ->
            {error, {child_error_decode_failed, Decode_error}}
    end.

-file("src/aion/child.gleam", 107).
?DOC(" Return the output codec carried by this child handle.\n").
-spec output_codec(child_handle(DZR, any())) -> aion@codec:codec(DZR).
output_codec(Handle) ->
    erlang:element(3, Handle).

-file("src/aion/child.gleam", 134).
-spec decode_output(binary(), child_handle(EAI, EAJ)) -> {ok, EAI} |
    {error, aion@error:child_error(EAJ)}.
decode_output(Payload, Handle) ->
    Codec = output_codec(Handle),
    case (erlang:element(3, Codec))(Payload) of
        {ok, Output} ->
            {ok, Output};

        {error, Decode_error} ->
            {error, {child_output_decode_failed, Decode_error}}
    end.

-file("src/aion/child.gleam", 120).
-spec decode_child_result(binary(), child_handle(EAB, EAC)) -> {ok, EAB} |
    {error, aion@error:child_error(EAC)}.
decode_child_result(Raw_result, Handle) ->
    case gleam_stdlib:string_starts_with(Raw_result, <<"ok:"/utf8>>) of
        true ->
            decode_output(gleam@string:drop_start(Raw_result, 3), Handle);

        false ->
            case gleam_stdlib:string_starts_with(Raw_result, <<"error:"/utf8>>) of
                true ->
                    decode_error_payload(
                        gleam@string:drop_start(Raw_result, 6),
                        Handle
                    );

                false ->
                    {error, {child_engine_failure, Raw_result}}
            end
    end.

-file("src/aion/child.gleam", 102).
?DOC(" Return the engine child/correlation id carried by this handle.\n").
-spec child_id(child_handle(any(), any())) -> binary().
child_id(Handle) ->
    erlang:element(2, Handle).

-file("src/aion/child.gleam", 63).
?DOC(
    " Await a child workflow's recorded completion or failure.\n"
    "\n"
    " AT/AD own blocking, replay resolution, and event recording. This wrapper\n"
    " decodes the raw recorded envelope with the codecs carried on the handle and\n"
    " returns decode/engine failures as typed data.\n"
    "\n"
    " The await is a yield point: pending workflow queries are serviced by the\n"
    " query pump before the child terminal resolves, exactly as activity awaits,\n"
    " signal receives, and timers do. Without the pump, a query arriving while\n"
    " the workflow is parked here would surface its sentinel as a bogus child\n"
    " failure and leave the engine refusing every later await in the run.\n"
).
-spec await(child_handle(DYV, DYW)) -> {ok, DYV} |
    {error, aion@error:child_error(DYW)}.
await(Handle) ->
    Awaited_child_id = child_id(Handle),
    case aion@internal@pump:run(
        fun() ->
            aion@internal@pump:shield(
                aion_flow_ffi:await_child(Awaited_child_id)
            )
        end
    ) of
        {ok, Raw_result} ->
            decode_child_result(Raw_result, Handle);

        {error, Raw_error} ->
            {error, {child_engine_failure, Raw_error}}
    end.

-file("src/aion/child.gleam", 86).
?DOC(
    " Start a linked child workflow and await its recorded result.\n"
    "\n"
    " This is the spawn-then-await convenience kept in the child logic module so\n"
    " `aion/workflow` can remain a forwarding authoring surface.\n"
).
-spec spawn_and_wait(
    binary(),
    fun((DZC) -> {ok, DZD} | {error, DZE}),
    DZC,
    aion@codec:codec(DZC),
    aion@codec:codec(DZD),
    aion@codec:codec(DZE)
) -> {ok, DZD} | {error, aion@error:child_error(DZE)}.
spawn_and_wait(Name, Workflow_fn, Input, Input_codec, Output_codec, Error_codec) ->
    case spawn(Name, Workflow_fn, Input, Input_codec, Output_codec, Error_codec) of
        {ok, Handle} ->
            await(Handle);

        {error, {engine_failure, Message}} ->
            {error, {child_engine_failure, Message}}
    end.