Skip to main content

src/wa_raft_transport.erl

%%% Copyright (c) Meta Platforms, Inc. and affiliates. All rights reserved.
%%%
%%% This source code is licensed under the Apache 2.0 license found in
%%% the LICENSE file in the root directory of this source tree.

-module(wa_raft_transport).
-compile(warn_missing_spec_all).
-behaviour(gen_server).

-include_lib("kernel/include/file.hrl").
-include_lib("wa_raft/include/wa_raft.hrl").
-include_lib("wa_raft/include/wa_raft_logger.hrl").

%% OTP supervision
-export([
    child_spec/0,
    start_link/0
]).

%% Bulk Transfer API
-export([
    start_transfer/4,
    start_transfer/5,
    transfer/5
]).

%% Snapshot Transfer API
-export([
    start_snapshot_transfer/6,
    start_snapshot_transfer/7,
    transfer_snapshot/7
]).

%% Transport API
-export([
    cancel/2,
    complete/3
]).

%% ETS API
-export([
    setup_tables/0,
    transports/0,
    transport_info/1,
    transport_info/2,
    file_info/2,
    update_file_info/3
]).

%% Internal API - Configuration
-export([
    default_directory/1,
    registered_directory/2,
    registered_module/2
]).

%% Internal API - Transport Workers
-export([
    pop_file/1
]).

%% gen_server callbacks
-export([
    init/1,
    handle_call/3,
    handle_cast/2,
    handle_info/2
]).

-export_type([
    transport_id/0,
    transport_info/0,
    file_id/0,
    file_info/0
]).

%% Name of the ETS table to keep records for transports
-define(TRANSPORT_TABLE, wa_raft_transport_transports).
%% Name of the ETS table to keep records for files
-define(FILE_TABLE, wa_raft_transport_files).

-define(RAFT_TRANSPORT_PARTITION_SUBDIRECTORY, "transport").

-define(RAFT_TRANSPORT_SCAN_INTERVAL_SECS, 30).

%% Number of counters
-define(RAFT_TRANSPORT_COUNTERS, 2).

%% Counter - inflight receives
-define(RAFT_TRANSPORT_COUNTER_ACTIVE_RECEIVES, 1).

%% Counter - inflight witness receives
-define(RAFT_TRANSPORT_COUNTER_ACTIVE_WITNESS_RECEIVES, 2).

-type transport_id() :: pos_integer().
-type transport_info() :: #{
    type := sender | receiver,
    status := requested | running | completed | cancelled | timed_out | failed,
    atomics := atomics:atomics_ref(),

    peer := atom(),
    module := module(),
    meta := meta(),
    notify => gen_server:from(),

    root := string(),

    start_ts := Millis :: integer(),
    end_ts => Millis :: integer(),

    total_files := non_neg_integer(),
    completed_files := non_neg_integer(),
    queue => ets:table(),

    error => term()
}.

-type meta() :: meta_transfer() | meta_snapshot().
-type meta_transfer() :: #{
    type := transfer,
    table := wa_raft:table(),
    partition := wa_raft:partition()
}.
-type meta_snapshot() :: #{
    type := snapshot,
    table := wa_raft:table(),
    partition := wa_raft:partition(),
    position := wa_raft_log:log_pos(),
    witness := boolean()
}.

-type file_id() :: pos_integer().
-type file_info() :: #{
    status := requested | sending | receiving | completed | cancelled | failed,
    atomics := {Transport :: atomics:atomics_ref(), File :: atomics:atomics_ref()},

    name := string(),
    path := string(),
    mtime => integer(),

    start_ts => Millis :: integer(),
    end_ts => Millis :: integer(),
    retries => non_neg_integer(),

    total_bytes := non_neg_integer(),
    completed_bytes := non_neg_integer(),

    meta => map(),
    error => Reason :: term()
}.

%%% ------------------------------------------------------------------------

-record(state, {
    counters :: counters:counters_ref()
}).

%%% ------------------------------------------------------------------------
%%%  Behaviour callbacks
%%%

%% Perform any setup required before transport can be started.
-callback transport_init(Node :: node()) -> {ok, State :: term()} | {stop, Reason :: term()}.

%% Send a file to the target peer.
-callback transport_send(ID :: transport_id(), FileID :: file_id(), State :: term()) ->
    {ok, NewState :: term()} |
    {continue, NewState :: term()} |
    {stop, Reason :: term(), NewState :: term()}.

%% Optional callback for performing any shutdown operations.
-callback transport_terminate(Reason :: term(), State :: term()) -> term().

-optional_callbacks([
    transport_terminate/2
]).

%%% ------------------------------------------------------------------------
%%%  OTP supervision callbacks
%%%
-spec child_spec() -> supervisor:child_spec().
child_spec() ->
    #{
        id => ?MODULE,
        start => {?MODULE, start_link, []},
        restart => permanent,
        shutdown => 5000,
        modules => [?MODULE]
    }.

-spec start_link() -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}.
start_link() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

