Skip to main content

src/wa_raft_transport_cleanup.erl

%%% Copyright (c) Meta Platforms, Inc. and affiliates. All rights reserved.
%%%
%%% This source code is licensed under the Apache 2.0 license found in
%%% the LICENSE file in the root directory of this source tree.

-module(wa_raft_transport_cleanup).
-compile(warn_missing_spec_all).
-behaviour(gen_server).

-include_lib("wa_raft/include/wa_raft.hrl").
-include_lib("wa_raft/include/wa_raft_logger.hrl").

%% OTP supervision
-export([
    child_spec/1,
    start_link/1
]).

%% Internal API
-export([
    default_name/2,
    registered_name/2
]).

%% Server Callbacks
-export([
    init/1,
    handle_call/3,
    handle_cast/2,
    handle_info/2
]).

-define(RAFT_TRANSPORT_CLEANUP_SCAN_INTERVAL_SECS, 30).

-record(state, {
    application :: atom(),
    name :: atom(),
    directory :: file:filename()
}).

%%-------------------------------------------------------------------
%% OTP Supervision
%%-------------------------------------------------------------------

-spec child_spec(Options :: #raft_options{}) -> supervisor:child_spec().
child_spec(Options) ->
    #{
        id => ?MODULE,
        start => {?MODULE, start_link, [Options]},
        restart => permanent,
        shutdown => 5000,
        modules => [?MODULE]
    }.

-spec start_link(Options :: #raft_options{}) ->  gen_server:start_ret().
start_link(#raft_options{transport_cleanup_name = Name} = Options) ->
    gen_server:start_link({local, Name}, ?MODULE, Options, []).

%%-------------------------------------------------------------------
%% Internal API
%%-------------------------------------------------------------------

%% Get the default name for the RAFT acceptor server associated with the
%% provided RAFT partition.
-spec default_name(Table :: wa_raft:table(), Partition :: wa_raft:partition()) -> Name :: atom().
default_name(Table, Partition) ->
    % elp:ignore W0023 bounded atom, one per table/partition at startup
    binary_to_atom(<<"raft_transport_cleanup_", (atom_to_binary(Table))/binary, "_", (integer_to_binary(Partition))/binary>>).

%% Get the registered name for the RAFT acceptor server associated with the
%% provided RAFT partition or the default name if no registration exists.
-spec registered_name(Table :: wa_raft:table(), Partition :: wa_raft:partition()) -> Name :: atom().
registered_name(Table, Partition) ->
    case wa_raft_part_sup:options(Table, Partition) of
        undefined -> default_name(Table, Partition);
        Options   -> Options#raft_options.transport_cleanup_name
    end.

%%-------------------------------------------------------------------
%% Server Callbacks
%%-------------------------------------------------------------------

-spec init(Options :: #raft_options{}) -> {ok, State :: #state{}}.
init(#raft_options{application = Application, transport_directory = Directory, transport_cleanup_name = Name}) ->
    process_flag(trap_exit, true),
    schedule_scan(),
    {ok, #state{application = Application, name = Name, directory = Directory}}.

-spec handle_call(Request :: term(), From :: gen_server:from(), State :: #state{}) -> {noreply, NewState :: #state{}}.
handle_call(Request, From, #state{name = Name} = State) ->
    ?RAFT_LOG_WARNING("~p received unrecognized call ~0P from ~0p", [Name, Request, 25, From]),
    {noreply, State}.

-spec handle_cast(Request :: term(), State :: #state{}) -> {noreply, NewState :: #state{}}.
handle_cast(Request, #state{name = Name} = State) ->
    ?RAFT_LOG_NOTICE("~p got unrecognized cast ~0P", [Name, Request, 25]),
    {noreply, State}.

-spec handle_info(Info :: term(), State :: #state{}) -> {noreply, NewState :: #state{}}.
handle_info(scan, #state{} = State) ->
    maybe_cleanup(State),
    schedule_scan(),
    {noreply, State};
handle_info(Info, #state{name = Name} = State) ->
    ?RAFT_LOG_NOTICE("~p got unrecognized info ~p", [Name, Info]),
    {noreply, State}.

-spec maybe_cleanup(State :: #state{}) -> ok | {error, term()}.
maybe_cleanup(#state{application = App, name = Name, directory = Directory} = State) ->
    case prim_file:list_dir(Directory) of
        {ok, Files} ->
            RetainMillis = ?RAFT_TRANSPORT_RETAIN_INTERVAL(App) * 1000,
            NowMillis = erlang:system_time(millisecond),
            lists:foreach(
                fun (Filename) ->
                    Path = filename:join(Directory, Filename),
                    ID = list_to_integer(Filename),
                    case wa_raft_transport:transport_info(ID) of
                        {ok, #{end_ts := EndTs}} when NowMillis - EndTs > RetainMillis ->
                            ?RAFT_LOG_NOTICE(
                                "~p deleting ~p due to expiring after transport ended",
                                [Name, Filename]
                            ),
                            cleanup(ID, Path, State);
                        {ok, _Info} ->
                            ok;
                        not_found ->
                            ?RAFT_LOG_NOTICE(
                                "~p deleting ~p due to having no associated transport",
                                [Name, Filename]
                            ),
                            cleanup(ID, Path, State)
                    end
                end, Files);
        {error, enoent} ->
            ok;
        {error, Reason} ->
            ?RAFT_LOG_WARNING(
                "~p failed to list transports for cleanup due to ~p",
                [Name, Reason]
            ),
            {error, Reason}
    end.

-spec cleanup(non_neg_integer(), string(), #state{}) -> ok | {error, term()}.
cleanup(ID, Path, #state{name = Name}) ->
    case file:del_dir_r(Path) of
        ok ->
            ok;
        {error, Reason} ->
            ?RAFT_LOG_WARNING(
                "~p failed to cleanup transport ~p due to ~p",
                [Name, ID, Reason]
            ),
            {error, Reason}
    end.

-spec schedule_scan() -> reference().
schedule_scan() ->
    erlang:send_after(?RAFT_TRANSPORT_CLEANUP_SCAN_INTERVAL_SECS * 1000, self(), scan).