lib/bella/reconciler/core.ex

defmodule Bella.Reconciler.Core do
  alias Bella.Reconciler.State
  alias Bella.Sys.Event

  def run(%State{} = state) do
    metadata = State.metadata(state)
    Event.reconciler_run_started(%{}, metadata)

    {measurements, result} = Event.measure(&resources/1, [state])

    case result do
      {:ok, resources} ->
        _ = async_run(resources, measurements, state)

      {:error, error} ->
        metadata = Map.put(metadata, :error, error)
        Event.reconciler_fetch_failed(measurements, metadata)
    end
  end

  defp async_run(resources, measurements, %State{} = state) do
    metadata = State.metadata(state)

    resources
    |> Task.async_stream(fn
      resource when is_map(resource) ->
        Event.reconciler_fetch_succeeded(measurements, metadata)
        reconcile_one(resource, state)

      {:error, error} ->
        metadata = Map.put(metadata, :error, error)
        Event.reconciler_fetch_failed(measurements, metadata)
    end)
    |> Enum.to_list()
  end

  defp resources(%State{connection: connection, client: client, reconciler: reconciler} = _state) do
    case reconciler.operation() do
      nil ->
        {:ok, []}

      op ->
        client.stream(connection, op)
    end
  end

  defp reconcile_one(resource, %State{reconciler: reconciler} = state) do
    Task.start(fn ->
      {measurements, result} = Event.measure(reconciler, :reconcile, [resource, state])

      metadata =
        state
        |> State.metadata()
        |> Map.put(:name, K8s.Resource.name(resource))
        |> Map.put(:namespace, K8s.Resource.namespace(resource))
        |> Map.put(:kind, K8s.Resource.kind(resource))
        |> Map.put(:api_versions, K8s.Resource.api_version(resource))

      case result do
        :ok ->
          Event.reconciler_reconcile_succeeded(measurements, metadata)

        {:ok, _} ->
          Event.reconciler_reconcile_succeeded(measurements, metadata)

        {:error, error} ->
          metadata = Map.put(metadata, :error, error)
          Event.reconciler_reconcile_failed(measurements, metadata)
      end
    end)
  end
end