%%% ------------------------------------------------------------------------
%%%  Internal API
%%%

-spec start_transport(Peer :: atom(), Meta :: meta(), Root :: string(), Timeout :: timeout()) -> {ok, ID :: transport_id()} | {error, Reason :: term()}.
start_transport(Peer, Meta, Root, Timeout) ->
    gen_server:call(?MODULE, {start, Peer, Meta, Root}, Timeout).

-spec start_transport_and_wait(Peer :: atom(), Meta :: meta(), Root :: string(), Timeout :: timeout()) -> {ok, ID :: transport_id()} | {error, Reason :: term()}.
start_transport_and_wait(Peer, Meta, Root, Timeout) ->
    gen_server:call(?MODULE, {start_wait, Peer, Meta, Root}, Timeout).

%%% ------------------------------------------------------------------------
%%%  Bulk Transfer API
%%%

-spec start_transfer(Peer :: atom(), Table :: wa_raft:table(), Partition :: wa_raft:partition(), Root :: string()) -> {ok, ID :: transport_id()} | {error, Reason :: term()}.
start_transfer(Peer, Table, Partition, Root) ->
    start_transfer(Peer, Table, Partition, Root, 10000).

-spec start_transfer(Peer :: atom(), Table :: wa_raft:table(), Partition :: wa_raft:partition(), Root :: string(), Timeout :: timeout()) -> {ok, ID :: transport_id()} | {error, Reason :: term()}.
start_transfer(Peer, Table, Partition, Root, Timeout) ->
    start_transport(Peer, #{type => transfer, table => Table, partition => Partition}, Root, Timeout).

-spec transfer(Peer :: atom(), Table :: wa_raft:table(), Partition :: wa_raft:partition(), Root :: string(), Timeout :: timeout()) -> {ok, ID :: transport_id()} | {error, Reason :: term()}.
transfer(Peer, Table, Partition, Root, Timeout) ->
    start_transport_and_wait(Peer, #{type => transfer, table => Table, partition => Partition}, Root, Timeout).

%%% ------------------------------------------------------------------------
%%%  Snapshot Transfer API
%%%

-spec start_snapshot_transfer(Peer :: atom(), Table :: wa_raft:table(), Partition :: wa_raft:partition(), LogPos :: wa_raft_log:log_pos(), Root :: string(), Witness :: boolean()) -> {ok, ID :: transport_id()} | {error, Reason :: term()}.
start_snapshot_transfer(Peer, Table, Partition, LogPos, Root, Witness) ->
    start_snapshot_transfer(Peer, Table, Partition, LogPos, Root, Witness, 10000).

-spec start_snapshot_transfer(Peer :: atom(), Table :: wa_raft:table(), Partition :: wa_raft:partition(), LogPos :: wa_raft_log:log_pos(), Root :: string(), Witness :: boolean(), Timeout :: timeout()) -> {ok, ID :: transport_id()} | {error, Reason :: term()}.
start_snapshot_transfer(Peer, Table, Partition, LogPos, Root, Witness, Timeout) ->
    start_transport(Peer, #{type => snapshot, table => Table, partition => Partition, position => LogPos, witness => Witness}, Root, Timeout).

-spec transfer_snapshot(Peer :: atom(), Table :: wa_raft:table(), Partition :: wa_raft:partition(), LogPos :: wa_raft_log:log_pos(), Root :: string(), Witness :: boolean(), Timeout :: timeout()) -> {ok, ID :: transport_id()} | {error, Reason :: term()}.
transfer_snapshot(Peer, Table, Partition, LogPos, Root, Witness, Timeout) ->
    start_transport_and_wait(Peer, #{type => snapshot, table => Table, partition => Partition, position => LogPos, witness => Witness}, Root, Timeout).

%%% ------------------------------------------------------------------------
%%%  Transport API
%%%

-spec cancel(ID :: transport_id(), Reason :: term()) -> ok | {error, Reason :: term()}.
cancel(ID, Reason) ->
    gen_server:call(?MODULE, {cancel, ID, Reason}).

-spec complete(ID :: transport_id(), FileID :: file_id(), Status :: dynamic()) -> ok.
complete(ID, FileID, Status) ->
    gen_server:cast(?MODULE, {complete, ID, FileID, Status}).

%%% ------------------------------------------------------------------------
%%%  ETS table helper functions
%%%

-spec setup_tables() -> ok.
setup_tables() ->
    ?TRANSPORT_TABLE = ets:new(?TRANSPORT_TABLE, [named_table, set, public]),
    ?FILE_TABLE = ets:new(?FILE_TABLE, [named_table, set, public]),
    ok.

-spec transports() -> [transport_id()].
transports() ->
    ets:select(?TRANSPORT_TABLE, [{{'$1', '_'}, [], ['$1']}]).

-spec transport_info(ID :: transport_id()) -> {ok, Info :: transport_info()} | not_found.
transport_info(ID) ->
    case ets:lookup_element(?TRANSPORT_TABLE, ID, 2, not_found) of
        not_found -> not_found;
        Info      -> {ok, Info}
    end.

-spec transport_info(ID :: transport_id(), Item :: atom()) -> Info :: term() | undefined.
transport_info(ID, Item) ->
    case transport_info(ID) of
        {ok, #{Item := Value}} -> Value;
        _                      -> undefined
    end.

% This function should only be called from the "gen_server" process since it does not
% provide any atomicity guarantees.
-spec set_transport_info(ID :: transport_id(), Info :: transport_info(), Counters :: counters:counters_ref()) -> ok.
set_transport_info(ID, #{atomics := TransportAtomics} = Info, Counters) ->
    true = ets:insert(?TRANSPORT_TABLE, {ID, Info}),
    maybe_update_active_inbound_transport_counts(undefined, Info, Counters),
    ok = atomics:put(TransportAtomics, ?RAFT_TRANSPORT_ATOMICS_UPDATED_TS, erlang:system_time(millisecond)).

% This function should only be called from the "gen_server" process since it does not
% provide any atomicity guarantees.
-spec update_and_get_transport_info(
    ID :: transport_id(),
    Fun :: fun((Info :: transport_info()) -> NewInfo :: transport_info()),
    Counters :: counters:counters_ref()
) -> {ok, NewOrExistingInfo :: transport_info()} | not_found.
update_and_get_transport_info(ID, Fun, Counters) ->
    case transport_info(ID) of
        {ok, #{atomics := TransportAtomics} = Info} ->
            case Fun(Info) of
                Info ->
                    {ok, Info};
                NewInfo ->
                    true = ets:insert(?TRANSPORT_TABLE, {ID, NewInfo}),
                    ok = atomics:put(TransportAtomics, ?RAFT_TRANSPORT_ATOMICS_UPDATED_TS, erlang:system_time(millisecond)),
                    ok = maybe_update_active_inbound_transport_counts(Info, NewInfo, Counters),
                    {ok, NewInfo}
            end;
        not_found ->
            not_found
    end.

-spec delete_transport_info(ID :: transport_id()) -> ok | not_found.
delete_transport_info(ID) ->
    case transport_info(ID) of
        {ok, #{total_files := TotalFiles} = Info} ->
            lists:foreach(fun (FileID) -> delete_file_info(ID, FileID) end, lists:seq(1, TotalFiles)),
            ets:delete(?TRANSPORT_TABLE, ID),
            Queue = maps:get(queue, Info, undefined),
            Queue =/= undefined andalso (try ets:delete(Queue) catch _:_ -> ok end),
            ok;
        not_found ->
            not_found
    end.

-spec file_info(ID :: transport_id(), FileID :: file_id()) -> {ok, Info :: file_info()} | not_found.
file_info(ID, FileID) ->
    case ets:lookup_element(?FILE_TABLE, {ID, FileID}, 2, not_found) of
        not_found -> not_found;
        Info      -> {ok, Info}
    end.

-spec maybe_update_active_inbound_transport_counts(OldInfo :: transport_info() | undefined, NewInfo :: transport_info(), Counters :: counters:counters_ref()) -> ok.
maybe_update_active_inbound_transport_counts(OldInfo, #{meta := #{witness := true}} = NewInfo, Counters) ->
    maybe_update_active_inbound_transport_counts_impl(OldInfo, NewInfo, ?RAFT_TRANSPORT_COUNTER_ACTIVE_WITNESS_RECEIVES, Counters);
maybe_update_active_inbound_transport_counts(OldInfo, NewInfo, Counters) ->
    maybe_update_active_inbound_transport_counts_impl(OldInfo, NewInfo, ?RAFT_TRANSPORT_COUNTER_ACTIVE_RECEIVES, Counters).

-spec maybe_update_active_inbound_transport_counts_impl(OldInfo :: transport_info() | undefined, NewInfo :: transport_info(), Counter :: non_neg_integer(), Counters :: counters:counters_ref()) -> ok.
maybe_update_active_inbound_transport_counts_impl(undefined, #{type := receiver, status := running}, Counter, Counters) ->
    counters:add(Counters, Counter, 1);
maybe_update_active_inbound_transport_counts_impl(#{type := receiver, status := OldStatus}, #{status := running}, Counter, Counters) when OldStatus =/= running ->
    counters:add(Counters, Counter, 1);
maybe_update_active_inbound_transport_counts_impl(#{type := receiver, status := running}, #{status := NewStatus}, Counter, Counters) when NewStatus =/= running ->
    counters:sub(Counters, Counter, 1);
maybe_update_active_inbound_transport_counts_impl(_, _, _, _) ->
    ok.

% This function should only be called from the "worker" process responsible for the
% transport of the specified file since it does not provide any atomicity guarantees.
-spec set_file_info(ID :: transport_id(), FileID :: file_id(), Info :: file_info()) -> ok.
set_file_info(ID, FileID, #{atomics := {TransportAtomics, FileAtomics}} = Info) ->
    true = ets:insert(?FILE_TABLE, {{ID, FileID}, Info}),
    NowMillis = erlang:system_time(millisecond),
    ok = atomics:put(TransportAtomics, ?RAFT_TRANSPORT_ATOMICS_UPDATED_TS, NowMillis),
    ok = atomics:put(FileAtomics, ?RAFT_TRANSPORT_ATOMICS_UPDATED_TS, NowMillis).

% This function should only be called from the "worker" process responsible for the
% transport of the specified file since it does not provide any atomicity guarantees.
-spec update_file_info(ID :: transport_id(), FileID :: file_id(), Fun :: fun((Info :: file_info()) -> NewInfo :: file_info())) -> ok | not_found.
update_file_info(ID, FileID, Fun) ->
    case file_info(ID, FileID) of
        {ok, #{atomics := {TransportAtomics, FileAtomics}} = Info} ->
            case Fun(Info) of
                Info ->
                    ok;
                NewInfo ->
                    true = ets:insert(?FILE_TABLE, {{ID, FileID}, NewInfo}),
                    NowMillis = erlang:system_time(millisecond),
                    ok = atomics:put(TransportAtomics, ?RAFT_TRANSPORT_ATOMICS_UPDATED_TS, NowMillis),
                    ok = atomics:put(FileAtomics, ?RAFT_TRANSPORT_ATOMICS_UPDATED_TS, NowMillis),
                    ok
            end;
        not_found ->
            not_found
    end.

-spec delete_file_info(ID :: transport_id(), FileID :: file_id()) -> ok.
delete_file_info(ID, FileID) ->
    ets:delete(?FILE_TABLE, {ID, FileID}),
    ok.

%%-------------------------------------------------------------------
%% Internal API - Configuration
%%-------------------------------------------------------------------

%% Get the default directory for incoming transports associated with the
%% provided RAFT partition given that RAFT partition's database directory.
-spec default_directory(Database :: file:filename()) -> Directory :: file:filename().
default_directory(Database) ->
    filename:join(Database, ?RAFT_TRANSPORT_PARTITION_SUBDIRECTORY).

%% Get the registered directory for incoming transports associated with the
%% provided RAFT partition or 'undefined' if no registration exists.
-spec registered_directory(Table :: wa_raft:table(), Partition :: wa_raft:partition()) -> Directory :: file:filename() | undefined.
registered_directory(Table, Partition) ->
    case wa_raft_part_sup:options(Table, Partition) of
        undefined -> undefined;
        Options   -> Options#raft_options.transport_directory
    end.

%% Get the registered module for outgoing transports associated with the
%% provided RAFT partition or the default transport module if no registration exists.
-spec registered_module(Table :: wa_raft:table(), Partition :: wa_raft:partition()) -> Module :: module() | undefined.
registered_module(Table, Partition) ->
    case wa_raft_part_sup:options(Table, Partition) of
        undefined -> ?RAFT_DEFAULT_TRANSPORT_MODULE;
        Options   -> Options#raft_options.transport_module
    end.

%%-------------------------------------------------------------------
%% Internal API - Transport Workers
%%-------------------------------------------------------------------

-spec pop_file(ID :: transport_id()) -> {ok, FileID :: file_id()} | empty | not_found.
pop_file(ID) ->
    case transport_info(ID) of
        {ok, #{queue := Queue}} -> try_pop_file(Queue);
        _Other                  -> not_found
    end.

-spec try_pop_file(Queue :: ets:table()) -> {ok, FileID :: file_id()} | empty | not_found.
try_pop_file(Queue) ->
    try ets:first(Queue) of
        '$end_of_table' ->
            empty;
        FileID ->
            try ets:select_delete(Queue, [{{FileID}, [], [true]}]) of
                0 -> try_pop_file(Queue);
                1 -> {ok, FileID}
            catch
                error:badarg -> not_found
            end
    catch
        error:badarg -> not_found
    end.

%%% ------------------------------------------------------------------------
%%%  gen_server callbacks
%%%

-spec init(Args :: []) -> {ok, State :: #state{}}.
init(_) ->
    process_flag(trap_exit, true),
    Counters = counters:new(?RAFT_TRANSPORT_COUNTERS, [atomics]),
    schedule_scan(),
    {ok, #state{counters = Counters}}.

-spec handle_call(Request, From :: gen_server:from(), State :: #state{}) -> {reply, Reply :: term(), NewState :: #state{}} | {noreply, NewState :: #state{}}
    when
        Request ::
            {start, Peer :: node(), Meta :: meta(), Root :: string()} |
            {start_wait, Peer :: node(), Meta :: meta(), Root :: string()} |
            {transport, ID :: transport_id(), Peer :: node(), Module :: module(), Meta :: meta(), Files :: [{file_id(), RelPath :: string(), Size :: integer()}]} |
            {cancel, ID :: transport_id(), Reason :: term()}.
handle_call({start, Peer, Meta, Root}, _From, #state{counters = Counters} = State) ->
    {reply, handle_transport_start(undefined, Peer, Meta, Root, Counters), State};
handle_call({start_wait, Peer, Meta, Root}, From, #state{counters = Counters} = State) ->
    case handle_transport_start(From, Peer, Meta, Root, Counters) of
        {ok, _ID}       -> {noreply, State};
        {error, Reason} -> {reply, {error, Reason}, State}
    end;
handle_call({transport, ID, Peer, Module, Meta, Files}, From, #state{counters = Counters} = State) ->
    Table = maps:get(table, Meta, undefined),
    try
        IsWitness = maps:get(witness, Meta, false),
        {MaxIncomingSnapshotTransfers, NumActiveReceives} = case IsWitness of
            true  -> {?RAFT_MAX_CONCURRENT_INCOMING_WITNESS_SNAPSHOT_TRANSFERS(), counters:get(Counters, ?RAFT_TRANSPORT_COUNTER_ACTIVE_WITNESS_RECEIVES)};
            false -> {?RAFT_MAX_CONCURRENT_INCOMING_SNAPSHOT_TRANSFERS(), counters:get(Counters, ?RAFT_TRANSPORT_COUNTER_ACTIVE_RECEIVES)}
        end,
        ShouldThrottle = NumActiveReceives >= MaxIncomingSnapshotTransfers,
        case {transport_info(ID), ShouldThrottle} of
            {{ok, _Info}, _} ->
                ?RAFT_LOG_WARNING("wa_raft_transport got duplicate transport receive start for ~p from ~p", [ID, From]),
                {reply, duplicate, State};
            {not_found, true} ->
                {reply, {error, receiver_overloaded}, State};
            {not_found, _} ->
                ?RAFT_COUNT(Table, 'transport.receive'),
                ?RAFT_LOG_NOTICE("wa_raft_transport starting transport receive for ~p", [ID]),

                TransportAtomics = atomics:new(?RAFT_TRANSPORT_TRANSPORT_ATOMICS_COUNT, []),
                RootDir = transport_destination(ID, Meta),
                NowMillis = erlang:system_time(millisecond),
                TotalFiles = length(Files),

                % Force the receiving directory to always exist
                try filelib:ensure_dir([RootDir, $/]) catch _:_ -> ok end,

                % Setup overall transport info
                set_transport_info(ID, #{
                    type => receiver,
                    status => running,
                    atomics => TransportAtomics,
                    peer => Peer,
                    module => Module,
                    meta => Meta,
                    root => RootDir,
                    start_ts => NowMillis,
                    total_files => TotalFiles,
                    completed_files => 0
                }, Counters),

                % Setup file info for each file
                [
                    begin
                        FileAtomics = atomics:new(?RAFT_TRANSPORT_FILE_ATOMICS_COUNT, []),
                        set_file_info(ID, FileID, #{
                            status => requested,
                            atomics => {TransportAtomics, FileAtomics},
                            name => RelativePath,
                            path => filename:join(RootDir, RelativePath),
                            total_bytes => Size,
                            completed_bytes => 0
                        })
                    end || {FileID, RelativePath, Size} <- Files
                ],

                % If the transport is empty, then immediately complete it
                TotalFiles =:= 0 andalso
                    update_and_get_transport_info(
                        ID,
                        fun (Info0) ->
                            Info1 = Info0#{status => completed, end_ts => NowMillis},
                            Info2 = case maybe_notify_complete(ID, Info1, State) of
                                ok              -> Info1;
                                {error, Reason} -> Info1#{status => failed, error => {notify_failed, Reason}}
                            end,
                            maybe_notify(ID, Info2)
                        end,
                        Counters
                    ),

                {reply, ok, State}
        end
    catch
        T:E:S ->
            ?RAFT_COUNT(Table, 'transport.receive.error'),
            ?RAFT_LOG_WARNING("wa_raft_transport failed to accept transport ~p due to ~p ~p: ~n~p", [ID, T, E, S]),
            update_and_get_transport_info(
                ID,
                fun (Info) ->
                    Info#{
                        status => failed,
                        end_ts => erlang:system_time(millisecond),
                        error => {receive_failed, {T, E, S}}
                    }
                end,
                Counters
            ),
            {reply, {error, failed}, State}
    end;
handle_call({cancel, ID, Reason}, _From, #state{counters = Counters} = State) ->
    ?RAFT_LOG_NOTICE("wa_raft_transport got cancellation request for ~p for reason ~p", [ID, Reason]),
    Reply =
        case
            update_and_get_transport_info(
                ID,
                fun
                    (#{status := running} = Info) ->
                        NowMillis = erlang:system_time(millisecond),
                        Info#{status := cancelled, end_ts => NowMillis, error => {cancelled, Reason}};
                    (Info) ->
                        Info
                end,
                Counters
            )
        of
            {ok, _Info} -> ok;
            not_found   -> {error, not_found}
        end,
    {reply, Reply, State};
handle_call(Request, _From, #state{} = State) ->
    ?RAFT_LOG_WARNING("wa_raft_transport received unrecognized call ~p", [Request]),
    {noreply, State}.

-spec handle_cast(Request, State :: #state{}) -> {noreply, NewState :: #state{}}
    when Request :: {complete, ID :: transport_id(), FileID :: file_id(), Status :: term()}.
handle_cast({complete, ID, FileID, Status}, #state{counters = Counters} = State) ->
    NowMillis = erlang:system_time(millisecond),
    Table = case transport_info(ID) of
        {ok, #{meta := Meta}} -> maps:get(table, Meta, undefined);
        _                     -> undefined
    end,
    ?RAFT_COUNT(Table, {'transport.file.send', normalize_status(Status)}),
    Result0 = update_file_info(ID, FileID,
        fun (Info) ->
            case Info of
                #{start_ts := StartMillis} ->
                    ?RAFT_GATHER_LATENCY(Table, {'transport.file.send.latency_ms', Status}, NowMillis - StartMillis);
                _ ->
                    ok
            end,
            case Status of
                ok -> Info#{status => completed, end_ts => NowMillis};
                _  -> Info#{status => failed, end_ts => NowMillis, error => Status}
            end
        end),
    Result0 =:= not_found andalso
        ?RAFT_LOG_WARNING("wa_raft_transport got complete report for unknown file ~p:~p", [ID, FileID]),
    Result1 =
        update_and_get_transport_info(
            ID,
            fun
                (#{status := running, completed_files := CompletedFiles, total_files := TotalFiles} = Info0) ->
                    Info1 = Info0#{completed_files => CompletedFiles + 1},
                    Info2 = case CompletedFiles + 1 of
                        TotalFiles -> Info1#{status => completed, end_ts => NowMillis};
                        _          -> Info1
                    end,
                    Info3 = case Status of
                        ok -> Info2;
                        _  -> Info2#{status => failed, end_ts => NowMillis, error => {file, FileID, Status}}
                    end,
                    Info4 = case maybe_notify_complete(ID, Info3, State) of
                        ok              -> Info3;
                        {error, Reason} -> Info3#{status => failed, error => {notify_failed, Reason}}
                    end,
                    maybe_notify(ID, Info4);
                (Info) ->
                    Info
            end,
            Counters
        ),
    Result1 =:= not_found andalso
        ?RAFT_LOG_WARNING("wa_raft_transport got complete report for unknown transfer ~p", [ID]),
    {noreply, State};
handle_cast(Request, State) ->
    ?RAFT_LOG_NOTICE("wa_raft_transport got unrecognized cast ~p", [Request]),
    {noreply, State}.

-spec handle_info(Info :: term(), State :: #state{}) -> {noreply, NewState :: #state{}}.
handle_info(scan, #state{counters = Counters} = State) ->
    InactiveTransports =
        lists:filter(
            fun (ID) ->
                case update_and_get_transport_info(ID, fun (Info) -> scan_transport(ID, Info) end, Counters) of
                    {ok, #{status := Status}} -> Status =/= requested andalso Status =/= running;
                    not_found                 -> false
                end
            end, transports()),
    ExcessTransports = length(InactiveTransports) - ?RAFT_TRANSPORT_INACTIVE_INFO_LIMIT(),
    ExcessTransports > 0 andalso begin
        ExcessTransportIDs = lists:sublist(lists:sort(InactiveTransports), ExcessTransports),
        lists:foreach(fun delete_transport_info/1, ExcessTransportIDs)
    end,

    schedule_scan(),
    {noreply, State};
handle_info(Info, State) ->
    ?RAFT_LOG_NOTICE("wa_raft_transport got unrecognized info ~p", [Info]),
    {noreply, State}.

%%% ------------------------------------------------------------------------
%%%  Helper functions
%%%

-spec make_id() -> non_neg_integer().
make_id() ->
    NowMicros = erlang:system_time(microsecond),
    ID = NowMicros * 1000000 + rand:uniform(1000000) - 1,
    case transport_info(ID) of
        {ok, _Info} -> make_id();
        not_found   -> ID
    end.

-spec handle_transport_start(From :: gen_server:from() | undefined, Peer :: node(), Meta :: meta(), Root :: string(), Counters :: counters:counters_ref()) -> {ok, ID :: transport_id()} | {error, Reason :: term()}.
handle_transport_start(From, Peer, Meta, Root, Counters) ->
    ID = make_id(),
    Table = maps:get(table, Meta, undefined),

    ?RAFT_COUNT(Table, 'transport.start'),
    ?RAFT_LOG_NOTICE(
        "wa_raft_transport starting transport ~p of ~p to ~p with metadata ~p",
        [ID, Root, Peer, Meta]
    ),

    try
        Files = collect_files(Root),
        TransportAtomics = atomics:new(?RAFT_TRANSPORT_TRANSPORT_ATOMICS_COUNT, []),
        Module = transport_module(Meta),
        TotalFiles = length(Files),
        NowMillis = erlang:system_time(millisecond),
        Queue = ets:new(?MODULE, [ordered_set, public]),

        % Setup overall transport info
        set_transport_info(ID, #{
            type => sender,
            status => requested,
            atomics => TransportAtomics,
            peer => Peer,
            module => Module,
            meta => Meta,
            root => Root,
            start_ts => NowMillis,
            total_files => TotalFiles,
            completed_files => 0,
            queue => Queue
        }, Counters),

        % Setup file info for each file
        [
            begin
                FileAtomics = atomics:new(?RAFT_TRANSPORT_FILE_ATOMICS_COUNT, []),
                set_file_info(ID, FileID, #{
                    status => requested,
                    atomics => {TransportAtomics, FileAtomics},
                    name => Filename,
                    path => Path,
                    mtime => MTime,
                    total_bytes => Size,
                    completed_bytes => 0
                })
            end || {FileID, Filename, Path, MTime, Size} <- Files
        ],

        % Notify peer node of incoming transport
        FileData = [{FileID, Filename, Size} || {FileID, Filename, _, _, Size} <- Files],
        case gen_server:call({?MODULE, Peer}, {transport, ID, node(), Module, Meta, FileData}, ?RAFT_RPC_CALL_TIMEOUT()) of
            ok ->
                % Add all files to the queue
                ets:insert(Queue, [{FileID} || {FileID, _, _, _, _} <- Files]),

                % Start workers
                update_and_get_transport_info(
                    ID,
                    fun (Info0) ->
                        Info1 = case From of
                            undefined -> Info0;
                            _         -> Info0#{notify => From}
                        end,
                        case TotalFiles of
                            0 ->
                                Info2 = Info1#{status => completed, end_ts => NowMillis},
                                maybe_notify(ID, Info2);
                            _ ->
                                Sup = wa_raft_transport_sup:get_or_start(Peer),
                                [gen_server:cast(Pid, {notify, ID, Table}) || {_Id, Pid, _Type, _Modules} <- supervisor:which_children(Sup), is_pid(Pid)],
                                Info1#{status => running}
                        end
                    end,
                    Counters
                ),
                {ok, ID};
            {error, receiver_overloaded} ->
                ?RAFT_COUNT(Table, 'transport.rejected.receiver_overloaded'),
                ?RAFT_LOG_WARNING("wa_raft_transport peer ~p rejected transport ~p because of overload", [Peer, ID]),
                update_and_get_transport_info(
                    ID,
                    fun (Info) ->
                        Info#{
                            status => failed,
                            end_ts => NowMillis,
                            error => {rejected, receiver_overloaded}
                        }
                    end,
                    Counters
                ),
                {error, receiver_overloaded};
            Error ->
                ?RAFT_COUNT(Table, 'transport.rejected'),
                ?RAFT_LOG_WARNING("wa_raft_transport peer ~p rejected transport ~p with error ~p", [Peer, ID, Error]),
                update_and_get_transport_info(
                    ID,
                    fun (Info) ->
                        Info#{
                            status => failed,
                            end_ts => NowMillis,
                            error => {rejected, Error}
                        }
                    end,
                    Counters
                ),
                {error, Error}
        end
    catch
        T:E:S ->
            ?RAFT_COUNT(Table, 'transport.start.error'),
            ?RAFT_LOG_WARNING(
                "wa_raft_transport failed to start transport ~p due to ~p ~p: ~n~p",
                [ID, T, E, S]
            ),
            update_and_get_transport_info(
                ID,
                fun (Info) ->
                    Info#{
                        status => failed,
                        end_ts => erlang:system_time(millisecond),
                        error => {start, {T, E, S}}
                    }
                end,
                Counters
            ),
            {error, failed}
    end.

-spec transport_module(Meta :: meta()) -> module().
transport_module(#{table := Table, partition := Partition}) ->
    wa_raft_transport:registered_module(Table, Partition);
transport_module(_Meta) ->
    ?RAFT_DEFAULT_TRANSPORT_MODULE.

-spec transport_destination(ID :: transport_id(), Meta :: meta()) -> string().
transport_destination(ID, #{type := transfer, table := Table, partition := Partition}) ->
    filename:join(wa_raft_transport:registered_directory(Table, Partition), integer_to_list(ID));
transport_destination(ID, #{type := snapshot, table := Table, partition := Partition}) ->
    filename:join(wa_raft_transport:registered_directory(Table, Partition), integer_to_list(ID)).

-spec collect_files(string()) -> [{non_neg_integer(), string(), string(), integer(), non_neg_integer()}].
collect_files(Root) ->
    {_, Files} = collect_files_impl(Root, [""],
        fun (Filename, Path, #file_info{size = Size, mtime = MTime}, {FileID, Acc}) ->
            {FileID + 1, [{FileID, filename:flatten(Filename), filename:flatten(Path), MTime, Size} | Acc]}
        end, {1, []}),
    Files.

-spec collect_files_impl(
    string(), list(), fun(), {integer(), [{non_neg_integer(), string(), string(), integer(), non_neg_integer()}]}
) -> {integer(), [{non_neg_integer(), string(), string(), integer(), non_neg_integer()}]}.
collect_files_impl(_Root, [], _Fun, Acc) ->
    Acc;
collect_files_impl(Root, [Filename | Queue], Fun, Acc0) ->
    Path = [Root, $/, Filename],
    case prim_file:read_file_info(Path, [{time, posix}]) of
        {ok, #file_info{type = regular} = Info} ->
            Acc1 = Fun(Filename, Path, Info, Acc0),
            collect_files_impl(Root, Queue, Fun, Acc1);
        {ok, #file_info{type = directory}} ->
            case prim_file:list_dir(Path) of
                {ok, Files} ->
                    NewQueue = lists:foldl(fun (Subfile, Acc) -> [join_names(Filename, Subfile) | Acc] end, Queue, Files),
                    collect_files_impl(Root, NewQueue, Fun, Acc0);
                {error, Reason} ->
                    ?RAFT_LOG_ERROR("wa_raft_transport failed to list files in ~p due to ~p", [filename:flatten(Path), Reason]),
                    throw({list_dir, Reason})
            end;
        {ok, #file_info{type = Type}} ->
            ?RAFT_LOG_WARNING("wa_raft_transport skipping file ~p with unknown type ~p", [filename:flatten(Path), Type]),
            collect_files_impl(Root, Queue, Fun, Acc0);
        {error, Reason} ->
            ?RAFT_LOG_ERROR("wa_raft_transport failed to read info of file ~p due to ~p", [filename:flatten(Path), Reason]),
            throw({read_file_info, Reason})
    end.

-spec join_names(string(), string()) -> string() | [string() | char()].
join_names("", Name) -> Name;
join_names(Dir, Name) -> [Dir, $/, Name].

-spec maybe_notify_complete(transport_id(), transport_info(), #state{}) -> ok | {error, term()}.
maybe_notify_complete(_ID, #{type := sender}, _State) ->
    ok;
maybe_notify_complete(_ID, #{status := Status}, _State) when Status =/= completed ->
    ok;
maybe_notify_complete(ID, #{type := receiver, root := Root, meta := #{type := snapshot, table := Table, partition := Partition, position := LogPos}}, #state{}) ->
    try wa_raft_server:snapshot_available(wa_raft_server:registered_name(Table, Partition), Root, LogPos) of
        ok ->
            ok;
        {error, Reason} ->
            ?RAFT_LOG_NOTICE(
                "wa_raft_transport failed to notify ~p of transport ~p completion due to ~p",
                [wa_raft_server:registered_name(Table, Partition), ID, Reason]
            ),
            {error, Reason}
    catch
        T:E:S ->
            ?RAFT_LOG_NOTICE(
                "wa_raft_transport failed to notify ~p of transport ~p completion due to ~p ~p: ~n~p",
                [wa_raft_server:registered_name(Table, Partition), ID, T, E, S]
            ),
            {error, {T, E, S}}
    end;
maybe_notify_complete(ID, _Info, #state{}) ->
    ?RAFT_LOG_NOTICE("wa_raft_transport finished transport ~p but does not know what to do with it", [ID]).

-spec maybe_notify(transport_id(), transport_info()) -> transport_info().
maybe_notify(ID, #{status := Status, notify := Notify, start_ts := Start, end_ts := End} = Info) when Status =/= requested, Status =/= running ->
    Table = maps:get(table, maps:get(meta, Info, #{}), undefined),
    ?RAFT_COUNT(Table, {'transport', Status}),
    ?RAFT_GATHER_LATENCY(Table, {'transport.latency_ms', Status}, End - Start),
    gen_server:reply(Notify, {ok, ID}),
    maps:remove(notify, Info);
maybe_notify(_ID, Info) ->
    Info.

-spec scan_transport(ID :: transport_id(), Info :: transport_info()) -> NewInfo :: transport_info().
scan_transport(ID, #{status := running, atomics := TransportAtomics} = Info) ->
    LastUpdateTs = atomics:get(TransportAtomics, ?RAFT_TRANSPORT_ATOMICS_UPDATED_TS),
    NowMillis = erlang:system_time(millisecond),
    case NowMillis - LastUpdateTs >= ?RAFT_TRANSPORT_IDLE_TIMEOUT() * 1000 of
        true  -> maybe_notify(ID, Info#{status := timed_out, end_ts => NowMillis});
        false -> Info
    end;
scan_transport(_ID, Info) ->
    Info.

-spec schedule_scan() -> reference().
schedule_scan() ->
    erlang:send_after(?RAFT_TRANSPORT_SCAN_INTERVAL_SECS * 1000, self(), scan).

-spec normalize_status(term()) -> atom().
normalize_status(Status) when is_atom(Status) ->
    Status;
normalize_status({_Error, Reason}) when is_atom(Reason) ->
    Reason;
normalize_status({_Error, Reason}) when is_tuple(Reason) ->
    normalize_status(element(1, Reason));
normalize_status({Error, _Reason}) when is_atom(Error) ->
    Error;
normalize_status(_) ->
    unknown.