defmodule Toolbox.Workflow do
@moduledoc """
Workflow is an abstraction of a state machine designed to use with Altworx scenarios.
## Overview
A workflow essentially describes a “blueprint” for a state machine. It specifies what
possible states of the machine are, when does it move from one state to another,
and what happens when such a transition is performed.
A concrete state machine that works according to the blueprint is called a *workflow
instance* and is represented by the `Toolbox.Workflow.Instance` struct. It specifies
what the current status and state are, what statuses it has gone through so far, and
so on. See `t:Toolbox.Workflow.Instance.t/0` for more information.
Note that what is normally referred to as *states* are in workflows called *statuses*.
Statuses are from a finite set and are given in advance (e.g., a CD player can have
three statuses: `empty`, `idle`, and `playing`). *State*, on the other hand, is an
additional context enriching status with other information. State is not listed upfront
and is not limited in size. For example, state of a CD player can be the artist and
album of the CD that is currently playing.
In a workflow, statuses are defined by declared transitions. That is, the set of statuses
is defined by all the statuses that are mentioned in the definition of transitions.
Statuses are therefore a part of the workflow definition and cannot be changed later.
State is defined when a new workflow instance is created. State can then be modified
during the life-cycle of the instance using `then` callbacks.
Workflow instances consume messages received by scenarios and produce output actions.
They are used from inside scenario units which feed them with messages, return produced
output actions, and store their state (i.e., their instance struct) in their own state.
As units are event-driven and can only react to messages, so are workflow instances and
the only time when a transition may be performed (or when a workflow instance may be
created) is when a message is received.
Most of the time, workflows are used via their wrapper for incident workflows
(`Toolbox.Incident`). However, they can be used on their own as well.
## Example
Here is a simple workflow for a CD player that has statuses and transitions as defined
in the diagram below and that creates an event when a CD finishes.
It operates on state that can be described with the following type:
`%{cd: %{album: String.t(), artist: String.t()} | nil}`.
```mermaid
stateDiagram-v2
direction LR
empty --> idle: insert CD
idle --> playing: play
playing --> idle: stop
playing --> idle: end of CD
playing --> empty: eject
idle --> empty: eject
```
defmodule CdPlayerWorkflow do
alias Toolbox.Workflow
alias Runbox.Message
alias Runbox.Scenario.OutputAction, as: OA
@cd_length :timer.minutes(74)
def definition do
Workflow.new()
|> Workflow.add_transition(from: "empty", to: "idle", when: {__MODULE__, :insert_cd_msg},
then: {__MODULE__, :put_cd_to_state})
|> Workflow.add_transition(from: "idle", to: "playing", when: {__MODULE__, :play_msg})
|> Workflow.add_transition(from: "playing", to: "idle", when: {__MODULE__, :stop_msg})
|> Workflow.add_transition(from: "playing", to: "idle", when: {:timeout, @cd_length},
side_effects: {__MODULE__, :create_played_event})
|> Workflow.add_transition(from: "playing", to: "empty", when: {__MODULE__, :eject_msg},
then: {__MODULE__, :remove_cd_from_state})
|> Workflow.add_transition(from: "idle", to: "empty", when: {__MODULE__, :eject_msg},
then: {__MODULE__, :remove_cd_from_state})
|> Workflow.build()
end
def insert_cd_msg(_tran, _inst, %Message{type: type}), do: type == :insert_cd
def eject_msg(_tran, _inst, %Message{type: type}), do: type == :eject
def play_msg(_tran, _inst, %Message{type: type}), do: type == :play
def stop_msg(_tran, _inst, %Message{type: type}), do: type == :stop
def put_cd_to_state(_tran, inst, %Message{type: :insert_cd, body: body}) do
cd = %{album: body.album, artist: body.artist}
state = %{inst.state | cd: cd}
{:ok, state}
end
def remove_cd_from_state(_tran, inst, _msg) do
state = %{inst.state | cd: nil}
{:ok, state}
end
def create_played_event(_tran, inst, msg) do
oa = %OA.Event{
type: "played_cd",
template: "${actors.player} played ${params.album} by ${params.artist}",
actors: %{"player" => %{asset_type: inst.type, asset_id: inst.id, name: nil}},
params: %{"album" => inst.state.cd.album, "artist" => inst.state.cd.artist},
origin_messages: [msg.origin]
}
{:ok, [oa]}
end
end
An instance of this workflow may be then created like this:
{:ok, wf} = CdPlayerWorkflow.definition()
{:ok, oas, inst} = Workflow.new_instance(wf, "empty", "cd_player", "Sony D-50", %{cd: nil}, msg)
Afterwards, when a message comes to your unit, you can then call `handle_message/3` to process
the message and get an updated instance together with a list of produced output actions:
{:ok, oas, inst} = Workflow.handle_message(wf, inst, msg)
"""
alias __MODULE__, as: WF
alias __MODULE__.Instance
alias __MODULE__.Transition
alias Runbox.Message, as: Msg
alias Runbox.Scenario.OutputAction, as: OA
alias Toolbox.Utils.Enum, as: UtilsEnum
alias Toolbox.Utils.Map, as: UtilsMap
defmodule Transition do
@moduledoc """
Struct defining a single transition, i.e., a change from one status to another.
The struct should be built only via `Toolbox.Workflow.add_transition/2`. You can then access
this struct in all the workflow callbacks such as `when` or `then`.
"""
alias Toolbox.Workflow, as: WF
@type builtin_when_fn_def ::
{:=, [String.t()], term}
| {:<, [String.t()], term}
| {:>, [String.t()], term}
| {:<=, [String.t()], term}
| {:>=, [String.t()], term}
| {:contains, [String.t()], term}
| {:is_in, [String.t()], term}
@type when_fn_def :: {atom, atom} | {:timeout, integer} | builtin_when_fn_def
@type then_fn_def :: {atom, atom}
@type side_effects_fn_def :: {atom, atom}
@type update_history_entry_fn_def :: {atom, atom}
@type update_possible_transition_fn_def :: {atom, atom}
@type t :: %Transition{
from: WF.status(),
to: WF.status(),
when_fn: [when_fn_def],
then_fn: [then_fn_def],
side_effects_fn: [side_effects_fn_def],
timeout?: boolean,
timeout: integer,
attributes: %{required(atom) => term},
update_history_entry_fn: [update_history_entry_fn_def],
update_possible_transition_fn: [update_possible_transition_fn_def]
}
defstruct from: nil,
to: nil,
when_fn: nil,
then_fn: nil,
side_effects_fn: nil,
timeout?: false,
timeout: -1,
attributes: %{},
update_history_entry_fn: nil,
update_possible_transition_fn: nil
end
defmodule Instance do
@moduledoc """
Struct holding the current state of a single workflow instance.
The struct should be built only via `Toolbox.Workflow.new_instance/7`. An updated instance
is subsequently returned from every call to `Toolbox.Workflow.handle_message/3`.
"""
alias Toolbox.Workflow, as: WF
@typedoc """
Workflow instance.
* `status` - the current status. Note that the `:none` status may be set only during
internal initialization of the instance. After`Toolbox.Workflow.new_instance/7` returns,
it will never have this status.
* `state` - the current context. May be any map. Updated using the `then` callbacks.
* `type`: an arbitrary “type”, possibly an asset type. Does not influence anything.
Set using `Toolbox.Workflow.new_instance/7`, does not change afterwards.
* `id`: an arbitrary “id”, possibly an asset id. May be used to distinguish instances.
Does not influence anything. Set using `Toolbox.Workflow.new_instance/7`, does not
change afterwards.
* `history`: a list of history entries, sorted from oldest to newest. Each entry is by default
of the form `%{"status" => some_status, "timestamp" => some_ts}`. Even the initial status is
included. In case of a timeout transition, the timestamp is the timestamp of the relevant
message (i.e., may be later than the actual timeout). The shape of history entries may be
modified using the `update_history_entry` callbacks.
* `last_updated`: the timestamp of the last transition. Including the transition to the
initial state. In case of a timeout transition, the timestamp corresponds to the moment
when the timeout fires.
* `possible_transitions`: a list of transitions than can be performed from the current status.
Each is by default of the form `%{"status" => some_status, "timestamp" => some_ts}`.
The timestamp is `-1` if there is no timeout bound with the transition, otherwise it is
timestamp of the timeout. If a transition uses the builtin `when` conditions on state,
they are taken into account and only if they hold, the transition is considered as possible.
The shape of the possible transitions may be modified using the `update_possible_transition`
callbacks.
* `next_possible_transition_timestamp`: the earliest timestamp out of the possible transitions
that specify a timeout (i.e., their timeout is not `-1`), or `nil` if there is no such
transition.
* `terminated?`: a flag meaning that the instance cannot move to any other status, i.e.,
its status is terminal and there are no transition from it.
## Example
This example uses the `CdPlayerWorkflow` from `Toolbox.Workflow` moduledoc.
%Toolbox.Workflow.Instance{
id: "Sony D-50",
type: "cd_player",
state: %{cd: %{album: "Zorya", artist: "Floex"}},
last_update: 300,
status: "playing",
history: [
%{"status" => "empty", "timestamp" => 100},
%{"status" => "idle", "timestamp" => 200},
%{"status" => "playing", "timestamp" => 300}
],
possible_transitions: [
%{"status" => "idle", "timestamp" => -1},
%{"status" => "idle", "timestamp" => 4440300},
%{"status" => "empty", "timestamp" => -1}
],
next_possible_transition_timestamp: 4440300,
terminated?: false
}
"""
@type t :: %Instance{
id: String.t(),
type: String.t(),
state: map,
last_update: integer,
status: :none | WF.status(),
history: [map],
possible_transitions: [map],
next_possible_transition_timestamp: integer | nil,
terminated?: boolean
}
defstruct id: nil,
type: nil,
state: %{},
last_update: 0,
status: :none,
history: [],
possible_transitions: [],
next_possible_transition_timestamp: nil,
terminated?: false
end
defstruct statuses: [],
transitions: %{},
terminal_statuses: [],
built?: false
@type status :: String.t()
@type transitions :: %{required(status) => [Transition.t()]}
@type t :: %WF{
statuses: [status],
transitions: transitions,
terminal_statuses: [status],
built?: boolean
}
@spec new :: t
@doc """
Creates new blank workflow definition.
Continue with calls to `add_transition/2` and `build/1` to define a workflow.
"""
def new do
%WF{}
end
@doc """
Creates a new instance for a given workflow.
`status` is the initial status of the instance. `state` is its initial state.
`type` and `id` may be arbitrary. If your instance represents an asset or incident,
you may use its type and id. These two fields are kept in the `Toolbox.Workflow.Instance`
struct and are available in all workflow callbacks, so you can use them e.g. to construct
output actions. They don't directly influence the computation, they're just additional
information available to the user.
`msg` is the message that causes this instance to be created. Its timestamp is used as
the first timestamp in the instance history.
When a new instance is created, it internally performs a transition from the internal
status `:none` to the given `status`. You may pass transition callbacks using `options`
(in the same way as to `add_transition/2`) and these callbacks will be executed on this
initial transition. All the callbacks except for `when` are supported.
The creation of an instance automatically adds the first entry to the instance history
and sets the `last_update` field to the timestamp of `msg`.
## Example
iex> Workflow.new_instance(wf, "empty", "cd_player", "Sony D-50", %{cd: nil},
...> %Message{type: :discover_player, timestamp: 100})
{:ok, [],
%Toolbox.Workflow.Instance{
id: "Sony D-50",
type: "cd_player",
state: %{cd: nil},
last_update: 100,
status: "empty",
history: [%{"status" => "empty", "timestamp" => 100}],
possible_transitions: [%{"status" => "idle", "timestamp" => -1}],
next_possible_transition_timestamp: nil,
terminated?: false
}}
"""
@spec new_instance(t, status, String.t(), String.t(), map, Msg.t(), Keyword.t()) ::
{:ok, [OA.oa_params()], Instance.t()}
| {:terminated, [OA.oa_params()], Instance.t()}
| {:error, :unknown_status}
def new_instance(%WF{} = wf, status, type, id, state, %Msg{} = msg, options \\ []) do
if status in wf.statuses do
transition_params =
Keyword.merge(options,
when: [],
then: Keyword.get(options, :then, []),
side_effects: Keyword.get(options, :side_effects, []),
update_history_entry: Keyword.get(options, :update_history_entry, []),
update_possible_transition: []
)
tran = construct_transition(:none, status, transition_params)
execute_transition(wf, tran, %Instance{id: id, type: type, state: state}, msg)
else
{:error, :unknown_status}
end
end
@spec build(t) ::
{:ok, t}
| {:error, :transition_from_required}
| {:error, :transition_to_required}
| {:error, {:bad_callback, {atom, atom}}}
| {:error, :multiple_init_statuses}
@doc "Finishes workflow definition, validates all configured dependencies and workflow structure."
def build(%WF{} = wf) do
with :ok <- validate_transition_defs(wf.transitions) do
statuses =
wf.transitions
|> Enum.map(fn {from, trs} -> [from | Enum.map(trs, fn tr -> tr.to end)] end)
|> List.flatten()
|> Enum.uniq()
non_term_stats = Map.keys(wf.transitions)
term_stats = Enum.filter(statuses, fn status -> !Enum.member?(non_term_stats, status) end)
wf = %WF{
wf
| built?: true,
statuses: statuses,
terminal_statuses: term_stats
}
{:ok, wf}
end
end
@spec validate_transition_defs(transitions) ::
:ok
| {:error, :transition_from_required}
| {:error, :transition_to_required}
| {:error, {:bad_callback, {atom, atom}}}
defp validate_transition_defs(transitions) do
valid_when_fn? = valid_when_fn?()
valid_then_fn? = function_exported_validator(3)
valid_side_effects_fn? = function_exported_validator(3)
valid_update_history_entry_fn? = function_exported_validator(4)
valid_update_possible_transition_fn? = function_exported_validator(4)
transitions
|> Map.values()
|> List.flatten()
|> Enum.reduce_while(:ok, fn tran, acc ->
cond do
tran.from == :none ->
{:halt, {:error, :transition_from_required}}
tran.to == nil ->
{:halt, {:error, :transition_to_required}}
!Enum.all?(tran.when_fn, valid_when_fn?) ->
bad_callback = Enum.find(tran.when_fn, fn callback -> !valid_when_fn?.(callback) end)
{:halt, {:error, {:bad_callback, bad_callback}}}
!Enum.all?(tran.then_fn, valid_then_fn?) ->
bad_callback = Enum.find(tran.then_fn, fn callback -> !valid_then_fn?.(callback) end)
{:halt, {:error, {:bad_callback, bad_callback}}}
!Enum.all?(tran.side_effects_fn, valid_side_effects_fn?) ->
bad_callback =
Enum.find(tran.side_effects_fn, fn callback -> !valid_side_effects_fn?.(callback) end)
{:halt, {:error, {:bad_callback, bad_callback}}}
!Enum.all?(tran.update_history_entry_fn, valid_update_history_entry_fn?) ->
bad_callback =
Enum.find(tran.update_history_entry_fn, fn callback ->
!valid_update_history_entry_fn?.(callback)
end)
{:halt, {:error, {:bad_callback, bad_callback}}}
!Enum.all?(tran.update_possible_transition_fn, valid_update_possible_transition_fn?) ->
bad_callback =
Enum.find(tran.update_possible_transition_fn, fn callback ->
!valid_update_possible_transition_fn?.(callback)
end)
{:halt, {:error, {:bad_callback, bad_callback}}}
true ->
{:cont, acc}
end
end)
end
defp valid_when_fn? do
fn
{:timeout, _} ->
true
{:=, path, _} when is_list(path) ->
true
{:>, path, _} when is_list(path) ->
true
{:<, path, _} when is_list(path) ->
true
{:<=, path, _} when is_list(path) ->
true
{:>=, path, _} when is_list(path) ->
true
{:contains, path, _} when is_list(path) ->
true
{:is_in, path, value} when is_list(path) and is_list(value) ->
true
{mod, fun} ->
Code.ensure_loaded?(mod) && Kernel.function_exported?(mod, fun, 3)
_ ->
false
end
end
@spec add_transition(t, Keyword.t()) :: t
@doc """
Defines a new transition in a given workflow.
The transition is defined using a keyword list with the following keys:
- `from` - source status
- required parameter
- `to` - target status
- required parameter
- `when` - predicate used to select transition which will be executed
- optional parameter
- possible `when` definitions:
- `{Module, function}`, where function accepts transition, instance and message as args,
and returns a boolean.
- `{:timeout, timeout}`, where timeout is defined in milliseconds
- `{:=, [path, to, state, key], value}`
- `{:>, [path, to, state, key], value}`
- `{:<, [path, to, state, key], value}`
- `{:<=, [path, to, state, key], value}`
- `{:>=, [path, to, state, key], value}`
- `{:contains, [path, to, state, key], value}`
- `{:is_in, [path, to, state, key], list_value}`
- a list of predicates where each has one of the shapes above
- e.g., `[{__MODULE__, :play_msg}, {:=, [:cd, :artist], "Pink Floyd"}]`
- all predicates in that list must hold for the combined predicate to hold
- `then` - callback used to update workflow instance *state* during transition execution
- optional parameter
- possible then definitions:
- `{Module, function}`, where function accepts transition, instance and message as args
- a list of `{Module, functions}` items
- listed callbacks are executed in the given order
- calbacks should return `{:ok, wf_instance_state :: map()}` or `{:error, reason}`
- `side_effects` - callback used to generate output actions during transition execution
- optional parameter
- possible `side_effects` definitions:
- `{Module, function}`, where function accepts transition, instance and message as args
- a list of `{Module, functions}` items
- listed callbacks are executed in the given order
- calbacks should return `{:ok, [Runbox.Scenario.OutputAction.oa_params()]}` or `{:error, reason}`
- `update_history_entry` - callback used to modify transition execution history entry
- optional parameter
- possible `update_history_entry` definitions:
- `{Module, function}`, where function accepts history entry, transition, instance and message
as args
- a list of `{Module, functions}` items
- listed callbacks are executed in the given order
- calbacks should return `{:ok, updated_history_entry :: map()}` or `{:error, reason}`
- `update_possible_transition` - callback used in `handle_message/3` to modify possible transition
- optional parameter
- possible `update_possible_transition` definitions:
- `{Module, function}`, where function accepts possible transition, transition, instance
and message as args
- a list of `{Module, functions}` items
- listed callbacks are executed in the given order
- calbacks should return `{:ok, updated_possible_transition :: map()}` or `{:error, reason}`
"""
def add_transition(%WF{} = wf, params) do
from = Keyword.get(params, :from, :none)
to = Keyword.get(params, :to)
tran = construct_transition(from, to, params)
trans = Map.update(wf.transitions, from, [tran], fn trans -> trans ++ [tran] end)
%WF{wf | transitions: trans}
end
@spec construct_transition(:none | status, status, Keyword.t()) :: Transition.t()
defp construct_transition(from, to, params) do
when_fn = construct_callback_list(params, :when)
then_fn = construct_callback_list(params, :then)
side_effects_fn = construct_callback_list(params, :side_effects)
update_history_entry_fn = construct_callback_list(params, :update_history_entry)
update_possible_transition_fn = construct_callback_list(params, :update_possible_transition)
attrs =
params
|> Keyword.drop([
:from,
:to,
:when,
:then,
:side_effects,
:update_history_entry,
:update_possible_transition
])
|> Map.new()
timeout? = timeout_when_fn?(when_fn)
{:timeout, timeout} = Enum.find(when_fn, {:timeout, -1}, &timeout_when_fn?/1)
%Transition{
from: from,
to: to,
when_fn: when_fn,
then_fn: then_fn,
side_effects_fn: side_effects_fn,
timeout?: timeout?,
timeout: timeout,
attributes: attrs,
update_history_entry_fn: update_history_entry_fn,
update_possible_transition_fn: update_possible_transition_fn
}
end
@builtin_fns [:=, :>, :<, :>=, :<=, :contains, :is_in]
@spec construct_callback_list(Keyword.t(), atom) :: [
{atom, atom}
| {:=, term, term}
| {:>, term, term}
| {:<, term, term}
| {:>=, term, term}
| {:<=, term, term}
| {:contains, term, term}
| {:is_in, term, term}
| {:timeout, integer}
]
defp construct_callback_list(params, key) do
case Keyword.get(params, key, []) do
{builtin_fn, _, _} = builtin_fn_def when builtin_fn in @builtin_fns ->
[builtin_fn_def]
{:timeout, _} = timeout_def ->
[timeout_def]
{mod, fun} = callback_def when is_atom(mod) and is_atom(fun) ->
[callback_def]
callback_defs when is_list(callback_defs) ->
callback_defs
end
end
@spec timeout_when_fn?([Transition.when_fn_def()] | Transition.when_fn_def()) :: boolean
defp timeout_when_fn?(when_fns) when is_list(when_fns),
do: Enum.any?(when_fns, &timeout_when_fn?/1)
defp timeout_when_fn?({:timeout, _}), do: true
defp timeout_when_fn?(_), do: false
@spec terminal_status?(t, status) :: boolean
defp terminal_status?(%WF{} = wf, status) do
Enum.member?(wf.terminal_statuses, status)
end
@spec handle_message(t, Instance.t(), Msg.t()) ::
{:ok, [OA.oa_params()], Instance.t()}
| {:terminated, [OA.oa_params()], Instance.t()}
| {:error, :not_built_yet}
| {:error, :status_mismatch}
@doc """
Uses given workflow definition and message to update the status and state of a given instance.
If no configured workflow transition matches, nothing will happen, i.e., instance status and
state will remain the same.
Order of callback execution:
1. `when` definitions of transitions in definition order
2. `then` definitions of matching transition
3. `update_history_entry` definitions of matching transition
4. `update_possible_transition` definitions of matching transition
5. `side_effects` definitions of matching transition
"""
def handle_message(%WF{built?: false}, _wf_inst, %Msg{}) do
{:error, :not_built_yet}
end
def handle_message(%WF{built?: true} = wf, %Instance{} = wf_inst, %Msg{} = msg) do
if Enum.member?(wf.statuses, wf_inst.status) do
case get_matching_transition(wf, wf_inst, msg) do
{:ok, %Transition{} = tran} ->
execute_transition(wf, tran, wf_inst, msg)
{:error, :no_match} ->
{:ok, [], wf_inst}
end
else
{:error, :status_mismatch}
end
end
@spec get_matching_transition(t, Instance.t(), Msg.t()) ::
{:ok, Transition.t()} | {:error, :no_match}
defp get_matching_transition(%WF{} = wf, %Instance{} = wf_inst, %Msg{} = msg) do
wf.transitions
|> Map.get(wf_inst.status, [])
|> Enum.reduce_while({:error, :no_match}, fn %Transition{} = tran, acc ->
if evaluate_when_fn(tran, wf_inst, msg) do
{:halt, {:ok, tran}}
else
{:cont, acc}
end
end)
end
@spec execute_transition(t, Transition.t(), Instance.t(), Msg.t()) ::
{:ok, [OA.oa_params()], Instance.t()}
| {:terminated, [OA.oa_params()], Instance.t()}
| {:error, :status_mismatch}
defp execute_transition(%WF{} = wf, %Transition{} = tran, %Instance{} = wf_inst, %Msg{} = msg) do
last_update =
if tran.timeout? do
wf_inst.last_update + tran.timeout
else
msg.timestamp
end
new_wf_inst = %Instance{
wf_inst
| last_update: last_update,
state: update_wf_inst_state(wf_inst, tran, msg)
}
possible_transitions = format_possible_transitions(wf.transitions, tran, new_wf_inst, msg)
new_wf_inst = %Instance{
new_wf_inst
| status: tran.to,
terminated?: terminal_status?(wf, tran.to),
history: append_history(new_wf_inst, tran, msg),
possible_transitions: possible_transitions,
next_possible_transition_timestamp: find_next_possible_transition_ts(possible_transitions)
}
{:ok, oas} = evaluate_side_effects_fn(tran, new_wf_inst, msg)
cond do
new_wf_inst.terminated? ->
{:terminated, oas, new_wf_inst}
tran.timeout? ->
case handle_message(wf, new_wf_inst, msg) do
{:ok, non_timeout_oas, wf_inst} ->
{:ok, oas ++ non_timeout_oas, wf_inst}
{:terminated, non_timeout_oas, wf_inst} ->
{:terminated, oas ++ non_timeout_oas, wf_inst}
{:error, :status_mismatch} ->
{:error, :status_mismatch}
end
true ->
{:ok, oas, new_wf_inst}
end
end
@spec update_wf_inst_state(Instance.t(), Transition.t(), Msg.t()) :: map
defp update_wf_inst_state(%Instance{} = wf_inst, %Transition{} = tran, %Msg{} = msg) do
case evaluate_then_fn(tran, wf_inst, msg) do
{:ok, new_state} ->
new_state
{:error, _reason} ->
wf_inst.state
end
end
@spec append_history(Instance.t(), Transition.t(), Msg.t()) :: [map]
defp append_history(%Instance{} = wf_inst, %Transition{} = tran, %Msg{} = msg) do
history_entry = %{
"timestamp" => msg.timestamp,
"status" => tran.to
}
new_history_entry =
case evaluate_update_history_entry_fn(history_entry, tran, wf_inst, msg) do
{:ok, new_history_entry} ->
new_history_entry
{:error, _reason} ->
history_entry
end
wf_inst.history ++ [new_history_entry]
end
@spec format_possible_transitions(transitions, Transition.t(), Instance.t(), Msg.t()) :: [map()]
defp format_possible_transitions(transitions, tran, wf_inst, msg) do
transitions
|> Map.get(tran.to, [])
|> Enum.filter(fn pos_tran ->
Enum.all?(pos_tran.when_fn, fn
{:=, _, _} = builtin_fn_def ->
evaluate_builtin_fn(builtin_fn_def, wf_inst.state)
{:<, _, _} = builtin_fn_def ->
evaluate_builtin_fn(builtin_fn_def, wf_inst.state)
{:>, _, _} = builtin_fn_def ->
evaluate_builtin_fn(builtin_fn_def, wf_inst.state)
{:<=, _, _} = builtin_fn_def ->
evaluate_builtin_fn(builtin_fn_def, wf_inst.state)
{:>=, _, _} = builtin_fn_def ->
evaluate_builtin_fn(builtin_fn_def, wf_inst.state)
{:contains, _, _} = builtin_fn_def ->
evaluate_builtin_fn(builtin_fn_def, wf_inst.state)
{:is_in, _, _} = builtin_fn_def ->
evaluate_builtin_fn(builtin_fn_def, wf_inst.state)
{:timeout, _} ->
true
{_, _} ->
true
end)
end)
|> Enum.map(fn pos_tran ->
pos_tran_entry =
if pos_tran.timeout? do
%{
"timestamp" => msg.timestamp + pos_tran.timeout,
"status" => pos_tran.to
}
else
%{
"timestamp" => -1,
"status" => pos_tran.to
}
end
case evaluate_update_possible_transition_fn(pos_tran_entry, pos_tran, wf_inst, msg) do
{:ok, new_pos_tran_entry} ->
new_pos_tran_entry
{:error, _reason} ->
pos_tran_entry
end
end)
end
@spec find_next_possible_transition_ts([map()]) :: integer | nil
# loop through possible_transitions and returns lowest transition timestamp
defp find_next_possible_transition_ts(possible_transitions) do
possible_transitions
|> Enum.map(fn pos_tran -> pos_tran["timestamp"] end)
# timestamp with -1 means unknown transition timestamp
|> Enum.reject(fn timestamp -> timestamp == -1 end)
# find lowest timestamp, return `nil` when there is no transition with known timestamp
|> Enum.min(fn -> nil end)
end
@spec evaluate_when_fn(Transition.t(), Instance.t(), Msg.t()) :: boolean
defp evaluate_when_fn(%Transition{} = tran, %Instance{} = wf_inst, %Msg{} = msg) do
Enum.all?(tran.when_fn, fn
{:timeout, timeout} ->
timeout + wf_inst.last_update <= msg.timestamp
{:=, _, _} = builtin_fn_def ->
evaluate_builtin_fn(builtin_fn_def, wf_inst.state)
{:>, _, _} = builtin_fn_def ->
evaluate_builtin_fn(builtin_fn_def, wf_inst.state)
{:<, _, _} = builtin_fn_def ->
evaluate_builtin_fn(builtin_fn_def, wf_inst.state)
{:>=, _, _} = builtin_fn_def ->
evaluate_builtin_fn(builtin_fn_def, wf_inst.state)
{:<=, _, _} = builtin_fn_def ->
evaluate_builtin_fn(builtin_fn_def, wf_inst.state)
{:contains, _, _} = builtin_fn_def ->
evaluate_builtin_fn(builtin_fn_def, wf_inst.state)
{:is_in, _, _} = builtin_fn_def ->
evaluate_builtin_fn(builtin_fn_def, wf_inst.state)
{mod, fun} ->
apply(mod, fun, [tran, wf_inst, msg])
end)
end
@spec evaluate_builtin_fn(Transition.builtin_when_fn_def(), map) :: boolean
defp evaluate_builtin_fn({:=, path, val}, state) do
UtilsMap.get_path(state, path) == val
end
defp evaluate_builtin_fn({:>, path, val}, state) do
UtilsMap.get_path(state, path) > val
end
defp evaluate_builtin_fn({:<, path, val}, state) do
UtilsMap.get_path(state, path) < val
end
defp evaluate_builtin_fn({:>=, path, val}, state) do
UtilsMap.get_path(state, path) >= val
end
defp evaluate_builtin_fn({:<=, path, val}, state) do
UtilsMap.get_path(state, path) <= val
end
defp evaluate_builtin_fn({:contains, path, val}, state) do
case UtilsMap.get_path(state, path, []) do
value when is_list(value) ->
Enum.member?(value, val)
_ ->
false
end
end
defp evaluate_builtin_fn({:is_in, path, list_val}, state) do
Enum.member?(list_val, UtilsMap.get_path(state, path))
end
@spec evaluate_then_fn(Transition.t(), Instance.t(), Msg.t()) :: {:ok, map} | {:error, term}
defp evaluate_then_fn(%Transition{} = tran, %Instance{} = wf_inst, %Msg{} = msg) do
UtilsEnum.reduce_while_ok(tran.then_fn, wf_inst.state, fn
{mod, fun}, state ->
apply(mod, fun, [tran, %Instance{wf_inst | state: state}, msg])
end)
end
@spec evaluate_side_effects_fn(Transition.t(), Instance.t(), Msg.t()) ::
{:ok, []} | {:error, term}
defp evaluate_side_effects_fn(tran, wf_inst, msg) do
Enum.reduce_while(tran.side_effects_fn, {:ok, []}, fn {mod, fun}, {:ok, oas} ->
case apply(mod, fun, [tran, wf_inst, msg]) do
{:ok, new_oas} ->
{:cont, {:ok, oas ++ new_oas}}
{:error, reason} ->
{:halt, {:error, reason}}
end
end)
end
@spec evaluate_update_history_entry_fn(map, Transition.t(), Instance.t(), Msg.t()) ::
{:ok, map} | {:error, term}
defp evaluate_update_history_entry_fn(history_entry, tran, wf_inst, msg) do
UtilsEnum.reduce_while_ok(tran.update_history_entry_fn, history_entry, fn
{mod, fun}, history_entry ->
apply(mod, fun, [history_entry, tran, wf_inst, msg])
end)
end
@spec evaluate_update_possible_transition_fn(map, Transition.t(), Instance.t(), Msg.t()) ::
{:ok, map} | {:error, term}
defp evaluate_update_possible_transition_fn(pos_tran_entry, tran, wf_inst, msg) do
UtilsEnum.reduce_while_ok(tran.update_possible_transition_fn, pos_tran_entry, fn
{mod, fun}, pos_tran_entry ->
apply(mod, fun, [pos_tran_entry, tran, wf_inst, msg])
end)
end
defp function_exported_validator(arity) do
fn {mod, fun} ->
Code.ensure_loaded?(mod) && Kernel.function_exported?(mod, fun, arity)
end
end
end