src/alcove_drv.erl

%%% Copyright (c) 2014-2022, Michael Santos <michael.santos@gmail.com>
%%%
%%% Permission to use, copy, modify, and/or distribute this software for any
%%% purpose with or without fee is hereby granted, provided that the above
%%% copyright notice and this permission notice appear in all copies.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
%%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
%%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
%%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
%%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
%%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
%%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-module(alcove_drv).

-behaviour(gen_server).

-include_lib("alcove/include/alcove.hrl").

%% API
-export([start/0, start/1, start/2, stop/1]).
-export([start_link/0, start_link/1, start_link/2]).
-export([call/5]).
-export([stdin/3, stdout/3, stderr/3, event/3]).
-export([raw/1, getopts/1, progname/0, port/1]).

%% gen_server callbacks
-export([
    init/1,
    handle_call/3,
    handle_cast/2,
    handle_info/2,
    terminate/2,
    code_change/3
]).

-type ref() :: pid().

-export_type([ref/0]).

-record(state, {
    owner :: pid(),
    raw = false,
    port :: port(),
    fdctl :: port(),
    buf = <<>> :: binary()
}).

% @doc Create the alcove port using the default options
-spec start() -> ignore | {error, _} | {ok, pid()}.
start() ->
    start(self(), []).

% @doc Create the alcove port
%
% See start_link/1 for options.
-spec start(proplists:proplist()) -> ignore | {error, _} | {ok, pid()}.
start(Options) ->
    start(self(), Options).

-spec start(pid(), proplists:proplist()) -> ignore | {error, _} | {ok, pid()}.
start(Owner, Options) ->
    gen_server:start(?MODULE, [Owner, Options], []).

% @doc Create and link against the alcove port using the default options
-spec start_link() -> ignore | {error, _} | {ok, pid()}.
start_link() ->
    start_link(self(), []).

% @doc Create and link against the alcove port
%
% Create the alcove port.
%
% * stderr_to_stdout
%
%   The behaviour of stderr from the port differs from child
%   processes. Standard error from the port goes to the console
%   while stderr from child processes is tagged and sent to the
%   controlling Erlang process.
%
%   This option merges stderr and stdout from the port. Since
%   stdout is used for communicating with the Erlang side and
%   is tagged with a header, this will likely mess things up.
%
%   Only use this option if you want to call execvp/3,4 in
%   the port.
%
% * {env, [{Key,Val}]}
%
%   Set the environment for the port.
%
% * {exec, Exec}
%
%   Default: ""
%
%   Sets a command to run the port, such as sudo.
%
% * {progname, Path}
%
%   Default: priv/alcove
%
%   Sets the path to the alcove executable.
%
% * {ctldir, Path}
%
%   Default: priv
%
%   Sets the path to the alcove control directory. The directory
%   must be writable by the alcove user.
%
% For the remaining options, see alcove:getopt/2,3.
-spec start_link(proplists:proplist()) -> ignore | {error, _} | {ok, pid()}.
start_link(Options) ->
    start_link(self(), Options).

-spec start_link(pid(), proplists:proplist()) -> ignore | {error, _} | {ok, pid()}.
start_link(Owner, Options) ->
    gen_server:start_link(?MODULE, [Owner, Options], []).

-spec stop(ref()) -> ok.
stop(Drv) ->
    catch gen_server:call(Drv, stop),
    ok.

-spec call(ref(), [alcove:pid_t()], atom(), list(), timeout()) -> term().
call(Drv, Pids, Command, Argv, Timeout) when
    is_list(Pids),
    is_atom(Command),
    is_list(Argv),
    (is_integer(Timeout) orelse Timeout =:= infinity)
->
    Data = alcove_codec:call(Command, Pids, Argv),
    case sync_send(Drv, Data) of
        ok ->
            call_reply(Drv, Pids, alcove_proto:noreturn(Command), Timeout);
        Error ->
            Error
    end.

-spec sync_send(ref(), iodata()) -> ok | {alcove_error, badarg}.
sync_send(Drv, Data) ->
    case iolist_size(Data) =< 16#ffff of
        true ->
            gen_server:call(Drv, {send, Data}, infinity);
        false ->
            {alcove_error, badarg}
    end.

-spec send(ref(), iodata()) -> ok | {alcove_error, badarg}.
send(Drv, Data) ->
    case iolist_size(Data) =< 16#ffff of
        true ->
            gen_server:cast(Drv, {send, Data});
        false ->
            {alcove_error, badarg}
    end.

-spec stdin(ref(), [alcove:pid_t()], iodata()) -> ok | {alcove_error, badarg}.
stdin(Drv, Pids, Data) ->
    send(Drv, alcove_codec:stdin(Pids, Data)).

-spec stdout(ref(), [alcove:pid_t()], timeout()) ->
    false | binary() | {alcove_error, any()} | {alcove_pipe, integer()}.
