src/grisp_connect_binlog.erl

%% @doc Ring buffer for storing logger logs
%%
%% This module extends a standard queue into a ring buffer.
%% It can be truncated once a batch of logs has been sent out
%% and is not needed anymore.
%% @end
-module(grisp_connect_binlog).

% API
-export([defaults/1]).
-export([new/0]).
-export([new/1]).
-export([insert/2]).
-export([items/1]).
-export([count/1]).
-export([bytes/1]).
-export([seq/1]).
-export([opts/1]).
-export([peek/2]).
-export([truncate/2]).

-define(DEFAULT_COUNT_MAX, 10_000).
% 10 MiB:
-define(DEFAULT_BYTES_MAX, 10 * 1_024 * 1_024).

-record(binlog, {
    queue = queue:new(),
    seq = -1,
    count = 0,
    count_max = ?DEFAULT_COUNT_MAX,
    bytes = 0,
    bytes_max = ?DEFAULT_BYTES_MAX
}).

%--- API -----------------------------------------------------------------------

defaults(count_max) -> ?DEFAULT_COUNT_MAX;
defaults(bytes_max) -> ?DEFAULT_BYTES_MAX.

new() -> new(#{}).

new(Opts) ->
    Log = #binlog{},
    Log#binlog{
        count_max = maps:get(count_max, Opts, Log#binlog.count_max),
        bytes_max = maps:get(bytes_max, Opts, Log#binlog.bytes_max)
    }.

insert({Seq, Bin} = Item, #binlog{bytes_max = BytesMax, count = Count} = L) when
    is_integer(Seq), Seq >= 0, byte_size(Bin) >= BytesMax
->
    {add(Item, clear(L)), Count};
insert({Seq, Bin} = Item, #binlog{} = L) when
    is_integer(Seq), Seq >= 0, is_binary(Bin)
->
    flush(add(Item, L));
insert(Item, #binlog{}) ->
    error({invalid_item, Item});
insert(_Item, Log) ->
    error({invalid_log, Log}).

items(#binlog{queue = Q}) ->
    queue:to_list(Q).

count(#binlog{count = Count}) -> Count.

bytes(#binlog{bytes = Bytes}) -> Bytes.

seq(#binlog{seq = Seq}) -> Seq.

opts(#binlog{count_max = CountMax, bytes_max = BytesMax}) ->
    #{count_max => CountMax, bytes_max => BytesMax}.

peek(Count, #binlog{queue = Queue}) when is_integer(Count), Count > 0 ->
    peek(Count, Queue, []);
peek(Count, _L) ->
    error({invalid_count, Count}).

truncate(To, #binlog{seq = Seq} = L) when To >= Seq ->
    clear(L);
truncate(To, #binlog{queue = Q0} = L) ->
    case queue:out(Q0) of
        {{value, {Seq, Bin}}, Q1} when Seq =< To ->
            truncate(To, drop(Q1, Bin, L));
        _ ->
            L
    end.

%--- Internal ------------------------------------------------------------------

flush(L) -> flush(L, 0).

flush(#binlog{queue = Q0, count = Count, bytes = Bytes} = L, N) when
    Count > L#binlog.count_max; Bytes > L#binlog.bytes_max
->
    {{value, {_Seq, Bin}}, Q1} = queue:out(Q0),
    flush(drop(Q1, Bin, L), N + 1);
flush(L, N) ->
    {L, N}.

drop(Q1, Bin, #binlog{count = Count, bytes = Bytes} = L) ->
    L#binlog{
        queue = Q1,
        count = Count - 1,
        bytes = Bytes - byte_size(Bin)
    }.

add({Seq, Bin}, L) when Seq > L#binlog.seq ->
    L#binlog{
        seq = Seq,
        queue = queue:in({Seq, Bin}, L#binlog.queue),
        count = L#binlog.count + 1,
        bytes = L#binlog.bytes + byte_size(Bin)
    };
add({Seq, _Bin}, _L) ->
    error({out_of_sequence, Seq}).

peek(0, _Q, Acc) ->
    lists:reverse(Acc);
peek(Count, Q0, Acc) ->
    case queue:out(Q0) of
        {{value, Item}, Q1} -> peek(Count - 1, Q1, [Item | Acc]);
        {empty, _} -> peek(0, Q0, Acc)
    end.

clear(L) -> L#binlog{queue = queue:new(), count = 0, bytes = 0}.