-module(marina_pool_server).
-compile({no_auto_import,[nodes/2]}).
-include("marina_internal.hrl").
-export([
start_link/0
]).
%% metal callbacks
-export([
init/3,
handle_msg/2,
terminate/2
]).
-define(MSG_BOOTSTRAP, bootstrap_pool).
-record(state, {
bootstrap_ips :: list(),
datacenter :: undefined | binary(),
nodes :: [{binary(), binary()}],
port :: pos_integer(),
strategy :: random | token_aware,
timer_ref :: undefined | reference()
}).
-type state() :: #state {}.
%% public
-spec start_link() ->
{ok, pid()}.
start_link() ->
metal:start_link(?MODULE, ?MODULE, undefined).
%% metal callbacks
-spec init(atom(), pid(), undefined) ->
no_return().
init(_Name, _Parent, undefined) ->
BootstrapIps = ?GET_ENV(bootstrap_ips, ?DEFAULT_BOOTSTRAP_IPS),
Port = ?GET_ENV(port, ?DEFAULT_PORT),
Strategy = ?GET_ENV(strategy, ?DEFAULT_STRATEGY),
self() ! ?MSG_BOOTSTRAP,
{ok, #state {
bootstrap_ips = BootstrapIps,
nodes = [],
port = Port,
strategy = Strategy
}}.
-spec handle_msg(term(), state()) ->
{ok, state()}.
handle_msg(?MSG_BOOTSTRAP, #state {
bootstrap_ips = BootstrapIps,
nodes = OldNodes,
port = Port,
strategy = Strategy
} = State) ->
case nodes(BootstrapIps, Port) of
{ok, NewNodes} ->
ok = marina_pool:sync(Strategy, NewNodes, OldNodes),
{ok, State#state {
nodes = NewNodes
}};
{error, _Reason} ->
logger:warning("[~p] bootstrap failed~n", [?MODULE]),
{ok, State#state {
timer_ref = erlang:send_after(500, self(), ?MSG_BOOTSTRAP)
}}
end;
handle_msg({topology_full_sync, NewNodes}, #state {
nodes = OldNodes,
strategy = Strategy
} = State) ->
ok = marina_pool:sync(Strategy, NewNodes, OldNodes),
{ok, State#state {nodes = NewNodes}}.
-spec terminate(term(), state()) ->
ok.
terminate(_Reason, #state {nodes = Nodes}) ->
marina_pool:stop(length(Nodes)),
ok.
%% private
connect(Ip, Port) ->
case marina_utils:connect(Ip, Port) of
{ok, Socket} ->
case marina_utils:startup(Socket) of
{ok, undefined} ->
{ok, Socket};
{ok, <<"org.apache.cassandra.auth.PasswordAuthenticator">>} ->
case marina_utils:authenticate(Socket) of
ok ->
{ok, Socket};
{error, Reason} ->
gen_tcp:close(Socket),
{error, Reason}
end;
{error, Reason} ->
gen_tcp:close(Socket),
{error, Reason}
end;
{error, Reason} ->
{error, Reason}
end.
filter_datacenter([], _Datacenter) ->
[];
filter_datacenter([[RpcAddress, _Datacenter, Tokens] | T], undefined) ->
[{RpcAddress, Tokens} | filter_datacenter(T, undefined)];
filter_datacenter([[RpcAddress, Datacenter, Tokens] | T], Datacenter) ->
[{RpcAddress, Tokens} | filter_datacenter(T, Datacenter)];
filter_datacenter([_ | T], Datacenter) ->
filter_datacenter(T, Datacenter).
nodes([], _Port) ->
{error, bootstrap_failed};
nodes([Ip | T], Port) ->
case peers(Ip, Port) of
{ok, Rows, Datacenter} ->
case filter_datacenter(Rows, Datacenter) of
[] ->
nodes(T, Port);
Nodes ->
{ok, Nodes}
end;
{error, Reason} ->
logger:warning("[~p] bootstrap error: ~p~n", [?MODULE, Reason]),
nodes(T, Port)
end.
peers(Ip, Port) ->
case connect(Ip, Port) of
{ok, Socket} ->
peers_query(Socket);
{error, Reason} ->
{error, Reason}
end.
peers_query(Socket) ->
{ok, {result, _ , _, Rows}} = marina_utils:query(Socket, ?LOCAL_QUERY),
[[_RpcAddress, Datacenter, _Tokens]] = Rows,
{ok, {result, _ , _, Rows2}} = marina_utils:query(Socket, ?PEERS_QUERY),
{ok, Rows ++ Rows2, Datacenter}.