src/marina_control.erl

%% Owns a long-lived CQL control connection to the cluster, subscribes to
%% server-side EVENT frames (TOPOLOGY_CHANGE, STATUS_CHANGE, SCHEMA_CHANGE),
%% and drives a full ring re-sync whenever the topology changes.
%%
%% Design is events-only and reconnect-on-change:
%%
%%   connecting -> dial any bootstrap_ip -> STARTUP + AUTH -> system.local +
%%     system.peers -> post {topology_full_sync, Nodes} to marina_pool_server
%%     -> REGISTER -> active-once -> subscribed
%%   subscribed -> any TOPOLOGY_CHANGE event closes the socket and re-enters
%%     connecting, which naturally re-queries system.peers and re-posts
%%     topology_full_sync. STATUS_CHANGE / SCHEMA_CHANGE are logged and
%%     otherwise ignored — shackle already reconnects transparently on
%%     per-node outages.
%%
%% Every reconnect does the full peers query again, which closes the window
%% where a topology event could have been missed while the control
%% connection was down.
-module(marina_control).
-include("marina_internal.hrl").

-behaviour(gen_statem).

-export([
    start_link/0
]).

-export([
    callback_mode/0,
    code_change/4,
    init/1,
    terminate/3
]).

-export([
    connecting/3,
    subscribed/3
]).

-define(CONTROL_STREAM, 0).
-define(EVENT_STREAM, -1).
-define(EVENT_TYPES, [
    <<"TOPOLOGY_CHANGE">>,
    <<"STATUS_CHANGE">>,
    <<"SCHEMA_CHANGE">>
]).
-define(BACKOFF_MIN, 500).
-define(BACKOFF_MAX, 30000).

-record(data, {
    bootstrap_ips :: [string()],
    port          :: pos_integer(),
    backoff       :: pos_integer(),
    socket        :: undefined | inet:socket(),
    buffer        :: binary()
}).

-type data() :: #data {}.

%% public
-spec start_link() -> {ok, pid()}.

start_link() ->
    gen_statem:start_link({local, ?MODULE}, ?MODULE, undefined, []).

%% gen_statem callbacks
-spec callback_mode() -> state_functions.

callback_mode() ->
    state_functions.

-spec init(undefined) ->
    {ok, connecting, data(), [gen_statem:action()]}.

init(undefined) ->
    BootstrapIps = ?GET_ENV(bootstrap_ips, ?DEFAULT_BOOTSTRAP_IPS),
    Port = ?GET_ENV(port, ?DEFAULT_PORT),
    {ok, connecting, #data {
        bootstrap_ips = BootstrapIps,
        port = Port,
        backoff = ?BACKOFF_MIN,
        buffer = <<>>
    }, [{next_event, internal, connect}]}.

-spec terminate(term(), atom(), data()) -> ok.

terminate(_Reason, _State, #data {socket = undefined}) ->
    ok;
terminate(_Reason, _State, #data {socket = Socket}) ->
    _ = gen_tcp:close(Socket),
    ok.

-spec code_change(term(), atom(), data(), term()) -> {ok, atom(), data()}.

code_change(_OldVsn, OldState, OldData, _Extra) ->
    {ok, OldState, OldData}.

%% state: connecting
-spec connecting(gen_statem:event_type(), term(), data()) ->
    gen_statem:event_handler_result(atom()).

connecting(internal, connect, #data {
        bootstrap_ips = Ips,
        port = Port
    } = Data) ->

    case establish(Ips, Port) of
        {ok, Socket, Nodes} ->
            marina_pool_server ! {topology_full_sync, Nodes},
            case subscribe(Socket) of
                ok ->
                    ok = inet:setopts(Socket, [{active, once}]),
                    {next_state, subscribed, Data#data {
                        socket = Socket,
                        buffer = <<>>,
                        backoff = ?BACKOFF_MIN
                    }};
                {error, Reason} ->
                    _ = gen_tcp:close(Socket),
                    logger:warning("[~p] register failed: ~p~n", [?MODULE, Reason]),
                    backoff(Data)
            end;
        {error, Reason} ->
            logger:warning("[~p] control connect failed: ~p~n", [?MODULE, Reason]),
            backoff(Data)
    end;
connecting(state_timeout, reconnect, Data) ->
    {keep_state, Data, [{next_event, internal, connect}]};
connecting(_, _, Data) ->
    {keep_state, Data}.

%% state: subscribed
-spec subscribed(gen_statem:event_type(), term(), data()) ->
    gen_statem:event_handler_result(atom()).

