-module(lightspeed@pipeline@connector).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lightspeed/pipeline/connector.gleam").
-export([database_source/3, file_source/4, queue_source/3, database_sink/3, pubsub_sink/3, connector_plan/7, default_plan/0, valid/1, classify_failure/4, within_reprocess_window/2, source_label/1, sink_label/1, retry_policy_label/1, dead_letter_policy_label/1, failure_action_label/1, signature/1, source/1, sink/1, retry_policy/1, dead_letter_policy/1, max_batch_records/1]).
-export_type([source_connector/0, sink_connector/0, retry_policy/0, dead_letter_policy/0, reprocess_window/0, connector_plan/0, failure_action/0]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(" Connector and retry/dead-letter contracts for M32 orchestration parity.\n").
-type source_connector() :: {database_source, binary(), binary(), integer()} |
{file_source, binary(), binary(), binary(), integer()} |
{queue_source, binary(), binary(), integer()}.
-type sink_connector() :: {database_sink, binary(), binary(), binary()} |
{pub_sub_sink, binary(), binary(), binary()}.
-type retry_policy() :: {retry_policy, integer(), integer(), integer()}.
-type dead_letter_policy() :: {dead_letter_policy,
boolean(),
binary(),
integer()}.
-type reprocess_window() :: {reprocess_window, integer(), integer(), binary()}.
-type connector_plan() :: {connector_plan,
binary(),
source_connector(),
sink_connector(),
retry_policy(),
dead_letter_policy(),
gleam@option:option(reprocess_window()),
integer()}.
-type failure_action() :: {retry, integer(), integer()} |
{dead_letter, binary(), binary()} |
{reject, binary()}.
-file("src/lightspeed/pipeline/connector.gleam", 55).
?DOC(" Build a database source connector.\n").
-spec database_source(binary(), binary(), integer()) -> source_connector().
database_source(Name, Query, Batch_size) ->
{database_source, Name, Query, Batch_size}.
-file("src/lightspeed/pipeline/connector.gleam", 64).
?DOC(" Build a file source connector.\n").
-spec file_source(binary(), binary(), binary(), integer()) -> source_connector().
file_source(Name, Path, Format, Batch_size) ->
{file_source, Name, Path, Format, Batch_size}.
-file("src/lightspeed/pipeline/connector.gleam", 74).
?DOC(" Build a queue source connector.\n").
-spec queue_source(binary(), binary(), integer()) -> source_connector().
queue_source(Name, Topic, Prefetch) ->
{queue_source, Name, Topic, Prefetch}.
-file("src/lightspeed/pipeline/connector.gleam", 83).
?DOC(" Build a database sink connector.\n").
-spec database_sink(binary(), binary(), binary()) -> sink_connector().
database_sink(Name, Table, Upsert_key) ->
{database_sink, Name, Table, Upsert_key}.
-file("src/lightspeed/pipeline/connector.gleam", 92).
?DOC(" Build a pubsub sink connector.\n").
-spec pubsub_sink(binary(), binary(), binary()) -> sink_connector().
pubsub_sink(Name, Topic, Ordering_key) ->
{pub_sub_sink, Name, Topic, Ordering_key}.
-file("src/lightspeed/pipeline/connector.gleam", 101).
?DOC(" Build a connector plan.\n").
-spec connector_plan(
binary(),
source_connector(),
sink_connector(),
retry_policy(),
dead_letter_policy(),
gleam@option:option(reprocess_window()),
integer()
) -> connector_plan().
connector_plan(
Name,
Source,
Sink,
Retry_policy,
Dead_letter_policy,
Reprocess_window,
Max_batch_records
) ->
{connector_plan,
Name,
Source,
Sink,
Retry_policy,
Dead_letter_policy,
Reprocess_window,
Max_batch_records}.
-file("src/lightspeed/pipeline/connector.gleam", 122).
?DOC(" Default M32 connector plan.\n").
-spec default_plan() -> connector_plan().
default_plan() ->
{connector_plan,
<<"orders_connector_plan"/utf8>>,
{queue_source, <<"orders_queue"/utf8>>, <<"orders.raw"/utf8>>, 4},
{database_sink,
<<"orders_store"/utf8>>,
<<"orders_projection"/utf8>>,
<<"order_id"/utf8>>},
{retry_policy, 3, 250, 2000},
{dead_letter_policy, true, <<"orders.dead_letter"/utf8>>, 65536},
none,
500}.
-file("src/lightspeed/pipeline/connector.gleam", 334).
-spec reprocess_window_valid(gleam@option:option(reprocess_window())) -> boolean().
reprocess_window_valid(Window) ->
case Window of
none ->
true;
{some, Window@1} ->
((erlang:element(2, Window@1) > 0) andalso (erlang:element(
3,
Window@1
)
>= erlang:element(2, Window@1)))
andalso (erlang:element(4, Window@1) /= <<""/utf8>>)
end.
-file("src/lightspeed/pipeline/connector.gleam", 327).
-spec dead_letter_valid(dead_letter_policy()) -> boolean().
dead_letter_valid(Policy) ->
case erlang:element(2, Policy) of
true ->
(erlang:element(3, Policy) /= <<""/utf8>>) andalso (erlang:element(
4,
Policy
)
> 0);
false ->
true
end.
-file("src/lightspeed/pipeline/connector.gleam", 321).
-spec retry_valid(retry_policy()) -> boolean().
retry_valid(Policy) ->
((erlang:element(2, Policy) > 0) andalso (erlang:element(3, Policy) > 0))
andalso (erlang:element(4, Policy) >= erlang:element(3, Policy)).
-file("src/lightspeed/pipeline/connector.gleam", 312).
-spec sink_valid(sink_connector()) -> boolean().
sink_valid(Sink) ->
case Sink of
{database_sink, Name, Table, Upsert_key} ->
((Name /= <<""/utf8>>) andalso (Table /= <<""/utf8>>)) andalso (Upsert_key
/= <<""/utf8>>);
{pub_sub_sink, Name@1, Topic, Ordering_key} ->
((Name@1 /= <<""/utf8>>) andalso (Topic /= <<""/utf8>>)) andalso (Ordering_key
/= <<""/utf8>>)
end.
-file("src/lightspeed/pipeline/connector.gleam", 301).
-spec source_valid(source_connector()) -> boolean().
source_valid(Source) ->
case Source of
{database_source, Name, Query, Batch_size} ->
((Name /= <<""/utf8>>) andalso (Query /= <<""/utf8>>)) andalso (Batch_size
> 0);
{file_source, Name@1, Path, Format, Batch_size@1} ->
(((Name@1 /= <<""/utf8>>) andalso (Path /= <<""/utf8>>)) andalso (Format
/= <<""/utf8>>))
andalso (Batch_size@1 > 0);
{queue_source, Name@2, Topic, Prefetch} ->
((Name@2 /= <<""/utf8>>) andalso (Topic /= <<""/utf8>>)) andalso (Prefetch
> 0)
end.
-file("src/lightspeed/pipeline/connector.gleam", 147).
?DOC(" Validate connector plan invariants.\n").
-spec valid(connector_plan()) -> boolean().
valid(Plan) ->
((((((erlang:element(2, Plan) /= <<""/utf8>>) andalso source_valid(
erlang:element(3, Plan)
))
andalso sink_valid(erlang:element(4, Plan)))
andalso retry_valid(erlang:element(5, Plan)))
andalso dead_letter_valid(erlang:element(6, Plan)))
andalso reprocess_window_valid(erlang:element(7, Plan)))
andalso (erlang:element(8, Plan) > 0).
-file("src/lightspeed/pipeline/connector.gleam", 367).
-spec max(integer(), integer()) -> integer().
max(Left, Right) ->
case Left >= Right of
true ->
Left;
false ->
Right
end.
-file("src/lightspeed/pipeline/connector.gleam", 374).
-spec min(integer(), integer()) -> integer().
min(Left, Right) ->
case Left =< Right of
true ->
Left;
false ->
Right
end.
-file("src/lightspeed/pipeline/connector.gleam", 344).
-spec backoff_ms(retry_policy(), integer()) -> integer().
backoff_ms(Policy, Attempt) ->
min(erlang:element(4, Policy), erlang:element(3, Policy) * max(1, Attempt)).
-file("src/lightspeed/pipeline/connector.gleam", 158).
?DOC(" Determine failure action from retry and dead-letter policy.\n").
-spec classify_failure(connector_plan(), integer(), binary(), integer()) -> failure_action().
classify_failure(Plan, Attempt, Reason, Now_ms) ->
case Attempt < erlang:element(2, erlang:element(5, Plan)) of
true ->
Next_attempt = Attempt + 1,
{retry,
Next_attempt,
max(0, Now_ms) + backoff_ms(
erlang:element(5, Plan),
Next_attempt
)};
false ->
case erlang:element(2, erlang:element(6, Plan)) of
true ->
{dead_letter,
erlang:element(3, erlang:element(6, Plan)),
Reason};
false ->
{reject, <<"retry_exhausted:"/utf8, Reason/binary>>}
end
end.
-file("src/lightspeed/pipeline/connector.gleam", 185).
?DOC(" Check whether one sequence is allowed by the optional reprocess window.\n").
-spec within_reprocess_window(connector_plan(), integer()) -> boolean().
within_reprocess_window(Plan, Sequence) ->
case erlang:element(7, Plan) of
none ->
Sequence > 0;
{some, Window} ->
(Sequence >= erlang:element(2, Window)) andalso (Sequence =< erlang:element(
3,
Window
))
end.
-file("src/lightspeed/pipeline/connector.gleam", 194).
?DOC(" Source connector label.\n").
-spec source_label(source_connector()) -> binary().
source_label(Source) ->
case Source of
{database_source, Name, Query, Batch_size} ->
<<<<<<<<<<"database_source:"/utf8, Name/binary>>/binary, ":"/utf8>>/binary,
Query/binary>>/binary,
":"/utf8>>/binary,
(erlang:integer_to_binary(Batch_size))/binary>>;
{file_source, Name@1, Path, Format, Batch_size@1} ->
<<<<<<<<<<<<<<"file_source:"/utf8, Name@1/binary>>/binary,
":"/utf8>>/binary,
Path/binary>>/binary,
":"/utf8>>/binary,
Format/binary>>/binary,
":"/utf8>>/binary,
(erlang:integer_to_binary(Batch_size@1))/binary>>;
{queue_source, Name@2, Topic, Prefetch} ->
<<<<<<<<<<"queue_source:"/utf8, Name@2/binary>>/binary, ":"/utf8>>/binary,
Topic/binary>>/binary,
":"/utf8>>/binary,
(erlang:integer_to_binary(Prefetch))/binary>>
end.
-file("src/lightspeed/pipeline/connector.gleam", 218).
?DOC(" Sink connector label.\n").
-spec sink_label(sink_connector()) -> binary().
sink_label(Sink) ->
case Sink of
{database_sink, Name, Table, Upsert_key} ->
<<<<<<<<<<"database_sink:"/utf8, Name/binary>>/binary, ":"/utf8>>/binary,
Table/binary>>/binary,
":"/utf8>>/binary,
Upsert_key/binary>>;
{pub_sub_sink, Name@1, Topic, Ordering_key} ->
<<<<<<<<<<"pubsub_sink:"/utf8, Name@1/binary>>/binary, ":"/utf8>>/binary,
Topic/binary>>/binary,
":"/utf8>>/binary,
Ordering_key/binary>>
end.
-file("src/lightspeed/pipeline/connector.gleam", 228).
?DOC(" Retry policy label.\n").
-spec retry_policy_label(retry_policy()) -> binary().
retry_policy_label(Policy) ->
<<<<<<<<<<"max_attempts="/utf8,
(erlang:integer_to_binary(erlang:element(2, Policy)))/binary>>/binary,
"|base_backoff_ms="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(3, Policy)))/binary>>/binary,
"|max_backoff_ms="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(4, Policy)))/binary>>.
-file("src/lightspeed/pipeline/connector.gleam", 360).
-spec bool_label(boolean()) -> binary().
bool_label(Value) ->
case Value of
true ->
<<"true"/utf8>>;
false ->
<<"false"/utf8>>
end.
-file("src/lightspeed/pipeline/connector.gleam", 238).
?DOC(" Dead-letter policy label.\n").
-spec dead_letter_policy_label(dead_letter_policy()) -> binary().
dead_letter_policy_label(Policy) ->
<<<<<<<<<<"enabled="/utf8, (bool_label(erlang:element(2, Policy)))/binary>>/binary,
"|destination="/utf8>>/binary,
(erlang:element(3, Policy))/binary>>/binary,
"|max_payload_bytes="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(4, Policy)))/binary>>.
-file("src/lightspeed/pipeline/connector.gleam", 248).
?DOC(" Failure action label.\n").
-spec failure_action_label(failure_action()) -> binary().
failure_action_label(Action) ->
case Action of
{retry, Next_attempt, At_ms} ->
<<<<<<"retry:"/utf8,
(erlang:integer_to_binary(Next_attempt))/binary>>/binary,
":"/utf8>>/binary,
(erlang:integer_to_binary(At_ms))/binary>>;
{dead_letter, Destination, Reason} ->
<<<<<<"dead_letter:"/utf8, Destination/binary>>/binary, ":"/utf8>>/binary,
Reason/binary>>;
{reject, Reason@1} ->
<<"reject:"/utf8, Reason@1/binary>>
end.
-file("src/lightspeed/pipeline/connector.gleam", 348).
-spec reprocess_window_label(gleam@option:option(reprocess_window())) -> binary().
reprocess_window_label(Window) ->
case Window of
none ->
<<"none"/utf8>>;
{some, Window@1} ->
<<<<<<<<(erlang:integer_to_binary(erlang:element(2, Window@1)))/binary,
"-"/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(3, Window@1)))/binary>>/binary,
":"/utf8>>/binary,
(erlang:element(4, Window@1))/binary>>
end.
-file("src/lightspeed/pipeline/connector.gleam", 259).
?DOC(" Stable connector-plan signature.\n").
-spec signature(connector_plan()) -> binary().
signature(Plan) ->
<<<<<<<<<<<<<<<<<<<<<<<<<<"name="/utf8, (erlang:element(2, Plan))/binary>>/binary,
"|source="/utf8>>/binary,
(source_label(
erlang:element(3, Plan)
))/binary>>/binary,
"|sink="/utf8>>/binary,
(sink_label(erlang:element(4, Plan)))/binary>>/binary,
"|retry="/utf8>>/binary,
(retry_policy_label(erlang:element(5, Plan)))/binary>>/binary,
"|dead_letter="/utf8>>/binary,
(dead_letter_policy_label(erlang:element(6, Plan)))/binary>>/binary,
"|reprocess="/utf8>>/binary,
(reprocess_window_label(erlang:element(7, Plan)))/binary>>/binary,
"|max_batch_records="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(8, Plan)))/binary>>.
-file("src/lightspeed/pipeline/connector.gleam", 277).
?DOC(" Access source connector.\n").
-spec source(connector_plan()) -> source_connector().
source(Plan) ->
erlang:element(3, Plan).
-file("src/lightspeed/pipeline/connector.gleam", 282).
?DOC(" Access sink connector.\n").
-spec sink(connector_plan()) -> sink_connector().
sink(Plan) ->
erlang:element(4, Plan).
-file("src/lightspeed/pipeline/connector.gleam", 287).
?DOC(" Access retry policy.\n").
-spec retry_policy(connector_plan()) -> retry_policy().
retry_policy(Plan) ->
erlang:element(5, Plan).
-file("src/lightspeed/pipeline/connector.gleam", 292).
?DOC(" Access dead-letter policy.\n").
-spec dead_letter_policy(connector_plan()) -> dead_letter_policy().
dead_letter_policy(Plan) ->
erlang:element(6, Plan).
-file("src/lightspeed/pipeline/connector.gleam", 297).
?DOC(" Access max batch records.\n").
-spec max_batch_records(connector_plan()) -> integer().
max_batch_records(Plan) ->
erlang:element(8, Plan).