src/mem_evoq_adapter.erl

%% @doc Adapter module implementing the `evoq_event_store' contract.
%%
%% Configured into evoq via:
%%
%% ```
%% application:set_env(evoq, event_store_adapter, mem_evoq_adapter).
%% '''
%%
%% Every callback looks up the store pid in {@link mem_evoq_registry}
%% and forwards via `gen_server:call/2'. The store itself
%% ({@link mem_evoq_store}) holds the actual state.
%%
%% This module is the public stable surface. The store gen_server's
%% callback contract may evolve internally; the adapter's exports must
%% not change without bumping the package's major version.
%% @end
-module(mem_evoq_adapter).

-include_lib("reckon_gater/include/reckon_gater_types.hrl").
-include_lib("evoq/include/evoq_types.hrl").

-export([
    %% Write path
    append/4,

    %% Read path
    read/5,
    read/6,
    read_all/3,
    read_all/4,
    read_all_global/3,
    read_by_event_types/3,
    read_by_tags/3,
    read_by_tags/4,

    %% Metadata
    version/2,
    exists/2,
    has_events/1,
    list_streams/1,
    delete/2,

    %% Snapshots
    save_snapshot/3,
    save_snapshot/5,
    load_snapshot/2,
    load_snapshot_at/3,
    list_snapshots/2,
    delete_snapshot/2,

    %% Subscriptions
    subscribe/4,
    subscribe_all/3,
    unsubscribe/2
]).

%%====================================================================
%% Write
%%====================================================================

append(StoreId, StreamId, ExpectedVersion, Events) ->
    call(StoreId, {append, StreamId, ExpectedVersion, Events}).

%%====================================================================
%% Read
%%====================================================================

read(StoreId, StreamId, FromVersion, Count, Direction) ->
    translate_events(call(StoreId, {read, StreamId, FromVersion, Count, Direction})).

read(StoreId, StreamId, FromVersion, Count, Direction, Opts) ->
    translate_events(call(StoreId, {read, StreamId, FromVersion, Count, Direction, Opts})).

read_all(StoreId, StreamId, Direction) ->
    translate_events(call(StoreId, {read_all, StreamId, Direction, 1000})).

read_all(StoreId, StreamId, Direction, BatchSize) ->
    translate_events(call(StoreId, {read_all, StreamId, Direction, BatchSize})).

read_all_global(StoreId, Offset, BatchSize) ->
    translate_events(call(StoreId, {read_all_global, Offset, BatchSize})).

read_by_event_types(StoreId, EventTypes, BatchSize) ->
    translate_events(call(StoreId, {read_by_event_types, EventTypes, BatchSize})).

read_by_tags(StoreId, Tags, BatchSize) ->
    read_by_tags(StoreId, Tags, any, BatchSize).

read_by_tags(StoreId, Tags, Match, BatchSize) ->
    translate_events(call(StoreId, {read_by_tags, Tags, Match, BatchSize})).

%%====================================================================
%% Metadata
%%====================================================================

version(StoreId, StreamId) ->
    call(StoreId, {version, StreamId}).

exists(StoreId, StreamId) ->
    boolean_or_false(call(StoreId, {exists, StreamId})).

has_events(StoreId) ->
    boolean_or_false(call(StoreId, has_events)).

boolean_or_false(true) -> true;
boolean_or_false(_)    -> false.

list_streams(StoreId) ->
    call(StoreId, list_streams).

delete(StoreId, StreamId) ->
    call(StoreId, {delete, StreamId}).

%%====================================================================
%% Snapshots
%%====================================================================

save_snapshot(StoreId, StreamId, Snapshot) ->
    call(StoreId, {save_snapshot, StreamId, Snapshot}).

save_snapshot(StoreId, StreamId, Version, Data, Metadata) ->
    call(StoreId, {save_snapshot, StreamId, Version, Data, Metadata}).

load_snapshot(StoreId, StreamId) ->
    call(StoreId, {load_snapshot, StreamId}).

load_snapshot_at(StoreId, StreamId, Version) ->
    call(StoreId, {load_snapshot, StreamId, Version}).

list_snapshots(StoreId, StreamId) ->
    call(StoreId, {list_snapshots, StreamId}).

delete_snapshot(StoreId, StreamId) ->
    call(StoreId, {delete_snapshot, StreamId}).

%%====================================================================
%% Subscriptions
%%====================================================================

subscribe(StoreId, StreamId, Pid, Opts) ->
    Bridge = bridge_for(Pid),
    call(StoreId, {subscribe, StreamId, Bridge, Opts}).

subscribe_all(StoreId, Pid, Opts) ->
    Bridge = bridge_for(Pid),
    call(StoreId, {subscribe_all, Bridge, Opts}).

unsubscribe(StoreId, SubKey) ->
    call(StoreId, {unsubscribe, SubKey}).

%%====================================================================
%% Internal — subscription bridge
%%====================================================================
%%
%% Subscribers ask for {events, [#evoq_event{}]}; the store sends
%% {events, [#event{}]}. Interpose a bridge process per subscription
%% that translates between the two — same pattern as reckon-evoq.
%%
%% subscription_error messages (e.g. integrity violations during
%% catch-up) pass through unchanged.

bridge_for(Pid) when is_pid(Pid) ->
    spawn(fun() -> bridge_loop(Pid, erlang:monitor(process, Pid)) end).

bridge_loop(Subscriber, MonRef) ->
    receive
        {events, Events} when is_list(Events) ->
            Subscriber ! {events, events_to_evoq(Events)},
            bridge_loop(Subscriber, MonRef);
        {subscription_error, _} = Msg ->
            Subscriber ! Msg,
            bridge_loop(Subscriber, MonRef);
        {'DOWN', MonRef, process, Subscriber, _Reason} ->
            ok
    end.

%%====================================================================
%% Internal — store lookup + call
%%====================================================================

call(StoreId, Request) ->
    dispatch_call(mem_evoq_registry:lookup(StoreId), StoreId, Request).

dispatch_call({error, not_found}, StoreId, _Request) ->
    {error, {store_not_started, StoreId}};
dispatch_call({ok, Pid}, StoreId, Request) ->
    try gen_server:call(Pid, Request, 5000)
    catch
        exit:{noproc, _}  -> {error, {store_not_running, StoreId}};
        exit:{timeout, _} -> {error, {timeout, StoreId}}
    end.

%%====================================================================
%% Internal — event translation at the adapter boundary
%%====================================================================
%%
%% Storage (mem_evoq_store) holds reckon_gater #event{} records. The
%% evoq_event_store contract returns #evoq_event{} (or maps). Translate
%% on the way out — same pattern as reckon_evoq_adapter:event_to_evoq/1.
%%
%% `mac' and `signature' are intentionally NOT propagated. They are
%% storage-layer concerns; consumers downstream of evoq don't need
%% them and propagating would leak them into projections and process
%% managers that have no reason to see them.

translate_events({ok, Events}) when is_list(Events) -> {ok, events_to_evoq(Events)};
translate_events(Other)                              -> Other.

events_to_evoq(Events) -> [event_to_evoq(E) || E <- Events].

event_to_evoq(#event{
    event_id              = EventId,
    event_type            = EventType,
    stream_id             = StreamId,
    version               = Version,
    data                  = Data,
    metadata              = Metadata,
    tags                  = Tags,
    timestamp             = Timestamp,
    epoch_us              = EpochUs,
    data_content_type     = DataContentType,
    metadata_content_type = MetadataContentType,
    prev_event_hash       = PrevEventHash
}) ->
    #evoq_event{
        event_id              = EventId,
        event_type            = EventType,
        stream_id             = StreamId,
        version               = Version,
        data                  = Data,
        metadata              = Metadata,
        tags                  = Tags,
        timestamp             = Timestamp,
        epoch_us              = EpochUs,
        data_content_type     = DataContentType,
        metadata_content_type = MetadataContentType,
        prev_event_hash       = PrevEventHash
    }.