stdout(Drv, Pids, Timeout) ->
    reply(Drv, Pids, alcove_stdout, Timeout).

-spec stderr(ref(), [alcove:pid_t()], timeout()) ->
    false | binary() | {alcove_error, any()} | {alcove_pipe, integer()}.
stderr(Drv, Pids, Timeout) ->
    reply(Drv, Pids, alcove_stderr, Timeout).

-spec event(ref(), [alcove:pid_t()], timeout()) -> term().
event(Drv, Pids, Timeout) ->
    reply(Drv, Pids, alcove_event, Timeout).

-spec raw(ref()) -> ok.
raw(Drv) ->
    gen_server:call(Drv, raw).

-spec port(ref()) -> port().
port(Drv) ->
    gen_server:call(Drv, port).

%%--------------------------------------------------------------------
%%% Callbacks
%%--------------------------------------------------------------------

% init/2 uses an undocumented feature of open_port/2 to set up a fifo
% between the alcove port process and beam. The close of the fifo indicates
% the port has called exec().
%
% As a result, dialyzer will warn about the first argument passed to
% open_port/2.
-dialyzer({nowarn_function, init/1}).
-dialyzer({no_unused, call_unlink/2}).

init([Owner, Options]) ->
    process_flag(trap_exit, true),

    % Control fifo for the port
    Ctldir = proplists:get_value(ctldir, Options, basedir(?MODULE)),
    Fifo = lists:concat([
        maybe_string(Ctldir),
        "/fdctl.",
        erlang:phash2([os:getpid(), self()])
    ]),

    [Cmd | Argv] = getopts([{fdctl, Fifo} | Options]),
    PortOpt = lists:filter(
        fun
            (stderr_to_stdout) -> true;
            ({env, _}) -> true;
            (_) -> false
        end,
        Options
    ),

    Port = erlang:open_port(
        {spawn_executable, Cmd},
        [
            {args, Argv},
            stream,
            binary
        ] ++ PortOpt
    ),

    % Block until the port has fully initialized
    receive
        {Port, {data, Data}} ->
            {alcove_call, [], ok} = alcove_codec:decode(Data),

            Fdctl = erlang:open_port(Fifo, [in]),

            % Decrease the link count of the fifo. The fifo is deleted in
            % the port because the port may be running as a different user.
            ok = call_unlink(Port, Fifo),
            {ok, #state{
                port = Port,
                fdctl = Fdctl,
                owner = Owner
            }};
        {'EXIT', Port, normal} ->
            {stop, {error, port_init_failed}};
        {'EXIT', Port, Reason} ->
            {stop, {error, Reason}}
    end.

