src/lightspeed@pipeline@operator.erl

-module(lightspeed@pipeline@operator).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lightspeed/pipeline/operator.gleam").
-export([new/0, audits/1, valid/1, action_label/1, apply/3, can_enqueue/1, can_start_work/1, state/1, state_label/1, audit_event_label/1, signature/1]).
-export_type([control_state/0, action/0, audit_event/0, runtime/0]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(" Operator control workflows for pause/resume/drain/replay orchestration.\n").

-type control_state() :: running |
    {paused, binary(), integer()} |
    {draining, binary(), integer()} |
    {replaying, binary(), binary(), integer()}.

-type action() :: {pause, binary()} |
    {resume, binary()} |
    {drain, binary()} |
    {replay, binary(), binary()}.

-type audit_event() :: {audit_event,
        action(),
        control_state(),
        control_state(),
        integer()}.

-opaque runtime() :: {runtime, control_state(), list(audit_event())}.

-file("src/lightspeed/pipeline/operator.gleam", 38).
?DOC(" Build one operator runtime.\n").
-spec new() -> runtime().
new() ->
    {runtime, running, []}.

-file("src/lightspeed/pipeline/operator.gleam", 93).
?DOC(" Audit events in application order.\n").
-spec audits(runtime()) -> list(audit_event()).
audits(Runtime) ->
    lists:reverse(erlang:element(3, Runtime)).

-file("src/lightspeed/pipeline/operator.gleam", 196).
-spec state_valid(control_state()) -> boolean().
state_valid(State) ->
    case State of
        running ->
            true;

        {paused, Reason, At_ms} ->
            (Reason /= <<""/utf8>>) andalso (At_ms >= 0);

        {draining, Reason@1, At_ms@1} ->
            (Reason@1 /= <<""/utf8>>) andalso (At_ms@1 >= 0);

        {replaying, From_stage, Reason@2, At_ms@2} ->
            ((From_stage /= <<""/utf8>>) andalso (Reason@2 /= <<""/utf8>>))
            andalso (At_ms@2 >= 0)
    end.

-file("src/lightspeed/pipeline/operator.gleam", 206).
-spec audit_events_valid(list(audit_event())) -> boolean().
audit_events_valid(Events) ->
    case Events of
        [] ->
            true;

        [Event | Rest] ->
            (((erlang:element(5, Event) >= 0) andalso state_valid(
                erlang:element(3, Event)
            ))
            andalso state_valid(erlang:element(4, Event)))
            andalso audit_events_valid(Rest)
    end.

-file("src/lightspeed/pipeline/operator.gleam", 43).
?DOC(" Validate operator runtime invariants.\n").
-spec valid(runtime()) -> boolean().
valid(Runtime) ->
    state_valid(erlang:element(2, Runtime)) andalso audit_events_valid(
        audits(Runtime)
    ).

-file("src/lightspeed/pipeline/operator.gleam", 110).
?DOC(" Action label.\n").
-spec action_label(action()) -> binary().
action_label(Action) ->
    case Action of
        {pause, Reason} ->
            <<"pause:"/utf8, Reason/binary>>;

        {resume, Reason@1} ->
            <<"resume:"/utf8, Reason@1/binary>>;

        {drain, Reason@2} ->
            <<"drain:"/utf8, Reason@2/binary>>;

        {replay, From_stage, Reason@3} ->
            <<<<<<"replay:"/utf8, From_stage/binary>>/binary, ":"/utf8>>/binary,
                Reason@3/binary>>
    end.

-file("src/lightspeed/pipeline/operator.gleam", 139).
-spec next_state(control_state(), action(), integer()) -> {ok, control_state()} |
    {error, binary()}.
next_state(Current, Action, At_ms) ->
    case {Current, Action} of
        {running, {pause, Reason}} ->
            {ok, {paused, Reason, At_ms}};

        {running, {drain, Reason@1}} ->
            {ok, {draining, Reason@1, At_ms}};

        {running, {replay, From_stage, Reason@2}} ->
            case From_stage /= <<""/utf8>> of
                true ->
                    {ok, {replaying, From_stage, Reason@2, At_ms}};

                false ->
                    {error, <<"invalid_replay_stage"/utf8>>}
            end;

        {running, {resume, Reason@3}} ->
            {error,
                <<"invalid_transition:running:"/utf8,
                    (action_label({resume, Reason@3}))/binary>>};

        {{paused, _, _}, {resume, _}} ->
            {ok, running};

        {{paused, _, _}, {pause, Reason@4}} ->
            {error,
                <<"invalid_transition:paused:"/utf8,
                    (action_label({pause, Reason@4}))/binary>>};

        {{paused, _, _}, {drain, Reason@5}} ->
            {ok, {draining, Reason@5, At_ms}};

        {{paused, _, _}, {replay, From_stage@1, Reason@6}} ->
            case From_stage@1 /= <<""/utf8>> of
                true ->
                    {ok, {replaying, From_stage@1, Reason@6, At_ms}};

                false ->
                    {error, <<"invalid_replay_stage"/utf8>>}
            end;

        {{draining, _, _}, {resume, _}} ->
            {ok, running};

        {{draining, _, _}, {pause, Reason@7}} ->
            {ok, {paused, Reason@7, At_ms}};

        {{draining, _, _}, {drain, Reason@8}} ->
            {error,
                <<"invalid_transition:draining:"/utf8,
                    (action_label({drain, Reason@8}))/binary>>};

        {{draining, _, _}, {replay, From_stage@2, Reason@9}} ->
            case From_stage@2 /= <<""/utf8>> of
                true ->
                    {ok, {replaying, From_stage@2, Reason@9, At_ms}};

                false ->
                    {error, <<"invalid_replay_stage"/utf8>>}
            end;

        {{replaying, _, _, _}, {resume, _}} ->
            {ok, running};

        {{replaying, _, _, _}, {pause, Reason@10}} ->
            {ok, {paused, Reason@10, At_ms}};

        {{replaying, _, _, _}, {drain, Reason@11}} ->
            {ok, {draining, Reason@11, At_ms}};

        {{replaying, _, _, _}, {replay, From_stage@3, Reason@12}} ->
            case From_stage@3 /= <<""/utf8>> of
                true ->
                    {ok, {replaying, From_stage@3, Reason@12, At_ms}};

                false ->
                    {error, <<"invalid_replay_stage"/utf8>>}
            end
    end.

-file("src/lightspeed/pipeline/operator.gleam", 225).
-spec max(integer(), integer()) -> integer().
max(Left, Right) ->
    case Left >= Right of
        true ->
            Left;

        false ->
            Right
    end.

-file("src/lightspeed/pipeline/operator.gleam", 48).
?DOC(" Apply one operator action.\n").
-spec apply(runtime(), action(), integer()) -> {ok, runtime()} |
    {error, binary()}.
apply(Runtime, Action, At_ms) ->
    Clamped_at_ms = max(0, At_ms),
    case next_state(erlang:element(2, Runtime), Action, Clamped_at_ms) of
        {error, Reason} ->
            {error, Reason};

        {ok, Next} ->
            Event = {audit_event,
                Action,
                erlang:element(2, Runtime),
                Next,
                Clamped_at_ms},
            {ok, {runtime, Next, [Event | erlang:element(3, Runtime)]}}
    end.

-file("src/lightspeed/pipeline/operator.gleam", 71).
?DOC(" Whether new work can be enqueued.\n").
-spec can_enqueue(runtime()) -> boolean().
can_enqueue(Runtime) ->
    case erlang:element(2, Runtime) of
        running ->
            true;

        _ ->
            false
    end.

-file("src/lightspeed/pipeline/operator.gleam", 79).
?DOC(" Whether queued work can be started.\n").
-spec can_start_work(runtime()) -> boolean().
can_start_work(Runtime) ->
    case erlang:element(2, Runtime) of
        running ->
            true;

        {replaying, _, _, _} ->
            true;

        _ ->
            false
    end.

-file("src/lightspeed/pipeline/operator.gleam", 88).
?DOC(" Current control state.\n").
-spec state(runtime()) -> control_state().
state(Runtime) ->
    erlang:element(2, Runtime).

-file("src/lightspeed/pipeline/operator.gleam", 98).
?DOC(" Control-state label.\n").
-spec state_label(control_state()) -> binary().
state_label(State) ->
    case State of
        running ->
            <<"running"/utf8>>;

        {paused, Reason, At_ms} ->
            <<<<<<"paused:"/utf8, Reason/binary>>/binary, ":"/utf8>>/binary,
                (erlang:integer_to_binary(At_ms))/binary>>;

        {draining, Reason@1, At_ms@1} ->
            <<<<<<"draining:"/utf8, Reason@1/binary>>/binary, ":"/utf8>>/binary,
                (erlang:integer_to_binary(At_ms@1))/binary>>;

        {replaying, From_stage, Reason@2, At_ms@2} ->
            <<<<<<<<<<"replaying:"/utf8, From_stage/binary>>/binary, ":"/utf8>>/binary,
                        Reason@2/binary>>/binary,
                    ":"/utf8>>/binary,
                (erlang:integer_to_binary(At_ms@2))/binary>>
    end.

-file("src/lightspeed/pipeline/operator.gleam", 120).
?DOC(" Audit-event label.\n").
-spec audit_event_label(audit_event()) -> binary().
audit_event_label(Event) ->
    <<<<<<<<<<<<<<"action="/utf8,
                                (action_label(erlang:element(2, Event)))/binary>>/binary,
                            "|before="/utf8>>/binary,
                        (state_label(erlang:element(3, Event)))/binary>>/binary,
                    "|after="/utf8>>/binary,
                (state_label(erlang:element(4, Event)))/binary>>/binary,
            "|at_ms="/utf8>>/binary,
        (erlang:integer_to_binary(erlang:element(5, Event)))/binary>>.

-file("src/lightspeed/pipeline/operator.gleam", 217).
-spec join_with(binary(), list(binary())) -> binary().
join_with(Separator, Values) ->
    case Values of
        [] ->
            <<""/utf8>>;

        [Value] ->
            Value;

        [Value@1 | Rest] ->
            <<<<Value@1/binary, Separator/binary>>/binary,
                (join_with(Separator, Rest))/binary>>
    end.

-file("src/lightspeed/pipeline/operator.gleam", 132).
?DOC(" Stable runtime signature.\n").
-spec signature(runtime()) -> binary().
signature(Runtime) ->
    <<<<<<"state="/utf8, (state_label(erlang:element(2, Runtime)))/binary>>/binary,
            "|audits="/utf8>>/binary,
        (join_with(
            <<";"/utf8>>,
            gleam@list:map(audits(Runtime), fun audit_event_label/1)
        ))/binary>>.