lib/k8s/client/runner/watch/stream.ex

defmodule K8s.Client.Runner.Watch.Stream do
  @moduledoc """
  Watches a `K8s.Client.list/3` operation and returns an Elixir [`Stream`](https://hexdocs.pm/elixir/Stream.html) of events.
  """

  alias K8s.Client.Runner.Base
  alias K8s.Client.Runner.Watch

  require Logger

  @log_prefix "#{__MODULE__} - " |> String.replace_leading("Elixir.", "")

  @type t :: %__MODULE__{
          resp: HTTPoison.AsyncResponse.t(),
          conn: K8s.Conn.t(),
          operation: K8s.Operation.t(),
          resource_version: binary(),
          http_opts: keyword(),
          remainder: binary()
        }

  defstruct [:resp, :conn, :operation, :resource_version, :http_opts, remainder: ""]

  @doc """
  Watches resources and returns an Elixir Stream of events emmitted by kubernetes.

  ### Example

      iex> {:ok,conn} = K8s.Conn.from_file("~/.kube/config", [context: "docker-desktop"])
      ...> op = K8s.Client.list("v1", "Namespace")
      ...> K8s.Client.Runner.Watch.Stream.resource(conn, op, []) |> Stream.map(&IO.inspect/1) |> Stream.run()
  """
  @spec resource(K8s.Conn.t(), K8s.Operation.t(), keyword()) :: Enumerable.t() | {:error, any()}
  def resource(conn, operation, http_opts) do
    Stream.resource(
      fn -> {:start, %__MODULE__{conn: conn, operation: operation, http_opts: http_opts}} end,
      &next_fun/1,
      fn _state -> :ok end
    )
  end

  @docp """
  Producing the next elements in the stream.
  * If the accumulator is {:recv, state}, receives and processes events from the HTTPoison process
  * If the accumulator is {:start, state}, tries to make new HTTPoison watcher request (see below)
  """
  @spec next_fun({:recv, t()}) :: {[map()], {:recv | :start, t()}} | {:halt, nil}
  defp next_fun({:recv, %__MODULE__{} = state}) do
    receive do
      %HTTPoison.AsyncEnd{} ->
        Logger.debug(
          @log_prefix <> "AsyncEnd received - tryin to restart watcher",
          library: :k8s
        )

        {[], {:start, state}}

      %HTTPoison.AsyncHeaders{} ->
        HTTPoison.stream_next(state.resp)
        {[], {:recv, state}}

      %HTTPoison.AsyncStatus{code: 200} ->
        HTTPoison.stream_next(state.resp)
        {[], {:recv, state}}

      %HTTPoison.AsyncStatus{code: 410} ->
        Logger.warn(
          @log_prefix <> "410 Gone received from watcher - resetting the resource version",
          library: :k8s
        )

        new_state = struct!(state, resource_version: nil)
        {[], {:recv, new_state}}

      %HTTPoison.AsyncStatus{code: _} = error ->
        Logger.warn(
          @log_prefix <> "Erronous async status received from watcher - aborting the watch",
          library: :k8s,
          error: error
        )

        {:halt, nil}

      %HTTPoison.AsyncChunk{chunk: chunk} ->
        HTTPoison.stream_next(state.resp)

        {chunk, state}
        |> transform_to_lines()
        |> transform_to_events()

      %HTTPoison.Error{reason: {:closed, :timeout}} ->
        Logger.debug(
          @log_prefix <> "Watch request timed out - resuming the watch",
          library: :k8s
        )

        {[], {:start, state}}

      other ->
        Logger.debug(
          @log_prefix <> "Wacher received unexpected message.",
          library: :k8s,
          message: other
        )

        # ignore other messages and continue
        {[], {:recv, state}}
    end
  end

  @docp """
  Tries to make new HTTPoison watcher request (self-healing)
  """
  @spec next_fun({:start, map()}) :: {[map()], {:recv, t()}} | {:halt, nil}
  defp next_fun({:start, state}) do
    %{conn: conn, operation: operation, http_opts: http_opts} = state

    #  get latest resource version if it is missing in the state
    resource_version =
      case Map.get(state, :resource_version) do
        nil ->
          {:ok, resource_version} = Watch.get_resource_version(conn, operation)
          resource_version

        rv ->
          rv
      end

    #  prepare http_opts
    http_opts =
      http_opts
      |> Keyword.put_new(:params, [])
      |> put_in([:params, :resourceVersion], resource_version)
      |> put_in([:params, :allowWatchBookmarks], true)
      |> put_in([:params, :watch], true)
      |> Keyword.put(:stream_to, self())
      |> Keyword.put(:async, :once)

    #  start the watcher
    case Base.run(conn, operation, http_opts) do
      {:ok, ref} ->
        new_state =
          struct!(
            state,
            resp: %HTTPoison.AsyncResponse{id: ref},
            resource_version: resource_version
          )

        {[], {:recv, new_state}}

      error ->
        Logger.error(
          @log_prefix <> "Can't restart watcher - stopping watcher stream: #{inspect(error)}",
          library: :k8s
        )

        {:halt, nil}
    end
  end

  @docp """
  Transforms chunks to lines iteratively.

  Code is taken from https://elixirforum.com/t/streaming-lines-from-an-enum-of-chunks/21244/3

  * Append new chunk to the remainder inside state
  * Split resulting string by newlines (there can be multiple newlines in the new chunk)
  * pop the last element from the resulting list returns the remainder and the list of whole lines
  """
  @spec transform_to_lines({binary(), t()}) :: {[binary()], t()}
  defp transform_to_lines({chunk, state}) do
    {remainder, whole_lines} =
      (state.remainder <> chunk)
      |> String.split("\n")
      |> List.pop_at(-1)

    {whole_lines, %{state | remainder: remainder}}
  end

  @docp """
  Transform lines to events

  * decode JSON events
  * Reduce lines into events and next state
    * If the resource_version changes, append the event to the stream and update the state
    * Otherwise, dont change anything
    * send :start upon errors
  """
  @spec transform_to_events({[binary()], t()}) :: {[map()], {:recv | :start, t()}}
  defp transform_to_events({lines, state}) do
    lines
    #  decode errors handled below
    |> Enum.map(&Jason.decode/1)
    |> Enum.reduce({[], {:recv, state}}, fn
      _, {events, {:start, state}} ->
        {events, {:start, state}}

      {:error, error}, {events, acc} ->
        Logger.error(
          @log_prefix <> "Could not decode JSON - chunk seems to be malformed",
          library: :k8s,
          error: error
        )

        {events, acc}

      {:ok, %{"type" => "ERROR", "object" => %{"message" => message, "code" => 410} = object}},
      {events, {_, state}} ->
        Logger.debug(
          @log_prefix <> "#{message} - resetting the resource version",
          library: :k8s,
          object: object
        )

        new_state = struct!(state, resource_version: nil)
        {events, {:recv, new_state}}

      {:ok, %{"object" => %{"message" => message} = object}}, {events, {_, state}} ->
        Logger.error(
          @log_prefix <>
            "Erronous event received from watcher: #{message} - resetting the resource version",
          library: :k8s,
          object: object
        )

        new_state = struct!(state, resource_version: nil)
        {events, {:recv, new_state}}

      {:ok, %{"type" => "BOOKMARK", "object" => object}}, {events, {:recv, state}} ->
        Logger.debug(
          @log_prefix <> "Bookmark received",
          library: :k8s,
          object: object
        )

        {events,
         {:recv, %__MODULE__{state | resource_version: object["metadata"]["resourceVersion"]}}}

      {:ok, %{"object" => %{"metadata" => %{"resourceVersion" => new_resource_version}}}},
      {events, {:recv, %__MODULE__{resource_version: resource_version} = state}}
      when new_resource_version == resource_version ->
        {events, {:recv, state}}

      {:ok, %{"object" => object} = new_event}, {events, {:recv, state}} ->
        # new resource_version => append new event to the stream
        {events ++ [new_event],
         {:recv, %__MODULE__{state | resource_version: object["metadata"]["resourceVersion"]}}}
    end)
  end
end