src/lightspeed@pipeline@connector.erl

-module(lightspeed@pipeline@connector).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lightspeed/pipeline/connector.gleam").
-export([database_source/3, file_source/4, queue_source/3, database_sink/3, pubsub_sink/3, connector_plan/7, default_plan/0, valid/1, classify_failure/4, within_reprocess_window/2, source_label/1, sink_label/1, retry_policy_label/1, dead_letter_policy_label/1, failure_action_label/1, signature/1, source/1, sink/1, retry_policy/1, dead_letter_policy/1, max_batch_records/1]).
-export_type([source_connector/0, sink_connector/0, retry_policy/0, dead_letter_policy/0, reprocess_window/0, connector_plan/0, failure_action/0]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(" Connector and retry/dead-letter contracts for M32 orchestration parity.\n").

-type source_connector() :: {database_source, binary(), binary(), integer()} |
    {file_source, binary(), binary(), binary(), integer()} |
    {queue_source, binary(), binary(), integer()}.

-type sink_connector() :: {database_sink, binary(), binary(), binary()} |
    {pub_sub_sink, binary(), binary(), binary()}.

-type retry_policy() :: {retry_policy, integer(), integer(), integer()}.

-type dead_letter_policy() :: {dead_letter_policy,
        boolean(),
        binary(),
        integer()}.

-type reprocess_window() :: {reprocess_window, integer(), integer(), binary()}.

-type connector_plan() :: {connector_plan,
        binary(),
        source_connector(),
        sink_connector(),
        retry_policy(),
        dead_letter_policy(),
        gleam@option:option(reprocess_window()),
        integer()}.

-type failure_action() :: {retry, integer(), integer()} |
    {dead_letter, binary(), binary()} |
    {reject, binary()}.

-file("src/lightspeed/pipeline/connector.gleam", 55).
?DOC(" Build a database source connector.\n").
-spec database_source(binary(), binary(), integer()) -> source_connector().
database_source(Name, Query, Batch_size) ->
    {database_source, Name, Query, Batch_size}.

-file("src/lightspeed/pipeline/connector.gleam", 64).
?DOC(" Build a file source connector.\n").
-spec file_source(binary(), binary(), binary(), integer()) -> source_connector().
file_source(Name, Path, Format, Batch_size) ->
    {file_source, Name, Path, Format, Batch_size}.

-file("src/lightspeed/pipeline/connector.gleam", 74).
?DOC(" Build a queue source connector.\n").
-spec queue_source(binary(), binary(), integer()) -> source_connector().
queue_source(Name, Topic, Prefetch) ->
    {queue_source, Name, Topic, Prefetch}.

-file("src/lightspeed/pipeline/connector.gleam", 83).
?DOC(" Build a database sink connector.\n").
-spec database_sink(binary(), binary(), binary()) -> sink_connector().
database_sink(Name, Table, Upsert_key) ->
    {database_sink, Name, Table, Upsert_key}.

-file("src/lightspeed/pipeline/connector.gleam", 92).
?DOC(" Build a pubsub sink connector.\n").
-spec pubsub_sink(binary(), binary(), binary()) -> sink_connector().
pubsub_sink(Name, Topic, Ordering_key) ->
    {pub_sub_sink, Name, Topic, Ordering_key}.

-file("src/lightspeed/pipeline/connector.gleam", 101).
?DOC(" Build a connector plan.\n").
-spec connector_plan(
    binary(),
    source_connector(),
    sink_connector(),
    retry_policy(),
    dead_letter_policy(),
    gleam@option:option(reprocess_window()),
    integer()
) -> connector_plan().
connector_plan(
    Name,
    Source,
    Sink,
    Retry_policy,
    Dead_letter_policy,
    Reprocess_window,
    Max_batch_records
) ->
    {connector_plan,
        Name,
        Source,
        Sink,
        Retry_policy,
        Dead_letter_policy,
        Reprocess_window,
        Max_batch_records}.

-file("src/lightspeed/pipeline/connector.gleam", 122).
?DOC(" Default M32 connector plan.\n").
-spec default_plan() -> connector_plan().
default_plan() ->
    {connector_plan,
        <<"orders_connector_plan"/utf8>>,
        {queue_source, <<"orders_queue"/utf8>>, <<"orders.raw"/utf8>>, 4},
        {database_sink,
            <<"orders_store"/utf8>>,
            <<"orders_projection"/utf8>>,
            <<"order_id"/utf8>>},
        {retry_policy, 3, 250, 2000},
        {dead_letter_policy, true, <<"orders.dead_letter"/utf8>>, 65536},
        none,
        500}.

-file("src/lightspeed/pipeline/connector.gleam", 334).
-spec reprocess_window_valid(gleam@option:option(reprocess_window())) -> boolean().
reprocess_window_valid(Window) ->
    case Window of
        none ->
            true;

        {some, Window@1} ->
            ((erlang:element(2, Window@1) > 0) andalso (erlang:element(
                3,
                Window@1
            )
            >= erlang:element(2, Window@1)))
            andalso (erlang:element(4, Window@1) /= <<""/utf8>>)
    end.

-file("src/lightspeed/pipeline/connector.gleam", 327).
-spec dead_letter_valid(dead_letter_policy()) -> boolean().
dead_letter_valid(Policy) ->
    case erlang:element(2, Policy) of
        true ->
            (erlang:element(3, Policy) /= <<""/utf8>>) andalso (erlang:element(
                4,
                Policy
            )
            > 0);

        false ->
            true
    end.

-file("src/lightspeed/pipeline/connector.gleam", 321).
-spec retry_valid(retry_policy()) -> boolean().
retry_valid(Policy) ->
    ((erlang:element(2, Policy) > 0) andalso (erlang:element(3, Policy) > 0))
    andalso (erlang:element(4, Policy) >= erlang:element(3, Policy)).

-file("src/lightspeed/pipeline/connector.gleam", 312).
-spec sink_valid(sink_connector()) -> boolean().
sink_valid(Sink) ->
    case Sink of
        {database_sink, Name, Table, Upsert_key} ->
            ((Name /= <<""/utf8>>) andalso (Table /= <<""/utf8>>)) andalso (Upsert_key
            /= <<""/utf8>>);

        {pub_sub_sink, Name@1, Topic, Ordering_key} ->
            ((Name@1 /= <<""/utf8>>) andalso (Topic /= <<""/utf8>>)) andalso (Ordering_key
            /= <<""/utf8>>)
    end.

-file("src/lightspeed/pipeline/connector.gleam", 301).
-spec source_valid(source_connector()) -> boolean().
source_valid(Source) ->
    case Source of
        {database_source, Name, Query, Batch_size} ->
            ((Name /= <<""/utf8>>) andalso (Query /= <<""/utf8>>)) andalso (Batch_size
            > 0);

        {file_source, Name@1, Path, Format, Batch_size@1} ->
            (((Name@1 /= <<""/utf8>>) andalso (Path /= <<""/utf8>>)) andalso (Format
            /= <<""/utf8>>))
            andalso (Batch_size@1 > 0);

        {queue_source, Name@2, Topic, Prefetch} ->
            ((Name@2 /= <<""/utf8>>) andalso (Topic /= <<""/utf8>>)) andalso (Prefetch
            > 0)
    end.

-file("src/lightspeed/pipeline/connector.gleam", 147).
?DOC(" Validate connector plan invariants.\n").
-spec valid(connector_plan()) -> boolean().
valid(Plan) ->
    ((((((erlang:element(2, Plan) /= <<""/utf8>>) andalso source_valid(
        erlang:element(3, Plan)
    ))
    andalso sink_valid(erlang:element(4, Plan)))
    andalso retry_valid(erlang:element(5, Plan)))
    andalso dead_letter_valid(erlang:element(6, Plan)))
    andalso reprocess_window_valid(erlang:element(7, Plan)))
    andalso (erlang:element(8, Plan) > 0).

-file("src/lightspeed/pipeline/connector.gleam", 367).
-spec max(integer(), integer()) -> integer().
max(Left, Right) ->
    case Left >= Right of
        true ->
            Left;

        false ->
            Right
    end.

-file("src/lightspeed/pipeline/connector.gleam", 374).
-spec min(integer(), integer()) -> integer().
min(Left, Right) ->
    case Left =< Right of
        true ->
            Left;

        false ->
            Right
    end.

-file("src/lightspeed/pipeline/connector.gleam", 344).
-spec backoff_ms(retry_policy(), integer()) -> integer().
backoff_ms(Policy, Attempt) ->
    min(erlang:element(4, Policy), erlang:element(3, Policy) * max(1, Attempt)).

-file("src/lightspeed/pipeline/connector.gleam", 158).
?DOC(" Determine failure action from retry and dead-letter policy.\n").
-spec classify_failure(connector_plan(), integer(), binary(), integer()) -> failure_action().
classify_failure(Plan, Attempt, Reason, Now_ms) ->
    case Attempt < erlang:element(2, erlang:element(5, Plan)) of
        true ->
            Next_attempt = Attempt + 1,
            {retry,
                Next_attempt,
                max(0, Now_ms) + backoff_ms(
                    erlang:element(5, Plan),
                    Next_attempt
                )};

        false ->
            case erlang:element(2, erlang:element(6, Plan)) of
                true ->
                    {dead_letter,
                        erlang:element(3, erlang:element(6, Plan)),
                        Reason};

                false ->
                    {reject, <<"retry_exhausted:"/utf8, Reason/binary>>}
            end
    end.

-file("src/lightspeed/pipeline/connector.gleam", 185).
?DOC(" Check whether one sequence is allowed by the optional reprocess window.\n").
-spec within_reprocess_window(connector_plan(), integer()) -> boolean().
within_reprocess_window(Plan, Sequence) ->
    case erlang:element(7, Plan) of
        none ->
            Sequence > 0;

        {some, Window} ->
            (Sequence >= erlang:element(2, Window)) andalso (Sequence =< erlang:element(
                3,
                Window
            ))
    end.

-file("src/lightspeed/pipeline/connector.gleam", 194).
?DOC(" Source connector label.\n").
-spec source_label(source_connector()) -> binary().
source_label(Source) ->
    case Source of
        {database_source, Name, Query, Batch_size} ->
            <<<<<<<<<<"database_source:"/utf8, Name/binary>>/binary, ":"/utf8>>/binary,
                        Query/binary>>/binary,
                    ":"/utf8>>/binary,
                (erlang:integer_to_binary(Batch_size))/binary>>;

        {file_source, Name@1, Path, Format, Batch_size@1} ->
            <<<<<<<<<<<<<<"file_source:"/utf8, Name@1/binary>>/binary,
                                    ":"/utf8>>/binary,
                                Path/binary>>/binary,
                            ":"/utf8>>/binary,
                        Format/binary>>/binary,
                    ":"/utf8>>/binary,
                (erlang:integer_to_binary(Batch_size@1))/binary>>;

        {queue_source, Name@2, Topic, Prefetch} ->
            <<<<<<<<<<"queue_source:"/utf8, Name@2/binary>>/binary, ":"/utf8>>/binary,
                        Topic/binary>>/binary,
                    ":"/utf8>>/binary,
                (erlang:integer_to_binary(Prefetch))/binary>>
    end.

-file("src/lightspeed/pipeline/connector.gleam", 218).
?DOC(" Sink connector label.\n").
-spec sink_label(sink_connector()) -> binary().
sink_label(Sink) ->
    case Sink of
        {database_sink, Name, Table, Upsert_key} ->
            <<<<<<<<<<"database_sink:"/utf8, Name/binary>>/binary, ":"/utf8>>/binary,
                        Table/binary>>/binary,
                    ":"/utf8>>/binary,
                Upsert_key/binary>>;

        {pub_sub_sink, Name@1, Topic, Ordering_key} ->
            <<<<<<<<<<"pubsub_sink:"/utf8, Name@1/binary>>/binary, ":"/utf8>>/binary,
                        Topic/binary>>/binary,
                    ":"/utf8>>/binary,
                Ordering_key/binary>>
    end.

-file("src/lightspeed/pipeline/connector.gleam", 228).
?DOC(" Retry policy label.\n").
-spec retry_policy_label(retry_policy()) -> binary().
retry_policy_label(Policy) ->
    <<<<<<<<<<"max_attempts="/utf8,
                        (erlang:integer_to_binary(erlang:element(2, Policy)))/binary>>/binary,
                    "|base_backoff_ms="/utf8>>/binary,
                (erlang:integer_to_binary(erlang:element(3, Policy)))/binary>>/binary,
            "|max_backoff_ms="/utf8>>/binary,
        (erlang:integer_to_binary(erlang:element(4, Policy)))/binary>>.

-file("src/lightspeed/pipeline/connector.gleam", 360).
-spec bool_label(boolean()) -> binary().
bool_label(Value) ->
    case Value of
        true ->
            <<"true"/utf8>>;

        false ->
            <<"false"/utf8>>
    end.

-file("src/lightspeed/pipeline/connector.gleam", 238).
?DOC(" Dead-letter policy label.\n").
-spec dead_letter_policy_label(dead_letter_policy()) -> binary().
dead_letter_policy_label(Policy) ->
    <<<<<<<<<<"enabled="/utf8, (bool_label(erlang:element(2, Policy)))/binary>>/binary,
                    "|destination="/utf8>>/binary,
                (erlang:element(3, Policy))/binary>>/binary,
            "|max_payload_bytes="/utf8>>/binary,
        (erlang:integer_to_binary(erlang:element(4, Policy)))/binary>>.

-file("src/lightspeed/pipeline/connector.gleam", 248).
?DOC(" Failure action label.\n").
-spec failure_action_label(failure_action()) -> binary().
failure_action_label(Action) ->
    case Action of
        {retry, Next_attempt, At_ms} ->
            <<<<<<"retry:"/utf8,
                        (erlang:integer_to_binary(Next_attempt))/binary>>/binary,
                    ":"/utf8>>/binary,
                (erlang:integer_to_binary(At_ms))/binary>>;

        {dead_letter, Destination, Reason} ->
            <<<<<<"dead_letter:"/utf8, Destination/binary>>/binary, ":"/utf8>>/binary,
                Reason/binary>>;

        {reject, Reason@1} ->
            <<"reject:"/utf8, Reason@1/binary>>
    end.

-file("src/lightspeed/pipeline/connector.gleam", 348).
-spec reprocess_window_label(gleam@option:option(reprocess_window())) -> binary().
reprocess_window_label(Window) ->
    case Window of
        none ->
            <<"none"/utf8>>;

        {some, Window@1} ->
            <<<<<<<<(erlang:integer_to_binary(erlang:element(2, Window@1)))/binary,
                            "-"/utf8>>/binary,
                        (erlang:integer_to_binary(erlang:element(3, Window@1)))/binary>>/binary,
                    ":"/utf8>>/binary,
                (erlang:element(4, Window@1))/binary>>
    end.

-file("src/lightspeed/pipeline/connector.gleam", 259).
?DOC(" Stable connector-plan signature.\n").
-spec signature(connector_plan()) -> binary().
signature(Plan) ->
    <<<<<<<<<<<<<<<<<<<<<<<<<<"name="/utf8, (erlang:element(2, Plan))/binary>>/binary,
                                                    "|source="/utf8>>/binary,
                                                (source_label(
                                                    erlang:element(3, Plan)
                                                ))/binary>>/binary,
                                            "|sink="/utf8>>/binary,
                                        (sink_label(erlang:element(4, Plan)))/binary>>/binary,
                                    "|retry="/utf8>>/binary,
                                (retry_policy_label(erlang:element(5, Plan)))/binary>>/binary,
                            "|dead_letter="/utf8>>/binary,
                        (dead_letter_policy_label(erlang:element(6, Plan)))/binary>>/binary,
                    "|reprocess="/utf8>>/binary,
                (reprocess_window_label(erlang:element(7, Plan)))/binary>>/binary,
            "|max_batch_records="/utf8>>/binary,
        (erlang:integer_to_binary(erlang:element(8, Plan)))/binary>>.

-file("src/lightspeed/pipeline/connector.gleam", 277).
?DOC(" Access source connector.\n").
-spec source(connector_plan()) -> source_connector().
source(Plan) ->
    erlang:element(3, Plan).

-file("src/lightspeed/pipeline/connector.gleam", 282).
?DOC(" Access sink connector.\n").
-spec sink(connector_plan()) -> sink_connector().
sink(Plan) ->
    erlang:element(4, Plan).

-file("src/lightspeed/pipeline/connector.gleam", 287).
?DOC(" Access retry policy.\n").
-spec retry_policy(connector_plan()) -> retry_policy().
retry_policy(Plan) ->
    erlang:element(5, Plan).

-file("src/lightspeed/pipeline/connector.gleam", 292).
?DOC(" Access dead-letter policy.\n").
-spec dead_letter_policy(connector_plan()) -> dead_letter_policy().
dead_letter_policy(Plan) ->
    erlang:element(6, Plan).

-file("src/lightspeed/pipeline/connector.gleam", 297).
?DOC(" Access max batch records.\n").
-spec max_batch_records(connector_plan()) -> integer().
max_batch_records(Plan) ->
    erlang:element(8, Plan).