handle_call({send, Buf}, {Owner, _Tag}, #state{port = Port, owner = Owner} = State) ->
    true = erlang:port_command(Port, Buf),
    {reply, ok, State};
handle_call(raw, {Owner, _Tag}, #state{owner = Owner} = State) ->
    {reply, ok, State#state{raw = true}};
handle_call(port, {Owner, _Tag}, #state{owner = Owner, port = Port} = State) ->
    {reply, Port, State};
handle_call(stop, _From, State) ->
    {stop, normal, ok, State};
handle_call(_, {Owner, _Tag}, #state{owner = Owner} = State) ->
    {reply, {error, enotsup}, State};
handle_call(_, _, State) ->
    {reply, {error, not_owner}, State}.

handle_cast({send, Buf}, #state{port = Port} = State) ->
    erlang:port_command(Port, Buf),
    {noreply, State};
handle_cast(_Msg, State) ->
    {noreply, State}.

terminate(_Reason, #state{port = Port, fdctl = Fdctl}) ->
    catch erlang:port_close(Port),
    catch erlang:port_close(Fdctl),
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%%--------------------------------------------------------------------
%%% Port communication
%%--------------------------------------------------------------------

% Reply from a child process.
%
% Several writes from the child process may be coalesced into 1 read by
% the parent.
handle_info(
    {Port, {data, Data}},
    #state{raw = true, port = Port, buf = Buf, owner = Owner} = State
) ->
    Owner ! {alcove_stdout, self(), [], <<Buf/binary, Data/binary>>},
    {noreply, State};
handle_info({Port, {data, Data}}, #state{port = Port, buf = Buf, owner = Owner} = State) ->
    {Msgs, Rest} = alcove_codec:stream(<<Buf/binary, Data/binary>>),
    Terms = [alcove_codec:decode(Msg) || Msg <- Msgs],
    _ = [Owner ! {Tag, self(), Pids, Term} || {Tag, Pids, Term} <- Terms],
    {noreply, State#state{buf = Rest}};
handle_info({'EXIT', Port, Reason}, #state{port = Port} = State) ->
    {stop, {shutdown, Reason}, State};
% The write end of the fdctl fifo has been closed: either the port has
% exit'ed or called exec().
handle_info({'EXIT', Fdctl, _Reason}, #state{fdctl = Fdctl, owner = Owner} = State) ->
    Owner ! {alcove_ctl, self(), [], fdctl_closed},
    {noreply, State#state{raw = true}};
% WTF
handle_info(Info, State) ->
    error_logger:error_report([{wtf, Info}]),
    {noreply, State}.

%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
call_reply(Drv, Pids, true, Timeout) ->
    receive
        {alcove_ctl, Drv, Pids, fdctl_closed} ->
            ok;
        {alcove_ctl, Drv, Pids, Error} ->
            {alcove_error, Error};
        {alcove_pipe, Drv, Pids, Bytes} ->
            {alcove_error, {eagain, Bytes}};
        {alcove_call, Drv, Pids, Error} when Error =:= badarg; Error =:= undef ->
            {alcove_error, Error};
        {alcove_call, Drv, Pids, Event} ->
            Event
    after Timeout -> {alcove_error, timeout}
    end;
call_reply(Drv, Pids, false, Timeout) ->
    receive
        {alcove_ctl, Drv, Pids, fdctl_closed} ->
            call_reply(Drv, Pids, false, Timeout);
        {alcove_ctl, Drv, Pids, Error} ->
            {alcove_error, Error};
        {alcove_event, Drv, Pids, {termsig, _} = Event} ->
            {alcove_error, Event};
        {alcove_event, Drv, Pids, {exit_status, _} = Event} ->
            {alcove_error, Event};
        {alcove_call, Drv, Pids, Error} when Error =:= badarg; Error =:= undef ->
            {alcove_error, Error};
        {alcove_pipe, Drv, Pids, Bytes} ->
            {alcove_error, {eagain, Bytes}};
        {alcove_call, Drv, Pids, Event} ->
            Event
    after Timeout -> {alcove_error, timeout}
    end.

reply(Drv, Pids, Type, Timeout) ->
    receive
        {alcove_ctl, Drv, Pids, fdctl_closed} ->
            reply(Drv, Pids, Type, Timeout);
        {alcove_ctl, Drv, Pids, Error} ->
            {alcove_error, Error};
        {alcove_call, Drv, Pids, Error} when Error =:= badarg; Error =:= undef ->
            {alcove_error, Error};
        {alcove_pipe, Drv, Pids, Bytes} ->
            {alcove_pipe, {eagain, Bytes}};
        {Type, Drv, Pids, Event} ->
            Event
    after Timeout -> false
    end.

%%--------------------------------------------------------------------
%%% Port executable
%%--------------------------------------------------------------------
-spec getopts(proplists:proplist()) -> list(string() | [string()]).
getopts(Options0) when is_list(Options0) ->
    Exec = maybe_string(proplists:get_value(exec, Options0, "")),
    Progname = maybe_string(proplists:get_value(progname, Options0, progname())),

    Options = lists:map(
        fun
            (N) when is_atom(N) ->
                {N, true};
            ({_, _} = N) ->
                N
        end,
        Options0
    ),

    Switches = lists:append([optarg(N) || N <- Options]),
    [Cmd | Argv] = [N || N <- string:tokens(Exec, " ") ++ [Progname | Switches], N /= ""],
    [find_executable(Cmd) | Argv].

optarg({fdctl, Arg}) -> switch("c", Arg);
optarg({depth, Arg}) -> switch("d", integer_to_list(Arg));
optarg({maxchild, Arg}) -> switch("m", integer_to_list(Arg));
optarg(_) -> "".

switch(Switch, Arg) when is_binary(Arg) ->
    switch(Switch, binary_to_list(Arg));
switch(Switch, Arg) ->
    [lists:concat(["-", Switch]), Arg].

maybe_string(N) when is_list(N) -> N;
maybe_string(N) when is_binary(N) -> binary_to_list(N).

find_executable(Exe) ->
    case os:find_executable(Exe) of
        false ->
            erlang:error(badarg, [Exe]);
        N ->
            N
    end.

basedir(Module) ->
    case code:priv_dir(Module) of
        {error, bad_name} ->
            filename:join([
                filename:dirname(code:which(Module)),
                "..",
                "priv"
            ]);
        Dir ->
            Dir
    end.

-spec progname() -> string().
progname() ->
    filename:join([basedir(alcove), "alcove"]).

% Blocking functions for handling the Control fd fifo. These functions
% are called from init/1.
call_unlink(Port, File) ->
    Encode = alcove_codec:call(unlink, [], [File]),
    erlang:port_command(Port, Encode),
    Reply =
        receive
            {Port, {data, Data}} ->
                alcove_codec:decode(Data);
            {'EXIT', Port, normal} ->
                {error, port_init_failed};
            {'EXIT', Port, Reason} ->
                {error, Reason}
        end,
    case Reply of
        {alcove_call, [], ok} ->
            ok;
        {alcove_call, [], Error} ->
            Error;
        {error, _} = Error ->
            Error
    end.