Skip to main content

src/wa_raft_part_sup.erl

%%% Copyright (c) Meta Platforms, Inc. and affiliates. All rights reserved.
%%%
%%% This source code is licensed under the Apache 2.0 license found in
%%% the LICENSE file in the root directory of this source tree.
%%%
%%% OTP Supervisor for monitoring RAFT processes. Correctness of RAFT
%%% relies on the consistency of the signaling between the processes,
%%% this supervisor is configured to restart all RAFT processes
%%% when any of them exits abnormally.

-module(wa_raft_part_sup).
-compile(warn_missing_spec_all).
-behaviour(supervisor).

%% OTP Supervision
-export([
    child_spec/1,
    child_spec/2,
    start_link/2
]).

%% Options API
-export([
    registered_name/2
]).

%% Options API
-export([
    default_partition_path/3,
    registered_partition_path/2
]).

%% Options API
-export([
    options/2
]).

%% Supervisor callbacks
-export([
    init/1
]).

%% Test API
-export([
    prepare_spec/2
]).

-include_lib("wa_raft/include/wa_raft.hrl").
-include_lib("wa_raft/include/wa_raft_logger.hrl").

%% Key in persistent_term for the options associated with a RAFT partition.
-define(OPTIONS_KEY(Table, Partition), {?MODULE, Table, Partition}).

%%-------------------------------------------------------------------
%% OTP supervision
%%-------------------------------------------------------------------

%% Returns a spec suitable for use with a `simple_one_for_one` supervisor.
-spec child_spec(Application :: atom()) -> supervisor:child_spec().
child_spec(Application) ->
    #{
        id => ?MODULE,
        start => {?MODULE, start_link, [Application]},
        restart => permanent,
        shutdown => infinity,
        type => supervisor,
        modules => [?MODULE]
    }.

-spec child_spec(Application :: atom(), Spec :: wa_raft:args()) -> supervisor:child_spec().
child_spec(Application, Spec) ->
    #{
        id => ?MODULE,
        start => {?MODULE, start_link, [Application, Spec]},
        restart => permanent,
        shutdown => infinity,
        type => supervisor,
        modules => [?MODULE]
    }.

-spec start_link(Application :: atom(), Spec :: wa_raft:args()) -> supervisor:startlink_ret().
start_link(Application, Spec) ->
    %% First normalize the provided specification into a full options record.
    Options = #raft_options{table = Table, partition = Partition, supervisor_name = Name} = normalize_spec(Application, Spec),

    %% Then put the declared options for the current RAFT partition into
    %% persistent term for access by shared resources and other services.
    %% The RAFT options for a table are not expected to change during the
    %% runtime of the RAFT application and so repeated updates should not
    %% result in any GC load. Warn if this is case.
    PrevOptions = persistent_term:get(?OPTIONS_KEY(Table, Partition), Options),
    PrevOptions =/= Options andalso
        ?RAFT_LOG_WARNING(
            ?MODULE_STRING " storing changed options for RAFT partitition ~0p/~0p",
            [Table, Partition]
        ),
    ok = persistent_term:put(?OPTIONS_KEY(Table, Partition), Options),

    supervisor:start_link({local, Name}, ?MODULE, Options).

%%-------------------------------------------------------------------
%% Internal API
%%-------------------------------------------------------------------

%% Get the default name for the RAFT partition supervisor associated with the
%% provided RAFT partition.
-spec default_name(Table :: wa_raft:table(), Partition :: wa_raft:partition()) -> Name :: atom().
default_name(Table, Partition) ->
    % elp:ignore W0023 bounded atom for supervisor name
    list_to_atom("raft_sup_" ++ atom_to_list(Table) ++ "_" ++ integer_to_list(Partition)).

%% Get the registered name for the RAFT partition supervisor associated with the
%% provided RAFT partition or the default name if no registration exists.
-spec registered_name(Table :: wa_raft:table(), Partition :: wa_raft:partition()) -> Name :: atom().
registered_name(Table, Partition) ->
    case wa_raft_part_sup:options(Table, Partition) of
        undefined -> default_name(Table, Partition);
        Options   -> Options#raft_options.supervisor_name
    end.

