lib/k8s/client/runner/stream/list_request.ex

defmodule K8s.Client.Runner.Stream.ListRequest do
  @moduledoc "`:list` `K8s.Operation` encapsulated with pagination and `K8s.Conn`"

  alias K8s.Client.Runner.Base

  @limit 10
  @typedoc "Pagination continue token"
  @type continue_t :: nil | :halt | binary()

  @type responses_t :: map() | {:error, K8s.Client.HTTPError.t()}

  @typedoc "List operation as a Stream data type"
  @type t :: %__MODULE__{
          operation: K8s.Operation.t(),
          conn: K8s.Conn.t(),
          continue: continue_t,
          limit: pos_integer,
          http_opts: Keyword.t()
        }
  defstruct operation: nil, conn: nil, continue: nil, http_opts: [], limit: @limit

  @spec stream(K8s.Conn.t(), K8s.Operation.t(), Keyword.t()) :: Enumerable.t(responses_t)
  def stream(conn, op, http_opts) do
    Stream.resource(
      fn ->
        struct!(__MODULE__, operation: op, conn: conn, http_opts: http_opts)
      end,
      &next_item/1,
      &Function.identity/1
    )
  end

  @spec next_item(t()) :: {responses_t(), t()}
  def next_item(%__MODULE__{continue: :halt}), do: {:halt, nil}

  def next_item(state) do
    with {:ok, response} <-
           list(state.conn, state.operation, state.http_opts, state.limit, state.continue),
         cont <- maybe_continue(response),
         {:ok, items} <- Map.fetch(response, "items") do
      {items, struct!(state, continue: cont)}
    else
      :halt ->
        {:halt, nil}

      :error ->
        {:halt, nil}

      {:error, error} ->
        {[{:error, error}], struct!(state, continue: :halt)}
    end
  end

  @spec list(K8s.Conn.t(), K8s.Operation.t(), Keyword.t(), integer(), continue_t()) ::
          K8s.Client.Provider.response_t()
  defp list(conn, op, http_opts, limit, continue) do
    new_params = [limit: limit, continue: continue]
    http_opts = Keyword.update(http_opts, :params, new_params, &Keyword.merge(&1, new_params))

    Base.run(conn, op, http_opts)
  end

  @spec maybe_continue(map | :halt) :: continue_t()
  defp maybe_continue(%{"metadata" => %{"continue" => ""}}), do: :halt

  defp maybe_continue(%{"metadata" => %{"continue" => cont}}) when is_binary(cont),
    do: cont

  defp maybe_continue(_map), do: :halt
end