lib/ex_cmd/stream.ex

defmodule ExCmd.Stream do
  @moduledoc """
  Defines a `ExCmd.Stream` struct returned by `ExCmd.stream!/2`.
  """

  alias ExCmd.Process
  alias ExCmd.Process.Error

  defmodule AbnormalExit do
    defexception [:message, :exit_status]

    @impl true
    def exception(exit_status) do
      msg = "program exited with exit status: #{exit_status}"
      %__MODULE__{message: msg, exit_status: exit_status}
    end
  end

  defmodule Sink do
    @moduledoc false
    defstruct [:process]

    defimpl Collectable do
      def into(%{process: process} = stream) do
        collector_fun = fn
          :ok, {:cont, x} ->
            :ok = Process.write(process, x)

          :ok, :done ->
            :ok = Process.close_stdin(process)
            stream

          :ok, :halt ->
            :ok = Process.close_stdin(process)
        end

        {:ok, collector_fun}
      end
    end
  end

  defstruct [:process, :stream_opts]

  @type t :: %__MODULE__{}

  @doc false
  def __build__(cmd_with_args, opts) do
    {stream_opts, process_opts} = Keyword.split(opts, [:exit_timeout, :input])

    case normalize_stream_opts(stream_opts) do
      {:ok, stream_opts} ->
        {:ok, process} = Process.start_link(cmd_with_args, process_opts)

        start_input_streamer(%Sink{process: process}, stream_opts[:input])

        %ExCmd.Stream{
          process: process,
          stream_opts: stream_opts
        }

      {:error, error} ->
        raise ArgumentError, message: error
    end
  end

  @doc false
  defp start_input_streamer(sink, input) do
    case input do
      :no_input ->
        :ok

      {:enumerable, enum} ->
        spawn_link(fn ->
          Enum.into(enum, sink)
        end)

      {:collectable, func} ->
        spawn_link(fn ->
          func.(sink)
        end)
    end
  end

  defimpl Enumerable do
    def reduce(%{process: process, stream_opts: stream_opts}, acc, fun) do
      start_fun = fn -> :ok end

      next_fun = fn :ok ->
        case Process.read(process) do
          {:ok, x} ->
            {[x], :ok}

          :eof ->
            {:halt, :normal}

          error ->
            raise Error, "Failed to read data from the program. error: #{inspect(error)}"
        end
      end

      after_fun = fn exit_type ->
        try do
          # always close stdin before stopping to give the command chance to exit properly
          Process.close_stdin(process)

          if exit_type == :normal do
            result = Process.await_exit(process, stream_opts[:exit_timeout])

            case result do
              {:ok, 0} ->
                :ok

              {:ok, status} ->
                raise AbnormalExit, status

              :timeout ->
                raise Error, "program fail to exit within timeout: #{stream_opts.exit_timeout}"
            end
          end
        after
          Process.stop(process)
        end
      end

      Stream.resource(start_fun, next_fun, after_fun).(acc, fun)
    end

    def count(_stream) do
      {:error, __MODULE__}
    end

    def member?(_stream, _term) do
      {:error, __MODULE__}
    end

    def slice(_stream) do
      {:error, __MODULE__}
    end
  end

  @spec normalize_input(term) ::
          {:ok, :no_input} | {:ok, {:enumerable, term}} | {:ok, {:collectable, function}}
  defp normalize_input(term) do
    cond do
      is_nil(term) ->
        {:ok, :no_input}

      is_binary(term) ->
        {:ok, {:enumerable, [IO.iodata_to_binary(term)]}}

      is_function(term, 1) ->
        {:ok, {:collectable, term}}

      Enumerable.impl_for(term) ->
        {:ok, {:enumerable, term}}

      true ->
        {:error, "`:input` must be either Enumerable or a function which accepts collectable"}
    end
  end

  defp normalize_exit_timeout(timeout) do
    case timeout do
      nil ->
        {:ok, :infinity}

      :infinity ->
        {:ok, :infinity}

      timeout when is_integer(timeout) and timeout > 0 ->
        {:ok, timeout}

      _ ->
        {:error, ":exit_timeout must be either :infinity or a non-zero integer"}
    end
  end

  defp normalize_stream_opts(opts) do
    with {:ok, input} <- normalize_input(opts[:input]),
         {:ok, exit_timeout} <- normalize_exit_timeout(opts[:exit_timeout]) do
      opts = %{
        input: input,
        exit_timeout: exit_timeout
      }

      {:ok, opts}
    end
  end
end