lib/bonny/axn.ex

defmodule Bonny.Axn do
  @moduledoc """
  Describes a resource action event.

  This is the token passed to all steps of your operator and controller
  pipeline.

  This module gets imported to your controllers where you should use the
  functions `register_descendant/3`, `update_status/2` and the ones to register
  events: `success_event/2`, `failure_event/2` and/or `register_event/6`. Note
  that these functions raise exceptions if those resources have already been
  applied to the cluster.

  The `register_before_*` functions can be used in `Pluggable` steps in order
  to register callbacks that are called before applying resources to the
  cluster. Have a look at `Bonny.Pluggable.Logger` for a use case.

  ## Action event fields

  These fields contain information on the action event that occurred.

    * `action` - the action that triggered this event
    * `resource` - the resource the action was applied to
    * `conn` - the connection to the cluster the event occurred
    * `operator` - the operator that discovered and dispatched the event
    * `controller` - the controller handling the event and its init opts

  ## Reaction fields

    * `descendants` - descending resources defined by the handling controller
    * `status` - the data to be applied to the status subresource
    * `events` - Kubernetes events regarding the resource to be applied to the cluster


  ## Pipeline fields

    * `halted` - the boolean status on whether the pipeline was halted
    * `assigns` - shared user data as a map
    * `private` - shared library data as a map
    * `states` - The states for status, events and descendants

  """

  @derive Pluggable.Token

  alias Bonny.Event
  alias Bonny.Resource

  import Bitwise
  require Logger

  @type assigns :: %{optional(atom) => any}

  @type states :: integer()

  @type t :: %__MODULE__{
          action: :add | :modify | :reconcile | :delete,
          conn: K8s.Conn.t(),
          descendants: %{{name :: binary(), namespace :: binary()} => Resource.t()},
          events: list(Bonny.Event.t()),
          resource: Resource.t(),
          status: map() | nil,
          assigns: assigns(),
          private: assigns(),
          halted: boolean(),
          controller: {controller :: module(), init_opts :: keyword()} | nil,
          operator: module() | nil,
          states: states()
        }

  @enforce_keys [:conn, :resource, :action]
  defstruct [
    :action,
    :conn,
    :resource,
    :controller,
    status: nil,
    assigns: %{},
    private: %{},
    descendants: %{},
    events: [],
    halted: false,
    operator: nil,
    states: 0
  ]

  @status_applied 1
  @descendants_applied 1 <<< 1
  @events_emitted 1 <<< 2

  defguard is_status_applied(axn) when (axn.states &&& @status_applied) == @status_applied

  defguard are_descendants_applied(axn)
           when (axn.states &&& @descendants_applied) == @descendants_applied

  defguard are_events_emitted(axn) when (axn.states &&& @events_emitted) == @events_emitted

  @spec new!(Keyword.t()) :: t()
  def new!(fields), do: struct!(__MODULE__, fields)

  defmodule StatusAlreadyAppliedError do
    defexception message: "the status has already been applied"

    @moduledoc """
    Error raised when trying to update or apply an already applied status
    """
  end

  defmodule DescendantsAlreadyAppliedError do
    defexception message: "the descendants have already been applied"

    @moduledoc """
    Error raised when trying to register a descendant or apply the descendants
    when already applied.
    """
  end

  defmodule EventsAlreadyEmittedError do
    defexception message: "the events have already been emitted"

    @moduledoc """
    Error raised when trying to register an event or emit evnts when already
    emitted.
    """
  end

  @doc """
  Registers a Kubernetes event to the `%Axn{}` token to be emitted by Bonny.
  """
  @spec register_event(
          t(),
          Resource.t() | nil,
          Event.event_type(),
          binary(),
          binary(),
          binary()
        ) :: t()
  def register_event(
        axn,
        related \\ nil,
        event_type,
        reason,
        action,
        message
      ) do
    event = Bonny.Event.new!(axn.resource, related, event_type, reason, action, message)
    add_event(axn, event)
  end

  @doc """
  Registers a asuccess event to the `%Axn{}` token to be emitted by Bonny.
  """
  @spec success_event(t(), Keyword.t()) :: t()
  def success_event(axn, opts \\ []) do
    action_string = axn.action |> Atom.to_string() |> String.capitalize()

    event =
      [
        reason: "Successful #{action_string}",
        message: "Resource #{axn.action} was successful."
      ]
      |> Keyword.merge(opts)
      |> Keyword.merge(
        event_type: :Normal,
        regarding: axn.resource,
        action: Atom.to_string(axn.action)
      )
      |> Event.new!()

    add_event(axn, event)
  end

  @doc """
  Registers a failure event to the `%Axn{}` token to be emitted by Bonny.
  """
  @spec failure_event(t(), Keyword.t()) :: t()
  def failure_event(axn, opts \\ []) do
    action_string = axn.action |> Atom.to_string() |> String.capitalize()

    event =
      [
        reason: "Failed #{action_string}",
        message: "Resource #{axn.action} has failed, no reason as specified."
      ]
      |> Keyword.merge(opts)
      |> Keyword.merge(
        event_type: :Warning,
        regarding: axn.resource,
        action: Atom.to_string(axn.action)
      )
      |> Event.new!()

    add_event(axn, event)
  end

  defp add_event(axn, _) when are_events_emitted(axn) do
    raise EventsAlreadyEmittedError
  end

  defp add_event(axn, event), do: %__MODULE__{axn | events: [event | axn.events]}

  @doc """
  Empties the list of events without emitting them.
  """
  @spec clear_events(t()) :: t()
  def clear_events(axn) when are_events_emitted(axn) do
    raise EventsAlreadyEmittedError
  end

  def clear_events(axn), do: %{axn | events: []}

  @doc """
  Registers a decending object to be applied.
  Owner reference will be added automatically.
  Adding the owner reference can be disabled by passing the option
  `omit_owner_ref: true`.
  """
  @spec register_descendant(t(), Resource.t(), Keyword.t()) :: t()
  def register_descendant(axn, descendant, opts \\ [])

  def register_descendant(axn, _, _) when are_descendants_applied(axn) do
    raise DescendantsAlreadyAppliedError
  end

  def register_descendant(axn, descendant, opts) do
    descendant =
      if opts[:omit_owner_ref],
        do: descendant,
        else: Resource.add_owner_reference(descendant, axn.resource)

    key =
      {K8s.Resource.FieldAccessors.namespace(descendant),
       K8s.Resource.FieldAccessors.name(descendant)}

    %__MODULE__{axn | descendants: Map.put(axn.descendants, key, descendant)}
  end

  @doc """
  Executes `fun` for the resource status and applies the new status
  subresource. This can be called multiple times.

  `fun` should be a function of arity 1. It will be passed the
  current status object and expected to return the updated one.

  If no current status exists, an empty map is passed to `fun`
  """
  @spec update_status(t(), (map() -> map())) :: t()

  def update_status(axn, _) when is_status_applied(axn) do
    raise StatusAlreadyAppliedError
  end

  def update_status(axn, fun) do
    current_status = axn.status || axn.resource["status"] || %{}
    new_status = fun.(current_status)
    struct!(axn, status: new_status)
  end

  @doc """
  Emits the events created for this Axn.
  """

  @spec emit_events(t()) :: t()
  def emit_events(axn) when are_events_emitted(axn) do
    raise EventsAlreadyEmittedError
  end

  def emit_events(%__MODULE__{events: events, conn: conn, operator: operator} = axn) do
    events
    |> List.wrap()
    |> Enum.map(&run_before_emit_event(&1, axn))
    |> Enum.map(fn event -> {Bonny.EventRecorder.emit(event, operator, conn), event} end)
    |> Enum.each(fn
      {{:ok, _}, _} ->
        :ok

      {{:error, error}, event} ->
        id = identifier(axn)
        message = emit_event_error_message(error)

        Logger.error("#{inspect(id)} - #{message}",
          library: :bonny,
          event: event,
          error: error
        )
    end)

    mark_events_emitted(axn)
  end

  @doc """
  Applies the status to the resource's status subresource in the cluster.
  If no status was specified, :noop is returned.
  """
  @spec apply_status(t(), Keyword.t()) :: t()
  def apply_status(axn, apply_opts \\ [])

  def apply_status(axn, _) when is_status_applied(axn) do
    raise StatusAlreadyAppliedError
  end

  def apply_status(%__MODULE__{status: nil} = axn, _) do
    mark_status_applied(axn)
  end

  def apply_status(%Bonny.Axn{resource: resource} = axn, apply_opts) do
    result =
      resource
      |> Map.put("status", axn.status)
      |> run_before_apply_status(axn)
      |> Resource.apply_status(axn.conn, apply_opts)

    case result do
      {:ok, _} ->
        mark_status_applied(axn)

      {:error, error} ->
        id = identifier(axn)
        message = apply_status_error_message(error)

        Logger.error("#{inspect(id)} - #{message}",
          library: :bonny,
          resource: resource,
          error: error
        )

        raise "#{inspect(id)} - #{message}"
    end
  end

  defp apply_error_message(%{message: message}), do: [" ", message]

  defp apply_error_message(_), do: []

  defp apply_status_error_message(%K8s.Discovery.Error{}) do
    [
      "Failed applying resource status.",
      " ",
      "The status subresource for this resource seems to be disabled."
    ]
  end

  defp apply_status_error_message(error) do
    ["Failed applying resource status." | apply_error_message(error)]
  end

  defp apply_descendant_error_message(error, descendant) do
    gvkn = Resource.gvkn(descendant)
    ["Failed applying descending (child) resource #{inspect(gvkn)}." | apply_error_message(error)]
  end

  defp emit_event_error_message(error) do
    ["Failed emitting event." | apply_error_message(error)]
  end

  @doc """
  Applies the dependants to the cluster.
  If `:create_events` is true, will create an event for each successful apply.
  Always creates events upon failed applies.

  ## Options

  `:create_events` - Whether events should be created upon success. Defaults to `true`

  All further options are passed to `K8s.Client.apply/2`
  """
  @spec apply_descendants(t(), Keyword.t()) :: t()
  def apply_descendants(axn, opts \\ [])

  def apply_descendants(axn, _) when are_descendants_applied(axn) do
    raise DescendantsAlreadyAppliedError
  end

  def apply_descendants(axn, opts) do
    {create_events, apply_opts} = Keyword.pop(opts, :create_events, [])
    %__MODULE__{descendants: descendants, conn: conn} = axn

    descendants
    |> Map.values()
    |> run_before_apply_descendants(axn)
    |> Enum.map(&Bonny.Resource.drop_managed_fields/1)
    |> Resource.apply_async(conn, apply_opts)
    |> Enum.reduce(axn, fn
      {_, {:ok, descendant}}, acc ->
        if create_events do
          acc
          |> success_event(
            reason: "Successfully applied descendant",
            message:
              "Successfully applied #{K8s.Resource.FieldAccessors.kind(descendant)} #{K8s.Resource.FieldAccessors.name(descendant)} to the cluster.",
            related: descendant
          )
        else
          acc
        end

      {descendant, {:error, error}}, _acc ->
        id = identifier(axn)
        message = apply_descendant_error_message(error, descendant)

        Logger.error("#{inspect(id)} - #{message}",
          library: :bonny,
          resource: descendant,
          error: error
        )

        raise "#{inspect(id)} - #{message}"
    end)
    |> mark_descendants_applied()
  end

  @doc ~S"""
  Registers a callback to be invoked before a status is applied to the
  status subresource.

  Callbacks are invoked in the reverse order they are defined (callbacks
  defined first are invoked last).

  ## Examples

  To log a message for the status being applied:

      require Logger
      Bonny.Axn.register_before_apply_status(axn, fn resource, axn ->
        Logger.info("Status of the #{resource["kind"]} named #{resource["metadata"]["name"]} is applied to namespace #{resource["metadata"]["namespace"]}")
        resource
      end)
  """
  @spec register_before_apply_status(t(), (Resource.t(), t() -> Resource.t())) :: t()
  def register_before_apply_status(%__MODULE__{private: private} = axn, callback)
      when is_function(callback, 2) do
    %{axn | private: update_in(private[:before_apply_status], &[callback | &1 || []])}
  end

  @doc ~S"""
  Registers a callback to be invoked before descendants are applied to the
  cluster.

  Callbacks are invoked in the reverse order they are defined (callbacks
  defined first are invoked last).

  ## Examples

  To log a message:

      require Logger
      Bonny.Axn.register_before_apply_status(axn, fn descendants, axn ->
        Enum.each(descendants, &Logger.info("Descending #{&1["kind"]} named #{&1["name"]} is applied to namespace #{&1["metadata"]["namespace"]}"))
        descendants
      end)
  """
  @spec register_before_apply_descendants(t(), (list(Resource.t()), t() -> list(Resource.t()))) ::
          t()
  def register_before_apply_descendants(%__MODULE__{private: private} = axn, callback)
      when is_function(callback, 2) do
    %{axn | private: update_in(private[:before_apply_descendants], &[callback | &1 || []])}
  end

  @doc ~S"""
  Registers a callback to be invoked before events are emitted to the
  cluster.

  Callbacks are invoked in the reverse order they are defined (callbacks
  defined first are invoked last).

  ## Examples

  To log a message:

      require Logger
      Bonny.Axn.register_before_apply_status(axn, fn events, axn ->
        Logger.info("Event of type #{event.event_type} is emitted")
        events
      end)
  """
  @spec register_before_emit_event(t(), (Bonny.Event.t(), t() -> Bonny.Event.t())) :: t()
  def register_before_emit_event(%__MODULE__{private: private} = axn, callback)
      when is_function(callback, 2) do
    %{axn | private: update_in(private[:before_emit_event], &[callback | &1 || []])}
  end

  @doc ~S"""
  Registers a callback to be invoked at the very end of an  action event's
  processing by the operator.

  Callbacks are invoked in the reverse order they are defined (callbacks
  defined first are invoked last).

  ## Examples

  To log a message after the an event was processed by the operator:

      Bonny.Axn.register_after_processed(axn, fn axn ->
        Logger.info("done")
        :ok
      end)
  """
  @spec register_after_processed(t(), (t() -> any())) :: t()
  def register_after_processed(%__MODULE__{private: private} = axn, callback)
      when is_function(callback, 1) do
    %{axn | private: update_in(private[:after_processed], &[callback | &1 || []])}
  end

  defp run_before_apply_status(resource, %__MODULE__{private: private} = axn) do
    for callback <- private[:before_apply_status] || [], reduce: resource do
      resource -> callback.(resource, axn)
    end
  end

  defp run_before_apply_descendants(descendants, %__MODULE__{private: private} = axn) do
    for callback <- private[:before_apply_descendants] || [], reduce: descendants do
      descendants -> callback.(descendants, axn)
    end
  end

  defp run_before_emit_event(event, %__MODULE__{private: private} = axn) do
    for callback <- private[:before_emit_event] || [], reduce: event do
      event -> callback.(event, axn)
    end
  end

  @doc false
  @spec run_after_processed(t()) :: any()
  def run_after_processed(%__MODULE__{private: private} = axn) do
    for callback <- private[:after_processed] || [] do
      callback.(axn)
    end
  end

  defp mark_status_applied(axn), do: do_mark_state(axn, @status_applied)
  defp mark_descendants_applied(axn), do: do_mark_state(axn, @descendants_applied)
  defp mark_events_emitted(axn), do: do_mark_state(axn, @events_emitted)

  defp do_mark_state(%__MODULE__{states: states} = axn, bitmask) do
    %{axn | states: states ||| bitmask}
  end

  @doc """
  Returns an identifier of an action event (resource and action) as tuple.
  Can be used in logs and similar.
  """
  @spec identifier(t()) ::
          {namespace_name :: binary(), api_version :: binary(), kind_action :: binary()}
  def identifier(%__MODULE__{action: action, resource: resource}) do
    {ns_name, api_version, others} = Bonny.Resource.gvkn(resource)
    {ns_name, api_version, "#{others}, Action=#{inspect(action)}"}
  end

  @doc """
  Sets the condition in the resource status.

  The field `.status.conditions`, if configured in the CRD, nolds a list of
  conditions, their `status` with a `message` and two timestamps. On the
  resource this could look something like this (taken from a Pod):

  ```
  kind: Pod
  status:
    conditions:
      - lastTransitionTime: "2019-10-22T16:29:24Z"
        status: "True"
        type: PodScheduled
      - lastTransitionTime: "2019-10-22T16:29:24Z"
        status: "True"
        type: Initialized
      - lastTransitionTime: "2019-10-22T16:29:31Z"
        status: "True"
        type: ContainersReady
      - lastTransitionTime: "2019-10-22T16:29:31Z"
        status: "True"
        type: Ready
  ```
  """
  @spec set_condition(
          axn :: t(),
          type :: binary(),
          status :: boolean(),
          message :: binary() | nil
        ) :: t()
  def set_condition(axn, type, status, message \\ nil) do
    condition_status = if(status, do: "True", else: "False")
    now = DateTime.utc_now()

    condition =
      %{
        "type" => type,
        "status" => condition_status,
        "message" => message,
        "lastHeartbeatTime" => now,
        "lastTransitionTime" => now
      }
      |> Map.reject(&is_nil(elem(&1, 1)))

    update_status(axn, fn status ->
      next_conditions =
        status
        |> Map.get("conditions", [])
        |> Map.new(&{&1["type"], &1})
        |> Map.update(type, condition, fn
          %{"status" => ^condition_status} = old_condition ->
            Map.put(condition, "lastTransitionTime", old_condition["lastTransitionTime"])

          _old_condition ->
            condition
        end)
        |> Map.values()

      Map.put(status, "conditions", next_conditions)
    end)
  end
end