lib/ex_cmd/stream.ex

defmodule ExCmd.Stream do
  @moduledoc """
  Defines a `ExCmd.Stream` struct returned by `ExCmd.stream!/2`.
  """

  alias ExCmd.Process
  alias ExCmd.Process.Error

  defmodule Sink do
    @moduledoc false
    defstruct [:process]

    defimpl Collectable do
      def into(%{process: process} = stream) do
        collector_fun = fn
          :ok, {:cont, x} ->
            :ok = Process.write(process, x)

          :ok, :done ->
            :ok = Process.close_stdin(process)
            stream

          :ok, :halt ->
            :ok = Process.close_stdin(process)
        end

        {:ok, collector_fun}
      end
    end
  end

  defstruct [:process, :stream_opts]

  @default_opts [exit_timeout: :infinity]

  @type t :: %__MODULE__{}

  @doc false
  def __build__(cmd_with_args, opts) do
    {stream_opts, process_opts} = Keyword.split(opts, [:exit_timeout, :input])
    stream_opts = Keyword.merge(@default_opts, stream_opts)

    {:ok, process} = Process.start_link(cmd_with_args, process_opts)

    start_input_streamer(%Sink{process: process}, stream_opts[:input])
    %ExCmd.Stream{process: process, stream_opts: stream_opts}
  end

  @doc false
  defp start_input_streamer(sink, input) do
    cond do
      is_nil(input) ->
        :ok

      is_function(input, 1) ->
        spawn_link(fn ->
          input.(sink)
        end)

      Enumerable.impl_for(input) ->
        spawn_link(fn ->
          Enum.into(input, sink)
        end)

      true ->
        raise ArgumentError,
          message: ":input must be either Enumerable or a function with arity 1"
    end
  end

  defimpl Enumerable do
    def reduce(%{process: process, stream_opts: stream_opts}, acc, fun) do
      start_fun = fn -> :ok end

      next_fun = fn :ok ->
        case Process.read(process) do
          {:ok, x} ->
            {[x], :ok}

          :eof ->
            {:halt, :normal}

          error ->
            raise Error, "Failed to read data from the command. error: #{inspect(error)}"
        end
      end

      after_fun = fn exit_type ->
        try do
          # always close stdin before stoping to give the command chance to exit properly
          Process.close_stdin(process)

          result = Process.await_exit(process, stream_opts[:exit_timeout])

          if exit_type == :normal do
            case result do
              {:ok, 0} ->
                :ok

              {:ok, status} ->
                raise Error, "command exited with status: #{status}"

              :timeout ->
                raise Error, "command fail to exit within timeout: #{stream_opts.exit_timeout}"
            end
          end
        after
          Process.stop(process)
        end
      end

      Stream.resource(start_fun, next_fun, after_fun).(acc, fun)
    end

    def count(_stream) do
      {:error, __MODULE__}
    end

    def member?(_stream, _term) do
      {:error, __MODULE__}
    end

    def slice(_stream) do
      {:error, __MODULE__}
    end
  end
end