lib/ex_cmd/process.ex

defmodule ExCmd.Process do
  @moduledoc """
  Server to interact with external process

  `ExCmd.stream!` should be preferred over this. Use this only if you need more control over the life-cycle of IO streams and OS process.
  """

  defmodule Error do
    defexception [:message]
  end

  @default [log: false]

  @doc """
  Starts a process using `cmd_with_args` and with options `opts`

  `cmd_with_args` must be a list containing command with arguments. example: `["cat", "file.txt"]`.

  ### Options
    * `cd`             -  the directory to run the command in
    * `env`            -  a list of tuples containing environment key-value. These can be accessed in the external program
    * `log`            -  When set to `true` odu logs and command stderr output are logged. Defaults to `false`
  """
  @spec start_link(nonempty_list(String.t()),
          cd: String.t(),
          env: [{String.t(), String.t()}],
          log: boolean()
        ) :: {:ok, pid()} | {:error, any()}
  def start_link([cmd | args], opts \\ []) do
    opts = Keyword.merge(@default, opts)
    odu_path = odu_path()

    if !odu_path do
      raise Error, message: "'odu' executable not found"
    end

    cmd_path = :os.find_executable(to_charlist(cmd))

    if !cmd_path do
      raise Error, message: "'#{cmd}' executable not found"
    end

    GenStateMachine.start_link(__MODULE__, %{
      odu_path: odu_path,
      cmd_with_args: [to_string(cmd_path) | args],
      opts: opts
    })
  end

  # client

  @doc """
  Return bytes written by the program to output stream.

  This blocks until the programs write and flush the output
  """
  @spec read(pid, non_neg_integer | :infinity) ::
          {:ok, iodata} | :eof | {:error, String.t()} | :closed
  def read(server, timeout \\ :infinity) do
    GenStateMachine.call(server, :read, timeout)
  end

  @doc """
  Writes iodata `data` to programs input streams

  This blocks when the pipe is full
  """
  @spec write(pid, iodata, non_neg_integer | :infinity) :: :ok | {:error, String.t()} | :closed
  def write(server, data, timeout \\ :infinity) do
    GenStateMachine.call(server, {:write, data}, timeout)
  end

  @doc """
  Closes input stream. Which signal EOF to the program
  """
  @spec close_stdin(pid) :: :ok | {:error, any()}
  def close_stdin(server), do: GenStateMachine.call(server, :close_stdin)

  @doc """
  Kills the program
  """
  def stop(server), do: GenStateMachine.stop(server, :normal)

  @doc """
  Returns status of the process. It will be either of `:started`, `{:done, exit_status}`
  """
  @spec status(pid) :: :started | {:done, integer()}
  def status(server), do: GenStateMachine.call(server, :status)

  @doc """
  Returns os pid of the command
  """
  @spec os_pid(pid) :: integer()
  def os_pid(server), do: GenStateMachine.call(server, :os_pid)

  @doc """
  Waits for the program to terminate.

  If the program terminates before timeout, it returns `{:ok, exit_status}` else returns `:timeout`
  """
  @spec await_exit(pid, timeout: timeout()) :: {:ok, integer()} | :timeout
  def await_exit(server, timeout \\ :infinity),
    do: GenStateMachine.call(server, {:await_exit, timeout})

  @doc """
  Returns [port_info](http://erlang.org/doc/man/erlang.html#port_info-1)
  """
  def port_info(server), do: GenStateMachine.call(server, :port_info)

  ## server
  require Logger
  use GenStateMachine, callback_mode: :handle_event_function

  @doc false
  defmacro send_input, do: 1

  @doc false
  defmacro send_output, do: 2

  @doc false
  defmacro output, do: 3

  @doc false
  defmacro input, do: 4

  @doc false
  defmacro close_input, do: 5

  @doc false
  defmacro output_eof, do: 6

  @doc false
  defmacro command_env, do: 7

  @doc false
  defmacro os_pid, do: 8

  @doc false
  defmacro start_error, do: 9

  # 4 byte length prefix + 1 byte tag
  @max_chunk_size 64 * 1024 - 5

  def init(params) do
    actions = [{:next_event, :internal, :setup}]
    {:ok, :init, params, actions}
  end

  def handle_event(:internal, :setup, :init, params) do
    Process.flag(:trap_exit, true)

    odu_opts = Keyword.take(params.opts, [:log, :cd])
    port = start_odu_port(params.odu_path, params.cmd_with_args, odu_opts)
    send_env(params.opts[:env], port)

    os_pid =
      receive do
        {^port, {:data, <<os_pid()::unsigned-integer-8, os_pid::big-unsigned-integer-32>>}} ->
          Logger.debug("Command started. os pid: #{os_pid}")
          os_pid

        {^port, {:data, <<start_error()::unsigned-integer-8, reason::binary>>}} ->
          Logger.error("Failed to start odu. reason: #{reason}")
          raise Error, message: "Failed to start odu"
      after
        5_000 ->
          raise Error, message: "Failed to start command"
      end

    data = %{
      pending_write: [],
      pending_read: [],
      input_ready: false,
      waiting_processes: MapSet.new(),
      port: port,
      os_pid: os_pid
    }

    {:next_state, :started, data, []}
  end

  def handle_event({:call, from}, {:await_exit, timeout}, state, data) do
    case state do
      {:done, exit_status} ->
        {:keep_state_and_data, [{:reply, from, {:ok, exit_status}}]}

      _ ->
        actions = [{{:timeout, {:await_exit, from}}, timeout, nil}]
        data = %{data | waiting_processes: MapSet.put(data.waiting_processes, from)}
        {:keep_state, data, actions}
    end
  end

  def handle_event({:call, from}, :status, state, _data) do
    {:keep_state_and_data, [{:reply, from, state}]}
  end

  def handle_event({:call, from}, :os_pid, _state, %{os_pid: os_pid}) do
    {:keep_state_and_data, [{:reply, from, os_pid}]}
  end

  def handle_event({:call, from}, :port_info, state, data) when state not in [:init, :setup] do
    {:keep_state_and_data, [{:reply, from, Port.info(data.port)}]}
  end

  def handle_event(:internal, :input_ready, _state, data) do
    {data, actions} = try_sending_input(data)
    {:keep_state, data, actions}
  end

  def handle_event({:call, from}, {:write, iodata}, :started, data) do
    bin = IO.iodata_to_binary(iodata)
    data = %{data | pending_write: data.pending_write ++ [{from, bin}]}
    {data, actions} = try_sending_input(data)
    {:keep_state, data, actions}
  end

  def handle_event({:call, from}, {:write, _iodata}, _state, _) do
    {:keep_state_and_data, [{:reply, from, {:error, :epipe}}]}
  end

  def handle_event({:call, from}, :read, state, data) when state in [:started, :input_closed] do
    {:keep_state, request_output(from, data), []}
  end

  def handle_event({:call, from}, :read, _state, _) do
    {:keep_state_and_data, [{:reply, from, :eof}]}
  end

  def handle_event({:call, from}, :close_stdin, :started, data) do
    {data, actions} = close_stream(:stdin, from, data)
    {data, write_actions} = handle_stdin_close(data)

    {:next_state, :input_closed, data, actions ++ write_actions}
  end

  def handle_event({:call, from}, :close_stdin, _, _data) do
    {:keep_state_and_data, [{:reply, from, :ok}]}
  end

  def handle_event(:info, {:EXIT, port, _reason}, state, %{port: port} = data) do
    {data, write_actions} = handle_stdin_close(data)
    {data, read_actions} = handle_eof(data)
    {data, await_exit_actions} = reply_await_exit(data, {:error, :stopped})

    if state in [:started, :input_closed] do
      {:next_state, :port_closed, data, write_actions ++ read_actions ++ await_exit_actions}
    else
      {:keep_state, data, write_actions ++ read_actions ++ await_exit_actions}
    end
  end

  def handle_event(:info, {:EXIT, _, reason}, _, data) do
    {:stop_and_reply, reason, [], data}
  end

  def handle_event(:info, {port, {:exit_status, exit_status}}, _, %{port: port} = data) do
    Logger.debug("command exited with status: #{exit_status}")

    {data, write_actions} = handle_stdin_close(data)
    {data, read_actions} = handle_eof(data)
    {data, await_exit_actions} = reply_await_exit(data, {:ok, exit_status})

    {:next_state, {:done, exit_status}, data, write_actions ++ read_actions ++ await_exit_actions}
  end

  def handle_event(:info, {port, {:data, output}}, _, %{port: port} = data) do
    <<tag::unsigned-integer-8, bin::binary>> = output
    {data, actions} = handle_command(tag, bin, data)
    {:keep_state, data, actions}
  end

  def handle_event({:timeout, {:await_exit, from}}, _, _, data) do
    {:keep_state, %{data | waiting_processes: MapSet.delete(data.waiting_processes, from)},
     [{:reply, from, :timeout}]}
  end

  defp start_odu_port(odu_path, cmd_with_args, opts) do
    args = build_odu_params(opts) ++ ["--" | cmd_with_args]
    options = [:use_stdio, :exit_status, :binary, :hide, {:packet, 4}, args: args]
    Port.open({:spawn_executable, odu_path}, options)
  end

  @odu_protocol_version "1.0"
  defp build_odu_params(opts) do
    cd = Path.expand(opts[:cd] || File.cwd!())

    if !File.exists?(cd) || !File.dir?(cd) do
      raise Error, message: ":cd is not a valid path"
    end

    params = ["-cd", cd, "-protocol_version", @odu_protocol_version]

    if opts[:log] do
      params ++ ["-log", "|2"]
    else
      params
    end
  end

  defp handle_command(output_eof(), <<>>, data) do
    handle_eof(data)
  end

  defp handle_command(output(), bin, %{pending_read: [pid | pending]} = data) do
    actions = [{:reply, pid, {:ok, bin}}]

    data =
      if Enum.empty?(pending) do
        %{data | pending_read: []}
      else
        send_command(send_output(), <<>>, data.port)
        %{data | pending_read: pending}
      end

    {data, actions}
  end

  defp handle_command(send_input(), <<>>, data) do
    data = %{data | input_ready: true}
    actions = [{:next_event, :internal, :input_ready}]

    {data, actions}
  end

  defp send_env(nil, port), do: send_env([], port)

  defp send_env(env, port) do
    payload =
      Enum.map(env, fn {key, value} ->
        entry = String.trim(key) <> "=" <> String.trim(value)

        if byte_size(entry) > 65536 do
          raise Error, message: "Env entry length exceeds limit"
        end

        <<byte_size(entry)::big-unsigned-integer-16, entry::binary>>
      end)
      |> Enum.join()

    send_command(command_env(), payload, port)
  end

  defp handle_stdin_close(data) do
    actions =
      Enum.flat_map(data.pending_write, fn {pid, _} ->
        [{:reply, pid, {:error, :epipe}}]
      end)

    {%{data | pending_write: []}, actions}
  end

  defp handle_eof(data) do
    actions =
      Enum.flat_map(data.pending_read, fn pid ->
        [{:reply, pid, :eof}]
      end)

    {%{data | pending_read: []}, actions}
  end

  defp reply_await_exit(data, response) do
    actions =
      Enum.flat_map(data.waiting_processes, fn pid ->
        [{:reply, pid, response}, {{:timeout, {:await_exit, pid}}, :infinity, nil}]
      end)

    {%{data | waiting_processes: MapSet.new()}, actions}
  end

  defp try_sending_input(%{pending_write: [{pid, bin} | pending], input_ready: true} = data) do
    {chunk, bin} = binary_split_at(bin, @max_chunk_size)
    send_command(input(), chunk, data.port)

    if bin == <<>> do
      actions = [{:reply, pid, :ok}]
      data = %{data | pending_write: pending, input_ready: false}
      {data, actions}
    else
      data = %{data | pending_write: [{pid, bin} | pending], input_ready: false}
      {data, []}
    end
  end

  defp try_sending_input(data) do
    {data, []}
  end

  defp request_output(from, %{pending_read: []} = data) do
    send_command(send_output(), <<>>, data.port)
    %{data | pending_read: [from]}
  end

  defp request_output(from, data) do
    %{data | pending_read: data.pending_read ++ [from]}
  end

  defp close_stream(:stdin, pid, data) do
    send_command(close_input(), <<>>, data.port)
    actions = [{:reply, pid, :ok}]
    {data, actions}
  end

  defp send_command(tag, bin, port) do
    bin = <<tag::unsigned-integer-8, bin::binary>>
    Port.command(port, bin)
  end

  defp binary_split_at(bin, pos) when byte_size(bin) <= pos, do: {bin, <<>>}

  defp binary_split_at(bin, pos) do
    len = byte_size(bin)
    {binary_part(bin, 0, pos), binary_part(bin, pos, len - pos)}
  end

  defp odu_path do
    Application.app_dir(:ex_cmd, "priv")
    |> Path.join(Mix.Tasks.Compile.Odu.executable_name())
  end
end