Skip to main content

src/aion@workflow@run.erl

-module(aion@workflow@run).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/aion/workflow/run.gleam").
-export([timestamp_to_milliseconds/1, run/1, now/0, random/0, random_int/2]).
-export_type([timestamp/0]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(" workflow.run (recorded activity dispatch) + now + random (determinism bindings)\n").

-opaque timestamp() :: {timestamp, integer()}.

-file("src/aion/workflow/run.gleam", 24).
?DOC(" Return the canonical millisecond representation of a deterministic timestamp.\n").
-spec timestamp_to_milliseconds(timestamp()) -> integer().
timestamp_to_milliseconds(Timestamp) ->
    erlang:element(2, Timestamp).

-file("src/aion/workflow/run.gleam", 139).
-spec activity_error(binary()) -> aion@error:activity_error().
activity_error(Raw) ->
    case gleam_stdlib:string_starts_with(Raw, <<"retryable:"/utf8>>) of
        true ->
            {retryable, gleam@string:drop_start(Raw, 10), <<""/utf8>>};

        false ->
            case gleam_stdlib:string_starts_with(Raw, <<"terminal:"/utf8>>) of
                true ->
                    {terminal, gleam@string:drop_start(Raw, 9), <<""/utf8>>};

                false ->
                    case gleam_stdlib:string_starts_with(
                        Raw,
                        <<"timeout:"/utf8>>
                    ) of
                        true ->
                            {activity_timed_out,
                                {timed_out, gleam@string:drop_start(Raw, 8)}};

                        false ->
                            case gleam_stdlib:string_starts_with(
                                Raw,
                                <<"cancelled:"/utf8>>
                            ) of
                                true ->
                                    {activity_cancelled,
                                        {cancelled,
                                            gleam@string:drop_start(Raw, 10)}};

                                false ->
                                    case gleam_stdlib:string_starts_with(
                                        Raw,
                                        <<"non_determinism:"/utf8>>
                                    ) of
                                        true ->
                                            {activity_non_deterministic,
                                                {non_determinism_violation,
                                                    gleam@string:drop_start(
                                                        Raw,
                                                        16
                                                    )}};

                                        false ->
                                            {activity_engine_failure, Raw}
                                    end
                            end
                    end
            end
    end.

-file("src/aion/workflow/run.gleam", 226).
-spec duration_json(aion@duration:duration()) -> gleam@json:json().
duration_json(Value) ->
    _pipe = Value,
    _pipe@1 = aion@duration:to_milliseconds(_pipe),
    gleam@json:int(_pipe@1).

-file("src/aion/workflow/run.gleam", 219).
-spec optional_duration(gleam@option:option(aion@duration:duration())) -> gleam@json:json().
optional_duration(Value) ->
    case Value of
        none ->
            gleam@json:null();

        {some, Duration} ->
            duration_json(Duration)
    end.

-file("src/aion/workflow/run.gleam", 195).
-spec backoff_config(aion@activity:backoff()) -> gleam@json:json().
backoff_config(Backoff) ->
    case Backoff of
        {exponential, Initial, Multiplier, Max} ->
            gleam@json:object(
                [{<<"kind"/utf8>>, gleam@json:string(<<"exponential"/utf8>>)},
                    {<<"initial_ms"/utf8>>, duration_json(Initial)},
                    {<<"multiplier"/utf8>>, gleam@json:float(Multiplier)},
                    {<<"max_ms"/utf8>>, duration_json(Max)}]
            );

        {linear, Initial@1, Increment, Max@1} ->
            gleam@json:object(
                [{<<"kind"/utf8>>, gleam@json:string(<<"linear"/utf8>>)},
                    {<<"initial_ms"/utf8>>, duration_json(Initial@1)},
                    {<<"increment_ms"/utf8>>, duration_json(Increment)},
                    {<<"max_ms"/utf8>>, duration_json(Max@1)}]
            );

        {fixed, Delay} ->
            gleam@json:object(
                [{<<"kind"/utf8>>, gleam@json:string(<<"fixed"/utf8>>)},
                    {<<"delay_ms"/utf8>>, duration_json(Delay)}]
            )
    end.

-file("src/aion/workflow/run.gleam", 184).
-spec retry_config(gleam@option:option(aion@activity:retry_policy())) -> gleam@json:json().
retry_config(Policy) ->
    case Policy of
        none ->
            gleam@json:null();

        {some, {retry_policy, Attempts, Backoff}} ->
            gleam@json:object(
                [{<<"max_attempts"/utf8>>, gleam@json:int(Attempts)},
                    {<<"backoff"/utf8>>, backoff_config(Backoff)}]
            )
    end.

-file("src/aion/workflow/run.gleam", 169).
-spec activity_config(aion@activity:activity(any(), any())) -> binary().
activity_config(Activity_value) ->
    _pipe = gleam@json:object(
        [{<<"retry"/utf8>>,
                retry_config(aion@activity:retry_policy(Activity_value))},
            {<<"timeout_ms"/utf8>>,
                optional_duration(
                    aion@activity:timeout_duration(Activity_value)
                )},
            {<<"heartbeat_ms"/utf8>>,
                optional_duration(
                    aion@activity:heartbeat_interval(Activity_value)
                )}]
    ),
    gleam@json:to_string(_pipe).

-file("src/aion/workflow/run.gleam", 36).
?DOC(
    " Dispatch an activity through the single recorded side-effect boundary.\n"
    "\n"
    " Plain Gleam workflow code is re-run on replay. The only recorded\n"
    " side-effectful path exposed by this SDK is `run` (and later concurrency\n"
    " combinators over activities); there is intentionally no generic\n"
    " `side_effect(fn)` escape hatch. The activity input is encoded with the\n"
    " activity's input `Codec`, the engine dispatches and records via AD, and the\n"
    " returned payload is decoded with the output `Codec`.\n"
).
-spec run(aion@activity:activity(any(), FDV)) -> {ok, FDV} |
    {error, aion@error:activity_error()}.
run(Activity_value) ->
    Input_codec = aion@activity:input_codec(Activity_value),
    Output_codec = aion@activity:output_codec(Activity_value),
    Encoded_input = (erlang:element(2, Input_codec))(
        aion@activity:input(Activity_value)
    ),
    case aion_flow_ffi:dispatch_activity(
        aion@activity:name(Activity_value),
        Encoded_input,
        activity_config(Activity_value)
    ) of
        {ok, Correlation_id} ->
            case aion@internal@pump:run(
                fun() ->
                    aion@internal@pump:shield(
                        aion_flow_ffi:await_activity_result(Correlation_id)
                    )
                end
            ) of
                {ok, Payload} ->
                    case (erlang:element(3, Output_codec))(Payload) of
                        {ok, Output} ->
                            {ok, Output};

                        {error, Decode_error} ->
                            {error, {activity_decode_failed, Decode_error}}
                    end;

                {error, Raw_error} ->
                    {error, activity_error(Raw_error)}
            end;

        {error, Raw_error@1} ->
            {error, activity_error(Raw_error@1)}
    end.

-file("src/aion/workflow/run.gleam", 109).
-spec parse_timestamp(binary()) -> {ok, timestamp()} |
    {error, aion@error:engine_error()}.
parse_timestamp(Raw) ->
    case gleam_stdlib:parse_int(Raw) of
        {ok, Milliseconds} ->
            {ok, {timestamp, Milliseconds}};

        {error, _} ->
            {error,
                {engine_failure,
                    <<"Invalid deterministic timestamp: "/utf8, Raw/binary>>}}
    end.

-file("src/aion/workflow/run.gleam", 73).
?DOC(
    " Return AD's recorded deterministic timestamp.\n"
    "\n"
    " This is the only time source exposed to workflow code. Workflow authors must\n"
    " not call wall-clock APIs such as Gleam/Erlang clocks from workflow logic, as\n"
    " ambient time would desynchronise replay.\n"
).
-spec now() -> {ok, timestamp()} | {error, aion@error:engine_error()}.
now() ->
    case aion_flow_ffi:now() of
        {ok, Raw_timestamp} ->
            parse_timestamp(Raw_timestamp);

        {error, Raw_error} ->
            {error, {engine_failure, Raw_error}}
    end.

-file("src/aion/workflow/run.gleam", 119).
-spec parse_float(binary(), binary()) -> {ok, float()} |
    {error, aion@error:engine_error()}.
parse_float(Raw, Label) ->
    case gleam_stdlib:parse_float(Raw) of
        {ok, Value} ->
            {ok, Value};

        {error, _} ->
            {error,
                {engine_failure,
                    <<<<<<"Invalid deterministic "/utf8, Label/binary>>/binary,
                            ": "/utf8>>/binary,
                        Raw/binary>>}}
    end.

-file("src/aion/workflow/run.gleam", 84).
?DOC(
    " Draw a deterministic floating-point value from AD's seeded RNG.\n"
    "\n"
    " The engine keys the RNG seed on WorkflowId + RunId so replay observes the\n"
    " same sequence. Workflow authors must not call ambient entropy sources.\n"
).
-spec random() -> {ok, float()} | {error, aion@error:engine_error()}.
random() ->
    case aion_flow_ffi:random() of
        {ok, Raw_random} ->
            parse_float(Raw_random, <<"random"/utf8>>);

        {error, Raw_error} ->
            {error, {engine_failure, Raw_error}}
    end.

-file("src/aion/workflow/run.gleam", 129).
-spec parse_int(binary(), binary()) -> {ok, integer()} |
    {error, aion@error:engine_error()}.
parse_int(Raw, Label) ->
    case gleam_stdlib:parse_int(Raw) of
        {ok, Value} ->
            {ok, Value};

        {error, _} ->
            {error,
                {engine_failure,
                    <<<<<<"Invalid deterministic "/utf8, Label/binary>>/binary,
                            ": "/utf8>>/binary,
                        Raw/binary>>}}
    end.

-file("src/aion/workflow/run.gleam", 95).
?DOC(
    " Draw a deterministic integer in the engine-defined inclusive range.\n"
    "\n"
    " Values come from AD's seeded RNG through the FFI boundary; no wall-clock or\n"
    " ambient entropy binding is exposed by the SDK.\n"
).
-spec random_int(integer(), integer()) -> {ok, integer()} |
    {error, aion@error:engine_error()}.
random_int(Min, Max) ->
    case Min > Max of
        true ->
            {error,
                {engine_failure,
                    <<"Invalid deterministic random_int range: min is greater than max"/utf8>>}};

        false ->
            case aion_flow_ffi:random_int(
                erlang:integer_to_binary(Min),
                erlang:integer_to_binary(Max)
            ) of
                {ok, Raw_random} ->
                    parse_int(Raw_random, <<"random_int"/utf8>>);

                {error, Raw_error} ->
                    {error, {engine_failure, Raw_error}}
            end
    end.