lib/k8s/client/runner/stream.ex

defmodule K8s.Client.Runner.Stream do
  @moduledoc """
  Takes a `K8s.Client.list/3` operation and returns an Elixir [`Stream`](https://hexdocs.pm/elixir/Stream.html) of resources.
  """
  alias K8s.Client.Runner.Base
  alias K8s.Client.Runner.Stream.ListRequest
  alias K8s.Client.Runner.Stream.Watch
  alias K8s.Conn
  alias K8s.Operation
  alias K8s.Operation.Error

  @supported_operations [:list, :list_all_namespaces, :watch, :watch_all_namespaces, :connect]

  @doc """
  Validates operation type before calling `stream/3`. Only supports verbs: `list_all_namespaces` and `list`.
  """
  @spec run(Operation.t()) :: {:ok, Enumerable.t()} | {:error, Error.t()}
  def run(%Operation{conn: %Conn{} = conn} = op), do: run(conn, op, [])

  @spec run(Operation.t(), keyword()) :: {:ok, Enumerable.t()} | {:error, Error.t()}
  def run(%Operation{conn: %Conn{} = conn} = op, http_opts), do: run(conn, op, http_opts)

  @spec run(Conn.t(), Operation.t()) :: {:ok, Enumerable.t()} | {:error, Error.t()}
  def run(%Conn{} = conn, %Operation{} = op), do: run(conn, op, [])

  @spec run(Conn.t(), Operation.t(), keyword()) :: {:ok, Enumerable.t()} | {:error, Error.t()}
  def run(%Conn{} = conn, %Operation{verb: verb} = op, http_opts)
      when verb in [:list, :list_all_namespaces] do
    op = name_as_field_selector(op)
    {:ok, ListRequest.stream(conn, op, http_opts)}
  end

  def run(%Conn{} = conn, %Operation{verb: verb} = op, http_opts)
      when verb in [:watch, :watch_all_namespaces] do
    op = name_as_field_selector(op)
    Watch.stream(conn, op, http_opts)
  end

  def run(%Conn{} = conn, %Operation{verb: :connect} = op, http_opts) do
    Base.stream(conn, op, http_opts)
  end

  def run(op, _, _) do
    msg = "Only #{inspect(@supported_operations)} operations can be streamed. #{inspect(op)}"

    {:error, %Error{message: msg}}
  end

  @spec name_as_field_selector(Operation.t()) :: Operation.t()
  defp name_as_field_selector(operation) do
    {name, operation} = pop_in(operation, [Access.key(:path_params), :name])

    if is_nil(name), do: operation, else: K8s.Selector.field(operation, {"metadata.name", name})
  end
end