%% @doc Per-store gen_server holding events, snapshots, subscribers,
%% and (optionally) integrity state for a single in-memory event store.
%%
%% The state shape mirrors what reckon-db keeps in Khepri:
%%
%% <ul>
%% <li>`streams' — map of StreamId to ordered list of `#event{}'</li>
%% <li>`snapshots' — map of StreamId to map of Version to `#snapshot{}'</li>
%% <li>`subscribers' — map of subscription key to subscriber metadata</li>
%% <li>`integrity' — `disabled' or `{enabled, Key, ChainStarts}'</li>
%% </ul>
%%
%% No persistence. Process restart loses state. That is the intended
%% semantic — mem-evoq exists for tests, demos, and as a reference
%% implementation of the evoq_event_store adapter behaviour. For
%% production use, pair evoq with reckon-evoq + reckon-db.
%%
%% Streams are stored in append order (events 0..N-1, oldest first).
%% This costs O(N) on append because lists are cons-prepended and
%% reversed once on the wire, but mem-evoq is for tests where N is
%% small. Premature optimisation here would obscure the reference
%% implementation aspect.
%% @end
-module(mem_evoq_store).
-behaviour(gen_server).
-include_lib("reckon_gater/include/reckon_gater_types.hrl").
-export([start_link/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
%% NO_STREAM (-1), ANY_VERSION (-2), STREAM_EXISTS (-4),
%% CONTENT_TYPE_JSON come from reckon_gater_types.hrl above.
-record(state, {
store_id :: atom(),
streams = #{} :: #{binary() => [event()]},
snapshots = #{} :: #{binary() => #{non_neg_integer() => snapshot()}},
subscribers = #{} :: #{binary() => map()},
integrity = disabled :: disabled | enabled_integrity()
}).
-type enabled_integrity() :: #{
key := binary(),
chain_start := #{binary() => non_neg_integer()}
}.
%%====================================================================
%% Lifecycle
%%====================================================================
-spec start_link(atom(), map()) -> {ok, pid()} | {error, term()}.
start_link(StoreId, Opts) ->
gen_server:start_link(?MODULE, {StoreId, Opts}, []).
%%====================================================================
%% gen_server callbacks
%%====================================================================
init({StoreId, Opts}) ->
finish_init(init_integrity(maps:get(integrity, Opts, disabled)), StoreId).
finish_init({ok, Integrity}, StoreId) ->
ok = mem_evoq_registry:register(StoreId, self()),
{ok, #state{store_id = StoreId, integrity = Integrity}};
finish_init({error, _} = Err, _StoreId) ->
{stop, Err}.
%%--------------------------------------------------------------------
%% Write path
%%--------------------------------------------------------------------
handle_call({append, StreamId, ExpectedVersion, Events}, _From, State) ->
reply_append(do_append(StreamId, ExpectedVersion, Events, State), State);
%%--------------------------------------------------------------------
%% Read path
%%--------------------------------------------------------------------
handle_call({read, StreamId, FromVersion, Count, Direction}, _From, State) ->
{reply, do_read(StreamId, FromVersion, Count, Direction, #{}, State), State};
handle_call({read, StreamId, FromVersion, Count, Direction, Opts}, _From, State) ->
{reply, do_read(StreamId, FromVersion, Count, Direction, Opts, State), State};
%%--------------------------------------------------------------------
%% Stream metadata
%%--------------------------------------------------------------------
handle_call({version, StreamId}, _From, State) ->
{reply, current_version(StreamId, State), State};
handle_call({exists, StreamId}, _From, State) ->
{reply, has_stream(StreamId, State), State};
handle_call(has_events, _From, State) ->
{reply, do_has_events(State), State};
handle_call(list_streams, _From, State) ->
{reply, {ok, do_list_streams(State)}, State};
handle_call({delete, StreamId}, _From, State) ->
{reply, ok, do_delete_stream(StreamId, State)};
%%--------------------------------------------------------------------
%% Append with subscriber fan-out — handled inside the {append, ...}
%% clause above; live delivery happens in do_append/4.
%%--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% Cross-stream reads
%%--------------------------------------------------------------------
handle_call({read_all_global, Offset, BatchSize}, _From, State) ->
{reply, do_read_all_global(Offset, BatchSize, State), State};
%%--------------------------------------------------------------------
%% Subscriptions
%%--------------------------------------------------------------------
handle_call({subscribe, StreamId, Pid, Opts}, _From, State) ->
do_subscribe({by_stream, StreamId}, Pid, Opts, State);
handle_call({subscribe_all, Pid, Opts}, _From, State) ->
do_subscribe(all, Pid, Opts, State);
handle_call({subscribe, Type, Selector, Pid, Opts}, _From, State) ->
do_subscribe({Type, Selector}, Pid, Opts, State);
handle_call({unsubscribe, SubKey}, _From, State) ->
{reply, ok, do_unsubscribe(SubKey, State)};
%%--------------------------------------------------------------------
%% Snapshots
%%--------------------------------------------------------------------
handle_call({save_snapshot, StreamId, #snapshot{} = Snap0}, _From, State) ->
reply_save_snapshot(
apply_snapshot_integrity(StreamId, Snap0, State),
StreamId, State);
handle_call({save_snapshot, StreamId, Version, Data, Metadata}, _From, State) ->
Snap0 = #snapshot{
stream_id = StreamId,
version = Version,
data = Data,
metadata = Metadata,
timestamp = erlang:system_time(millisecond)
},
reply_save_snapshot(
apply_snapshot_integrity(StreamId, Snap0, State),
StreamId, State);
handle_call({load_snapshot, StreamId}, _From, State) ->
{reply, verify_snapshot_load(do_load_latest_snapshot(StreamId, State),
StreamId, State), State};
handle_call({load_snapshot, StreamId, Version}, _From, State) ->
{reply, verify_snapshot_load(do_load_snapshot_at(StreamId, Version, State),
StreamId, State), State};
handle_call({list_snapshots, StreamId}, _From, State) ->
{reply, {ok, do_list_snapshots(StreamId, State)}, State};
handle_call({delete_snapshot, StreamId}, _From, State) ->
{reply, ok, do_delete_snapshots_for_stream(StreamId, State)};
%%--------------------------------------------------------------------
%% Other (further moves — pending)
%%--------------------------------------------------------------------
handle_call(_Req, _From, State) ->
{reply, {error, not_implemented}, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
%% Subscriber pid died — clean up any subscriptions it owned.
handle_info({'DOWN', _Ref, process, Pid, _Reason},
#state{subscribers = Subs} = State) ->
Kept = maps:filter(
fun(_K, #{pid := P}) -> P =/= Pid end,
Subs),
{noreply, State#state{subscribers = Kept}};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, #state{store_id = StoreId}) ->
catch mem_evoq_registry:unregister(StoreId),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%====================================================================
%% Internal — append
%%====================================================================
-spec do_append(binary(), integer(), [map()], #state{}) ->
{ok, non_neg_integer(), #state{}} | {error, term()}.
do_append(StreamId, ExpectedVersion, Events, State) ->
CurrentVersion = current_version(StreamId, State),
proceed_with_append(
check_expected_version(ExpectedVersion, CurrentVersion),
StreamId, CurrentVersion, Events, State).
proceed_with_append(ok, StreamId, CurrentVersion, Events, State) ->
append_events_to_stream(StreamId, CurrentVersion, Events, State);
proceed_with_append({error, _} = Err, _StreamId, _CV, _Events, _State) ->
Err.
reply_append({ok, NewVersion, NewState}, _OldState) ->
{reply, {ok, NewVersion}, NewState};
reply_append({error, _} = Err, State) ->
{reply, Err, State}.
-spec current_version(binary(), #state{}) -> integer().
current_version(StreamId, #state{streams = Streams}) ->
version_from_entry(maps:get(StreamId, Streams, undefined)).
version_from_entry(undefined) -> ?NO_STREAM;
version_from_entry([]) -> ?NO_STREAM;
version_from_entry(List) when is_list(List) -> length(List) - 1.
%% Mirror of reckon_db_streams:check_expected_version/2.
-spec check_expected_version(integer(), integer()) -> ok | {error, term()}.
check_expected_version(?ANY_VERSION, _CurrentVersion) ->
ok;
check_expected_version(?NO_STREAM, ?NO_STREAM) ->
ok;
check_expected_version(?NO_STREAM, CurrentVersion) ->
{error, {wrong_expected_version, ?NO_STREAM, CurrentVersion}};
check_expected_version(?STREAM_EXISTS, ?NO_STREAM) ->
{error, {wrong_expected_version, ?STREAM_EXISTS, ?NO_STREAM}};
check_expected_version(?STREAM_EXISTS, _CurrentVersion) ->
ok;
check_expected_version(Expected, Current) when Expected =:= Current ->
ok;
check_expected_version(Expected, Current) ->
{error, {wrong_expected_version, Expected, Current}}.
-spec append_events_to_stream(
binary(), integer(), [map()], #state{}
) -> {ok, non_neg_integer(), #state{}}.
append_events_to_stream(StreamId, CurrentVersion, Events, State) ->
Now = erlang:system_time(millisecond),
EpochUs = erlang:system_time(microsecond),
IntegrityCtx = setup_integrity_for_append(StreamId, CurrentVersion, State),
InitialTip = resolve_write_initial_tip(
StreamId, CurrentVersion + 1, IntegrityCtx, State),
{Recorded, FinalVersion, _FinalTip} = lists:foldl(
fun(Event, {Acc, AccVer, AccTip}) ->
NewVer = AccVer + 1,
Record0 = create_event_record(Event, StreamId, NewVer, Now, EpochUs),
{Record, NextTip} = apply_integrity_if_enabled(
Record0, AccTip, IntegrityCtx),
{[Record | Acc], NewVer, NextTip}
end,
{[], CurrentVersion, InitialTip},
Events
),
%% Acc is reverse-order; reverse and prepend in correct order.
AppendList = lists:reverse(Recorded),
NewStreams = maps_update_append(StreamId, AppendList, State#state.streams),
%% Persist the watermark side-effect from setup_integrity_for_append
%% (only matters on integrity-enabled stores, no-op otherwise).
NewIntegrity = persist_watermark(StreamId, IntegrityCtx, State#state.integrity),
NewState = State#state{streams = NewStreams, integrity = NewIntegrity},
%% Fan out to live subscribers — synchronous send per subscriber.
ok = fanout_to_subscribers(AppendList, NewState),
{ok, FinalVersion, NewState}.
%% Mirror of reckon_db_streams:create_event_record/5. Integrity fields
%% (prev_event_hash, mac, signature) are left as undefined here —
%% those will be populated when move 14 wires integrity in.
-spec create_event_record(
map(), binary(), non_neg_integer(), integer(), integer()
) -> event().
create_event_record(Event, StreamId, Version, Timestamp, EpochUs) ->
EventId = maps:get(event_id, Event, generate_event_id()),
EventType = maps:get(event_type, Event),
Data = maps:get(data, Event),
Metadata = maps:get(metadata, Event, #{}),
Tags = maps:get(tags, Event, undefined),
DataContentType = maps:get(data_content_type, Event, ?CONTENT_TYPE_JSON),
MetadataContentType = maps:get(metadata_content_type, Event, ?CONTENT_TYPE_JSON),
#event{
event_id = EventId,
event_type = EventType,
stream_id = StreamId,
version = Version,
data = Data,
metadata = Metadata,
tags = Tags,
timestamp = Timestamp,
epoch_us = EpochUs,
data_content_type = DataContentType,
metadata_content_type = MetadataContentType
}.
%% Append-to-list in a map; create the entry if absent. Order matters:
%% List MUST be in version order (oldest first).
%%
%% Empty appends are a no-op — never create an empty stream entry.
%% This matches reckon-db, which doesn't put anything in Khepri if
%% no events are provided.
maps_update_append(_Key, [], Map) ->
Map;
maps_update_append(Key, ListToAppend, Map) ->
insert_or_extend(maps:get(Key, Map, undefined), Key, ListToAppend, Map).
insert_or_extend(undefined, Key, ListToAppend, Map) ->
maps:put(Key, ListToAppend, Map);
insert_or_extend(Existing, Key, ListToAppend, Map) when is_list(Existing) ->
maps:put(Key, Existing ++ ListToAppend, Map).
%% Generate a UUIDv7-shaped event id. mem-evoq is for tests, so a
%% per-process counter + timestamp is fine — strict v7 collision-
%% resistance is not the property tests care about.
generate_event_id() ->
Bin = crypto:strong_rand_bytes(16),
list_to_binary(
[hex_digit(B) || <<B:4>> <= Bin]
).
hex_digit(D) when D < 10 -> $0 + D;
hex_digit(D) -> $a + D - 10.
%%====================================================================
%% Internal — read
%%====================================================================
%% @private Read a slice of a stream.
%%
%% Forward semantics: take events at versions
%% `[FromVersion, FromVersion + Count - 1]'.
%% Backward semantics: take events at versions
%% `[max(0, FromVersion - Count + 1), FromVersion]', returned in
%% descending version order (newest first).
%%
%% Stream-not-found returns `{error, {stream_not_found, StreamId}}'.
%% Out-of-range requests return a partial result (no padding, no error).
-spec do_read(
binary(), non_neg_integer(), pos_integer(), forward | backward,
map(), #state{}
) -> {ok, [event()]} | {error, term()}.
do_read(StreamId, FromVersion, Count, Direction, Opts,
#state{streams = Streams} = State)
when is_integer(FromVersion), FromVersion >= 0,
is_integer(Count), Count > 0,
(Direction =:= forward orelse Direction =:= backward),
is_map(Opts) ->
read_with_lookup(
maps:get(StreamId, Streams, undefined),
StreamId, FromVersion, Count, Direction, Opts, State);
do_read(_StreamId, _FromVersion, _Count, _Direction, _Opts, _State) ->
{error, badarg}.
read_with_lookup(undefined, StreamId, _F, _C, _D, _O, _S) ->
{error, {stream_not_found, StreamId}};
read_with_lookup(Events, StreamId, FromVersion, Count, Direction, Opts, State)
when is_list(Events) ->
maybe_verify_read(
StreamId, FromVersion, Direction,
slice_events(Events, FromVersion, Count, Direction),
Opts, State).
%%--------------------------------------------------------------------
%% Read-path verification (move 15)
%%--------------------------------------------------------------------
maybe_verify_read(StreamId, FromVersion, Direction, Events, Opts, State) ->
Mode = maps:get(verify, Opts, skip_legacy),
dispatch_verify_read(
verify_required(State#state.integrity, Mode),
StreamId, FromVersion, Direction, Events, Mode, State).
dispatch_verify_read(false, _StreamId, _FromVersion, _Direction, Events, _Mode, _State) ->
{ok, Events};
dispatch_verify_read(true, StreamId, FromVersion, Direction, Events, Mode, State) ->
finalize_verify_read(
verify_in_direction(StreamId, FromVersion, Direction, Events, Mode, State)).
finalize_verify_read({ok, _} = Ok) -> Ok;
finalize_verify_read({integrity_violation, _} = V) -> {error, V}.
verify_required(disabled, _Mode) -> false;
verify_required(_, skip_all) -> false;
verify_required(_, _) -> true.
verify_in_direction(StreamId, FromVersion, forward, Events, Mode, State) ->
verify_events_forward(StreamId, FromVersion, Events, Mode, State);
verify_in_direction(StreamId, FromVersion, backward, Events, Mode, State) ->
%% Same trick as reckon-db 2.1.1: reverse to forward, verify, reverse back.
Forward = lists:reverse(Events),
reverse_verified(
verify_events_forward(
StreamId, forward_start_version(Forward, FromVersion),
Forward, Mode, State)).
forward_start_version([], FallbackVersion) -> FallbackVersion;
forward_start_version([#event{version = V} | _], _) -> V.
reverse_verified({ok, Events}) -> {ok, lists:reverse(Events)};
reverse_verified(Other) -> Other.
verify_events_forward(StreamId, StartVersion, Events, Mode,
#state{integrity = #{key := Key, chain_start := WMs},
streams = Streams}) ->
ChainStart = maps:get(StreamId, WMs, undefined),
InitialTip = resolve_read_initial_tip(
StreamId, StartVersion, ChainStart, Streams),
verify_events_loop(Events, InitialTip, ChainStart, Key, Mode, StreamId, []).
%% Mirrors reckon_db_streams:resolve_read_initial_tip/4 — see that
%% module for the rationale on each branch.
resolve_read_initial_tip(_StreamId, _StartVersion, undefined, _Streams) ->
undefined;
resolve_read_initial_tip(_StreamId, StartVersion, ChainStart, _Streams)
when StartVersion < ChainStart ->
undefined;
resolve_read_initial_tip(_StreamId, StartVersion, ChainStart, _Streams)
when StartVersion =:= ChainStart ->
reckon_gater_integrity:genesis_prev_hash();
resolve_read_initial_tip(StreamId, StartVersion, _ChainStart, Streams)
when StartVersion > 0 ->
%% Predecessor must be on disk and integrity-bearing; look it up
%% in the live in-memory streams map.
tip_of_predecessor(
find_event_at(maps:get(StreamId, Streams, []), StartVersion - 1)).
tip_of_predecessor({ok, #event{prev_event_hash = Prev} = E})
when is_binary(Prev) ->
reckon_gater_integrity:compute_chain_hash(E, Prev);
tip_of_predecessor(_) ->
undefined.
verify_events_loop([], _Tip, _ChainStart, _Key, _Mode, _StreamId, Acc) ->
{ok, lists:reverse(Acc)};
verify_events_loop([Event | Rest], Tip, ChainStart, Key, Mode, StreamId, Acc) ->
branch_on_event_kind(
is_legacy_event(Event, ChainStart),
Event, Rest, Tip, ChainStart, Key, Mode, StreamId, Acc).
branch_on_event_kind(true, Event, Rest, Tip, ChainStart, Key, Mode, StreamId, Acc) ->
handle_legacy(Event, Rest, Tip, ChainStart, Key, Mode, StreamId, Acc);
branch_on_event_kind(false, Event, Rest, Tip, ChainStart, Key, Mode, StreamId, Acc) ->
handle_integrity(Event, Rest, Tip, ChainStart, Key, Mode, StreamId, Acc).
is_legacy_event(_Event, undefined) -> true;
is_legacy_event(#event{version = V}, ChainStart) -> V < ChainStart.
handle_legacy(Event, _Rest, _Tip, _ChainStart, _Key, strict, StreamId, _Acc) ->
{integrity_violation, #{
layer => storage,
stream_id => StreamId,
version => Event#event.version,
kind => missing_integrity,
context => #{detail => legacy_event_under_strict_mode}
}};
handle_legacy(Event, Rest, Tip, ChainStart, Key, Mode, StreamId, Acc) ->
verify_events_loop(Rest, Tip, ChainStart, Key, Mode, StreamId, [Event | Acc]).
handle_integrity(Event, Rest, undefined, ChainStart, Key, Mode, StreamId, Acc)
when is_integer(ChainStart),
Event#event.version =:= ChainStart ->
%% First integrity event in the stream — seed tip with genesis.
handle_integrity(Event, Rest,
reckon_gater_integrity:genesis_prev_hash(),
ChainStart, Key, Mode, StreamId, Acc);
handle_integrity(Event, Rest, Tip, ChainStart, Key, Mode, StreamId, Acc)
when is_binary(Tip) ->
continue_after_verify(
reckon_gater_integrity:verify_event(Event, Tip, Key),
Event, Rest, Tip, ChainStart, Key, Mode, StreamId, Acc).
continue_after_verify(ok, Event, Rest, Tip, ChainStart, Key, Mode, StreamId, Acc) ->
NextTip = reckon_gater_integrity:compute_chain_hash(Event, Tip),
verify_events_loop(Rest, NextTip, ChainStart, Key, Mode, StreamId, [Event | Acc]);
continue_after_verify({integrity_violation, _} = V, _E, _R, _T, _C, _K, _M, _S, _A) ->
V.
%% List is in forward order (oldest first); filter into the requested
%% window, then reverse for backward reads so callers get the natural
%% "newest first" sequence.
slice_events(Events, FromVersion, Count, forward) ->
EndVersion = FromVersion + Count - 1,
[E || E <- Events,
E#event.version >= FromVersion,
E#event.version =< EndVersion];
slice_events(Events, FromVersion, Count, backward) ->
StartVersion = max(0, FromVersion - Count + 1),
Filtered = [E || E <- Events,
E#event.version >= StartVersion,
E#event.version =< FromVersion],
lists:reverse(Filtered).
%%====================================================================
%% Internal — metadata
%%====================================================================
%% @private Stream exists when it has at least one event in state.
%% Empty entries are not produced (see maps_update_append/3).
-spec has_stream(binary(), #state{}) -> boolean().
has_stream(StreamId, #state{streams = Streams}) ->
maps:is_key(StreamId, Streams).
%% @private Whether the store contains at least one event across any
%% stream. Mirrors reckon_db_streams:has_events/1.
-spec do_has_events(#state{}) -> boolean().
do_has_events(#state{streams = Streams}) ->
%% Empty-stream entries don't exist by construction, so any
%% non-empty map means events exist.
maps:size(Streams) > 0.
%% @private All stream IDs in the store. Order is implementation-
%% defined; mirrors reckon_db_streams:list_streams/1 which returns a
%% lists:usort'd list.
-spec do_list_streams(#state{}) -> [binary()].
do_list_streams(#state{streams = Streams}) ->
lists:usort(maps:keys(Streams)).
%% @private Delete a stream from the store. Idempotent — returns
%% an unchanged state if the stream doesn't exist. Snapshots for
%% the stream are also dropped (matches the operational expectation
%% that "delete the stream" means "remove all trace of it").
-spec do_delete_stream(binary(), #state{}) -> #state{}.
do_delete_stream(StreamId, #state{streams = S, snapshots = SS} = State) ->
State#state{
streams = maps:remove(StreamId, S),
snapshots = maps:remove(StreamId, SS)
}.
%%====================================================================
%% Internal — cross-stream reads
%%====================================================================
%% @private Read all events across all streams, sorted by epoch_us,
%% skipping `Offset` events and returning up to `BatchSize` events.
%%
%% This is the catch-up subscription primitive; reckon-db's
%% reckon_db_streams:read_all_global/3 does the same thing against
%% Khepri. Within a single store instance ties on epoch_us are
%% rare but possible (microsecond timestamps); we don't impose a
%% secondary sort because reckon-db doesn't either.
-spec do_read_all_global(non_neg_integer(), pos_integer(), #state{}) ->
{ok, [event()]} | {error, term()}.
do_read_all_global(Offset, BatchSize, #state{streams = Streams})
when is_integer(Offset), Offset >= 0,
is_integer(BatchSize), BatchSize > 0 ->
AllEvents = lists:append(maps:values(Streams)),
Sorted = lists:sort(
fun(#event{epoch_us = A}, #event{epoch_us = B}) -> A =< B end,
AllEvents
),
Sliced = lists:sublist(Sorted, Offset + 1, BatchSize),
{ok, Sliced};
do_read_all_global(_Offset, _BatchSize, _State) ->
{error, badarg}.
%%====================================================================
%% Internal — subscriptions
%%====================================================================
-type filter() ::
all |
{by_stream, binary()} |
{by_event_type, binary()} |
{by_event_pattern, map()} |
{by_tags, [binary()], any | all}.
%% @private Register a subscription.
%%
%% Opts:
%% <ul>
%% <li>`from => 0' — replay all matching existing events synchronously
%% to the subscriber pid before the call returns (catch-up).</li>
%% <li>`from => latest' (default) — no catch-up; subscriber receives
%% only events written after the call returns (live).</li>
%% </ul>
%%
%% Returns `{ok, SubKey}' where SubKey is an opaque binary the caller
%% later passes to `unsubscribe/2'.
do_subscribe(FilterIn, Pid, Opts, State) when is_pid(Pid), is_map(Opts) ->
SubKey = generate_sub_key(),
From = maps:get(from, Opts, latest),
Filter = normalise_filter(FilterIn),
%% Catch-up BEFORE installing the subscription so live deliveries
%% arriving after this call returns don't race catch-up batches.
ok = maybe_deliver_catchup(From, Pid, Filter, State),
SubInfo = #{sub_key => SubKey, pid => Pid,
filter => Filter, from => From},
_Ref = erlang:monitor(process, Pid),
NewSubs = maps:put(SubKey, SubInfo, State#state.subscribers),
{reply, {ok, SubKey}, State#state{subscribers = NewSubs}}.
maybe_deliver_catchup(latest, _Pid, _Filter, _State) ->
ok;
maybe_deliver_catchup(N, Pid, Filter, State) when is_integer(N), N >= 0 ->
deliver_catchup(Pid, Filter, N, State);
maybe_deliver_catchup(_, _Pid, _Filter, _State) ->
ok.
%% @private Remove a subscription. Idempotent.
do_unsubscribe(SubKey, #state{subscribers = Subs} = State) ->
State#state{subscribers = maps:remove(SubKey, Subs)}.
%% @private Subscribe-time normalisation of the user-facing filter
%% shapes (stream / event_type / event_pattern / tags — both the
%% bare-atom and the `by_*` flavours) to the internal canonical form.
normalise_filter(all) -> all;
normalise_filter({stream, StreamId}) -> {by_stream, StreamId};
normalise_filter({by_stream, StreamId}) -> {by_stream, StreamId};
normalise_filter({event_type, Type}) -> {by_event_type, Type};
normalise_filter({by_event_type, Type}) -> {by_event_type, Type};
normalise_filter({event_pattern, P}) -> {by_event_pattern, P};
normalise_filter({by_event_pattern, P}) -> {by_event_pattern, P};
normalise_filter({tags, Tags}) -> {by_tags, Tags, any};
normalise_filter({by_tags, Tags}) -> {by_tags, Tags, any};
normalise_filter({by_tags, Tags, Match}) -> {by_tags, Tags, Match}.
%% @private Send a catch-up batch synchronously.
%%
%% For integrity-enabled stores, each event's MAC is verified before
%% delivery. A failure halts the catch-up — the subscriber receives a
%% {subscription_error, {integrity_violation, _}} message and no
%% further events. Same shape as reckon-db's subscription catch-up
%% (move 17).
deliver_catchup(Pid, Filter, FromOffset, #state{streams = Streams} = State) ->
AllEvents = lists:append(maps:values(Streams)),
Matched = [E || E <- AllEvents, event_matches_filter(E, Filter)],
Sorted = lists:sort(
fun(#event{epoch_us = A}, #event{epoch_us = B}) -> A =< B end,
Matched),
Sliced = slice_catchup(Sorted, FromOffset),
dispatch_catchup(verify_catchup_batch(Sliced, State), Pid, Sliced).
slice_catchup(Sorted, 0) ->
Sorted;
slice_catchup(Sorted, FromOffset) ->
lists:nthtail(min(FromOffset, length(Sorted)), Sorted).
dispatch_catchup(ok, _Pid, []) ->
ok;
dispatch_catchup(ok, Pid, Sliced) ->
Pid ! {events, Sliced},
ok;
dispatch_catchup({integrity_violation, _} = V, Pid, _Sliced) ->
Pid ! {subscription_error, V},
ok.
%% @private MAC-only verification across an out-of-order set of
%% events (catch-up may include multiple streams). Per-stream chain
%% verification is a different concern handled in the read path;
%% here we just confirm each event's MAC against the store's key.
%% Disabled stores skip entirely.
verify_catchup_batch(_Events, #state{integrity = disabled}) ->
ok;
verify_catchup_batch(Events, #state{integrity = #{key := Key}}) ->
verify_each_event_mac(Events, Key).
verify_each_event_mac([], _Key) ->
ok;
verify_each_event_mac([Event | Rest], Key) ->
continue_mac_chain(verify_one_event_mac(Event, Key), Rest, Key).
continue_mac_chain(ok, Rest, Key) -> verify_each_event_mac(Rest, Key);
continue_mac_chain(Violation, _Rest, _Key) -> Violation.
verify_one_event_mac(#event{mac = undefined}, _Key) ->
%% Legacy event in catch-up — pass through (matches skip_legacy
%% semantics from the read path).
ok;
verify_one_event_mac(#event{mac = {_KeyId, StoredMac}} = Event, Key) ->
Stripped = Event#event{mac = undefined, signature = undefined},
Bytes = reckon_gater_canonical:encode_for_mac(event, Stripped),
Expected = crypto:mac(hmac, sha256, Key, Bytes),
mac_match_outcome(crypto:hash_equals(StoredMac, Expected), Event).
mac_match_outcome(true, _Event) ->
ok;
mac_match_outcome(false, Event) ->
{integrity_violation, #{
layer => storage,
stream_id => Event#event.stream_id,
version => Event#event.version,
kind => mac_mismatch,
context => #{detected_at => catchup_replay}
}}.
%% @private Live fan-out — for each subscriber whose filter matches
%% one or more of the newly-appended events, send a batch of just
%% those matching events.
fanout_to_subscribers(NewEvents, #state{subscribers = Subs}) ->
maps:fold(
fun(_K, #{pid := Pid, filter := Filter}, _Acc) ->
deliver_matched(
Pid, [E || E <- NewEvents, event_matches_filter(E, Filter)])
end,
ok, Subs).
deliver_matched(_Pid, []) -> ok;
deliver_matched(Pid, Matched) -> Pid ! {events, Matched}, ok.
%% @private Single-event filter match. Move 11 — the filter taxonomy
%% covered here is a subset of reckon-db's (which adds payload-pattern
%% and by_event_pattern variants). For mem-evoq we cover the common
%% test cases and document the rest.
-spec event_matches_filter(event(), filter()) -> boolean().
event_matches_filter(_Event, all) ->
true;
event_matches_filter(#event{stream_id = SID}, {by_stream, SID}) ->
true;
event_matches_filter(#event{}, {by_stream, _}) ->
false;
event_matches_filter(#event{event_type = T}, {by_event_type, T}) ->
true;
event_matches_filter(#event{}, {by_event_type, _}) ->
false;
event_matches_filter(#event{event_type = T}, {by_event_pattern, #{event_type := T}}) ->
true;
event_matches_filter(#event{}, {by_event_pattern, _}) ->
false;
event_matches_filter(#event{tags = Tags}, {by_tags, Wanted, Match})
when is_list(Tags) ->
tags_match(Tags, Wanted, Match);
event_matches_filter(#event{tags = undefined}, {by_tags, _, _}) ->
false.
tags_match(EventTags, Wanted, any) ->
lists:any(fun(T) -> lists:member(T, EventTags) end, Wanted);
tags_match(EventTags, Wanted, all) ->
lists:all(fun(T) -> lists:member(T, EventTags) end, Wanted).
generate_sub_key() ->
Bin = crypto:strong_rand_bytes(8),
list_to_binary([hex_digit(B) || <<B:4>> <= Bin]).
%%====================================================================
%% Internal — snapshots
%%====================================================================
%% @private Save a snapshot record under StreamId/Version. Replaces
%% any existing snapshot at the same version.
do_save_snapshot(StreamId, #snapshot{version = V} = Snap,
#state{snapshots = All} = State) ->
PerStream = maps:get(StreamId, All, #{}),
NewPerStream = maps:put(V, Snap, PerStream),
State#state{snapshots = maps:put(StreamId, NewPerStream, All)}.
%% @private Load the highest-version snapshot for a stream, or
%% `{error, not_found}'.
do_load_latest_snapshot(StreamId, #state{snapshots = All}) ->
latest_in_per_stream(maps:get(StreamId, All, undefined)).
latest_in_per_stream(undefined) ->
{error, not_found};
latest_in_per_stream(PerStream) when map_size(PerStream) =:= 0 ->
{error, not_found};
latest_in_per_stream(PerStream) ->
Latest = lists:max(maps:keys(PerStream)),
{ok, maps:get(Latest, PerStream)}.
%% @private Load a specific version snapshot.
do_load_snapshot_at(StreamId, Version, #state{snapshots = All}) ->
snapshot_at_version(maps:get(StreamId, All, undefined), Version).
snapshot_at_version(undefined, _Version) ->
{error, not_found};
snapshot_at_version(PerStream, Version) ->
wrap_snapshot(maps:get(Version, PerStream, undefined)).
wrap_snapshot(undefined) -> {error, not_found};
wrap_snapshot(Snap) -> {ok, Snap}.
%% @private List all snapshots for a stream, oldest version first.
do_list_snapshots(StreamId, #state{snapshots = All}) ->
snapshots_sorted(maps:get(StreamId, All, undefined)).
snapshots_sorted(undefined) ->
[];
snapshots_sorted(PerStream) ->
Versions = lists:sort(maps:keys(PerStream)),
[maps:get(V, PerStream) || V <- Versions].
%% @private Delete all snapshots for a stream. Idempotent.
do_delete_snapshots_for_stream(StreamId, #state{snapshots = All} = State) ->
State#state{snapshots = maps:remove(StreamId, All)}.
%%====================================================================
%% Internal — snapshot integrity (move 16)
%%====================================================================
%% @private Apply snapshot integrity if enabled on the store. For
%% integrity-disabled stores this is a pass-through. For
%% integrity-enabled stores: compute the anchor_hash from the
%% underlying event at the snapshot version, compute the snapshot
%% MAC, populate both fields. Refuses the save if no integrity-
%% bearing event exists at the snapshot version.
apply_snapshot_integrity(_StreamId, Snap, #state{integrity = disabled}) ->
{ok, Snap};
apply_snapshot_integrity(StreamId, #snapshot{version = V} = Snap,
#state{integrity = #{key := Key}, streams = Streams}) ->
seal_snapshot(compute_event_chain_hash(StreamId, V, Streams), Snap, Key).
seal_snapshot({ok, AnchorHash}, Snap, Key) ->
Snap1 = Snap#snapshot{anchor_hash = AnchorHash},
Mac = reckon_gater_integrity:compute_snapshot_mac(Snap1, Key),
{ok, Snap1#snapshot{mac = Mac}};
seal_snapshot({error, _} = Err, _Snap, _Key) ->
Err.
reply_save_snapshot({ok, Snap}, StreamId, State) ->
{reply, ok, do_save_snapshot(StreamId, Snap, State)};
reply_save_snapshot({error, _} = Err, _StreamId, State) ->
{reply, Err, State}.
%% @private Compute the chain hash of the event at (StreamId, Version).
%% Used to seed the anchor when saving a snapshot and to recompute
%% the actual anchor when verifying one.
compute_event_chain_hash(StreamId, Version, Streams) ->
chain_hash_for(
find_event_at(maps:get(StreamId, Streams, []), Version),
StreamId, Version).
chain_hash_for({ok, #event{prev_event_hash = Prev} = E}, _StreamId, _Version)
when is_binary(Prev) ->
{ok, reckon_gater_integrity:compute_chain_hash(E, Prev)};
chain_hash_for({ok, #event{prev_event_hash = undefined}}, StreamId, Version) ->
{error, {snapshot_anchor_unavailable,
#{stream_id => StreamId, version => Version,
reason => event_is_legacy}}};
chain_hash_for(_, StreamId, Version) ->
{error, {snapshot_anchor_unavailable,
#{stream_id => StreamId, version => Version,
reason => event_not_found}}}.
%% @private If integrity is enabled and the loaded snapshot carries
%% integrity fields, verify both the anchor and the MAC. Legacy
%% snapshots (no integrity fields) pass through. Integrity-disabled
%% stores skip verification entirely.
verify_snapshot_load({error, _} = E, _StreamId, _State) ->
E;
verify_snapshot_load({ok, _} = Ok, _StreamId, #state{integrity = disabled}) ->
Ok;
verify_snapshot_load({ok, Snap} = Ok, StreamId, State) ->
branch_on_snapshot_kind(
reckon_gater_integrity:is_legacy_snapshot(Snap),
Snap, Ok, StreamId, State).
branch_on_snapshot_kind(true, _Snap, Ok, _StreamId, _State) ->
Ok;
branch_on_snapshot_kind(false, Snap, Ok, StreamId,
#state{integrity = #{key := Key}, streams = Streams}) ->
verify_against_recomputed_anchor(
compute_event_chain_hash(StreamId, Snap#snapshot.version, Streams),
Snap, Ok, Key).
verify_against_recomputed_anchor({error, _} = Err, _Snap, _Ok, _Key) ->
Err;
verify_against_recomputed_anchor({ok, Anchor}, Snap, Ok, Key) ->
snapshot_verification_outcome(
reckon_gater_integrity:verify_snapshot(Snap, Anchor, Key), Ok).
snapshot_verification_outcome(ok, Ok) -> Ok;
snapshot_verification_outcome({integrity_violation, _} = V, _Ok) -> {error, V}.
%%====================================================================
%% Internal — integrity init
%%====================================================================
init_integrity(disabled) ->
{ok, disabled};
init_integrity(#{enabled := true, key := Key})
when is_binary(Key), byte_size(Key) =:= 32 ->
{ok, #{key => Key, chain_start => #{}}};
init_integrity(#{enabled := true, key := Key}) when is_binary(Key) ->
{error, {integrity_key_invalid_size, byte_size(Key)}};
init_integrity(_) ->
{error, integrity_config_invalid}.
%%====================================================================
%% Internal — write-path integrity (move 14)
%%====================================================================
%% @private Determine the integrity context for a single append batch.
%% For disabled stores this is the constant atom `disabled' (the
%% per-event code paths short-circuit on it). For enabled stores we
%% resolve (or lazily set) the per-stream chain_start watermark and
%% return it along with the key so the per-event helpers don't have
%% to keep hitting state.
-type integrity_ctx() ::
disabled |
{enabled, Key :: binary(), ChainStart :: non_neg_integer()}.
-spec setup_integrity_for_append(binary(), integer(), #state{}) ->
integrity_ctx().
setup_integrity_for_append(_StreamId, _CurrentVersion, #state{integrity = disabled}) ->
disabled;
setup_integrity_for_append(StreamId, CurrentVersion,
#state{integrity = #{key := Key, chain_start := Map}}) ->
NextVersion = CurrentVersion + 1,
%% Lazy enablement: if the stream has no watermark, the upcoming
%% append becomes the first integrity-bearing event in that stream.
{enabled, Key,
chain_start_or_default(maps:get(StreamId, Map, undefined), NextVersion)}.
chain_start_or_default(undefined, NextVersion) -> NextVersion;
chain_start_or_default(V, _NextVersion) -> V.
%% @private Persist the chain_start watermark set during setup. No-op
%% for stores where integrity is disabled.
persist_watermark(_StreamId, disabled, _Integrity) ->
disabled;
persist_watermark(StreamId, {enabled, _Key, ChainStart},
#{chain_start := Map} = Integrity) ->
Integrity#{chain_start := maps:put(StreamId, ChainStart, Map)}.
%% @private Resolve the chain-tip value that the first event in this
%% batch must reference as its `prev_event_hash'.
%%
%% Disabled: undefined (unused).
%% Enabled, first integrity event in stream: genesis.
%% Enabled, later batch: chain hash of the predecessor on disk.
-spec resolve_write_initial_tip(
binary(), non_neg_integer(), integrity_ctx(), #state{}
) -> binary() | undefined.
resolve_write_initial_tip(_StreamId, _NextVersion, disabled, _State) ->
undefined;
resolve_write_initial_tip(_StreamId, NextVersion, {enabled, _Key, ChainStart}, _State)
when NextVersion =:= ChainStart ->
reckon_gater_integrity:genesis_prev_hash();
resolve_write_initial_tip(StreamId, NextVersion, {enabled, _Key, ChainStart},
#state{streams = Streams})
when NextVersion > ChainStart ->
PrevVersion = NextVersion - 1,
write_tip_from_predecessor(
find_event_at(maps:get(StreamId, Streams, []), PrevVersion),
StreamId, PrevVersion, ChainStart).
write_tip_from_predecessor(
{ok, #event{prev_event_hash = PrevPrev} = PrevEvent},
_StreamId, _PrevVersion, _ChainStart) when is_binary(PrevPrev) ->
reckon_gater_integrity:compute_chain_hash(PrevEvent, PrevPrev);
write_tip_from_predecessor(_, StreamId, PrevVersion, ChainStart) ->
erlang:error({integrity_setup_failed,
#{stream_id => StreamId,
looking_for_version => PrevVersion,
chain_start => ChainStart}}).
find_event_at([], _Version) -> error;
find_event_at([#event{version = V} = E | _], V) -> {ok, E};
find_event_at([#event{version = V} | _], Target) when V > Target -> error;
find_event_at([_ | Rest], Target) -> find_event_at(Rest, Target).
%% @private Compute and attach integrity fields for one event. For
%% disabled stores this is a pass-through.
-spec apply_integrity_if_enabled(
event(), binary() | undefined, integrity_ctx()
) -> {event(), binary() | undefined}.
apply_integrity_if_enabled(Event, _Tip, disabled) ->
{Event, undefined};
apply_integrity_if_enabled(Event, Tip, {enabled, Key, _ChainStart})
when is_binary(Tip) ->
Event1 = Event#event{prev_event_hash = Tip},
Mac = reckon_gater_integrity:compute_event_mac(Event1, Key),
Event2 = Event1#event{mac = Mac},
NextTip = reckon_gater_integrity:compute_chain_hash(Event2, Tip),
{Event2, NextTip}.