lib/k8s/client/runner/stream.ex

defmodule K8s.Client.Runner.Stream do
  @moduledoc """
  Takes a `K8s.Client.list/3` operation and returns an Elixir [`Stream`](https://hexdocs.pm/elixir/Stream.html) of resources.
  """

  alias K8s.Client.Runner.Base
  alias K8s.Client.Runner.Stream.ListRequest
  alias K8s.Conn
  alias K8s.Operation
  alias K8s.Operation.Error

  @supported_operations [:list, :list_all_namespaces]

  @typedoc "List of items and pagination request"
  @type state_t :: {list(), ListRequest.t()}

  @typedoc "Halt streaming"
  @type halt_t :: {:halt, state_t}

  @doc """
  Validates operation type before calling `stream/3`. Only supports verbs: `list_all_namespaces` and `list`.
  """
  @spec run(Conn.t(), Operation.t(), keyword()) :: {:ok, Enumerable.t()} | {:error, Error.t()}
  def run(conn, op, http_opts \\ [])

  def run(%Conn{} = conn, %Operation{verb: verb} = op, http_opts)
      when verb in @supported_operations,
      do: {:ok, stream(conn, op, http_opts)}

  def run(op, _, _) do
    msg = "Only #{inspect(@supported_operations)} operations are supported. #{inspect(op)}"

    {:error, %Error{message: msg}}
  end

  @doc """
  Returns an elixir stream of paginated list results.

  Elements in stream will be HTTP bodies, or error tuples.

  Encountering an HTTP error mid-stream will halt the stream.
  """
  @spec stream(Conn.t(), Operation.t(), keyword | nil) :: Enumerable.t()
  def stream(%Conn{} = conn, %Operation{} = op, http_opts \\ []) do
    request = %ListRequest{
      operation: op,
      conn: conn,
      http_opts: http_opts
    }

    Stream.resource(
      fn -> {[], request} end,
      &next_item/1,
      &stop/1
    )
  end

  @doc false
  @spec next_item(state_t) :: state_t | halt_t
  # All items in list have been popped, get more
  def next_item({[], _request} = state) do
    fetch_next_page(state)
  end

  # Items are in list, pop one and keep on keeping on.
  def next_item({_items, _request} = state) do
    pop_item(state)
  end

  @doc false
  # fetches next page when item list is empty. Returns `:halt` to stream processor when
  # maybe_continue returns `:halt`
  @spec fetch_next_page(state_t) :: state_t | halt_t
  def fetch_next_page({_empty, %ListRequest{continue: :halt}} = state) do
    {:halt, state}
  end

  def fetch_next_page({_empty, next_request} = _state) do
    next_request
    |> list
    |> pop_item
  end

  @doc false
  # Make a list request and convert response to stream state
  @spec list(ListRequest.t()) :: state_t
  def list(%ListRequest{operation: operation} = request) do
    query_params = operation.query_params || []
    pagination_params = [limit: request.limit, continue: request.continue]
    merged_params = Keyword.merge(query_params, pagination_params)
    updated_operation = %Operation{operation | query_params: merged_params}
    paginated_request = %ListRequest{request | operation: updated_operation}

    response =
      Base.run(
        paginated_request.conn,
        paginated_request.operation,
        paginated_request.http_opts
      )

    case response do
      {:ok, response} ->
        items = Map.get(response, "items", [])
        next_request = ListRequest.make_next_request(paginated_request, response)
        {items, next_request}

      {:error, error} ->
        items = [{:error, error}]
        halt_requests = ListRequest.make_next_request(paginated_request, :halt)
        {items, halt_requests}
    end
  end

  # Return the next item to the stream caller (`[head]`) and return the tail and next request as the current state
  @spec pop_item(state_t) :: {list, state_t}
  defp pop_item({[], next}) do
    new_state = {[], next}
    {[], new_state}
  end

  defp pop_item({[head | tail], next} = _state) do
    new_state = {tail, next}
    {[head], new_state}
  end

  @doc false
  # Stop processing the stream.
  @spec stop(state_t) :: nil
  def stop(_state) do
    nil
  end
end