-module(aion@workflow@concurrency).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/aion/workflow/concurrency.gleam").
-export([all/1, race/1, map/2]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(" Typed workflow concurrency combinators over homogeneous activity lists.\n").
-file("src/aion/workflow/concurrency.gleam", 137).
-spec activity_error(binary()) -> aion@error:activity_error().
activity_error(Raw) ->
case gleam_stdlib:string_starts_with(Raw, <<"retryable:"/utf8>>) of
true ->
{retryable, gleam@string:drop_start(Raw, 10), <<""/utf8>>};
false ->
case gleam_stdlib:string_starts_with(Raw, <<"terminal:"/utf8>>) of
true ->
{terminal, gleam@string:drop_start(Raw, 9), <<""/utf8>>};
false ->
case gleam_stdlib:string_starts_with(
Raw,
<<"timeout:"/utf8>>
) of
true ->
{activity_timed_out,
{timed_out, gleam@string:drop_start(Raw, 8)}};
false ->
case gleam_stdlib:string_starts_with(
Raw,
<<"cancelled:"/utf8>>
) of
true ->
{activity_cancelled,
{cancelled,
gleam@string:drop_start(Raw, 10)}};
false ->
case gleam_stdlib:string_starts_with(
Raw,
<<"non_determinism:"/utf8>>
) of
true ->
{activity_non_deterministic,
{non_determinism_violation,
gleam@string:drop_start(
Raw,
16
)}};
false ->
{activity_engine_failure, Raw}
end
end
end
end
end.
-file("src/aion/workflow/concurrency.gleam", 123).
-spec decode_one(binary(), aion@codec:codec(EVY)) -> {ok, EVY} |
{error, aion@error:activity_error()}.
decode_one(Payload, Output_codec) ->
case (erlang:element(3, Output_codec))(Payload) of
{ok, Output} ->
{ok, Output};
{error, Decode_error} ->
{error, {activity_decode_failed, Decode_error}}
end.
-file("src/aion/workflow/concurrency.gleam", 107).
-spec decode_many(binary(), aion@codec:codec(EVT)) -> {ok, list(EVT)} |
{error, aion@error:activity_error()}.
decode_many(Payloads, Output_codec) ->
case gleam@json:parse(
Payloads,
gleam@dynamic@decode:list(
{decoder, fun gleam@dynamic@decode:decode_string/1}
)
) of
{ok, Encoded_payloads} ->
gleam@list:try_map(
Encoded_payloads,
fun(Payload) -> decode_one(Payload, Output_codec) end
);
{error, _} ->
{error,
{activity_engine_failure,
<<"Invalid collect_all result envelope: "/utf8,
Payloads/binary>>}}
end.
-file("src/aion/workflow/concurrency.gleam", 133).
-spec collection_id(binary(), list(binary())) -> binary().
collection_id(Prefix, Specs) ->
<<<<Prefix/binary, ":"/utf8>>/binary,
(erlang:integer_to_binary(erlang:length(Specs)))/binary>>.
-file("src/aion/workflow/concurrency.gleam", 224).
-spec duration_json(aion@duration:duration()) -> gleam@json:json().
duration_json(Value) ->
_pipe = Value,
_pipe@1 = aion@duration:to_milliseconds(_pipe),
gleam@json:int(_pipe@1).
-file("src/aion/workflow/concurrency.gleam", 217).
-spec optional_duration(gleam@option:option(aion@duration:duration())) -> gleam@json:json().
optional_duration(Value) ->
case Value of
none ->
gleam@json:null();
{some, Duration} ->
duration_json(Duration)
end.
-file("src/aion/workflow/concurrency.gleam", 193).
-spec backoff_config(aion@activity:backoff()) -> gleam@json:json().
backoff_config(Backoff) ->
case Backoff of
{exponential, Initial, Multiplier, Max} ->
gleam@json:object(
[{<<"kind"/utf8>>, gleam@json:string(<<"exponential"/utf8>>)},
{<<"initial_ms"/utf8>>, duration_json(Initial)},
{<<"multiplier"/utf8>>, gleam@json:float(Multiplier)},
{<<"max_ms"/utf8>>, duration_json(Max)}]
);
{linear, Initial@1, Increment, Max@1} ->
gleam@json:object(
[{<<"kind"/utf8>>, gleam@json:string(<<"linear"/utf8>>)},
{<<"initial_ms"/utf8>>, duration_json(Initial@1)},
{<<"increment_ms"/utf8>>, duration_json(Increment)},
{<<"max_ms"/utf8>>, duration_json(Max@1)}]
);
{fixed, Delay} ->
gleam@json:object(
[{<<"kind"/utf8>>, gleam@json:string(<<"fixed"/utf8>>)},
{<<"delay_ms"/utf8>>, duration_json(Delay)}]
)
end.
-file("src/aion/workflow/concurrency.gleam", 182).
-spec retry_config(gleam@option:option(aion@activity:retry_policy())) -> gleam@json:json().
retry_config(Policy) ->
case Policy of
none ->
gleam@json:null();
{some, {retry_policy, Attempts, Backoff}} ->
gleam@json:object(
[{<<"max_attempts"/utf8>>, gleam@json:int(Attempts)},
{<<"backoff"/utf8>>, backoff_config(Backoff)}]
)
end.
-file("src/aion/workflow/concurrency.gleam", 167).
-spec activity_config(aion@activity:activity(any(), any())) -> binary().
activity_config(Activity_value) ->
_pipe = gleam@json:object(
[{<<"retry"/utf8>>,
retry_config(aion@activity:retry_policy(Activity_value))},
{<<"timeout_ms"/utf8>>,
optional_duration(
aion@activity:timeout_duration(Activity_value)
)},
{<<"heartbeat_ms"/utf8>>,
optional_duration(
aion@activity:heartbeat_interval(Activity_value)
)}]
),
gleam@json:to_string(_pipe).
-file("src/aion/workflow/concurrency.gleam", 94).
-spec activity_spec(aion@activity:activity(any(), any()), integer()) -> binary().
activity_spec(Activity_value, Index) ->
Input_codec = aion@activity:input_codec(Activity_value),
Encoded_input = (erlang:element(2, Input_codec))(
aion@activity:input(Activity_value)
),
_pipe = gleam@json:object(
[{<<"correlation"/utf8>>,
gleam@json:string(
<<"activity-"/utf8,
(erlang:integer_to_binary(Index))/binary>>
)},
{<<"name"/utf8>>,
gleam@json:string(aion@activity:name(Activity_value))},
{<<"input"/utf8>>, gleam@json:string(Encoded_input)},
{<<"config"/utf8>>,
gleam@json:string(activity_config(Activity_value))}]
),
gleam@json:to_string(_pipe).
-file("src/aion/workflow/concurrency.gleam", 87).
-spec activity_specs(list(aion@activity:activity(any(), any()))) -> list(binary()).
activity_specs(Activities) ->
_pipe = Activities,
gleam@list:index_map(
_pipe,
fun(Activity_value, Index) -> activity_spec(Activity_value, Index) end
).
-file("src/aion/workflow/concurrency.gleam", 27).
?DOC(
" Spawn all activities concurrently and collect their typed outputs in input\n"
" order.\n"
"\n"
" Activity inputs are encoded with each activity's input `Codec`; returned\n"
" payloads are decoded one-by-one with the homogeneous output `Codec`. AT owns\n"
" selective receive, fail-fast behaviour, correlation, and cancellation of\n"
" remaining activities when any activity fails.\n"
"\n"
" The collect is a yield point: pending workflow queries are serviced by the\n"
" query pump before the fan-out settles, exactly as activity awaits, signal\n"
" receives, timers, and child awaits do.\n"
).
-spec all(list(aion@activity:activity(any(), EUM))) -> {ok, list(EUM)} |
{error, aion@error:activity_error()}.
all(Activities) ->
case Activities of
[] ->
{ok, []};
[First | _] ->
Output_codec = aion@activity:output_codec(First),
Specs = activity_specs(Activities),
Id = collection_id(<<"all"/utf8>>, Specs),
case aion@internal@pump:run(
fun() ->
aion@internal@pump:shield(
aion_flow_ffi:collect_all(Id, Specs)
)
end
) of
{ok, Payloads} ->
decode_many(Payloads, Output_codec);
{error, Raw_error} ->
{error, activity_error(Raw_error)}
end
end.
-file("src/aion/workflow/concurrency.gleam", 53).
?DOC(
" Race activities and return the first settled typed result.\n"
"\n"
" This is FIRST SETTLE semantics, not first-success-wins: the first activity to\n"
" finish wins whether it completes successfully or returns an `ActivityError`.\n"
" AT records that winner and cancels the losers.\n"
"\n"
" Like `all`, the race is a query-pump yield point: pending workflow queries\n"
" are serviced while the race is parked.\n"
).
-spec race(list(aion@activity:activity(any(), EUU))) -> {ok, EUU} |
{error, aion@error:activity_error()}.
race(Activities) ->
case Activities of
[] ->
{error,
{activity_engine_failure,
<<"race requires at least one activity"/utf8>>}};
[First | _] ->
Output_codec = aion@activity:output_codec(First),
Specs = activity_specs(Activities),
Id = collection_id(<<"race"/utf8>>, Specs),
case aion@internal@pump:run(
fun() ->
aion@internal@pump:shield(
aion_flow_ffi:collect_race(Id, Specs)
)
end
) of
{ok, Payload} ->
decode_one(Payload, Output_codec);
{error, Raw_error} ->
{error, activity_error(Raw_error)}
end
end.
-file("src/aion/workflow/concurrency.gleam", 78).
?DOC(
" Dynamically produce one activity per input element, then collect like `all`.\n"
"\n"
" The v1 concurrency surface intentionally covers homogeneous-output list\n"
" fan-out. Typed tuple variants such as `all2`/`all3` are deferred additions.\n"
).
-spec map(list(EVA), fun((EVA) -> aion@activity:activity(any(), EVD))) -> {ok,
list(EVD)} |
{error, aion@error:activity_error()}.
map(Items, To_activity) ->
_pipe = Items,
_pipe@1 = gleam@list:map(_pipe, To_activity),
all(_pipe@1).