defmodule Toolbox.Incident do
@moduledoc """
Module extends Toolbox.Workflow and abstracts how regular incident behaves.
Wraps around `Toolbox.Workflow` and adds some additional callbacks to manage incident in asset
map. This module works very much like regular workflow, but some additional properties can be
specified. Contrary to a regular workflow, this automatically generates OAs to manage the incident
in asset map and therefore syncs the general state of this workflow with the incident asset.
Start by creating a definition (see `new/0`, `add_transition/2` and `build/1`) which describes the
workflow of the incident. Then, you can create a new instance based on that definition with
`new_instance/7`.
"""
alias Runbox.Message, as: Msg
alias Runbox.Scenario.OutputAction, as: OA
alias Runbox.Scenario.OutputAction.Incident, as: CreateIncidentParams
alias Runbox.Scenario.OutputAction.IncidentActor
alias Runbox.Scenario.OutputAction.IncidentFuture
alias Runbox.Scenario.OutputAction.IncidentHistory
alias Runbox.Scenario.OutputAction.IncidentPatch, as: UpdateIncidentParams
alias Toolbox.Utils.Map, as: UtilsMap
alias Toolbox.Workflow, as: WF
alias Toolbox.Workflow.Instance, as: WFI
alias Toolbox.Workflow.Transition, as: WFT
@placeholder_re ~r/{{([a-zA-Z\_\.0-9]*)}}/
@user_actions_state_field "_user_actions"
@prev_user_actions_state_field "_prev_user_actions"
@spec new :: WF.t()
@doc "Creates new blank incident workflow definition"
def new do
%WF{}
end
@spec new_instance(WF.t(), WF.status(), String.t(), String.t(), map, Msg.t(), Keyword.t()) ::
{:ok, [OA.oa_params()], WFI.t()}
| {:terminated, [OA.oa_params()], WFI.t()}
| {:error, :unknown_status}
| {:error, {:user_actions_invalid | :upsert_attributes_invalid, reason :: String.t()}}
@doc """
Creates new incident instance for given workflow.
`params` can be used to specify additional transition-like parameters. `actors` parameter is evaluated
only when `new_instance/7` is called and is not stored in the instance. It is a listing of actors used
when an incident is created. Its format is a list of maps where `id` and `type` are mandatory keys.
`id` and `type` references an asset.
For other available options see `add_transition/2`.
"""
def new_instance(%WF{} = wf, status, type, id, state, %Msg{} = msg, params) do
with {:user_actions_invalid, :ok} <-
{:user_actions_invalid, validate_user_actions_def(params[:user_actions])},
{:upsert_attributes_invalid, :ok} <-
{:upsert_attributes_invalid, validate_upsert_attributes(params[:upsert_attributes])} do
subject = Keyword.get(params, :subject, "Incident '#{type} / #{id}'")
deprecated_name_param(params)
severity = Keyword.get(params, :severity, 1)
description = Keyword.get(params, :description, "Incident was created")
actors = Keyword.get(params, :actors, [])
then_fn =
construct_callback_list(params, :then, [
{__MODULE__, :update_incident_state},
{__MODULE__, :update_user_actions}
])
side_effects_fn =
construct_callback_list(params, :side_effects, [
{__MODULE__, :append_create_incident_output_actions}
])
update_history_entry_fn =
construct_callback_list(params, :update_history_entry, [
{__MODULE__, :update_incident_history_entry}
])
update_possible_transition_fn =
construct_callback_list(params, :update_possible_transition, [
{__MODULE__, :update_incident_possible_transition}
])
wf_params =
Keyword.merge(params,
subject: subject,
severity: severity,
description_after: description,
actors: actors,
then: then_fn,
side_effects: side_effects_fn,
update_history_entry: update_history_entry_fn,
update_possible_transition: update_possible_transition_fn
)
WF.new_instance(wf, status, type, id, state, msg, wf_params)
else
{:user_actions_invalid, {:error, reason}} ->
{:error, {:user_actions_invalid, reason}}
{:upsert_attributes_invalid, {:error, reason}} ->
{:error, {:upsert_attributes_invalid, reason}}
end
end
defp deprecated_name_param(params) do
if Keyword.has_key?(params, :name) do
IO.warn(
"Parameter 'name' is deprecated and is not used. " <>
"Use 'subject' for this purpose instead."
)
end
:ok
end
defp construct_callback_list(params, key, defaults) when is_list(defaults) do
case Keyword.get(params, key) do
nil ->
defaults
{mod, fun} = callback_def when is_atom(mod) and is_atom(fun) ->
defaults ++ [callback_def]
callback_defs when is_list(callback_defs) ->
defaults ++ callback_defs
end
end
@spec add_transition(WF.t(), Keyword.t()) :: WF.t()
@doc """
Adds a new transition to incident workflow definition.
Incident workflow is a finite state machine. Transition defines how workflow
changes its state from one to another and what must be done during the state change.
Incident transition is defined by `params` parameter. `params` is a keyword list where
each transition parameter has its key.
Parameters may be either required or optional.
Some parameters have predefined keys. These parameters are used to control workflow.
The developer of the scenario can add his own parameters to enrich function of the workflow
with features that the standard workflow module does not provide. These parameters can have
any key except predefined ones.
If `add_transition/2` succeeds it returns a two level `Toolbox.Workflow.Transition` structure.
- The first level contains parameters important for workflow control. Some parameters are copied
from `param` and some are computed.
- The second level is located under attributes key. It contains parameters important
for workflow control and all parameters defined by scenario developer.
Transition structure is passed, alongside with instance status and message to all callback
functions. The scenario developer can evaluate them inside the function.
- `from` - the state of the workflow from which the transition starts.
- required parameter
- `to` - the state of the workflow after transition is completed.
- required parameter
- `description_before` - transition from `from` state to `to` state generates an output action.
The output action contains simplified list of all possible states that can be reached from
`to` state. The list is stored in `body.attributes["future"]`. Each entry of the list is
a map. The `description before` is assigned to `description` key of that map.
The `description` is intended to provide the human readable description of the conditions
leading to the particular state.
- required parameter
- `description_after` - the aforementioned output action contains a list of all state changes
that the workflow has already passed through. That list is stored in `body.attributes["history']`.
Each entry of the list is a map containing details of the state. The `description_after`
is copied to the `"description"` key of the map. The `description` is intended to provide
human readable description about the past state change.
- required parameter
- `severity` - an integer from 1 to 4. The severity of the incident after transition to `to` state.
- optional parameter
- `when` - predicate used to select the transition which will be executed.
- optional parameter
- there can be multiple `when` definitions in list, all definitions are evaluated with
&& boolean operator
- possible `when` definitions are:
- `{Module, function}`, where function accepts transition, instance and message as args,
returns boolean
- `{:timeout, timeout}`, where timeout is defined in milliseconds (transition is then
automatically executed when time elapses the specified value)
- `{:=, [path, to, state, key], value}` (transition is executed if the specified field of
state reaches the specified value)
- `then` - callback used to update workflow instance state during transition execution.
- optional parameter
- there can be multiple then definitions in list, all definitions are executed in given order
- possible then definitions:
- `{Module, function}`, where function accepts transition, instance and message as args, and
returns `{:ok, state()}` to update the instance state
- `side_effects` - callback used to generate side effects during transition execution.
- optional parameter
- there can be multiple side effect definitions in list, all definitions are executed
in given order.
- possible definitions:
- `{Module, function}`, where function accepts transition, instance and message as args, and
returns `{:ok, [OA | Msg | OtherSideEffect]}`
- `update_history_entry` - callback used to modify transition execution history entry stored in
asset map.
- optional parameter
- there can be multiple definitions in list, all definitions are executed in given order
- this is usually used to interpolate description texts, or to add additional attributes to
history
- You can use this callback to set additional event parameters - `event_*` available in
`Runbox.Scenario.OutputAction.IncidentHistory` (the keys are strings in this case).
- possible definitions:
- `{Module, function}`, where function accepts history entry, transition, instance and message
as args and returns `{:ok, history_entry}`
- `update_possible_transition` - callback used to modify possible future transitions stored in
asset map.
- optional parameter
- there can be multiple definitions in list, all definitions are executed in given order
- the callback is executed for each possible future transition
- note this only modifies the items of `future` attribute of the incident asset, this has no
effect on definition transitions
- possible definitions:
- `{Module, function}`, where function accepts possible transition, transition, instance
and message as args, and returns `{:ok, future_transition}`
- `user_actions` - specifies all possible user actions from the target state
- a map of user actions that should be enabled once the transition is executed and incident is
in the target state.
- optional parameter
- user actions are automatically deleted if not present in the next transition
- keys are strings
- values are `{module, function}`, this specifies the function to be called to generate the user
action token (since tokens are not known in advance, they are generated by the specified
function)
- the function takes `transition, instance, message` as arguments and is expected to return
`{:ok, binary_token}` to register the user action token
- `upsert_attributes` - specifies additional attributes to be added to the incident asset.
- optional parameter
- list of callbacks to compute the additional attributes
- each callback produces a map of additional attributes and this is merged into a single map
where the latter has priority over the former
- attributes cannot override attributes handled by this workflow incl. user actions, only other
attributes can be added
- possible definitions:
- `{Module, function}`, where function accepts transition, instance and message as args,
and should return `{:ok, attribute_map}`
Any other keys of the `params` parameter are put into the map under `attributes` key
of the `Transition` structure. They can be used inside callback functions for any purpose.
For example for debugging:
### Code snippets
def definition do
Incident.new()
|> Incident.add_transition(
from: "detected",
to: "rare",
when: [{__MODULE__, :rare_message_arrived?}],
description_before: "Other messages will arrive",
description_after: "Rare message arrived",
update_history_entry: [{__MODULE__, :update_history_entry}],
my_debug_key: "This should be a very rare transition"
)
...
|> Incident.build()
end
def update_history_entry(history, transition, _workflow_instance, message) do
Logger.info(transition.attributes.my_debug_key)
...
{:ok, new_history}
end
When a message is evaluated the callbacks above are run in the following order.
1. `when` callbacks are evaluated to see if the current transition is ready to be executed. If
not the next transition is tried.
2. `then` callbacks are evaluated to update the instance state.
3. `user_actions` is evaluated, all callbacks specified inside are executed and all user action
tokens are calculated.
4. `update_history_entry` callbacks are evaluated to update the new history entry
5. `update_possible_transition` callbacks are evaluated to update the new possible future
transitions
6. `upsert_attributes` callbacks are evaluated to gather additional attributes
7. `side_effects` callbacks are evaluated to acquire the list of all additional output actions
All text bearing attributes (such as `subject`, `description_before`,
`description_after`) have access to incident metadata dictionary. This dictionary contains these,
keys:
- `type` - incident type
- `id` - incident id
- `transition` - transition attributes dictionary containing `from`, `to`, `severity` keys
- `state` - dictionary containing user defined state
- `message` - altworx message which triggered given transition
Metadata can be accessed via interpolation defined as `{{}}`, e.g. `{{state.foo.bar}}`,
`{{message.body.foo}}`. There is also an option to reference assets in those attributes, see
`t:Runbox.Scenario.OutputAction.interpolable/0`.
"""
def add_transition(%WF{} = wf, params) do
then_fn =
construct_callback_list(params, :then, [
{__MODULE__, :update_incident_state},
{__MODULE__, :update_user_actions}
])
side_effects_fn =
construct_callback_list(params, :side_effects, [
{__MODULE__, :append_update_incident_output_actions}
])
update_history_entry_fn =
construct_callback_list(params, :update_history_entry, [
{__MODULE__, :update_incident_history_entry}
])
update_possible_transition_fn =
construct_callback_list(params, :update_possible_transition, [
{__MODULE__, :update_incident_possible_transition}
])
incident_tran_params = [
then: then_fn,
side_effects: side_effects_fn,
update_history_entry: update_history_entry_fn,
update_possible_transition: update_possible_transition_fn
]
WF.add_transition(wf, Keyword.merge(params, incident_tran_params))
end
@spec handle_message(WF.t(), WFI.t(), Msg.t()) ::
{:ok, [OA.oa_params()], WFI.t()}
| {:terminated, [OA.oa_params()], WFI.t()}
| {:error, :not_built_yet}
| {:error, :status_mismatch}
@doc """
Uses given incident workflow definition and message to update state of given instance.
If no configured workflow transition matches, nothing will happen = instance state will remain the
same.
Order of callback execution:
1. when definitions of transitions in definition order
2. then definitions of matching transition
3. update history entry definitions of matching transition
3. update possible transition definitions of matching transition
5. side effects definitions of matching transition
"""
def handle_message(%WF{} = wf, %WFI{} = inc_inst, %Msg{} = msg) do
WF.handle_message(wf, inc_inst, msg)
end
@spec append_create_incident_output_actions(WFT.t(), WFI.t(), Msg.t()) ::
{:ok, [CreateIncidentParams.t()]}
def append_create_incident_output_actions(%WFT{} = tran, %WFI{} = inc_inst, %Msg{} = msg) do
metadata = format_transition_metadata(tran, inc_inst, msg)
additional_attributes =
tran.attributes
|> Map.get(:upsert_attributes, [])
|> evaluate_upsert_attributes(tran, inc_inst, msg)
oa = %CreateIncidentParams{
type: inc_inst.type,
id: inc_inst.id,
subject: apply_metadata(tran.attributes.subject, metadata),
status: inc_inst.status,
resolved: inc_inst.terminated?,
severity: tran.attributes.severity,
future: to_incident_future(inc_inst.possible_transitions),
history: to_incident_history(inc_inst.history),
actors: to_incident_actors(tran.attributes.actors),
user_actions: inc_inst.state[@user_actions_state_field],
additional_attributes: additional_attributes
}
{:ok, [oa]}
end
defp to_incident_actors(actors) do
Enum.map(actors, fn %{type: type, id: actor_id} ->
%IncidentActor{type: type, id: actor_id}
end)
end
@spec append_update_incident_output_actions(WFT.t(), WFI.t(), Msg.t()) ::
{:ok, [UpdateIncidentParams.t()]}
def append_update_incident_output_actions(%WFT{} = tran, %WFI{} = inc_inst, %Msg{} = msg) do
subject = Map.get(tran.attributes, :subject)
severity = Map.get(tran.attributes, :severity, Map.get(inc_inst.state, "severity"))
metadata = format_transition_metadata(tran, inc_inst, msg)
{update_actions, delete_actions} =
prepare_user_actions_for_update(
inc_inst.state[@user_actions_state_field],
inc_inst.state[@prev_user_actions_state_field]
)
additional_attributes =
tran.attributes
|> Map.get(:upsert_attributes, [])
|> evaluate_upsert_attributes(tran, inc_inst, msg)
# History is appended to incident in asset map. Pass only the last entry.
history = [List.last(inc_inst.history)]
update_incident_oa =
%UpdateIncidentParams{
type: inc_inst.type,
id: inc_inst.id,
status: inc_inst.status,
resolved: inc_inst.terminated?,
severity: severity,
future: to_incident_future(inc_inst.possible_transitions),
history: to_incident_history(history),
user_actions: %{
upsert: update_actions,
remove: delete_actions
},
additional_attributes: %{
upsert: additional_attributes
}
}
|> then(fn params ->
if is_nil(subject) do
params
else
Map.put(params, :subject, apply_metadata(subject, metadata))
end
end)
{:ok, [update_incident_oa]}
end
@spec update_incident_state(WFT.t(), WFI.t(), Msg.t()) :: {:ok, map}
def update_incident_state(%WFT{attributes: %{severity: severity}}, %WFI{} = inc_inst, %Msg{}) do
new_state = Map.put(inc_inst.state, "severity", severity)
{:ok, new_state}
end
def update_incident_state(_tran, inc_inst, _msg) do
{:ok, inc_inst.state}
end
@spec update_user_actions(WFT.t(), WFI.t(), Msg.t()) :: {:ok, map}
def update_user_actions(%WFT{attributes: attrs} = transition, %WFI{} = instance, %Msg{} = msg) do
current_actions =
case attrs[:user_actions] do
%{} = actions_def -> evaluate_user_actions(actions_def, transition, instance, msg)
_ -> %{}
end
new_state =
instance.state
|> Map.put(@prev_user_actions_state_field, instance.state[@user_actions_state_field])
|> Map.put(@user_actions_state_field, current_actions)
{:ok, new_state}
end
@spec update_incident_history_entry(map, WFT.t(), WFI.t(), Msg.t()) :: {:ok, map}
def update_incident_history_entry(history_entry, tran, inc_inst, msg) do
severity = Map.get(tran.attributes, :severity, Map.get(inc_inst.state, "severity"))
metadata = format_transition_metadata(tran, inc_inst, msg)
description = apply_metadata(tran.attributes.description_after, metadata)
new_history_entry =
history_entry
|> Map.put("severity", severity)
|> Map.put("description", description)
|> Map.put("attributes", %{})
{:ok, new_history_entry}
end
defp format_transition_metadata(%WFT{} = tran, %WFI{} = inc_inst, %Msg{} = msg) do
tran_attrs =
tran.attributes
|> Enum.map(fn {k, v} -> {Atom.to_string(k), v} end)
|> Map.new()
|> Map.put("from", tran.from)
|> Map.put("to", tran.to)
update_metadata_keys(%{
"type" => inc_inst.type,
"id" => inc_inst.id,
"transition" => tran_attrs,
"state" => inc_inst.state,
"message" => msg
})
end
@spec update_incident_possible_transition(map, WFT.t(), WFI.t(), Msg.t()) :: {:ok, map}
def update_incident_possible_transition(
pos_tran,
%WFT{} = tran,
%WFI{} = inc_inst,
%Msg{} = msg
) do
severity = Map.get(tran.attributes, :severity, Map.get(inc_inst.state, "severity"))
metadata = format_transition_metadata(tran, inc_inst, msg)
description = apply_metadata(tran.attributes.description_before, metadata)
new_pos_tran =
pos_tran
|> Map.put("severity", severity)
|> Map.put("description", description)
{:ok, new_pos_tran}
end
@spec build(WF.t()) ::
{:ok, WF.t()}
| {:error, :transition_from_required}
| {:error, :transition_to_required}
| {:error, :description_after_required}
| {:error, :description_before_required}
| {:error, {:bad_callback, {atom, atom}}}
| {:error, :multiple_init_statuses}
| {:error, {:user_actions_invalid | :upsert_attributes_invalid, reason :: String.t()}}
def build(%WF{} = wf) do
with :ok <- validate_incident_transitions(wf) do
WF.build(wf)
end
end
@spec validate_incident_transitions(WF.t()) ::
:ok
| {:error, :description_after_required}
| {:error, :description_before_required}
defp validate_incident_transitions(%WF{} = wf) do
wf.transitions
|> Map.values()
|> List.flatten()
|> Enum.reduce_while(:ok, fn tran, acc ->
cond do
!Map.has_key?(tran.attributes, :description_after) ->
{:halt, {:error, :description_after_required}}
!Map.has_key?(tran.attributes, :description_before) ->
{:halt, {:error, :description_before_required}}
(error = validate_user_actions_def(tran.attributes[:user_actions])) != :ok ->
{:error, reason} = error
{:halt, {:error, {:user_actions_invalid, reason}}}
(error = validate_upsert_attributes(tran.attributes[:upsert_attributes])) != :ok ->
{:error, reason} = error
{:halt, {:error, {:upsert_attributes_invalid, reason}}}
true ->
{:cont, acc}
end
end)
end
defp validate_user_actions_def(nil), do: :ok
defp validate_user_actions_def(user_actions_def) do
with {:map, true} <- {:map, is_map(user_actions_def)},
{:unknown_def, :none} <-
{:unknown_def, Enum.find(user_actions_def, :none, &(!match?({_name, {_mod, _fun}}, &1)))},
{:exported_fun, :none} <-
{:exported_fun,
Enum.find(user_actions_def, :none, fn {_name, {mod, fun}} ->
Code.ensure_loaded?(mod)
!function_exported?(mod, fun, 3)
end)} do
:ok
else
{:map, _} ->
{:error, "user actions definition is not map"}
{:unknown_def, {name, _}} ->
{:error, "user action #{name}: unknown definition"}
{:exported_fun, {name, {mod, fun}}} ->
{:error, "user action #{name}: function #{mod}.#{fun}/3 is not exported"}
end
end
def validate_upsert_attributes(nil), do: :ok
def validate_upsert_attributes(upsert_attributes_def) do
with {:list, true} <- {:list, is_list(upsert_attributes_def)},
{:unknown_def, :none} <-
{:unknown_def, Enum.find(upsert_attributes_def, :none, &(!match?({_mod, _fun}, &1)))},
{:exported_fun, :none} <-
{:exported_fun,
Enum.find(upsert_attributes_def, :none, fn {mod, fun} ->
Code.ensure_loaded?(mod)
!function_exported?(mod, fun, 3)
end)} do
:ok
else
{:list, _} ->
{:error, "not a list"}
{:unknown_def, def} ->
{:error, "invalid element: #{inspect(def)}"}
{:exported_fun, {mod, fun}} ->
{:error, "function #{mod}.#{fun}/3 is not exported"}
end
end
@spec apply_metadata(String.t(), map) :: String.t()
defp apply_metadata(text, metadata) do
Regex.replace(@placeholder_re, text, fn _, key ->
metadata
|> UtilsMap.get_path(String.split(key, "."), "")
|> Kernel.to_string()
end)
end
defp update_metadata_keys(metadata) when is_map(metadata) do
metadata
|> Enum.map(fn
{k, %_{} = v} when is_map(v) ->
{to_string(k), update_metadata_keys(Map.from_struct(v))}
{k, %{} = v} when is_map(v) ->
{to_string(k), update_metadata_keys(v)}
{k, v} ->
{to_string(k), v}
end)
|> Map.new()
end
defp prepare_user_actions_for_update(nil, _), do: {%{}, %{}}
defp prepare_user_actions_for_update(current, nil) do
prepare_user_actions_for_update(current, %{})
end
defp prepare_user_actions_for_update(current, previous) do
delete_actions = Map.keys(previous) -- Map.keys(current)
update_actions =
current
|> Enum.filter(fn {action, token} -> previous[action] != token end)
|> Enum.into(%{})
{update_actions, delete_actions}
end
defp evaluate_user_actions(actions_def, transition, instance, msg) do
actions_def
|> Enum.map(fn {action_name, token_def} ->
{action_name, evaluate_user_action_token(token_def, transition, instance, msg)}
end)
|> Enum.filter(fn {_, token} -> is_binary(token) end)
|> Enum.into(%{})
end
defp evaluate_user_action_token({mod, fun}, transition, instance, msg) do
case apply(mod, fun, [transition, instance, msg]) do
{:ok, token} when is_binary(token) -> token
_ -> nil
end
end
defp evaluate_upsert_attributes(defs, transition, instance, msg) do
Enum.reduce(defs, %{}, fn {mod, fun}, attrs ->
case apply(mod, fun, [transition, instance, msg]) do
{:ok, new_attrs} -> Map.merge(attrs, new_attrs)
_ -> attrs
end
end)
end
# Public for tests
@doc false
def to_incident_future(future) do
Enum.map(
future,
fn %{
"status" => status,
"severity" => severity,
"timestamp" => timestamp,
"description" => description
} ->
%IncidentFuture{
status: status,
severity: severity,
timestamp: timestamp,
description: description
}
end
)
end
# Public for tests
@doc false
def to_incident_history(history) do
Enum.map(
history,
fn %{
"status" => status,
"severity" => severity,
"timestamp" => timestamp,
"description" => description,
"attributes" => attributes
} = h ->
%IncidentHistory{
status: status,
severity: severity,
timestamp: timestamp,
description: description,
attributes: attributes,
event_type: h["event_type"],
event_params: h["event_params"],
event_actors: h["event_actors"],
event_origin_messages: h["event_origin_messages"]
}
end
)
end
end