%% @doc Main CETS module.
%%
%% CETS stores data in-memory. Writes are replicated on all nodes across the cluster.
%% Reads are done locally.
%%
%% This module contains functions to write data. To read data, use an `ets' module
%% from the Erlang/OTP.
%%
%% The preferred key format is a node specific key (i.e. a key should contain the inserter
%% node name or a pid). This should simplify the cleaning logic. This also avoids the key conflicts
%% during the cluster join.
%%
%% A random key is a good option too. But the cleaning logic in the case of a netsplit could
%% be tricky. Also, if you update a record with a random key, you have to provide
%% a `handle_conflict' function on startup (because two segments in the cluster could
%% contain a new and an old version of the record, so the records would be overwritten incorrectly).
%%
%% Be careful, CETS does not protect you from records being overwritten.
%% It is a good option to provide `handle_conflict' function as a start argument, so you could
%% choose, which version of the record to keep, if there are two versions present in the different
%% cluster segments.
%%
%% Often we need to insert some key, if it is not presented yet. Use `insert_new' for this, it would
%% use a single node to serialize inserts.
%%
%% Check MongooseIM code for examples of usage of this module.
-module(cets).
-behaviour(gen_server).
-export([
start/2,
stop/1,
insert/2,
insert_many/2,
insert_new/2,
insert_new_or_lookup/2,
insert_serial/2,
delete/2,
delete_many/2,
delete_object/2,
delete_objects/2,
dump/1,
remote_dump/1,
send_dump/5,
table_name/1,
other_nodes/1,
get_nodes_request/1,
other_pids/1,
pause/1,
unpause/2,
get_leader/1,
set_leader/2,
ping_all/1,
ping/1,
info/1,
insert_request/2,
insert_many_request/2,
delete_request/2,
delete_many_request/2,
delete_object_request/2,
delete_objects_request/2,
wait_response/2,
wait_responses/2,
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).
-ignore_xref([
start/2,
stop/1,
insert/2,
insert_many/2,
insert_new/2,
insert_new_or_lookup/2,
insert_serial/2,
delete/2,
delete_many/2,
delete_object/2,
delete_objects/2,
pause/1,
unpause/2,
get_leader/1,
set_leader/2,
ping_all/1,
ping/1,
info/1,
other_nodes/1,
get_nodes_request/1,
insert_request/2,
insert_many_request/2,
delete_request/2,
delete_many_request/2,
delete_object_request/2,
delete_objects_request/2,
wait_response/2,
wait_responses/2
]).
-include_lib("kernel/include/logger.hrl").
-type server_pid() :: pid().
%% CETS pid.
-type server_ref() ::
server_pid()
| atom()
| {local, atom()}
| {global, term()}
| {via, module(), term()}.
%% CETS Process Reference.
-type request_id() :: gen_server:request_id().
%% Request Reference.
-type from() :: gen_server:from().
%% gen_server's caller.
-type join_ref() :: cets_join:join_ref().
%% An unique ID assigned during the table join attempt.
-type ack_pid() :: cets_ack:ack_pid().
%% Pid of the helper process that tracks unacked writes.
-type op() ::
{insert, tuple()}
| {delete, term()}
| {delete_object, term()}
| {insert_many, [tuple()]}
| {delete_many, [term()]}
| {delete_objects, [term()]}
| {insert_new, tuple()}
| {insert_new_or_lookup, tuple()}
| {leader_op, op()}.
%% Write operation type.
-type remote_op() ::
{remote_op, Op :: op(), From :: from(), AckPid :: ack_pid(), JoinRef :: join_ref()}.
%% Message broadcasted to other nodes.
-type backlog_entry() :: {op(), from()}.
%% Delayed operation.
-type table_name() :: atom().
%% ETS table name (and the process server name).
-type pause_monitor() :: reference().
%% Reference returned from `pause/1'.
-type servers() :: ordsets:ordset(server_pid()).
%% Ordered list of server pids.
-type node_down_event() :: #{node => node(), pid => pid(), reason => term()}.
-type state() :: #{
tab := table_name(),
keypos := pos_integer(),
ack_pid := ack_pid(),
join_ref := join_ref(),
%% Updated by set_other_servers/2 function only
other_servers := servers(),
leader := server_pid(),
is_leader := boolean(),
opts := start_opts(),
backlog := [backlog_entry()],
pause_monitors := [pause_monitor()],
node_down_history := [node_down_event()]
}.
%% gen_server's state.
-type long_msg() ::
pause
| ping
| remote_dump
| ping_all
| table_name
| get_info
| other_servers
| get_nodes
| {unpause, reference()}
| get_leader
| {set_leader, boolean()}
| {send_dump, servers(), join_ref(), pause_monitor(), [tuple()]}.
%% Types of gen_server calls.
-type info() :: #{
table := table_name(),
nodes := [node()],
other_servers := [pid()],
size := non_neg_integer(),
memory := non_neg_integer(),
ack_pid := ack_pid(),
join_ref := join_ref(),
opts := start_opts(),
node_down_history := [node_down_event()],
pause_monitors := [pause_monitor()]
}.
%% Status information returned `info/1'.
-type handle_down_fun() :: fun(
(
#{
remote_pid := server_pid(),
remote_node := node(),
table := table_name(),
is_leader := boolean()
}
) -> ok
).
%% Handler function which is called when the remote node goes down.
-type handle_conflict_fun() :: fun((tuple(), tuple()) -> tuple()).
%% Handler function which is called when we need to choose which record to keep during joining.
-type handle_wrong_leader() :: fun((#{from := from(), op := op(), server := server_pid()}) -> ok).
%% Handler function which is called when a leader operation is received by a non-leader (for debugging).
-type start_opts() :: #{
type => ordered_set | bag,
keypos => non_neg_integer(),
handle_down => handle_down_fun(),
handle_conflict => handle_conflict_fun(),
handle_wrong_leader => handle_wrong_leader()
}.
%% Options for `start/2' function.
-type response_return() :: {reply, Reply :: term()} | {error, {_, _}} | timeout.
%% Response return from `gen_server''s API. `Reply' is usually ok.
-type response_timeout() :: timeout() | {abs, integer()}.
%% Timeout to wait for response.
-export_type([
request_id/0,
op/0,
server_pid/0,
server_ref/0,
long_msg/0,
info/0,
table_name/0,
pause_monitor/0,
servers/0,
response_return/0,
response_timeout/0
]).
%% API functions
%% @doc Starts a process serving an ETS table.
%%
%% The process would be registered under `table_name()' name.
%%
%% Options:
%%
%% - `handle_down = fun(#{remote_pid := Pid, table := Tab})'
%%
%% Called when a remote node goes down. This function is called on all nodes
%% in the remaining partition, so you should call the remote nodes
%% from this function. Otherwise a circular locking could happen.
%% i.e. any functions that replicate changes are not allowed (i.e. `cets:insert/2',
%% `cets:remove/2' and so on).
%% Use `ets' module to handle the cleaning (i.e. `ets:match_delete/2').
%% Use spawn to make a new async process if you need to update the data on the
%% remote nodes, but it could cause an improper cleaning due to the race conditions.
%%
%% - `handle_conflict = fun(Record1, Record2) -> NewRecord'
%%
%% Called when two records have the same key when clustering.
%% `NewRecord' would be the record CETS would keep in the table under the key.
%% Does not work for bags.
%%
%% We recommend to define that function if keys could have conflicts.
%% This function would be called once for each conflicting key.
%% We recommend to keep that function pure (or at least no blocking calls from it).
-spec start(table_name(), start_opts()) -> gen_server:start_ret().
start(Tab, Opts) when is_atom(Tab) ->
case check_opts(Opts) of
[] ->
gen_server:start({local, Tab}, ?MODULE, {Tab, Opts}, []);
Errors ->
{error, Errors}
end.
%% @doc Stops a CETS server.
-spec stop(server_ref()) -> ok.
stop(Server) ->
gen_server:stop(Server).
%% @doc Gets all records from a local ETS table.
-spec dump(table_name()) -> Records :: [tuple()].
dump(Tab) ->
ets:tab2list(Tab).
%% @doc Gets all records from a remote ETS table.
-spec remote_dump(server_ref()) -> {ok, Records :: [tuple()]}.
remote_dump(Server) ->
cets_call:long_call(Server, remote_dump).
%% @doc Returns a table name, that the server is serving.
-spec table_name(server_ref()) -> {ok, table_name()}.
table_name(Tab) when is_atom(Tab) ->
{ok, Tab};
table_name(Server) ->
cets_call:long_call(Server, table_name).
%% Sends dump, used in `cets_join'.
%% @private
-spec send_dump(server_ref(), servers(), join_ref(), pause_monitor(), [tuple()]) ->
ok | {error, ignored}.
send_dump(Server, NewPids, JoinRef, PauseRef, OurDump) ->
Info = #{msg => send_dump, join_ref => JoinRef, count => length(OurDump)},
cets_call:long_call(Server, {send_dump, NewPids, JoinRef, PauseRef, OurDump}, Info).
%% @doc Inserts (or overwrites) a tuple into a table.
%%
%% Only the node that owns the data could update/remove the data.
%% Ideally, Key should contain inserter node info so cleaning and merging is simplified.
-spec insert(server_ref(), tuple()) -> ok.
insert(Server, Rec) when is_tuple(Rec) ->
cets_call:sync_operation(Server, {insert, Rec}).
%% @doc Inserts (or overwrites) several tuples into a table.
-spec insert_many(server_ref(), list(tuple())) -> ok.
insert_many(Server, Records) when is_list(Records) ->
cets_call:sync_operation(Server, {insert_many, Records}).
%% @doc Tries to insert a new record.
%%
%% All inserts are sent to the leader node first.
%% It is a slightly slower comparing to just insert, because
%% extra messaging is required.
-spec insert_new(server_ref(), tuple()) -> WasInserted :: boolean().
insert_new(Server, Rec) when is_tuple(Rec) ->
Res = cets_call:send_leader_op(Server, {insert_new, Rec}),
handle_insert_new_result(Res).
handle_insert_new_result(ok) -> true;
handle_insert_new_result({error, rejected}) -> false.
%% @doc Inserts a new tuple or returns an existing one.
-spec insert_new_or_lookup(server_ref(), tuple()) -> {WasInserted, ReadRecords} when
WasInserted :: boolean(),
ReadRecords :: [tuple()].
insert_new_or_lookup(Server, Rec) when is_tuple(Rec) ->
Res = cets_call:send_leader_op(Server, {insert_new_or_lookup, Rec}),
handle_insert_new_or_lookup(Res, Rec).
handle_insert_new_or_lookup(ok, Rec) ->
{true, [Rec]};
handle_insert_new_or_lookup({error, {rejected, Recs}}, _CandidateRec) ->
{false, Recs}.
%% @doc Serialized version of `insert/2'.
%%
%% All `insert_serial' calls are sent to the leader node first.
%%
%% Similar to `insert_new/2', but overwrites the data silently on conflict.
%% It could be used to update entries, which use not node-specific keys.
-spec insert_serial(server_ref(), tuple()) -> ok.
insert_serial(Server, Rec) when is_tuple(Rec) ->
ok = cets_call:send_leader_op(Server, {insert, Rec}).
%% @doc Removes an object with the key from all nodes in the cluster.
%%
%% Ideally, nodes should only remove data that they've inserted, not data from another node.
%% @see delete_many/2
%% @see delete_request/2
-spec delete(server_ref(), term()) -> ok.
delete(Server, Key) ->
cets_call:sync_operation(Server, {delete, Key}).
%% @doc Removes a specific object. Useful to remove data from ETS tables of type `bag'.
-spec delete_object(server_ref(), tuple()) -> ok.
delete_object(Server, Object) ->
cets_call:sync_operation(Server, {delete_object, Object}).
%% @doc Removes multiple objects using a list of keys.
%% @see delete/2
-spec delete_many(server_ref(), [term()]) -> ok.
delete_many(Server, Keys) ->
cets_call:sync_operation(Server, {delete_many, Keys}).
%% @doc Removes multiple specific tuples.
-spec delete_objects(server_ref(), [tuple()]) -> ok.
delete_objects(Server, Objects) ->
cets_call:sync_operation(Server, {delete_objects, Objects}).
%% @doc Async `insert/2' call.
-spec insert_request(server_ref(), tuple()) -> request_id().
insert_request(Server, Rec) ->
cets_call:async_operation(Server, {insert, Rec}).
%% @doc Async `insert_many/2' call.
-spec insert_many_request(server_ref(), [tuple()]) -> request_id().
insert_many_request(Server, Records) ->
cets_call:async_operation(Server, {insert_many, Records}).
%% @doc Async `delete/2' call.
-spec delete_request(server_ref(), term()) -> request_id().
delete_request(Server, Key) ->
cets_call:async_operation(Server, {delete, Key}).
%% @doc Async `delete_object/2' call.
-spec delete_object_request(server_ref(), tuple()) -> request_id().
delete_object_request(Server, Object) ->
cets_call:async_operation(Server, {delete_object, Object}).
%% @doc Async `delete_many/2' call.
-spec delete_many_request(server_ref(), [term()]) -> request_id().
delete_many_request(Server, Keys) ->
cets_call:async_operation(Server, {delete_many, Keys}).
%% @doc Async `delete_objects/2' call.
-spec delete_objects_request(server_ref(), [tuple()]) -> request_id().
delete_objects_request(Server, Objects) ->
cets_call:async_operation(Server, {delete_objects, Objects}).
%% @doc Waits for the result of the async operation.
-spec wait_response(request_id(), timeout()) -> response_return().
wait_response(ReqId, Timeout) ->
gen_server:wait_response(ReqId, Timeout).
%% @doc Waits for multiple responses.
%%
%% Returns results in the same order as `ReqIds'.
%% Blocks for maximum `Timeout' milliseconds.
-spec wait_responses([request_id()], response_timeout()) ->
[response_return()].
wait_responses(ReqIds, Timeout) ->
cets_call:wait_responses(ReqIds, Timeout).
%% @doc Gets the pid of the process, which handles `insert_new' operations.
-spec get_leader(server_ref()) -> server_pid().
get_leader(Tab) when is_atom(Tab) ->
%% Optimization: replace call with ETS lookup
try
cets_metadata:get(Tab, leader)
catch
error:badarg ->
%% Failed to get from metadata,
%% Retry by calling the server process.
%% Most likely would fail too, but we want the same error format
%% when calling get_leader using either atom() or pid()
gen_server:call(Tab, get_leader)
end;
get_leader(Server) ->
gen_server:call(Server, get_leader).
%% @doc Get a list of other nodes in the cluster that are connected together.
-spec other_nodes(server_ref()) -> ordsets:ordset(node()).
other_nodes(Server) ->
lists:usort(pids_to_nodes(other_pids(Server))).
%% @doc Async `get_nodes/1' call.
-spec get_nodes_request(server_ref()) -> request_id().
get_nodes_request(Server) ->
gen_server:send_request(Server, get_nodes).
%% @doc Gets a list of other CETS processes that are handling this table.
-spec other_pids(server_ref()) -> servers().
other_pids(Server) ->
cets_call:long_call(Server, other_servers).
%% @doc Pauses update operations.
%%
%% `cets:insert/2' and other functions would block, till the server is unpaused.
-spec pause(server_ref()) -> pause_monitor().
pause(Server) ->
cets_call:long_call(Server, pause).
%% @doc Unpauses update operations.
%%
%% Provide reference, returned from `cets:pause/1' as an argument.
-spec unpause(server_ref(), pause_monitor()) -> ok | {error, unknown_pause_monitor}.
unpause(Server, PauseRef) ->
cets_call:long_call(Server, {unpause, PauseRef}).
%% @doc Sets `is_leader' field in the state.
%%
%% For debugging only.
%% Setting in in the real life would break leader election logic.
-spec set_leader(server_ref(), boolean()) -> ok.
set_leader(Server, IsLeader) ->
cets_call:long_call(Server, {set_leader, IsLeader}).
%% @doc Waits till all pending operations are applied.
-spec ping_all(server_ref()) -> ok | {error, [{server_pid(), Reason :: term()}]}.
ping_all(Server) ->
cets_call:long_call(Server, ping_all).
%% @doc Blocks until all pending Erlang messages are processed by the `Server'.
-spec ping(server_ref()) -> pong.
ping(Server) ->
cets_call:long_call(Server, ping).
%% @doc Returns debug information from the server.
-spec info(server_ref()) -> info().
info(Server) ->
cets_call:long_call(Server, get_info).
%% gen_server callbacks
%% @private
-spec init({table_name(), start_opts()}) -> {ok, state()}.
init({Tab, Opts}) ->
process_flag(message_queue_data, off_heap),
Type = maps:get(type, Opts, ordered_set),
KeyPos = maps:get(keypos, Opts, 1),
%% Match result to prevent the Dialyzer warning
_ = ets:new(Tab, [Type, named_table, public, {keypos, KeyPos}]),
cets_metadata:init(Tab),
cets_metadata:set(Tab, leader, self()),
{ok, AckPid} = cets_ack:start_link(Tab),
{ok, #{
tab => Tab,
keypos => KeyPos,
ack_pid => AckPid,
other_servers => [],
%% Initial join_ref is random
join_ref => make_ref(),
leader => self(),
is_leader => true,
opts => Opts,
backlog => [],
pause_monitors => [],
node_down_history => []
}}.
%% @private
-spec handle_call(long_msg() | {op, op()}, from(), state()) ->
{noreply, state()} | {reply, term(), state()}.
handle_call({op, Op}, From, State = #{pause_monitors := []}) ->
handle_op(Op, From, State),
{noreply, State};
handle_call({op, Op}, From, State = #{pause_monitors := [_ | _], backlog := Backlog}) ->
%% Backlog is a list of pending operations, when our server is paused.
%% The list would be applied, once our server is unpaused.
{noreply, State#{backlog := [{Op, From} | Backlog]}};
handle_call(other_servers, _From, State = #{other_servers := Servers}) ->
{reply, Servers, State};
handle_call(get_nodes, _From, State = #{other_servers := Servers}) ->
{reply, lists:usort([node() | pids_to_nodes(Servers)]), State};
handle_call(get_leader, _From, State = #{leader := Leader}) ->
{reply, Leader, State};
handle_call(ping_all, From, State = #{other_servers := Servers}) ->
%% Do spawn to avoid any possible deadlocks
proc_lib:spawn(fun() ->
%% If ping crashes, the caller would not receive a reply.
%% So, we have to use catch to still able to reply with ok.
Results = lists:map(fun(Server) -> {Server, catch ping(Server)} end, Servers),
BadResults = [Res || {_Server, Result} = Res <- Results, Result =/= pong],
case BadResults of
[] ->
gen_server:reply(From, ok);
_ ->
gen_server:reply(From, {error, BadResults})
end
end),
{noreply, State};
handle_call(ping, _From, State) ->
{reply, pong, State};
handle_call(table_name, _From, State = #{tab := Tab}) ->
{reply, {ok, Tab}, State};
handle_call(remote_dump, From, State = #{tab := Tab}) ->
%% Do not block the main process (also reduces GC of the main process)
proc_lib:spawn_link(fun() -> gen_server:reply(From, {ok, dump(Tab)}) end),
{noreply, State};
handle_call({send_dump, NewPids, JoinRef, PauseRef, Dump}, _From, State) ->
handle_send_dump(NewPids, JoinRef, PauseRef, Dump, State);
handle_call(pause, _From = {FromPid, _}, State = #{pause_monitors := Mons}) ->
%% We monitor who pauses our server
Mon = erlang:monitor(process, FromPid),
{reply, Mon, State#{pause_monitors := [Mon | Mons]}};
handle_call({unpause, Ref}, _From, State) ->
handle_unpause(Ref, State);
handle_call({set_leader, IsLeader}, _From, State) ->
{reply, ok, State#{is_leader := IsLeader}};
handle_call(get_info, _From, State) ->
{reply, handle_get_info(State), State}.
%% @private
-spec handle_cast(term(), state()) -> {noreply, state()}.
handle_cast(Msg, State) ->
?LOG_ERROR(#{what => unexpected_cast, msg => Msg}),
{noreply, State}.
%% @private
-spec handle_info(term(), state()) -> {noreply, state()}.
handle_info({remote_op, Op, From, AckPid, JoinRef}, State) ->
handle_remote_op(Op, From, AckPid, JoinRef, State),
{noreply, State};
handle_info({'DOWN', Mon, process, Pid, Reason}, State) ->
{noreply, handle_down(Mon, Pid, Reason, State)};
handle_info({check_server, FromPid, JoinRef}, State) ->
{noreply, handle_check_server(FromPid, JoinRef, State)};
handle_info(Msg, State) ->
?LOG_ERROR(#{what => unexpected_info, msg => Msg}),
{noreply, State}.
%% @private
terminate(_Reason, _State = #{ack_pid := AckPid}) ->
ok = gen_server:stop(AckPid).
%% @private
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% Internal logic
-spec handle_send_dump(servers(), join_ref(), pause_monitor(), [tuple()], state()) ->
{reply, ok, state()}.
handle_send_dump(NewPids, JoinRef, PauseRef, Dump, State) ->
#{tab := Tab, other_servers := Servers, pause_monitors := PauseMons} = State,
case lists:member(PauseRef, PauseMons) of
true ->
ets:insert(Tab, Dump),
Servers2 = add_servers(NewPids, Servers),
{reply, ok, set_other_servers(Servers2, State#{join_ref := JoinRef})};
false ->
?LOG_ERROR(#{
what => send_dump_received_when_unpaused,
text => <<"Received send_dump message while in the unpaused state. Ignore it">>,
join_ref => JoinRef,
pause_ref => PauseRef,
state => State
}),
{reply, {error, ignored}, State}
end.
-spec handle_down(reference(), pid(), term(), state()) -> state().
handle_down(Mon, Pid, Reason, State = #{pause_monitors := Mons}) ->
case lists:member(Mon, Mons) of
true ->
%% Ignore logging if the process exited normally
case Reason of
normal ->
ok;
_ ->
?LOG_ERROR(#{
what => pause_owner_crashed,
state => State,
paused_by_pid => Pid,
reason => Reason
})
end,
handle_unpause2(Mon, Mons, State);
false ->
handle_down2(Pid, Reason, State)
end.
-spec handle_down2(pid(), term(), state()) -> state().
handle_down2(RemotePid, Reason, State = #{other_servers := Servers, ack_pid := AckPid}) ->
case lists:member(RemotePid, Servers) of
true ->
cets_ack:send_remote_down(AckPid, RemotePid),
Servers2 = lists:delete(RemotePid, Servers),
State3 = update_node_down_history(
RemotePid, Reason, set_other_servers(Servers2, State)
),
call_user_handle_down(RemotePid, State3),
State3;
false ->
%% This should not happen
?LOG_ERROR(#{
what => handle_down_failed,
remote_pid => RemotePid,
state => State
}),
State
end.
update_node_down_history(RemotePid, Reason, State = #{node_down_history := History}) ->
Item = #{node => node(RemotePid), pid => RemotePid, reason => Reason},
State#{node_down_history := [Item | History]}.
%% Merge two lists of pids, create the missing monitors.
-spec add_servers(Servers, Servers) -> Servers when Servers :: servers().
add_servers(Pids, Servers) ->
%% Ignore ourself in the list
%% Also filter out already added servers
OtherServers = lists:delete(self(), lists:usort(Pids)),
NewServers = ordsets:subtract(OtherServers, Servers),
case ordsets:intersection(OtherServers, Servers) of
[] ->
ok;
Overlap ->
%% Should not happen (cets_join checks for it)
%% Still log it, if that happens
?LOG_ERROR(#{
what => already_added,
already_added_servers => Overlap,
pids => Pids,
servers => Servers
})
end,
[erlang:monitor(process, Pid) || Pid <- NewServers],
ordsets:union(NewServers, Servers).
%% Sets other_servers field, chooses the leader
-spec set_other_servers(servers(), state()) -> state().
set_other_servers(Servers, State = #{tab := Tab, ack_pid := AckPid}) ->
%% Choose process with highest pid.
%% Uses total ordering of terms in Erlang
%% (so all nodes would choose the same leader).
%% The leader node would not receive that much extra load.
Leader = lists:max([self() | Servers]),
IsLeader = Leader =:= self(),
cets_metadata:set(Tab, leader, Leader),
%% Ask the ack process to use this list of servers as the source of replies
%% for all new cets_ack:add/2 calls
cets_ack:set_servers(AckPid, Servers),
State#{leader := Leader, is_leader := IsLeader, other_servers := Servers}.
-spec pids_to_nodes([pid()]) -> [node()].
pids_to_nodes(Pids) ->
lists:map(fun node/1, Pids).
%% ETS returns booleans instead of ok, because ETS API is old and inspired by Prolog.
%% So, match the API logic here.
-spec ets_delete_keys(table_name(), [term()]) -> true.
ets_delete_keys(Tab, Keys) ->
[ets:delete(Tab, Key) || Key <- Keys],
true.
-spec ets_delete_objects(table_name(), [tuple()]) -> true.
ets_delete_objects(Tab, Objects) ->
[ets:delete_object(Tab, Object) || Object <- Objects],
true.
%% Handle operation from a remote node
-spec handle_remote_op(op(), from(), ack_pid(), join_ref(), state()) -> ok.
handle_remote_op(Op, From, AckPid, JoinRef, State = #{join_ref := JoinRef}) ->
do_op(Op, State),
cets_ack:ack(AckPid, From, self());
handle_remote_op(Op, From, AckPid, RemoteJoinRef, #{join_ref := JoinRef}) ->
?LOG_ERROR(#{
what => drop_remote_op,
from => From,
remote_join_ref => RemoteJoinRef,
join_ref => JoinRef,
op => Op
}),
%% We still need to reply to the remote process so it could stop waiting
cets_ack:ack(AckPid, From, self()).
%% Apply operation for one local table only
-spec do_op(op(), state()) -> boolean().
do_op(Op, #{tab := Tab}) ->
do_table_op(Op, Tab).
-spec do_table_op(op(), table_name()) -> boolean().
do_table_op({insert, Rec}, Tab) ->
ets:insert(Tab, Rec);
do_table_op({delete, Key}, Tab) ->
ets:delete(Tab, Key);
do_table_op({delete_object, Object}, Tab) ->
ets:delete_object(Tab, Object);
do_table_op({insert_many, Recs}, Tab) ->
ets:insert(Tab, Recs);
do_table_op({delete_many, Keys}, Tab) ->
ets_delete_keys(Tab, Keys);
do_table_op({delete_objects, Objects}, Tab) ->
ets_delete_objects(Tab, Objects);
do_table_op({insert_new, Rec}, Tab) ->
ets:insert_new(Tab, Rec);
do_table_op({insert_new_or_lookup, Rec}, Tab) ->
ets:insert_new(Tab, Rec).
%% Handle operation locally and replicate it across the cluster
-spec handle_op(op(), from(), state()) -> ok.
handle_op({leader_op, Op}, From, State) ->
handle_leader_op(Op, From, State);
handle_op(Op, From, State) ->
do_op(Op, State),
replicate(Op, From, State).
-spec rejected_result(op(), state()) -> term().
rejected_result({insert_new, _Rec}, _State) ->
{error, rejected};
rejected_result({insert_new_or_lookup, Rec}, #{keypos := KeyPos, tab := Tab}) ->
Key = element(KeyPos, Rec),
%% Return a list of records, because the table could be a bag
Recs = ets:lookup(Tab, Key),
{error, {rejected, Recs}}.
-spec handle_leader_op(op(), from(), state()) -> ok.
handle_leader_op(Op, From, State = #{is_leader := true}) ->
case do_op(Op, State) of
%% Skip the replication - insert_new returns false.
false ->
gen_server:reply(From, rejected_result(Op, State)),
ok;
true ->
replicate(Op, From, State)
end;
handle_leader_op(Op, From, State = #{leader := Leader}) ->
%% Reject operation - not a leader
gen_server:reply(From, {error, {wrong_leader, Leader}}),
%% Call an user defined callback to notify about the error
handle_wrong_leader(Op, From, State).
-spec replicate(op(), from(), state()) -> ok.
replicate(_Op, From, #{other_servers := []}) ->
%% Skip replication
gen_server:reply(From, ok);
replicate(Op, From, #{ack_pid := AckPid, other_servers := Servers, join_ref := JoinRef}) ->
cets_ack:add(AckPid, From),
RemoteOp = {remote_op, Op, From, AckPid, JoinRef},
[send_remote_op(Server, RemoteOp) || Server <- Servers],
%% AckPid would call gen_server:reply(From, ok) once all the remote servers reply
ok.
-spec send_remote_op(server_pid(), remote_op()) -> ok.
send_remote_op(RemotePid, RemoteOp) ->
erlang:send(RemotePid, RemoteOp, [noconnect]),
ok.
-spec apply_backlog(state()) -> state().
apply_backlog(State = #{backlog := Backlog}) ->
[handle_op(Op, From, State) || {Op, From} <- lists:reverse(Backlog)],
State#{backlog := []}.
%% We support multiple pauses
%% Only when all pause requests are unpaused we continue
-spec handle_unpause(pause_monitor(), state()) -> {reply, Reply, state()} when
Reply :: ok | {error, unknown_pause_monitor}.
handle_unpause(Mon, State = #{pause_monitors := Mons}) ->
case lists:member(Mon, Mons) of
true ->
{reply, ok, handle_unpause2(Mon, Mons, State)};
false ->
{reply, {error, unknown_pause_monitor}, State}
end.
-spec handle_unpause2(pause_monitor(), [pause_monitor()], state()) -> state().
handle_unpause2(Mon, Mons, State) ->
erlang:demonitor(Mon, [flush]),
Mons2 = lists:delete(Mon, Mons),
State2 = State#{pause_monitors := Mons2},
case Mons2 of
[] ->
send_check_servers(State2),
apply_backlog(State2);
_ ->
State2
end.
-spec send_check_servers(state()) -> ok.
send_check_servers(#{join_ref := JoinRef, other_servers := OtherPids}) ->
[send_check_server(Pid, JoinRef) || Pid <- OtherPids],
ok.
%% Send check_server before sending any new remote_op messages,
%% so the remote node has a chance to disconnect from us
%% (i.e. remove our pid from other_servers list and not allow remote ops)
-spec send_check_server(pid(), reference()) -> ok.
send_check_server(Pid, JoinRef) ->
Pid ! {check_server, self(), JoinRef},
ok.
%% That could actually arrive before we get fully unpaused
%% (though cets_join:pause_on_remote_node/2 would ensure that CETS server
%% would send check_server only after cets_join is down
%% and does not send new send_dump messages)
handle_check_server(_FromPid, JoinRef, State = #{join_ref := JoinRef}) ->
%% check_server passed - do nothing
State;
handle_check_server(FromPid, RemoteJoinRef, State = #{join_ref := JoinRef}) ->
?LOG_WARNING(#{
what => cets_check_server_failed,
text => <<"Disconnect the remote server">>,
remote_pid => FromPid,
remote_join_ref => RemoteJoinRef,
join_ref => JoinRef
}),
%% Ask the remote server to disconnect from us
Reason = {check_server_failed, {RemoteJoinRef, JoinRef}},
FromPid ! {'DOWN', make_ref(), process, self(), Reason},
State.
-spec handle_get_info(state()) -> info().
handle_get_info(
#{
tab := Tab,
other_servers := Servers,
ack_pid := AckPid,
join_ref := JoinRef,
node_down_history := DownHistory,
pause_monitors := PauseMons,
opts := Opts
}
) ->
#{
table => Tab,
nodes => lists:usort(pids_to_nodes([self() | Servers])),
other_servers => Servers,
size => ets:info(Tab, size),
memory => ets:info(Tab, memory),
ack_pid => AckPid,
join_ref => JoinRef,
node_down_history => DownHistory,
pause_monitors => PauseMons,
opts => Opts
}.
%% Cleanup
-spec call_user_handle_down(server_pid(), state()) -> ok.
call_user_handle_down(RemotePid, #{tab := Tab, opts := Opts, is_leader := IsLeader}) ->
case Opts of
#{handle_down := F} ->
FF = fun() ->
F(#{
remote_pid => RemotePid,
remote_node => node(RemotePid),
table => Tab,
is_leader => IsLeader
})
end,
Info = #{
task => call_user_handle_down,
table => Tab,
remote_pid => RemotePid,
remote_node => node(RemotePid)
},
%% Errors would be logged inside run_tracked
catch cets_long:run_tracked(Info, FF);
_ ->
ok
end.
-spec handle_wrong_leader(op(), from(), state()) -> ok.
handle_wrong_leader(Op, From, #{opts := #{handle_wrong_leader := F}}) ->
%% It is used for debugging/logging
%% Do not do anything heavy here
catch F(#{from => From, op => Op, server => self()}),
ok;
handle_wrong_leader(_Op, _From, _State) ->
ok.
-type start_error() :: bag_with_conflict_handler.
-spec check_opts(start_opts()) -> [start_error()].
check_opts(#{handle_conflict := _, type := bag}) ->
[bag_with_conflict_handler];
check_opts(_) ->
[].