lib/bonny/server/reconciler.ex

defmodule Bonny.Server.Reconciler do
  @moduledoc """
  Creates a stream that, when run, streams a list of resources and calls `reconcile/1`
  on the given controller for each resource in the stream in parallel.

  ## Example

      reconciliation_stream = Bonny.Server.Reconciler.get_stream(controller)
      Task.async(fn -> Stream.run(reconciliation_stream) end)
  """

  require Logger

  @callback reconcile(map()) :: :ok | {:ok, any()} | {:error, any()}

  @doc """
  """
  @spec get_raw_stream(K8s.Conn.t(), K8s.Operation.t(), keyword()) :: Enumerable.t()
  def get_raw_stream(conn, reconcile_operation, stream_opts \\ []) do
    {:ok, reconciliation_stream} = K8s.Client.stream(conn, reconcile_operation, stream_opts)

    Stream.filter(reconciliation_stream, &fetch_succeeded?/1)
  end

  @doc """
  Prepares a stream wich maps each resoruce returned by the `reconcile_operation` to
  a function `reconcile/1` on the given `module`. If given, the stream_opts are passed
  to K8s.Client.stream/3
  """
  @spec get_stream(module(), K8s.Conn.t(), K8s.Operation.t(), keyword()) ::
          Enumerable.t(Bonny.Resource.t())
  def get_stream(module, conn, reconcile_operation, stream_opts \\ []) do
    get_raw_stream(conn, reconcile_operation, stream_opts)
    |> Task.async_stream(&reconcile_single_resource(&1, module), timeout: :infinity)
    |> Stream.filter(&match?({:ok, {:ok, _}}, &1))
    |> Stream.map(fn {:ok, {:ok, resource}} -> resource end)
  end

  defp fetch_succeeded?({:error, error}) do
    Logger.debug("Reconciler fetch failed", %{error: error, library: :bonny})
    false
  end

  defp fetch_succeeded?(resource) when is_map(resource) do
    Logger.debug("Reconciler fetch succeeded", library: :bonny)
    true
  end

  defp reconcile_single_resource(resource, module) do
    metadata = %{
      module: module,
      name: K8s.Resource.name(resource),
      namespace: K8s.Resource.namespace(resource),
      kind: K8s.Resource.kind(resource),
      api_version: resource["apiVersion"],
      library: :bonny
    }

    :telemetry.span([:reconciler, :reconcile], metadata, fn ->
      case module.reconcile(resource) do
        :ok ->
          Logger.debug("Reconciler reconciliation succeeded", metadata)
          {{:ok, resource}, metadata}

        {:ok, _} ->
          Logger.debug("Reconciler reconciliation succeeded", metadata)
          {{:ok, resource}, metadata}

        {:error, error} ->
          metadata = Map.put(metadata, :error, error)
          Logger.error("Reconciler reconciliation failed", metadata)
          {:error, metadata}
      end
    end)
  end
end