Skip to main content

src/wa_raft_dist_transport.erl

%%% Copyright (c) Meta Platforms, Inc. and affiliates. All rights reserved.
%%%
%%% This source code is licensed under the Apache 2.0 license found in
%%% the LICENSE file in the root directory of this source tree.
%%%
%%% This module implements transport interface by using erlang OTP dist.

-module(wa_raft_dist_transport).
-compile(warn_missing_spec_all).
-behaviour(gen_server).
-behaviour(wa_raft_transport).

-export([
    child_spec/0,
    start_link/0
]).

-export([
    transport_init/1,
    transport_send/3
]).

-export([
    init/1,
    handle_call/3,
    handle_cast/2,
    terminate/2
]).

-include_lib("wa_raft/include/wa_raft.hrl").
-include_lib("wa_raft/include/wa_raft_logger.hrl").

-record(sender_state, {
}).
-record(receiver_state, {
    fds = #{} :: #{{ID :: wa_raft_transport:transport_id(), FileID :: wa_raft_transport:file_id()} => Fd :: file:fd()}
}).

-spec child_spec() -> supervisor:child_spec().
child_spec() ->
    #{
        id => ?MODULE,
        start => {?MODULE, start_link, []},
        restart => transient,
        shutdown => 5000,
        modules => [?MODULE]
    }.

-spec start_link() -> gen_server:start_ret().
start_link() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