%% Get the database directory that should be used by the RAFT partition that
%% is started under the provided application with the provided arguments.
-spec partition_path(Application :: atom(), Spec :: wa_raft:args()) -> Database :: file:filename().
partition_path(Application, #{table := Table, partition := Partition}) ->
    Root = wa_raft_env:database_path(Application),
    default_partition_path(Root, Table, Partition).

%% Get the default location for the database directory associated with the
%% provided RAFT partition given the database of the RAFT root.
-spec default_partition_path(Root :: file:filename(), Table :: wa_raft:table(), Partition :: wa_raft:partition()) -> Database :: file:filename().
default_partition_path(Root, Table, Partition) ->
    filename:join(Root, atom_to_list(Table) ++ "." ++ integer_to_list(Partition)).

%% Get the registered database directory for the provided RAFT partition. An
%% error is raised if no registration exists.
-spec registered_partition_path(Table :: wa_raft:table(), Partition :: wa_raft:partition()) -> Database :: file:filename().
registered_partition_path(Table, Partition) ->
    case wa_raft_part_sup:options(Table, Partition) of
        undefined -> error({not_registered, Table, Partition});
        Options   -> Options#raft_options.database
    end.

-spec options(Table :: wa_raft:table(), Partition :: wa_raft:partition()) -> #raft_options{} | undefined.
options(Table, Partition) ->
    persistent_term:get(?OPTIONS_KEY(Table, Partition), undefined).

-spec normalize_spec(Application :: atom(), Spec :: wa_raft:args()) -> #raft_options{}.
normalize_spec(Application, #{table := Table, partition := Partition} = Spec) ->
    Database = partition_path(Application, Spec),
    ServerName = wa_raft_server:default_name(Table, Partition),
    LogName = wa_raft_log:default_name(Table, Partition),
    StorageName = wa_raft_storage:default_name(Table, Partition),
    AcceptorName = wa_raft_acceptor:default_name(Table, Partition),
    #raft_options{
        application = Application,
        table = Table,
        partition = Partition,
        % RAFT identity always uses the default RAFT server name for the partition
        self = #raft_identity{name = wa_raft_server:default_name(Table, Partition), node = node()},
        identifier = #raft_identifier{application = Application, table = Table, partition = Partition},
        database = Database,
        acceptor_name = AcceptorName,
        distribution_module = maps:get(distribution_module, Spec, wa_raft_env:get_env(Application, raft_distribution_module, ?RAFT_DEFAULT_DISTRIBUTION_MODULE)),
        log_name = LogName,
        log_module = maps:get(log_module, Spec, wa_raft_env:get_env(Application, raft_log_module, ?RAFT_DEFAULT_LOG_MODULE)),
        label_module = maps:get(label_module, Spec, wa_raft_env:get_env(Application, raft_label_module, ?RAFT_DEFAULT_LABEL_MODULE)),
        queue_name = wa_raft_queue:default_name(Table, Partition),
        queue_counters = wa_raft_queue:default_counters(),
        queue_reads = wa_raft_queue:default_read_queue_name(Table, Partition),
        server_name = ServerName,
        storage_name = StorageName,
        storage_module = maps:get(storage_module, Spec, wa_raft_env:get_env(Application, raft_storage_module, ?RAFT_DEFAULT_STORAGE_MODULE)),
        supervisor_name = default_name(Table, Partition),
        transport_cleanup_name = wa_raft_transport_cleanup:default_name(Table, Partition),
        transport_directory = wa_raft_transport:default_directory(Database),
        transport_module = maps:get(transport_module, Spec, wa_raft_env:get_env(Application, {raft_transport_module, transport_module}, ?RAFT_DEFAULT_TRANSPORT_MODULE))
    }.

%%-------------------------------------------------------------------
%% Test API
%%-------------------------------------------------------------------

-spec prepare_spec(Application :: atom(), Spec :: wa_raft:args()) -> #raft_options{}.
prepare_spec(Application, Spec) ->
    Options = #raft_options{table = Table, partition = Partition} = normalize_spec(Application, Spec),
    ok = persistent_term:put(?OPTIONS_KEY(Table, Partition), Options),
    Options.

%%-------------------------------------------------------------------
%% Supervisor callbacks
%%-------------------------------------------------------------------

-spec init(Options :: #raft_options{}) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
init(Options) ->
    ChildSpecs = [
        wa_raft_queue:child_spec(Options),
        wa_raft_storage:child_spec(Options),
        wa_raft_log:child_spec(Options),
        wa_raft_server:child_spec(Options),
        wa_raft_acceptor:child_spec(Options),
        wa_raft_transport_cleanup:child_spec(Options)
    ],
    {ok, {#{strategy => one_for_all, intensity => 10, period => 1}, ChildSpecs}}.