subscribed(info, {tcp, Socket, Chunk}, #data {
        socket = Socket,
        buffer = Buffer
    } = Data) ->

    {Rest, Frames} = marina_frame:decode(<<Buffer/binary, Chunk/binary>>),
    case process_frames(Frames) of
        cycle ->
            _ = gen_tcp:close(Socket),
            {next_state, connecting,
                Data#data {socket = undefined, buffer = <<>>,
                           backoff = ?BACKOFF_MIN},
                [{next_event, internal, connect}]};
        continue ->
            ok = inet:setopts(Socket, [{active, once}]),
            {keep_state, Data#data {buffer = Rest}}
    end;
subscribed(info, {tcp_closed, Socket}, #data {socket = Socket} = Data) ->
    logger:warning("[~p] control socket closed~n", [?MODULE]),
    backoff(Data#data {socket = undefined, buffer = <<>>});
subscribed(info, {tcp_error, Socket, Reason}, #data {socket = Socket} = Data) ->
    logger:warning("[~p] control socket error: ~p~n", [?MODULE, Reason]),
    _ = gen_tcp:close(Socket),
    backoff(Data#data {socket = undefined, buffer = <<>>});
subscribed(_, _, Data) ->
    {keep_state, Data}.

%% private
backoff(#data {backoff = B} = Data) ->
    Next = min(B * 2, ?BACKOFF_MAX),
    {next_state, connecting, Data#data {backoff = Next},
        [{state_timeout, B, reconnect}]}.

establish([], _Port) ->
    {error, no_reachable_seed};
establish([Ip | Rest], Port) ->
    case marina_utils:connect(Ip, Port) of
        {ok, Socket} ->
            case handshake(Socket) of
                {ok, Nodes} ->
                    {ok, Socket, Nodes};
                {error, Reason} ->
                    _ = gen_tcp:close(Socket),
                    logger:warning("[~p] seed ~s handshake failed: ~p~n", [?MODULE, Ip, Reason]),
                    establish(Rest, Port)
            end;
        {error, Reason} ->
            logger:warning("[~p] seed ~s unreachable: ~p~n", [?MODULE, Ip, Reason]),
            establish(Rest, Port)
    end.

handshake(Socket) ->
    case marina_utils:startup(Socket) of
        {ok, undefined} ->
            discover(Socket);
        {ok, _Authenticator} ->
            case marina_utils:authenticate(Socket) of
                ok -> discover(Socket);
                {error, Reason} -> {error, Reason}
            end;
        {error, Reason} ->
            {error, Reason}
    end.

discover(Socket) ->
    case marina_utils:query(Socket, ?LOCAL_QUERY) of
        {ok, {result, _, _, [[_, Datacenter, _]] = LocalRows}} ->
            case marina_utils:query(Socket, ?PEERS_QUERY) of
                {ok, {result, _, _, PeerRows}} ->
                    {ok, filter_dc(LocalRows ++ PeerRows, Datacenter)};
                {error, Reason} ->
                    {error, Reason}
            end;
        {error, Reason} ->
            {error, Reason}
    end.

filter_dc([], _Dc) ->
    [];
filter_dc([[Addr, _, Tokens] | T], undefined) ->
    [{Addr, Tokens} | filter_dc(T, undefined)];
filter_dc([[Addr, Dc, Tokens] | T], Dc) ->
    [{Addr, Tokens} | filter_dc(T, Dc)];
filter_dc([_ | T], Dc) ->
    filter_dc(T, Dc).

subscribe(Socket) ->
    FrameFlags = marina_utils:frame_flags(),
    Msg = marina_request:register(?CONTROL_STREAM, FrameFlags, ?EVENT_TYPES),
    case marina_utils:sync_msg(Socket, Msg) of
        {ok, undefined} -> ok;
        {ok, Other} -> {error, {unexpected_register_response, Other}};
        {error, Reason} -> {error, Reason}
    end.

process_frames([]) ->
    continue;
process_frames([#frame {stream = ?EVENT_STREAM} = Frame | Rest]) ->
    case marina_body:decode(Frame) of
        {ok, {event, topology_change, Kind, Addr}} ->
            logger:warning("[~p] topology change: ~p ~p~n", [?MODULE, Kind, Addr]),
            cycle;
        {ok, {event, status_change, Kind, Addr}} ->
            logger:warning("[~p] status change: ~p ~p~n", [?MODULE, Kind, Addr]),
            process_frames(Rest);
        {ok, {event, schema_change, _Body}} ->
            process_frames(Rest);
        {ok, Other} ->
            logger:warning("[~p] unexpected event frame: ~p~n", [?MODULE, Other]),
            process_frames(Rest)
    end;
process_frames([_Frame | Rest]) ->
    %% stray non-event response (e.g. late STARTUP reply) — ignore
    process_frames(Rest).