-module(marina_pool).
-include("marina_internal.hrl").
-compile({no_auto_import, [
node/1
]}).
-export([
init/0,
node/0,
node/1,
node_id/1,
start/2,
stop/1,
sync/3
]).
%% public
-spec init() ->
ok.
init() ->
foil:new(?MODULE),
foil:load(?MODULE).
-spec node() ->
{ok, atom()} | {error, marina_pool_not_started}.
node() ->
node(undefined).
-spec node(binary() | undefined) ->
{ok, atom()} | {error, marina_pool_not_started}.
node(RoutingKey) ->
case foil:lookup(?MODULE, strategy) of
{ok, Strategy} ->
case node(Strategy, RoutingKey) of
undefined ->
{error, marina_pool_not_started};
{ok, Node} ->
{ok, Node};
{error, _Reason} ->
{error, marina_pool_not_started}
end;
{error, _Reason} ->
{error, marina_pool_not_started}
end.
-spec node_id(binary()) ->
atom().
node_id(<<A, B, C, D>>) ->
RpcAddress = lists:flatten(lists:join(".", [integer_to_list(X) ||
X <- [A, B, C, D]])),
list_to_atom("marina_" ++ RpcAddress).
-spec start(random | token_aware, [{binary(), binary()}]) ->
ok.
start(Strategy, Nodes) ->
sync(Strategy, Nodes, []).
-spec stop(non_neg_integer()) ->
ok.
stop(0) ->
foil:delete(?MODULE, strategy),
foil:load(?MODULE);
stop(N) ->
{ok, NodeId} = foil:lookup(?MODULE, {node, N}),
ok = shackle_pool:stop(NodeId),
ok = foil:delete(?MODULE, {node, N}),
stop(N - 1).
-spec sync(random | token_aware,
[{binary(), binary()}],
[{binary(), binary()}]) -> ok.
sync(Strategy, NewNodes, OldNodes) ->
OldAddrs = [Addr || {Addr, _} <- OldNodes],
NewAddrs = [Addr || {Addr, _} <- NewNodes],
lists:foreach(fun stop_node/1, OldAddrs -- NewAddrs),
lists:foreach(fun start_node/1, NewAddrs -- OldAddrs),
rebuild_index(Strategy, NewNodes, length(OldNodes)),
case Strategy of
token_aware -> marina_ring:build(NewNodes);
random -> ok
end.
%% private
node({random, NodeCount}, undefined) ->
N = shackle_utils:random(NodeCount),
foil:lookup(?MODULE, {node, N});
node({token_aware, NodeCount}, undefined) ->
N = shackle_utils:random(NodeCount),
foil:lookup(?MODULE, {node, N});
node({token_aware, _NodeCount}, RoutingKey) ->
marina_ring:lookup(RoutingKey).
start(<<A, B, C, D>> = RpcAddress) ->
BacklogSize = ?GET_ENV(backlog_size, ?DEFAULT_BACKLOG_SIZE),
Ip = lists:flatten(io_lib:format("~b.~b.~b.~b", [A, B, C, D])),
NodeId = node_id(RpcAddress),
PoolSize = ?GET_ENV(pool_size, ?DEFAULT_POOL_SIZE),
PoolStrategy = ?GET_ENV(pool_strategy, ?DEFAULT_POOL_STRATEGY),
Port = ?GET_ENV(port, ?DEFAULT_PORT),
Reconnect = ?GET_ENV(reconnect, ?DEFAULT_RECONNECT),
ReconnectTimeMax = ?GET_ENV(reconnect_time_max,
?DEFAULT_RECONNECT_MAX),
ReconnectTimeMin = ?GET_ENV(reconnect_time_min,
?DEFAULT_RECONNECT_MIN),
SocketOptions = ?GET_ENV(socket_options, ?DEFAULT_SOCKET_OPTIONS),
ClientOptions = [
{ip, Ip},
{port, Port},
{reconnect, Reconnect},
{reconnect_time_max, ReconnectTimeMax},
{reconnect_time_min, ReconnectTimeMin},
{socket_options, SocketOptions}
],
PoolOptions = [
{backlog_size, BacklogSize},
{pool_size, PoolSize},
{pool_strategy, PoolStrategy}
],
case shackle_pool:start(NodeId, ?CLIENT, ClientOptions, PoolOptions) of
ok ->
{ok, NodeId};
{error, Reason} ->
{error, Reason}
end.
start_node(RpcAddress) ->
_ = start(RpcAddress),
ok.
stop_node(RpcAddress) ->
NodeId = node_id(RpcAddress),
try shackle_pool:stop(NodeId) of
_ -> ok
catch
_:_ -> ok
end,
_ = marina_cache:erase_pool(NodeId),
ok.
rebuild_index(Strategy, NewNodes, OldCount) ->
_ = [foil:delete(?MODULE, {node, N}) || N <- lists:seq(1, OldCount)],
lists:foldl(fun ({Addr, _}, N) ->
foil:insert(?MODULE, {node, N}, node_id(Addr)),
N + 1
end, 1, NewNodes),
foil:insert(?MODULE, strategy, {Strategy, length(NewNodes)}),
foil:load(?MODULE),
ok.