Skip to main content

src/aion@workflow@concurrency.erl

-module(aion@workflow@concurrency).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/aion/workflow/concurrency.gleam").
-export([all/1, race/1, map/2]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(" Typed workflow concurrency combinators over homogeneous activity lists.\n").

-file("src/aion/workflow/concurrency.gleam", 137).
-spec activity_error(binary()) -> aion@error:activity_error().
activity_error(Raw) ->
    case gleam_stdlib:string_starts_with(Raw, <<"retryable:"/utf8>>) of
        true ->
            {retryable, gleam@string:drop_start(Raw, 10), <<""/utf8>>};

        false ->
            case gleam_stdlib:string_starts_with(Raw, <<"terminal:"/utf8>>) of
                true ->
                    {terminal, gleam@string:drop_start(Raw, 9), <<""/utf8>>};

                false ->
                    case gleam_stdlib:string_starts_with(
                        Raw,
                        <<"timeout:"/utf8>>
                    ) of
                        true ->
                            {activity_timed_out,
                                {timed_out, gleam@string:drop_start(Raw, 8)}};

                        false ->
                            case gleam_stdlib:string_starts_with(
                                Raw,
                                <<"cancelled:"/utf8>>
                            ) of
                                true ->
                                    {activity_cancelled,
                                        {cancelled,
                                            gleam@string:drop_start(Raw, 10)}};

                                false ->
                                    case gleam_stdlib:string_starts_with(
                                        Raw,
                                        <<"non_determinism:"/utf8>>
                                    ) of
                                        true ->
                                            {activity_non_deterministic,
                                                {non_determinism_violation,
                                                    gleam@string:drop_start(
                                                        Raw,
                                                        16
                                                    )}};

                                        false ->
                                            {activity_engine_failure, Raw}
                                    end
                            end
                    end
            end
    end.

-file("src/aion/workflow/concurrency.gleam", 123).
-spec decode_one(binary(), aion@codec:codec(EVY)) -> {ok, EVY} |
    {error, aion@error:activity_error()}.
decode_one(Payload, Output_codec) ->
    case (erlang:element(3, Output_codec))(Payload) of
        {ok, Output} ->
            {ok, Output};

        {error, Decode_error} ->
            {error, {activity_decode_failed, Decode_error}}
    end.

-file("src/aion/workflow/concurrency.gleam", 107).
-spec decode_many(binary(), aion@codec:codec(EVT)) -> {ok, list(EVT)} |
    {error, aion@error:activity_error()}.
decode_many(Payloads, Output_codec) ->
    case gleam@json:parse(
        Payloads,
        gleam@dynamic@decode:list(
            {decoder, fun gleam@dynamic@decode:decode_string/1}
        )
    ) of
        {ok, Encoded_payloads} ->
            gleam@list:try_map(
                Encoded_payloads,
                fun(Payload) -> decode_one(Payload, Output_codec) end
            );

        {error, _} ->
            {error,
                {activity_engine_failure,
                    <<"Invalid collect_all result envelope: "/utf8,
                        Payloads/binary>>}}
    end.

-file("src/aion/workflow/concurrency.gleam", 133).
-spec collection_id(binary(), list(binary())) -> binary().
collection_id(Prefix, Specs) ->
    <<<<Prefix/binary, ":"/utf8>>/binary,
        (erlang:integer_to_binary(erlang:length(Specs)))/binary>>.

-file("src/aion/workflow/concurrency.gleam", 224).
-spec duration_json(aion@duration:duration()) -> gleam@json:json().
duration_json(Value) ->
    _pipe = Value,
    _pipe@1 = aion@duration:to_milliseconds(_pipe),
    gleam@json:int(_pipe@1).

-file("src/aion/workflow/concurrency.gleam", 217).
-spec optional_duration(gleam@option:option(aion@duration:duration())) -> gleam@json:json().
optional_duration(Value) ->
    case Value of
        none ->
            gleam@json:null();

        {some, Duration} ->
            duration_json(Duration)
    end.

-file("src/aion/workflow/concurrency.gleam", 193).
-spec backoff_config(aion@activity:backoff()) -> gleam@json:json().
backoff_config(Backoff) ->
    case Backoff of
        {exponential, Initial, Multiplier, Max} ->
            gleam@json:object(
                [{<<"kind"/utf8>>, gleam@json:string(<<"exponential"/utf8>>)},
                    {<<"initial_ms"/utf8>>, duration_json(Initial)},
                    {<<"multiplier"/utf8>>, gleam@json:float(Multiplier)},
                    {<<"max_ms"/utf8>>, duration_json(Max)}]
            );

        {linear, Initial@1, Increment, Max@1} ->
            gleam@json:object(
                [{<<"kind"/utf8>>, gleam@json:string(<<"linear"/utf8>>)},
                    {<<"initial_ms"/utf8>>, duration_json(Initial@1)},
                    {<<"increment_ms"/utf8>>, duration_json(Increment)},
                    {<<"max_ms"/utf8>>, duration_json(Max@1)}]
            );

        {fixed, Delay} ->
            gleam@json:object(
                [{<<"kind"/utf8>>, gleam@json:string(<<"fixed"/utf8>>)},
                    {<<"delay_ms"/utf8>>, duration_json(Delay)}]
            )
    end.

-file("src/aion/workflow/concurrency.gleam", 182).
-spec retry_config(gleam@option:option(aion@activity:retry_policy())) -> gleam@json:json().
retry_config(Policy) ->
    case Policy of
        none ->
            gleam@json:null();

        {some, {retry_policy, Attempts, Backoff}} ->
            gleam@json:object(
                [{<<"max_attempts"/utf8>>, gleam@json:int(Attempts)},
                    {<<"backoff"/utf8>>, backoff_config(Backoff)}]
            )
    end.

-file("src/aion/workflow/concurrency.gleam", 167).
-spec activity_config(aion@activity:activity(any(), any())) -> binary().
activity_config(Activity_value) ->
    _pipe = gleam@json:object(
        [{<<"retry"/utf8>>,
                retry_config(aion@activity:retry_policy(Activity_value))},
            {<<"timeout_ms"/utf8>>,
                optional_duration(
                    aion@activity:timeout_duration(Activity_value)
                )},
            {<<"heartbeat_ms"/utf8>>,
                optional_duration(
                    aion@activity:heartbeat_interval(Activity_value)
                )}]
    ),
    gleam@json:to_string(_pipe).

-file("src/aion/workflow/concurrency.gleam", 94).
-spec activity_spec(aion@activity:activity(any(), any()), integer()) -> binary().
activity_spec(Activity_value, Index) ->
    Input_codec = aion@activity:input_codec(Activity_value),
    Encoded_input = (erlang:element(2, Input_codec))(
        aion@activity:input(Activity_value)
    ),
    _pipe = gleam@json:object(
        [{<<"correlation"/utf8>>,
                gleam@json:string(
                    <<"activity-"/utf8,
                        (erlang:integer_to_binary(Index))/binary>>
                )},
            {<<"name"/utf8>>,
                gleam@json:string(aion@activity:name(Activity_value))},
            {<<"input"/utf8>>, gleam@json:string(Encoded_input)},
            {<<"config"/utf8>>,
                gleam@json:string(activity_config(Activity_value))}]
    ),
    gleam@json:to_string(_pipe).

-file("src/aion/workflow/concurrency.gleam", 87).
-spec activity_specs(list(aion@activity:activity(any(), any()))) -> list(binary()).
activity_specs(Activities) ->
    _pipe = Activities,
    gleam@list:index_map(
        _pipe,
        fun(Activity_value, Index) -> activity_spec(Activity_value, Index) end
    ).

-file("src/aion/workflow/concurrency.gleam", 27).
?DOC(
    " Spawn all activities concurrently and collect their typed outputs in input\n"
    " order.\n"
    "\n"
    " Activity inputs are encoded with each activity's input `Codec`; returned\n"
    " payloads are decoded one-by-one with the homogeneous output `Codec`. AT owns\n"
    " selective receive, fail-fast behaviour, correlation, and cancellation of\n"
    " remaining activities when any activity fails.\n"
    "\n"
    " The collect is a yield point: pending workflow queries are serviced by the\n"
    " query pump before the fan-out settles, exactly as activity awaits, signal\n"
    " receives, timers, and child awaits do.\n"
).
-spec all(list(aion@activity:activity(any(), EUM))) -> {ok, list(EUM)} |
    {error, aion@error:activity_error()}.
all(Activities) ->
    case Activities of
        [] ->
            {ok, []};

        [First | _] ->
            Output_codec = aion@activity:output_codec(First),
            Specs = activity_specs(Activities),
            Id = collection_id(<<"all"/utf8>>, Specs),
            case aion@internal@pump:run(
                fun() ->
                    aion@internal@pump:shield(
                        aion_flow_ffi:collect_all(Id, Specs)
                    )
                end
            ) of
                {ok, Payloads} ->
                    decode_many(Payloads, Output_codec);

                {error, Raw_error} ->
                    {error, activity_error(Raw_error)}
            end
    end.

-file("src/aion/workflow/concurrency.gleam", 53).
?DOC(
    " Race activities and return the first settled typed result.\n"
    "\n"
    " This is FIRST SETTLE semantics, not first-success-wins: the first activity to\n"
    " finish wins whether it completes successfully or returns an `ActivityError`.\n"
    " AT records that winner and cancels the losers.\n"
    "\n"
    " Like `all`, the race is a query-pump yield point: pending workflow queries\n"
    " are serviced while the race is parked.\n"
).
-spec race(list(aion@activity:activity(any(), EUU))) -> {ok, EUU} |
    {error, aion@error:activity_error()}.
race(Activities) ->
    case Activities of
        [] ->
            {error,
                {activity_engine_failure,
                    <<"race requires at least one activity"/utf8>>}};

        [First | _] ->
            Output_codec = aion@activity:output_codec(First),
            Specs = activity_specs(Activities),
            Id = collection_id(<<"race"/utf8>>, Specs),
            case aion@internal@pump:run(
                fun() ->
                    aion@internal@pump:shield(
                        aion_flow_ffi:collect_race(Id, Specs)
                    )
                end
            ) of
                {ok, Payload} ->
                    decode_one(Payload, Output_codec);

                {error, Raw_error} ->
                    {error, activity_error(Raw_error)}
            end
    end.

-file("src/aion/workflow/concurrency.gleam", 78).
?DOC(
    " Dynamically produce one activity per input element, then collect like `all`.\n"
    "\n"
    " The v1 concurrency surface intentionally covers homogeneous-output list\n"
    " fan-out. Typed tuple variants such as `all2`/`all3` are deferred additions.\n"
).
-spec map(list(EVA), fun((EVA) -> aion@activity:activity(any(), EVD))) -> {ok,
        list(EVD)} |
    {error, aion@error:activity_error()}.
map(Items, To_activity) ->
    _pipe = Items,
    _pipe@1 = gleam@list:map(_pipe, To_activity),
    all(_pipe@1).