-module(lightspeed@pipeline@checkpoint).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lightspeed/pipeline/checkpoint.gleam").
-export([watermark/2, checkpoint/6, valid/1, valid_watermark/1, watermark_label/1, checkpoint_label/1, resume_point_label/1, latest/1, latest_for_stage/2, resume_point_for_stage/2, chain_signature/1]).
-export_type([watermark/0, checkpoint/0, resume_point/0]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(" Checkpoint and watermark contracts for deterministic pipeline replay.\n").
-type watermark() :: {watermark, integer(), integer()}.
-type checkpoint() :: {checkpoint,
binary(),
binary(),
integer(),
watermark(),
binary(),
integer()}.
-type resume_point() :: {resume_point,
binary(),
binary(),
integer(),
watermark()}.
-file("src/lightspeed/pipeline/checkpoint.gleam", 35).
?DOC(" Build one watermark.\n").
-spec watermark(integer(), integer()) -> watermark().
watermark(Offset, Event_time_ms) ->
{watermark, Offset, Event_time_ms}.
-file("src/lightspeed/pipeline/checkpoint.gleam", 40).
?DOC(" Build one checkpoint.\n").
-spec checkpoint(
binary(),
binary(),
integer(),
watermark(),
binary(),
integer()
) -> checkpoint().
checkpoint(Run_id, Stage, Sequence, Watermark, Idempotency_key, At_ms) ->
{checkpoint, Run_id, Stage, Sequence, Watermark, Idempotency_key, At_ms}.
-file("src/lightspeed/pipeline/checkpoint.gleam", 59).
?DOC(" Checkpoint invariants.\n").
-spec valid(checkpoint()) -> boolean().
valid(Entry) ->
(((((erlang:element(2, Entry) /= <<""/utf8>>) andalso (erlang:element(
3,
Entry
)
/= <<""/utf8>>))
andalso (erlang:element(4, Entry) > 0))
andalso (erlang:element(2, erlang:element(5, Entry)) >= 0))
andalso (erlang:element(3, erlang:element(5, Entry)) >= 0))
andalso (erlang:element(7, Entry) >= 0).
-file("src/lightspeed/pipeline/checkpoint.gleam", 69).
?DOC(" Watermark invariants.\n").
-spec valid_watermark(watermark()) -> boolean().
valid_watermark(Mark) ->
(erlang:element(2, Mark) >= 0) andalso (erlang:element(3, Mark) >= 0).
-file("src/lightspeed/pipeline/checkpoint.gleam", 74).
?DOC(" Stable watermark label.\n").
-spec watermark_label(watermark()) -> binary().
watermark_label(Mark) ->
<<<<<<"offset="/utf8,
(erlang:integer_to_binary(erlang:element(2, Mark)))/binary>>/binary,
",event_time_ms="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(3, Mark)))/binary>>.
-file("src/lightspeed/pipeline/checkpoint.gleam", 82).
?DOC(" Stable checkpoint label.\n").
-spec checkpoint_label(checkpoint()) -> binary().
checkpoint_label(Entry) ->
<<<<<<<<<<<<<<<<<<<<<<"run="/utf8, (erlang:element(2, Entry))/binary>>/binary,
"|stage="/utf8>>/binary,
(erlang:element(3, Entry))/binary>>/binary,
"|sequence="/utf8>>/binary,
(erlang:integer_to_binary(
erlang:element(4, Entry)
))/binary>>/binary,
"|watermark="/utf8>>/binary,
(watermark_label(erlang:element(5, Entry)))/binary>>/binary,
"|idempotency_key="/utf8>>/binary,
(erlang:element(6, Entry))/binary>>/binary,
"|at_ms="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(7, Entry)))/binary>>.
-file("src/lightspeed/pipeline/checkpoint.gleam", 98).
?DOC(" Stable resume-point label.\n").
-spec resume_point_label(resume_point()) -> binary().
resume_point_label(Point) ->
<<<<<<<<<<<<<<"run="/utf8, (erlang:element(2, Point))/binary>>/binary,
"|stage="/utf8>>/binary,
(erlang:element(3, Point))/binary>>/binary,
"|sequence="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(4, Point)))/binary>>/binary,
"|watermark="/utf8>>/binary,
(watermark_label(erlang:element(5, Point)))/binary>>.
-file("src/lightspeed/pipeline/checkpoint.gleam", 110).
?DOC(" Latest checkpoint in chronological order.\n").
-spec latest(list(checkpoint())) -> gleam@option:option(checkpoint()).
latest(Entries) ->
case lists:reverse(Entries) of
[] ->
none;
[Entry | _] ->
{some, Entry}
end.
-file("src/lightspeed/pipeline/checkpoint.gleam", 147).
-spec latest_for_stage_loop(list(checkpoint()), binary()) -> gleam@option:option(checkpoint()).
latest_for_stage_loop(Entries, Stage) ->
case Entries of
[] ->
none;
[Entry | Rest] ->
case erlang:element(3, Entry) =:= Stage of
true ->
{some, Entry};
false ->
latest_for_stage_loop(Rest, Stage)
end
end.
-file("src/lightspeed/pipeline/checkpoint.gleam", 118).
?DOC(" Latest checkpoint for one stage.\n").
-spec latest_for_stage(list(checkpoint()), binary()) -> gleam@option:option(checkpoint()).
latest_for_stage(Entries, Stage) ->
latest_for_stage_loop(lists:reverse(Entries), Stage).
-file("src/lightspeed/pipeline/checkpoint.gleam", 126).
?DOC(" Derive one resume point for a stage.\n").
-spec resume_point_for_stage(list(checkpoint()), binary()) -> {ok,
resume_point()} |
{error, binary()}.
resume_point_for_stage(Entries, Stage) ->
case latest_for_stage(Entries, Stage) of
none ->
{error, <<"missing_stage_checkpoint:"/utf8, Stage/binary>>};
{some, Entry} ->
{ok,
{resume_point,
erlang:element(2, Entry),
erlang:element(3, Entry),
erlang:element(4, Entry),
erlang:element(5, Entry)}}
end.
-file("src/lightspeed/pipeline/checkpoint.gleam", 161).
-spec join_with(binary(), list(binary())) -> binary().
join_with(Separator, Values) ->
case Values of
[] ->
<<""/utf8>>;
[Value] ->
Value;
[Value@1 | Rest] ->
<<<<Value@1/binary, Separator/binary>>/binary,
(join_with(Separator, Rest))/binary>>
end.
-file("src/lightspeed/pipeline/checkpoint.gleam", 143).
?DOC(" Stable checkpoint chain signature.\n").
-spec chain_signature(list(checkpoint())) -> binary().
chain_signature(Entries) ->
join_with(<<","/utf8>>, gleam@list:map(Entries, fun checkpoint_label/1)).