-module(aion@workflow@run).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/aion/workflow/run.gleam").
-export([timestamp_to_milliseconds/1, run/1, now/0, random/0, random_int/2]).
-export_type([timestamp/0]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(" workflow.run (recorded activity dispatch) + now + random (determinism bindings)\n").
-opaque timestamp() :: {timestamp, integer()}.
-file("src/aion/workflow/run.gleam", 24).
?DOC(" Return the canonical millisecond representation of a deterministic timestamp.\n").
-spec timestamp_to_milliseconds(timestamp()) -> integer().
timestamp_to_milliseconds(Timestamp) ->
erlang:element(2, Timestamp).
-file("src/aion/workflow/run.gleam", 139).
-spec activity_error(binary()) -> aion@error:activity_error().
activity_error(Raw) ->
case gleam_stdlib:string_starts_with(Raw, <<"retryable:"/utf8>>) of
true ->
{retryable, gleam@string:drop_start(Raw, 10), <<""/utf8>>};
false ->
case gleam_stdlib:string_starts_with(Raw, <<"terminal:"/utf8>>) of
true ->
{terminal, gleam@string:drop_start(Raw, 9), <<""/utf8>>};
false ->
case gleam_stdlib:string_starts_with(
Raw,
<<"timeout:"/utf8>>
) of
true ->
{activity_timed_out,
{timed_out, gleam@string:drop_start(Raw, 8)}};
false ->
case gleam_stdlib:string_starts_with(
Raw,
<<"cancelled:"/utf8>>
) of
true ->
{activity_cancelled,
{cancelled,
gleam@string:drop_start(Raw, 10)}};
false ->
case gleam_stdlib:string_starts_with(
Raw,
<<"non_determinism:"/utf8>>
) of
true ->
{activity_non_deterministic,
{non_determinism_violation,
gleam@string:drop_start(
Raw,
16
)}};
false ->
{activity_engine_failure, Raw}
end
end
end
end
end.
-file("src/aion/workflow/run.gleam", 226).
-spec duration_json(aion@duration:duration()) -> gleam@json:json().
duration_json(Value) ->
_pipe = Value,
_pipe@1 = aion@duration:to_milliseconds(_pipe),
gleam@json:int(_pipe@1).
-file("src/aion/workflow/run.gleam", 219).
-spec optional_duration(gleam@option:option(aion@duration:duration())) -> gleam@json:json().
optional_duration(Value) ->
case Value of
none ->
gleam@json:null();
{some, Duration} ->
duration_json(Duration)
end.
-file("src/aion/workflow/run.gleam", 195).
-spec backoff_config(aion@activity:backoff()) -> gleam@json:json().
backoff_config(Backoff) ->
case Backoff of
{exponential, Initial, Multiplier, Max} ->
gleam@json:object(
[{<<"kind"/utf8>>, gleam@json:string(<<"exponential"/utf8>>)},
{<<"initial_ms"/utf8>>, duration_json(Initial)},
{<<"multiplier"/utf8>>, gleam@json:float(Multiplier)},
{<<"max_ms"/utf8>>, duration_json(Max)}]
);
{linear, Initial@1, Increment, Max@1} ->
gleam@json:object(
[{<<"kind"/utf8>>, gleam@json:string(<<"linear"/utf8>>)},
{<<"initial_ms"/utf8>>, duration_json(Initial@1)},
{<<"increment_ms"/utf8>>, duration_json(Increment)},
{<<"max_ms"/utf8>>, duration_json(Max@1)}]
);
{fixed, Delay} ->
gleam@json:object(
[{<<"kind"/utf8>>, gleam@json:string(<<"fixed"/utf8>>)},
{<<"delay_ms"/utf8>>, duration_json(Delay)}]
)
end.
-file("src/aion/workflow/run.gleam", 184).
-spec retry_config(gleam@option:option(aion@activity:retry_policy())) -> gleam@json:json().
retry_config(Policy) ->
case Policy of
none ->
gleam@json:null();
{some, {retry_policy, Attempts, Backoff}} ->
gleam@json:object(
[{<<"max_attempts"/utf8>>, gleam@json:int(Attempts)},
{<<"backoff"/utf8>>, backoff_config(Backoff)}]
)
end.
-file("src/aion/workflow/run.gleam", 169).
-spec activity_config(aion@activity:activity(any(), any())) -> binary().
activity_config(Activity_value) ->
_pipe = gleam@json:object(
[{<<"retry"/utf8>>,
retry_config(aion@activity:retry_policy(Activity_value))},
{<<"timeout_ms"/utf8>>,
optional_duration(
aion@activity:timeout_duration(Activity_value)
)},
{<<"heartbeat_ms"/utf8>>,
optional_duration(
aion@activity:heartbeat_interval(Activity_value)
)}]
),
gleam@json:to_string(_pipe).
-file("src/aion/workflow/run.gleam", 36).
?DOC(
" Dispatch an activity through the single recorded side-effect boundary.\n"
"\n"
" Plain Gleam workflow code is re-run on replay. The only recorded\n"
" side-effectful path exposed by this SDK is `run` (and later concurrency\n"
" combinators over activities); there is intentionally no generic\n"
" `side_effect(fn)` escape hatch. The activity input is encoded with the\n"
" activity's input `Codec`, the engine dispatches and records via AD, and the\n"
" returned payload is decoded with the output `Codec`.\n"
).
-spec run(aion@activity:activity(any(), FDV)) -> {ok, FDV} |
{error, aion@error:activity_error()}.
run(Activity_value) ->
Input_codec = aion@activity:input_codec(Activity_value),
Output_codec = aion@activity:output_codec(Activity_value),
Encoded_input = (erlang:element(2, Input_codec))(
aion@activity:input(Activity_value)
),
case aion_flow_ffi:dispatch_activity(
aion@activity:name(Activity_value),
Encoded_input,
activity_config(Activity_value)
) of
{ok, Correlation_id} ->
case aion@internal@pump:run(
fun() ->
aion@internal@pump:shield(
aion_flow_ffi:await_activity_result(Correlation_id)
)
end
) of
{ok, Payload} ->
case (erlang:element(3, Output_codec))(Payload) of
{ok, Output} ->
{ok, Output};
{error, Decode_error} ->
{error, {activity_decode_failed, Decode_error}}
end;
{error, Raw_error} ->
{error, activity_error(Raw_error)}
end;
{error, Raw_error@1} ->
{error, activity_error(Raw_error@1)}
end.
-file("src/aion/workflow/run.gleam", 109).
-spec parse_timestamp(binary()) -> {ok, timestamp()} |
{error, aion@error:engine_error()}.
parse_timestamp(Raw) ->
case gleam_stdlib:parse_int(Raw) of
{ok, Milliseconds} ->
{ok, {timestamp, Milliseconds}};
{error, _} ->
{error,
{engine_failure,
<<"Invalid deterministic timestamp: "/utf8, Raw/binary>>}}
end.
-file("src/aion/workflow/run.gleam", 73).
?DOC(
" Return AD's recorded deterministic timestamp.\n"
"\n"
" This is the only time source exposed to workflow code. Workflow authors must\n"
" not call wall-clock APIs such as Gleam/Erlang clocks from workflow logic, as\n"
" ambient time would desynchronise replay.\n"
).
-spec now() -> {ok, timestamp()} | {error, aion@error:engine_error()}.
now() ->
case aion_flow_ffi:now() of
{ok, Raw_timestamp} ->
parse_timestamp(Raw_timestamp);
{error, Raw_error} ->
{error, {engine_failure, Raw_error}}
end.
-file("src/aion/workflow/run.gleam", 119).
-spec parse_float(binary(), binary()) -> {ok, float()} |
{error, aion@error:engine_error()}.
parse_float(Raw, Label) ->
case gleam_stdlib:parse_float(Raw) of
{ok, Value} ->
{ok, Value};
{error, _} ->
{error,
{engine_failure,
<<<<<<"Invalid deterministic "/utf8, Label/binary>>/binary,
": "/utf8>>/binary,
Raw/binary>>}}
end.
-file("src/aion/workflow/run.gleam", 84).
?DOC(
" Draw a deterministic floating-point value from AD's seeded RNG.\n"
"\n"
" The engine keys the RNG seed on WorkflowId + RunId so replay observes the\n"
" same sequence. Workflow authors must not call ambient entropy sources.\n"
).
-spec random() -> {ok, float()} | {error, aion@error:engine_error()}.
random() ->
case aion_flow_ffi:random() of
{ok, Raw_random} ->
parse_float(Raw_random, <<"random"/utf8>>);
{error, Raw_error} ->
{error, {engine_failure, Raw_error}}
end.
-file("src/aion/workflow/run.gleam", 129).
-spec parse_int(binary(), binary()) -> {ok, integer()} |
{error, aion@error:engine_error()}.
parse_int(Raw, Label) ->
case gleam_stdlib:parse_int(Raw) of
{ok, Value} ->
{ok, Value};
{error, _} ->
{error,
{engine_failure,
<<<<<<"Invalid deterministic "/utf8, Label/binary>>/binary,
": "/utf8>>/binary,
Raw/binary>>}}
end.
-file("src/aion/workflow/run.gleam", 95).
?DOC(
" Draw a deterministic integer in the engine-defined inclusive range.\n"
"\n"
" Values come from AD's seeded RNG through the FFI boundary; no wall-clock or\n"
" ambient entropy binding is exposed by the SDK.\n"
).
-spec random_int(integer(), integer()) -> {ok, integer()} |
{error, aion@error:engine_error()}.
random_int(Min, Max) ->
case Min > Max of
true ->
{error,
{engine_failure,
<<"Invalid deterministic random_int range: min is greater than max"/utf8>>}};
false ->
case aion_flow_ffi:random_int(
erlang:integer_to_binary(Min),
erlang:integer_to_binary(Max)
) of
{ok, Raw_random} ->
parse_int(Raw_random, <<"random_int"/utf8>>);
{error, Raw_error} ->
{error, {engine_failure, Raw_error}}
end
end.