-spec transport_init(Node :: node()) -> {ok, State :: #sender_state{}}.
transport_init(_Node) ->
    {ok, #sender_state{}}.

-spec transport_send(ID :: wa_raft_transport:transport_id(), FileID :: wa_raft_transport:file_id(), State :: #sender_state{}) ->
    {ok, NewState :: #sender_state{}} |
    {stop, Reason :: term(), NewState :: #sender_state{}}.
transport_send(ID, FileID, State) ->
    ?RAFT_LOG_DEBUG("wa_raft_dist_transport starting to send file ~p/~p", [ID, FileID]),
    case wa_raft_transport:transport_info(ID) of
        {ok, #{peer := Peer}} ->
            case wa_raft_transport:file_info(ID, FileID) of
                {ok, #{name := File, path := Path}} ->
                    case prim_file:open(Path, [binary, read]) of
                        {ok, Fd} ->
                            try
                                try prim_file:advise(Fd, 0, 0, sequential)
                                catch _:_ -> ok
                                end,
                                case transport_send_loop(ID, FileID, Fd, Peer, State) of
                                    {ok, NewState} ->
                                        {ok, NewState};
                                    {error, Reason, NewState} ->
                                        {stop, Reason, NewState}
                                end
                            after
                                prim_file:close(Fd)
                            end;
                        {error, Reason} ->
                            ?RAFT_LOG_ERROR(
                                "wa_raft_dist_transport failed to open file ~p/~p (~s) due to ~p",
                                [ID, FileID, File, Reason]
                            ),
                            {stop, {failed_to_open_file, ID, FileID, Reason}, State}
                    end;
                _ ->
                    {stop, {invalid_file, ID, FileID}, State}
            end;
        _ ->
            {stop, {invalid_transport, ID}, State}
    end.

-spec transport_send_loop(
    wa_raft_transport:transport_id(),
    wa_raft_transport:file_id(),
    file:fd(),
    node(),
    #sender_state{}
) -> {ok, #sender_state{}} | {error, term(), #sender_state{}}.
transport_send_loop(ID, FileID, Fd, Peer, State) ->
    ChunkSize = ?RAFT_DIST_TRANSPORT_CHUNK_SIZE(),
    MaxInflight = ?RAFT_DIST_TRANSPORT_MAX_INFLIGHT(),
    transport_send_loop(ID, FileID, Fd, 0, Peer, [], ChunkSize, MaxInflight, State).

-spec transport_send_loop(
    wa_raft_transport:transport_id(),
    wa_raft_transport:file_id(),
    file:fd(),
    non_neg_integer() | eof,
    node(),
    [gen_server:request_id()],
    pos_integer(),
    pos_integer(),
    #sender_state{}
) -> {ok, #sender_state{}} | {error, term(), #sender_state{}}.
transport_send_loop(ID, FileID, _Fd, eof, Peer, [], _ChunkSize, _MaxInflight, State) ->
    gen_server:cast({?MODULE, Peer}, {complete, ID, FileID}),
    {ok, State};
transport_send_loop(ID, FileID, Fd, Offset, Peer, [RequestId | Chunks], ChunkSize, MaxInflight, State)
        when Offset =:= eof ; length(Chunks) >= MaxInflight ->
    case gen_server:wait_response(RequestId, 5000) of
        {reply, ok} ->
            transport_send_loop(ID, FileID, Fd, Offset, Peer, Chunks, ChunkSize, MaxInflight, State);
        {reply, {error, Reason}} ->
            ?RAFT_LOG_ERROR("wa_raft_dist_transport failed to send file ~p/~p due to receiver error ~p", [ID, FileID, Reason]),
            {error, {receiver_error, ID, FileID, Reason}, State};
        timeout ->
            ?RAFT_LOG_ERROR("wa_raft_dist_transport timed out while sending file ~p/~p", [ID, FileID]),
            {error, {send_timed_out, ID, FileID}, State};
        {error, {Reason, _}} ->
            ?RAFT_LOG_ERROR("wa_raft_dist_transport failed to send file ~p/~p due to ~p", [ID, FileID, Reason]),
            {error, {send_failed, ID, FileID, Reason}, State}
    end;
transport_send_loop(ID, FileID, Fd, Offset, Peer, Chunks, ChunkSize, MaxInflight, State) when is_integer(Offset) ->
    case prim_file:read(Fd, ChunkSize) of
        {ok, Data} ->
            RequestId = gen_server:send_request({?MODULE, Peer}, {chunk, ID, FileID, Offset, Data}),
            wa_raft_transport:update_file_info(ID, FileID,
                fun (#{completed_bytes := Completed} = Info) ->
                    Info#{completed_bytes := Completed + byte_size(Data)}
                end),
            transport_send_loop(ID, FileID, Fd, Offset + byte_size(Data), Peer, Chunks ++ [RequestId], ChunkSize, MaxInflight, State);
        eof ->
            transport_send_loop(ID, FileID, Fd, eof, Peer, Chunks, ChunkSize, MaxInflight, State);
        {error, Reason} ->
            ?RAFT_LOG_ERROR("wa_raft_dist_transport failed to read file ~p/~p due to ~p", [ID, FileID, Reason]),
            {error, {read_failed, ID, FileID, Reason}, State}
    end.

-spec init(Args :: []) -> {ok, State :: #receiver_state{}}.
init([]) ->
    process_flag(trap_exit, true),
    {ok, #receiver_state{}}.

-spec handle_call(Request, From :: term(), State :: #receiver_state{}) ->
    {reply, Reply :: term(), NewState :: #receiver_state{}} | {noreply, NewState :: #receiver_state{}}
    when Request :: {chunk, wa_raft_transport:transport_id(), wa_raft_transport:file_id(), integer(), binary()}.
handle_call({chunk, ID, FileID, Offset, Data}, _From, #receiver_state{} = State0) ->
    {Reply, NewState} = case open_file(ID, FileID, State0) of
        {ok, Fd, State1} ->
            case prim_file:pwrite(Fd, Offset, Data) of
                ok ->
                    wa_raft_transport:update_file_info(ID, FileID,
                        fun (#{completed_bytes := Completed} = Info) ->
                            Info#{completed_bytes := Completed + byte_size(Data)}
                        end),

                    {ok, State1};
                {error, Reason} ->
                    ?RAFT_LOG_WARNING(
                        "wa_raft_dist_transport receiver failed to write file chunk ~p/~p @ ~p due to ~p",
                        [ID, FileID, Offset, Reason]
                    ),
                    {{write_failed, Reason}, State1}
            end;
        {error, Reason, State1} ->
            ?RAFT_LOG_WARNING(
                "wa_raft_dist_transport receiver failed to handle file chunk ~p/~p @ ~p due to open failing due to ~p",
                [ID, FileID, Offset, Reason]
            ),
            {{open_failed, Reason}, State1}
    end,
    {reply, Reply, NewState};
handle_call(Request, From, #receiver_state{} = State) ->
    ?RAFT_LOG_NOTICE("wa_raft_dist_transport got unrecognized call ~p from ~p", [Request, From]),
    {noreply, State}.

-spec handle_cast(Request, State :: #receiver_state{}) -> {noreply, NewState :: #receiver_state{}}
    when Request :: {complete,  wa_raft_transport:transport_id(), wa_raft_transport:file_id()}.
handle_cast({complete, ID, FileID}, #receiver_state{} = State0) ->
    case open_file(ID, FileID, State0) of
        {ok, _Fd, State1} ->
            {ok, State2} = close_file(ID, FileID, State1),
            wa_raft_transport:complete(ID, FileID, ok),
            {noreply, State2};
        {error, Reason, State1} ->
            ?RAFT_LOG_WARNING(
                "wa_raft_dist_transport receiver failed to handle file complete ~p/~p due to open failing due to ~p",
                [ID, FileID, Reason]
            ),
            {noreply, State1}
    end;
handle_cast(Request, #receiver_state{} = State) ->
    ?RAFT_LOG_NOTICE("wa_raft_dist_transport got unrecognized cast ~p", [Request]),
    {noreply, State}.

-spec terminate(Reason :: dynamic(), State :: #receiver_state{}) -> ok.
terminate(Reason, #receiver_state{}) ->
    ?RAFT_LOG_NOTICE("wa_raft_dist_transport terminating due to ~p", [Reason]),
    ok.

-spec open_file(ID :: wa_raft_transport:transport_id(), FileID :: wa_raft_transport:file_id(), State :: #receiver_state{}) ->
    {ok, Fd :: file:fd(), NewState :: #receiver_state{}} | {error, Reason :: term(), NewState :: #receiver_state{}}.
open_file(ID, FileID, #receiver_state{fds = Fds} = State0) ->
    case Fds of
        #{{ID, FileID} := Fd} ->
            {ok, Fd, State0};
        #{} ->
            case wa_raft_transport:file_info(ID, FileID) of
                {ok, #{name := File, path := Path}} ->
                    try filelib:ensure_dir(Path)
                    catch _:_ -> ok
                    end,
                    case prim_file:open(Path, [binary, write]) of
                        {ok, Fd} ->
                            State1 = State0#receiver_state{fds = Fds#{{ID, FileID} => Fd}},
                            {ok, Fd, State1};
                        {error, Reason} ->
                            ?RAFT_LOG_WARNING(
                                "wa_raft_dist_transport receiver failed to open file ~p/~p (~p) due to ~p",
                                [ID, FileID, File, Reason]
                            ),
                            {error, {open_failed, Reason}, State0}
                    end;
                _ ->
                    {error, invalid_file, State0}
            end
    end.

-spec close_file(ID :: wa_raft_transport:transport_id(), FileID :: wa_raft_transport:file_id(), State :: #receiver_state{}) ->
    {ok, NewState :: #receiver_state{}}.
close_file(ID, FileID, #receiver_state{fds = Fds} = State0) ->
    case Fds of
        #{{ID, FileID} := Fd} ->
            try prim_file:close(Fd)
            catch _:_ -> ok
            end,
            State1 = State0#receiver_state{fds = maps:remove({ID, FileID}, Fds)},
            {ok, State1};
        _ ->
            {ok, State0}
    end.