src/lightspeed@pipeline@checkpoint.erl

-module(lightspeed@pipeline@checkpoint).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lightspeed/pipeline/checkpoint.gleam").
-export([watermark/2, checkpoint/6, valid/1, valid_watermark/1, watermark_label/1, checkpoint_label/1, resume_point_label/1, latest/1, latest_for_stage/2, resume_point_for_stage/2, chain_signature/1]).
-export_type([watermark/0, checkpoint/0, resume_point/0]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(" Checkpoint and watermark contracts for deterministic pipeline replay.\n").

-type watermark() :: {watermark, integer(), integer()}.

-type checkpoint() :: {checkpoint,
        binary(),
        binary(),
        integer(),
        watermark(),
        binary(),
        integer()}.

-type resume_point() :: {resume_point,
        binary(),
        binary(),
        integer(),
        watermark()}.

-file("src/lightspeed/pipeline/checkpoint.gleam", 35).
?DOC(" Build one watermark.\n").
-spec watermark(integer(), integer()) -> watermark().
watermark(Offset, Event_time_ms) ->
    {watermark, Offset, Event_time_ms}.

-file("src/lightspeed/pipeline/checkpoint.gleam", 40).
?DOC(" Build one checkpoint.\n").
-spec checkpoint(
    binary(),
    binary(),
    integer(),
    watermark(),
    binary(),
    integer()
) -> checkpoint().
checkpoint(Run_id, Stage, Sequence, Watermark, Idempotency_key, At_ms) ->
    {checkpoint, Run_id, Stage, Sequence, Watermark, Idempotency_key, At_ms}.

-file("src/lightspeed/pipeline/checkpoint.gleam", 59).
?DOC(" Checkpoint invariants.\n").
-spec valid(checkpoint()) -> boolean().
valid(Entry) ->
    (((((erlang:element(2, Entry) /= <<""/utf8>>) andalso (erlang:element(
        3,
        Entry
    )
    /= <<""/utf8>>))
    andalso (erlang:element(4, Entry) > 0))
    andalso (erlang:element(2, erlang:element(5, Entry)) >= 0))
    andalso (erlang:element(3, erlang:element(5, Entry)) >= 0))
    andalso (erlang:element(7, Entry) >= 0).

-file("src/lightspeed/pipeline/checkpoint.gleam", 69).
?DOC(" Watermark invariants.\n").
-spec valid_watermark(watermark()) -> boolean().
valid_watermark(Mark) ->
    (erlang:element(2, Mark) >= 0) andalso (erlang:element(3, Mark) >= 0).

-file("src/lightspeed/pipeline/checkpoint.gleam", 74).
?DOC(" Stable watermark label.\n").
-spec watermark_label(watermark()) -> binary().
watermark_label(Mark) ->
    <<<<<<"offset="/utf8,
                (erlang:integer_to_binary(erlang:element(2, Mark)))/binary>>/binary,
            ",event_time_ms="/utf8>>/binary,
        (erlang:integer_to_binary(erlang:element(3, Mark)))/binary>>.

-file("src/lightspeed/pipeline/checkpoint.gleam", 82).
?DOC(" Stable checkpoint label.\n").
-spec checkpoint_label(checkpoint()) -> binary().
checkpoint_label(Entry) ->
    <<<<<<<<<<<<<<<<<<<<<<"run="/utf8, (erlang:element(2, Entry))/binary>>/binary,
                                            "|stage="/utf8>>/binary,
                                        (erlang:element(3, Entry))/binary>>/binary,
                                    "|sequence="/utf8>>/binary,
                                (erlang:integer_to_binary(
                                    erlang:element(4, Entry)
                                ))/binary>>/binary,
                            "|watermark="/utf8>>/binary,
                        (watermark_label(erlang:element(5, Entry)))/binary>>/binary,
                    "|idempotency_key="/utf8>>/binary,
                (erlang:element(6, Entry))/binary>>/binary,
            "|at_ms="/utf8>>/binary,
        (erlang:integer_to_binary(erlang:element(7, Entry)))/binary>>.

-file("src/lightspeed/pipeline/checkpoint.gleam", 98).
?DOC(" Stable resume-point label.\n").
-spec resume_point_label(resume_point()) -> binary().
resume_point_label(Point) ->
    <<<<<<<<<<<<<<"run="/utf8, (erlang:element(2, Point))/binary>>/binary,
                            "|stage="/utf8>>/binary,
                        (erlang:element(3, Point))/binary>>/binary,
                    "|sequence="/utf8>>/binary,
                (erlang:integer_to_binary(erlang:element(4, Point)))/binary>>/binary,
            "|watermark="/utf8>>/binary,
        (watermark_label(erlang:element(5, Point)))/binary>>.

-file("src/lightspeed/pipeline/checkpoint.gleam", 110).
?DOC(" Latest checkpoint in chronological order.\n").
-spec latest(list(checkpoint())) -> gleam@option:option(checkpoint()).
latest(Entries) ->
    case lists:reverse(Entries) of
        [] ->
            none;

        [Entry | _] ->
            {some, Entry}
    end.

-file("src/lightspeed/pipeline/checkpoint.gleam", 147).
-spec latest_for_stage_loop(list(checkpoint()), binary()) -> gleam@option:option(checkpoint()).
latest_for_stage_loop(Entries, Stage) ->
    case Entries of
        [] ->
            none;

        [Entry | Rest] ->
            case erlang:element(3, Entry) =:= Stage of
                true ->
                    {some, Entry};

                false ->
                    latest_for_stage_loop(Rest, Stage)
            end
    end.

-file("src/lightspeed/pipeline/checkpoint.gleam", 118).
?DOC(" Latest checkpoint for one stage.\n").
-spec latest_for_stage(list(checkpoint()), binary()) -> gleam@option:option(checkpoint()).
latest_for_stage(Entries, Stage) ->
    latest_for_stage_loop(lists:reverse(Entries), Stage).

-file("src/lightspeed/pipeline/checkpoint.gleam", 126).
?DOC(" Derive one resume point for a stage.\n").
-spec resume_point_for_stage(list(checkpoint()), binary()) -> {ok,
        resume_point()} |
    {error, binary()}.
resume_point_for_stage(Entries, Stage) ->
    case latest_for_stage(Entries, Stage) of
        none ->
            {error, <<"missing_stage_checkpoint:"/utf8, Stage/binary>>};

        {some, Entry} ->
            {ok,
                {resume_point,
                    erlang:element(2, Entry),
                    erlang:element(3, Entry),
                    erlang:element(4, Entry),
                    erlang:element(5, Entry)}}
    end.

-file("src/lightspeed/pipeline/checkpoint.gleam", 161).
-spec join_with(binary(), list(binary())) -> binary().
join_with(Separator, Values) ->
    case Values of
        [] ->
            <<""/utf8>>;

        [Value] ->
            Value;

        [Value@1 | Rest] ->
            <<<<Value@1/binary, Separator/binary>>/binary,
                (join_with(Separator, Rest))/binary>>
    end.

-file("src/lightspeed/pipeline/checkpoint.gleam", 143).
?DOC(" Stable checkpoint chain signature.\n").
-spec chain_signature(list(checkpoint())) -> binary().
chain_signature(Entries) ->
    join_with(<<","/utf8>>, gleam@list:map(Entries, fun checkpoint_label/1)).