defmodule Bonny.Axn do
@moduledoc """
Describes a resource action event.
This is the token passed to all steps of your operator and controller
pipeline.
This module gets imported to your controllers where you should use the
functions `register_descendant/3`, `update_status/2` and the ones to register
events: `success_event/2`, `failure_event/2` and/or `register_event/6`. Note
that these functions raise exceptions if those resources have already been
applied to the cluster.
The `register_before_*` functions can be used in `Pluggable` steps in order
to register callbacks that are called before applying resources to the
cluster. Have a look at `Bonny.Pluggable.Logger` for a use case.
## Action event fields
These fields contain information on the action event that occurred.
* `action` - the action that triggered this event
* `resource` - the resource the action was applied to
* `conn` - the connection to the cluster the event occurred
* `operator` - the operator that discovered and dispatched the event
* `controller` - the controller handling the event and its init opts
## Reaction fields
* `descendants` - descending resources defined by the handling controller
* `status` - the data to be applied to the status subresource
* `events` - Kubernetes events regarding the resource to be applied to the cluster
## Pipeline fields
* `halted` - the boolean status on whether the pipeline was halted
* `assigns` - shared user data as a map
* `private` - shared library data as a map
* `states` - The states for status, events and descendants
"""
@derive Pluggable.Token
alias Bonny.Event
alias Bonny.Resource
import Bitwise
require Logger
@type assigns :: %{optional(atom) => any}
@type states :: integer()
@type t :: %__MODULE__{
action: :add | :modify | :reconcile | :delete,
conn: K8s.Conn.t(),
descendants: list(Resource.t()),
events: list(Bonny.Event.t()),
resource: Resource.t(),
status: map() | nil,
assigns: assigns(),
private: assigns(),
halted: boolean(),
controller: {controller :: module(), init_opts :: keyword()} | nil,
operator: module() | nil,
states: states()
}
@enforce_keys [:conn, :resource, :action]
defstruct [
:action,
:conn,
:resource,
:controller,
status: nil,
assigns: %{},
private: %{},
descendants: [],
events: [],
halted: false,
operator: nil,
states: 0
]
@status_applied 1
@descendants_applied 1 <<< 1
@events_emitted 1 <<< 2
defguard is_status_applied(axn) when (axn.states &&& @status_applied) == @status_applied
defguard are_descendants_applied(axn)
when (axn.states &&& @descendants_applied) == @descendants_applied
defguard are_events_emitted(axn) when (axn.states &&& @events_emitted) == @events_emitted
@spec new!(Keyword.t()) :: t()
def new!(fields), do: struct!(__MODULE__, fields)
defmodule StatusAlreadyAppliedError do
defexception message: "the status has already been applied"
@moduledoc """
Error raised when trying to update or apply an already applied status
"""
end
defmodule DescendantsAlreadyAppliedError do
defexception message: "the descendants have already been applied"
@moduledoc """
Error raised when trying to register a descendant or apply the descendants
when already applied.
"""
end
defmodule EventsAlreadyEmittedError do
defexception message: "the events have already been emitted"
@moduledoc """
Error raised when trying to register an event or emit evnts when already
emitted.
"""
end
@doc """
Registers a Kubernetes event to the `%Axn{}` token to be emitted by Bonny.
"""
@spec register_event(
t(),
Resource.t() | nil,
Event.event_type(),
binary(),
binary(),
binary()
) :: t()
def register_event(
axn,
related \\ nil,
event_type,
reason,
action,
message
) do
event = Bonny.Event.new!(axn.resource, related, event_type, reason, action, message)
add_event(axn, event)
end
@doc """
Registers a asuccess event to the `%Axn{}` token to be emitted by Bonny.
"""
@spec success_event(t(), Keyword.t()) :: t()
def success_event(axn, opts \\ []) do
action_string = axn.action |> Atom.to_string() |> String.capitalize()
event =
[
reason: "Successful #{action_string}",
message: "Resource #{axn.action} was successful."
]
|> Keyword.merge(opts)
|> Keyword.merge(
event_type: :Normal,
regarding: axn.resource,
action: Atom.to_string(axn.action)
)
|> Event.new!()
add_event(axn, event)
end
@doc """
Registers a failure event to the `%Axn{}` token to be emitted by Bonny.
"""
@spec failure_event(t(), Keyword.t()) :: t()
def failure_event(axn, opts \\ []) do
action_string = axn.action |> Atom.to_string() |> String.capitalize()
event =
[
reason: "Failed #{action_string}",
message: "Resource #{axn.action} has failed, no reason as specified."
]
|> Keyword.merge(opts)
|> Keyword.merge(
event_type: :Warning,
regarding: axn.resource,
action: Atom.to_string(axn.action)
)
|> Event.new!()
add_event(axn, event)
end
defp add_event(axn, _) when are_events_emitted(axn) do
raise EventsAlreadyEmittedError
end
defp add_event(axn, event), do: %__MODULE__{axn | events: [event | axn.events]}
@doc """
Empties the list of events without emitting them.
"""
@spec clear_events(t()) :: t()
def clear_events(axn) when are_events_emitted(axn) do
raise EventsAlreadyEmittedError
end
def clear_events(axn), do: %{axn | events: []}
@doc """
Registers a decending object to be applied.
Owner reference will be added automatically.
Adding the owner reference can be disabled by passing the option
`omit_owner_ref: true`.
"""
@spec register_descendant(t(), Resource.t(), Keyword.t()) :: t()
def register_descendant(axn, descendant, opts \\ [])
def register_descendant(axn, _, _) when are_descendants_applied(axn) do
raise DescendantsAlreadyAppliedError
end
def register_descendant(axn, descendant, opts) do
descendant =
if opts[:omit_owner_ref],
do: descendant,
else: Resource.add_owner_reference(descendant, axn.resource)
%__MODULE__{axn | descendants: [descendant | axn.descendants]}
end
@doc """
Executes `fun` for the resource status and applies the new status
subresource. This can be called multiple times.
`fun` should be a function of arity 1. It will be passed the
current status object and expected to return the updated one.
If no current status exists, an empty map is passed to `fun`
"""
@spec update_status(t(), (map() -> map())) :: t()
def update_status(axn, _) when is_status_applied(axn) do
raise StatusAlreadyAppliedError
end
def update_status(axn, fun) do
current_status = axn.status || axn.resource["status"] || %{}
new_status = fun.(current_status)
struct!(axn, status: new_status)
end
@doc """
Emits the events created for this Axn.
"""
@spec emit_events(t()) :: t()
def emit_events(axn) when are_events_emitted(axn) do
raise EventsAlreadyEmittedError
end
def emit_events(%__MODULE__{events: events, conn: conn, operator: operator} = axn) do
events
|> List.wrap()
|> Enum.map(&run_before_emit_event(&1, axn))
|> Enum.map(fn event -> {Bonny.EventRecorder.emit(event, operator, conn), event} end)
|> Enum.each(fn
{{:ok, _}, _} ->
:ok
{{:error, error}, event} ->
id = identifier(axn)
message = emit_event_error_message(error)
Logger.error("#{inspect(id)} - #{message}",
library: :bonny,
event: event,
error: error
)
end)
mark_events_emitted(axn)
end
@doc """
Applies the status to the resource's status subresource in the cluster.
If no status was specified, :noop is returned.
"""
@spec apply_status(t(), Keyword.t()) :: t()
def apply_status(axn, apply_opts \\ [])
def apply_status(axn, _) when is_status_applied(axn) do
raise StatusAlreadyAppliedError
end
def apply_status(%__MODULE__{status: nil} = axn, _) do
mark_status_applied(axn)
end
def apply_status(%Bonny.Axn{resource: resource} = axn, apply_opts) do
result =
resource
|> Map.put("status", axn.status)
|> run_before_apply_status(axn)
|> Resource.apply_status(axn.conn, apply_opts)
case result do
{:ok, _} ->
mark_status_applied(axn)
{:error, error} ->
id = identifier(axn)
message = apply_status_error_message(error)
Logger.error("#{inspect(id)} - #{message}",
library: :bonny,
resource: resource,
error: error
)
raise "#{inspect(id)} - #{message}"
end
end
defp apply_error_message(%{message: message}), do: [" ", message]
defp apply_error_message(_), do: []
defp apply_status_error_message(%K8s.Discovery.Error{}) do
[
"Failed applying resource status.",
" ",
"The status subresource for this resource seems to be disabled."
]
end
defp apply_status_error_message(error) do
["Failed applying resource status." | apply_error_message(error)]
end
defp apply_descendant_error_message(error, descendant) do
gvkn = Resource.gvkn(descendant)
["Failed applying descending (child) resource #{inspect(gvkn)}." | apply_error_message(error)]
end
defp emit_event_error_message(error) do
["Failed emitting event." | apply_error_message(error)]
end
@doc """
Applies the dependants to the cluster.
If `:create_events` is true, will create an event for each successful apply.
Always creates events upon failed applies.
## Options
`:create_events` - Whether events should be created upon success. Defaults to `true`
All further options are passed to `K8s.Client.apply/2`
"""
@spec apply_descendants(t(), Keyword.t()) :: t()
def apply_descendants(axn, opts \\ [])
def apply_descendants(axn, _) when are_descendants_applied(axn) do
raise DescendantsAlreadyAppliedError
end
def apply_descendants(axn, opts) do
{create_events, apply_opts} = Keyword.pop(opts, :create_events, [])
%__MODULE__{descendants: descendants, conn: conn} = axn
descendants
|> List.wrap()
|> run_before_apply_descendants(axn)
|> Resource.apply_async(conn, apply_opts)
|> Enum.reduce(axn, fn
{_, {:ok, descendant}}, acc ->
if create_events do
acc
|> success_event(
reason: "Successfully applied descendant",
message:
"Successfully applied #{K8s.Resource.FieldAccessors.kind(descendant)} #{K8s.Resource.FieldAccessors.name(descendant)} to the cluster.",
related: descendant
)
else
acc
end
{descendant, {:error, error}}, _acc ->
id = identifier(axn)
message = apply_descendant_error_message(error, descendant)
Logger.error("#{inspect(id)} - #{message}",
library: :bonny,
resource: descendant,
error: error
)
raise "#{inspect(id)} - #{message}"
end)
|> mark_descendants_applied()
end
@doc ~S"""
Registers a callback to be invoked before a status is applied to the
status subresource.
Callbacks are invoked in the reverse order they are defined (callbacks
defined first are invoked last).
## Examples
To log a message for the status being applied:
require Logger
Bonny.Axn.register_before_apply_status(axn, fn resource, axn ->
Logger.info("Status of the #{resource["kind"]} named #{resource["metadata"]["name"]} is applied to namespace #{resource["metadata"]["namespace"]}")
resource
end)
"""
@spec register_before_apply_status(t(), (Resource.t(), t() -> Resource.t())) :: t()
def register_before_apply_status(%__MODULE__{private: private} = axn, callback)
when is_function(callback, 2) do
%{axn | private: update_in(private[:before_apply_status], &[callback | &1 || []])}
end
@doc ~S"""
Registers a callback to be invoked before descendants are applied to the
cluster.
Callbacks are invoked in the reverse order they are defined (callbacks
defined first are invoked last).
## Examples
To log a message:
require Logger
Bonny.Axn.register_before_apply_status(axn, fn descendants, axn ->
Enum.each(descendants, &Logger.info("Descending #{&1["kind"]} named #{&1["name"]} is applied to namespace #{&1["metadata"]["namespace"]}"))
descendants
end)
"""
@spec register_before_apply_descendants(t(), (list(Resource.t()), t() -> list(Resource.t()))) ::
t()
def register_before_apply_descendants(%__MODULE__{private: private} = axn, callback)
when is_function(callback, 2) do
%{axn | private: update_in(private[:before_apply_descendants], &[callback | &1 || []])}
end
@doc ~S"""
Registers a callback to be invoked before events are emitted to the
cluster.
Callbacks are invoked in the reverse order they are defined (callbacks
defined first are invoked last).
## Examples
To log a message:
require Logger
Bonny.Axn.register_before_apply_status(axn, fn events, axn ->
Logger.info("Event of type #{event.event_type} is emitted")
events
end)
"""
@spec register_before_emit_event(t(), (Bonny.Event.t(), t() -> Bonny.Event.t())) :: t()
def register_before_emit_event(%__MODULE__{private: private} = axn, callback)
when is_function(callback, 2) do
%{axn | private: update_in(private[:before_emit_event], &[callback | &1 || []])}
end
defp run_before_apply_status(resource, %__MODULE__{private: private} = axn) do
for callback <- private[:before_apply_status] || [], reduce: resource do
resource -> callback.(resource, axn)
end
end
defp run_before_apply_descendants(descendants, %__MODULE__{private: private} = axn) do
for callback <- private[:before_apply_descendants] || [], reduce: descendants do
descendants -> callback.(descendants, axn)
end
end
defp run_before_emit_event(event, %__MODULE__{private: private} = axn) do
for callback <- private[:before_emit_event] || [], reduce: event do
event -> callback.(event, axn)
end
end
defp mark_status_applied(axn), do: do_mark_state(axn, @status_applied)
defp mark_descendants_applied(axn), do: do_mark_state(axn, @descendants_applied)
defp mark_events_emitted(axn), do: do_mark_state(axn, @events_emitted)
defp do_mark_state(%__MODULE__{states: states} = axn, bitmask) do
%{axn | states: states ||| bitmask}
end
@doc """
Returns an identifier of an action event (resource and action) as tuple.
Can be used in logs and similar.
"""
@spec identifier(t()) :: {binary(), binary(), binary()}
def identifier(%__MODULE__{action: action, resource: resource}) do
{ns_name, api_version, others} = Bonny.Resource.gvkn(resource)
{ns_name, api_version, "#{others}, Action=#{inspect(action)}"}
end
end