lib/toolbox/workflow.ex

defmodule Toolbox.Workflow do
  @moduledoc """
  Workflow is an abstraction of a state machine designed to use with Altworx scenarios.

  ## Overview

  A workflow essentially describes a “blueprint” for a state machine. It specifies what
  possible states of the machine are, when does it move from one state to another,
  and what happens when such a transition is performed.

  A concrete state machine that works according to the blueprint is called a *workflow
  instance* and is represented by the `Toolbox.Workflow.Instance` struct. It specifies
  what the current status and state are, what statuses it has gone through so far, and
  so on. See `t:Toolbox.Workflow.Instance.t/0` for more information.

  Note that what is normally referred to as *states* are in workflows called *statuses*.
  Statuses are from a finite set and are given in advance (e.g., a CD player can have
  three statuses: `empty`, `idle`, and `playing`). *State*, on the other hand, is an
  additional context enriching status with other information. State is not listed upfront
  and is not limited in size. For example, state of a CD player can be the artist and
  album of the CD that is currently playing.

  In a workflow, statuses are defined by declared transitions. That is, the set of statuses
  is defined by all the statuses that are mentioned in the definition of transitions.
  Statuses are therefore a part of the workflow definition and cannot be changed later.
  State is defined when a new workflow instance is created. State can then be modified
  during the life-cycle of the instance using `then` callbacks.

  Workflow instances consume messages received by scenarios and produce output actions.
  They are used from inside scenario units which feed them with messages, return produced
  output actions, and store their state (i.e., their instance struct) in their own state.
  As units are event-driven and can only react to messages, so are workflow instances and
  the only time when a transition may be performed (or when a workflow instance may be
  created) is when a message is received.

  Most of the time, workflows are used via their wrapper for incident workflows
  (`Toolbox.Incident`). However, they can be used on their own as well.

  ## Example

  Here is a simple workflow for a CD player that has statuses and transitions as defined
  in the diagram below and that creates an event when a CD finishes.
  It operates on state that can be described with the following type:
  `%{cd: %{album: String.t(), artist: String.t()} | nil}`.

  ```mermaid
  stateDiagram-v2
    direction LR
    empty --> idle: insert CD
    idle --> playing: play
    playing --> idle: stop
    playing --> idle: end of CD
    playing --> empty: eject
    idle --> empty: eject
  ```

      defmodule CdPlayerWorkflow do
        alias Toolbox.Workflow
        alias Runbox.Message
        alias Runbox.Scenario.OutputAction, as: OA

        @cd_length :timer.minutes(74)

        def definition do
          Workflow.new()
          |> Workflow.add_transition(from: "empty", to: "idle", when: {__MODULE__, :insert_cd_msg},
                                     then: {__MODULE__, :put_cd_to_state})
          |> Workflow.add_transition(from: "idle", to: "playing", when: {__MODULE__, :play_msg})
          |> Workflow.add_transition(from: "playing", to: "idle", when: {__MODULE__, :stop_msg})
          |> Workflow.add_transition(from: "playing", to: "idle", when: {:timeout, @cd_length},
                                     side_effects: {__MODULE__, :create_played_event})
          |> Workflow.add_transition(from: "playing", to: "empty", when: {__MODULE__, :eject_msg},
                                     then: {__MODULE__, :remove_cd_from_state})
          |> Workflow.add_transition(from: "idle", to: "empty", when: {__MODULE__, :eject_msg},
                                     then: {__MODULE__, :remove_cd_from_state})
          |> Workflow.build()
        end

        def insert_cd_msg(_tran, _inst, %Message{type: type}), do: type == :insert_cd
        def eject_msg(_tran, _inst, %Message{type: type}), do: type == :eject
        def play_msg(_tran, _inst, %Message{type: type}), do: type == :play
        def stop_msg(_tran, _inst, %Message{type: type}), do: type == :stop

        def put_cd_to_state(_tran, inst, %Message{type: :insert_cd, body: body}) do
          cd = %{album: body.album, artist: body.artist}
          state = %{inst.state | cd: cd}
          {:ok, state}
        end

        def remove_cd_from_state(_tran, inst, _msg) do
          state = %{inst.state | cd: nil}
          {:ok, state}
        end

        def create_played_event(_tran, inst, msg) do
          oa = %OA.Event{
            type: "played_cd",
            template: "${actors.player} played ${params.album} by ${params.artist}",
            actors: %{"player" => %{asset_type: inst.type, asset_id: inst.id, name: nil}},
            params: %{"album" => inst.state.cd.album, "artist" => inst.state.cd.artist},
            origin_messages: [msg.origin]
          }
          {:ok, [oa]}
        end
      end

  An instance of this workflow may be then created like this:

      {:ok, wf} = CdPlayerWorkflow.definition()
      {:ok, oas, inst} = Workflow.new_instance(wf, "empty", "cd_player", "Sony D-50", %{cd: nil}, msg)

  Afterwards, when a message comes to your unit, you can then call `handle_message/3` to process
  the message and get an updated instance together with a list of produced output actions:

      {:ok, oas, inst} = Workflow.handle_message(wf, inst, msg)
  """

  alias __MODULE__, as: WF
  alias __MODULE__.Instance
  alias __MODULE__.Transition
  alias Runbox.Message, as: Msg
  alias Runbox.Scenario.OutputAction, as: OA
  alias Toolbox.Utils.Enum, as: UtilsEnum
  alias Toolbox.Utils.Map, as: UtilsMap

  defmodule Transition do
    @moduledoc """
    Struct defining a single transition, i.e., a change from one status to another.

    The struct should be built only via `Toolbox.Workflow.add_transition/2`. You can then access
    this struct in all the workflow callbacks such as `when` or `then`.
    """

    alias Toolbox.Workflow, as: WF

    @type builtin_when_fn_def ::
            {:=, [String.t()], term}
            | {:<, [String.t()], term}
            | {:>, [String.t()], term}
            | {:<=, [String.t()], term}
            | {:>=, [String.t()], term}
            | {:contains, [String.t()], term}
            | {:is_in, [String.t()], term}
    @type when_fn_def :: {atom, atom} | {:timeout, integer} | builtin_when_fn_def
    @type then_fn_def :: {atom, atom}
    @type side_effects_fn_def :: {atom, atom}
    @type update_history_entry_fn_def :: {atom, atom}
    @type update_possible_transition_fn_def :: {atom, atom}

    @type t :: %Transition{
            from: WF.status(),
            to: WF.status(),
            when_fn: [when_fn_def],
            then_fn: [then_fn_def],
            side_effects_fn: [side_effects_fn_def],
            timeout?: boolean,
            timeout: integer,
            attributes: %{required(atom) => term},
            update_history_entry_fn: [update_history_entry_fn_def],
            update_possible_transition_fn: [update_possible_transition_fn_def]
          }

    defstruct from: nil,
              to: nil,
              when_fn: nil,
              then_fn: nil,
              side_effects_fn: nil,
              timeout?: false,
              timeout: -1,
              attributes: %{},
              update_history_entry_fn: nil,
              update_possible_transition_fn: nil
  end

  defmodule Instance do
    @moduledoc """
    Struct holding the current state of a single workflow instance.

    The struct should be built only via `Toolbox.Workflow.new_instance/7`. An updated instance
    is subsequently returned from every call to `Toolbox.Workflow.handle_message/3`.
    """

    alias Toolbox.Workflow, as: WF

    @typedoc """
    Workflow instance.

    * `status` - the current status. Note that the `:none` status may be set only during
      internal initialization of the instance. After`Toolbox.Workflow.new_instance/7` returns,
      it will never have this status.
    * `state` - the current context. May be any map. Updated using the `then` callbacks.
    * `type`: an arbitrary “type”, possibly an asset type. Does not influence anything.
      Set using `Toolbox.Workflow.new_instance/7`, does not change afterwards.
    * `id`: an arbitrary “id”, possibly an asset id. May be used to distinguish instances.
      Does not influence anything. Set using `Toolbox.Workflow.new_instance/7`, does not
      change afterwards.
    * `history`: a list of history entries, sorted from oldest to newest. Each entry is by default
      of the form `%{"status" => some_status, "timestamp" => some_ts}`. Even the initial status is
      included.  In case of a timeout transition, the timestamp is the timestamp of the relevant
      message (i.e., may be later than the actual timeout). The shape of history entries may be
      modified using the `update_history_entry` callbacks.
    * `last_updated`: the timestamp of the last transition. Including the transition to the
      initial state. In case of a timeout transition, the timestamp corresponds to the moment
      when the timeout fires.
    * `possible_transitions`: a list of transitions than can be performed from the current status.
      Each is by default of the form `%{"status" => some_status, "timestamp" => some_ts}`.
      The timestamp is `-1` if there is no timeout bound with the transition, otherwise it is
      timestamp of the timeout. If a transition uses the builtin `when` conditions on state,
      they are taken into account and only if they hold, the transition is considered as possible.
      The shape of the possible transitions may be modified using the `update_possible_transition`
      callbacks.
    * `next_possible_transition_timestamp`: the earliest timestamp out of the possible transitions
      that specify a timeout (i.e., their timeout is not `-1`), or `nil` if there is no such
      transition.
    * `terminated?`: a flag meaning that the instance cannot move to any other status, i.e.,
      its status is terminal and there are no transition from it.

    ## Example

    This example uses the `CdPlayerWorkflow` from `Toolbox.Workflow` moduledoc.

        %Toolbox.Workflow.Instance{
          id: "Sony D-50",
          type: "cd_player",
          state: %{cd: %{album: "Zorya", artist: "Floex"}},
          last_update: 300,
          status: "playing",
          history: [
            %{"status" => "empty", "timestamp" => 100},
            %{"status" => "idle", "timestamp" => 200},
            %{"status" => "playing", "timestamp" => 300}
          ],
          possible_transitions: [
            %{"status" => "idle", "timestamp" => -1},
            %{"status" => "idle", "timestamp" => 4440300},
            %{"status" => "empty", "timestamp" => -1}
          ],
          next_possible_transition_timestamp: 4440300,
          terminated?: false
        }
    """
    @type t :: %Instance{
            id: String.t(),
            type: String.t(),
            state: map,
            last_update: integer,
            status: :none | WF.status(),
            history: [map],
            possible_transitions: [map],
            next_possible_transition_timestamp: integer | nil,
            terminated?: boolean
          }

    defstruct id: nil,
              type: nil,
              state: %{},
              last_update: 0,
              status: :none,
              history: [],
              possible_transitions: [],
              next_possible_transition_timestamp: nil,
              terminated?: false
  end

  defstruct statuses: [],
            transitions: %{},
            terminal_statuses: [],
            built?: false

  @type status :: String.t()

  @type transitions :: %{required(status) => [Transition.t()]}
  @type t :: %WF{
          statuses: [status],
          transitions: transitions,
          terminal_statuses: [status],
          built?: boolean
        }

  @spec new :: t
  @doc """
  Creates new blank workflow definition.

  Continue with calls to `add_transition/2` and `build/1` to define a workflow.
  """
  def new do
    %WF{}
  end

  @doc """
  Creates a new instance for a given workflow.

  `status` is the initial status of the instance. `state` is its initial state.

  `type` and `id` may be arbitrary. If your instance represents an asset or incident,
  you may use its type and id. These two fields are kept in the `Toolbox.Workflow.Instance`
  struct and are available in all workflow callbacks, so you can use them e.g. to construct
  output actions. They don't directly influence the computation, they're just additional
  information available to the user.

  `msg` is the message that causes this instance to be created. Its timestamp is used as
  the first timestamp in the instance history.

  When a new instance is created, it internally performs a transition from the internal
  status `:none` to the given `status`. You may pass transition callbacks using `options`
  (in the same way as to `add_transition/2`) and these callbacks will be executed on this
  initial transition. All the callbacks except for `when` are supported.

  The creation of an instance automatically adds the first entry to the instance history
  and sets the `last_update` field to the timestamp of `msg`.

  ## Example

      iex> Workflow.new_instance(wf, "empty", "cd_player", "Sony D-50", %{cd: nil},
      ...>                       %Message{type: :discover_player, timestamp: 100})
      {:ok, [],
        %Toolbox.Workflow.Instance{
          id: "Sony D-50",
          type: "cd_player",
          state: %{cd: nil},
          last_update: 100,
          status: "empty",
          history: [%{"status" => "empty", "timestamp" => 100}],
          possible_transitions: [%{"status" => "idle", "timestamp" => -1}],
          next_possible_transition_timestamp: nil,
          terminated?: false
      }}
  """
  @spec new_instance(t, status, String.t(), String.t(), map, Msg.t(), Keyword.t()) ::
          {:ok, [OA.oa_params()], Instance.t()}
          | {:terminated, [OA.oa_params()], Instance.t()}
          | {:error, :unknown_status}
  def new_instance(%WF{} = wf, status, type, id, state, %Msg{} = msg, options \\ []) do
    if status in wf.statuses do
      transition_params =
        Keyword.merge(options,
          when: [],
          then: Keyword.get(options, :then, []),
          side_effects: Keyword.get(options, :side_effects, []),
          update_history_entry: Keyword.get(options, :update_history_entry, []),
          update_possible_transition: []
        )

      tran = construct_transition(:none, status, transition_params)

      execute_transition(wf, tran, %Instance{id: id, type: type, state: state}, msg)
    else
      {:error, :unknown_status}
    end
  end

  @spec build(t) ::
          {:ok, t}
          | {:error, :transition_from_required}
          | {:error, :transition_to_required}
          | {:error, {:bad_callback, {atom, atom}}}
          | {:error, :multiple_init_statuses}
  @doc "Finishes workflow definition, validates all configured dependencies and workflow structure."
  def build(%WF{} = wf) do
    with :ok <- validate_transition_defs(wf.transitions) do
      statuses =
        wf.transitions
        |> Enum.map(fn {from, trs} -> [from | Enum.map(trs, fn tr -> tr.to end)] end)
        |> List.flatten()
        |> Enum.uniq()

      non_term_stats = Map.keys(wf.transitions)
      term_stats = Enum.filter(statuses, fn status -> !Enum.member?(non_term_stats, status) end)

      wf = %WF{
        wf
        | built?: true,
          statuses: statuses,
          terminal_statuses: term_stats
      }

      {:ok, wf}
    end
  end

  @spec validate_transition_defs(transitions) ::
          :ok
          | {:error, :transition_from_required}
          | {:error, :transition_to_required}
          | {:error, {:bad_callback, {atom, atom}}}
  defp validate_transition_defs(transitions) do
    valid_when_fn? = valid_when_fn?()

    valid_then_fn? = function_exported_validator(3)
    valid_side_effects_fn? = function_exported_validator(3)
    valid_update_history_entry_fn? = function_exported_validator(4)
    valid_update_possible_transition_fn? = function_exported_validator(4)

    transitions
    |> Map.values()
    |> List.flatten()
    |> Enum.reduce_while(:ok, fn tran, acc ->
      cond do
        tran.from == :none ->
          {:halt, {:error, :transition_from_required}}

        tran.to == nil ->
          {:halt, {:error, :transition_to_required}}

        !Enum.all?(tran.when_fn, valid_when_fn?) ->
          bad_callback = Enum.find(tran.when_fn, fn callback -> !valid_when_fn?.(callback) end)
          {:halt, {:error, {:bad_callback, bad_callback}}}

        !Enum.all?(tran.then_fn, valid_then_fn?) ->
          bad_callback = Enum.find(tran.then_fn, fn callback -> !valid_then_fn?.(callback) end)
          {:halt, {:error, {:bad_callback, bad_callback}}}

        !Enum.all?(tran.side_effects_fn, valid_side_effects_fn?) ->
          bad_callback =
            Enum.find(tran.side_effects_fn, fn callback -> !valid_side_effects_fn?.(callback) end)

          {:halt, {:error, {:bad_callback, bad_callback}}}

        !Enum.all?(tran.update_history_entry_fn, valid_update_history_entry_fn?) ->
          bad_callback =
            Enum.find(tran.update_history_entry_fn, fn callback ->
              !valid_update_history_entry_fn?.(callback)
            end)

          {:halt, {:error, {:bad_callback, bad_callback}}}

        !Enum.all?(tran.update_possible_transition_fn, valid_update_possible_transition_fn?) ->
          bad_callback =
            Enum.find(tran.update_possible_transition_fn, fn callback ->
              !valid_update_possible_transition_fn?.(callback)
            end)

          {:halt, {:error, {:bad_callback, bad_callback}}}

        true ->
          {:cont, acc}
      end
    end)
  end

  defp valid_when_fn? do
    fn
      {:timeout, _} ->
        true

      {:=, path, _} when is_list(path) ->
        true

      {:>, path, _} when is_list(path) ->
        true

      {:<, path, _} when is_list(path) ->
        true

      {:<=, path, _} when is_list(path) ->
        true

      {:>=, path, _} when is_list(path) ->
        true

      {:contains, path, _} when is_list(path) ->
        true

      {:is_in, path, value} when is_list(path) and is_list(value) ->
        true

      {mod, fun} ->
        Code.ensure_loaded?(mod) && Kernel.function_exported?(mod, fun, 3)

      _ ->
        false
    end
  end

  @spec add_transition(t, Keyword.t()) :: t
  @doc """
  Defines a new transition in a given workflow.

  The transition is defined using a keyword list with the following keys:
  - `from` - source status
    - required parameter
  - `to` - target status
    - required parameter
  - `when` - predicate used to select transition which will be executed
    - optional parameter
    - possible `when` definitions:
      - `{Module, function}`, where function accepts transition, instance and message as args,
        and returns a boolean.
      - `{:timeout, timeout}`, where timeout is defined in milliseconds
      - `{:=, [path, to, state, key], value}`
      - `{:>, [path, to, state, key], value}`
      - `{:<, [path, to, state, key], value}`
      - `{:<=, [path, to, state, key], value}`
      - `{:>=, [path, to, state, key], value}`
      - `{:contains, [path, to, state, key], value}`
      - `{:is_in, [path, to, state, key], list_value}`
      - a list of predicates where each has one of the shapes above
        - e.g., `[{__MODULE__, :play_msg}, {:=, [:cd, :artist], "Pink Floyd"}]`
        - all predicates in that list must hold for the combined predicate to hold
  - `then` - callback used to update workflow instance *state* during transition execution
    - optional parameter
    - possible then definitions:
      - `{Module, function}`, where function accepts transition, instance and message as args
      - a list of `{Module, functions}` items
        - listed callbacks are executed in the given order
    - calbacks should return `{:ok, wf_instance_state :: map()}` or `{:error, reason}`
  - `side_effects` - callback used to generate output actions during transition execution
    - optional parameter
    - possible `side_effects` definitions:
      - `{Module, function}`, where function accepts transition, instance and message as args
      - a list of `{Module, functions}` items
        - listed callbacks are executed in the given order
    - calbacks should return `{:ok, [Runbox.Scenario.OutputAction.oa_params()]}` or `{:error, reason}`
  - `update_history_entry` - callback used to modify transition execution history entry
    - optional parameter
    - possible `update_history_entry` definitions:
      - `{Module, function}`, where function accepts history entry, transition, instance and message
        as args
      - a list of `{Module, functions}` items
        - listed callbacks are executed in the given order
    - calbacks should return `{:ok, updated_history_entry :: map()}` or `{:error, reason}`
  - `update_possible_transition` - callback used in `handle_message/3` to modify possible transition
    - optional parameter
    - possible `update_possible_transition` definitions:
      - `{Module, function}`, where function accepts possible transition, transition, instance
        and message as args
      - a list of `{Module, functions}` items
        - listed callbacks are executed in the given order
    - calbacks should return `{:ok, updated_possible_transition :: map()}` or `{:error, reason}`
  """
  def add_transition(%WF{} = wf, params) do
    from = Keyword.get(params, :from, :none)
    to = Keyword.get(params, :to)
    tran = construct_transition(from, to, params)
    trans = Map.update(wf.transitions, from, [tran], fn trans -> trans ++ [tran] end)
    %WF{wf | transitions: trans}
  end

  @spec construct_transition(:none | status, status, Keyword.t()) :: Transition.t()
  defp construct_transition(from, to, params) do
    when_fn = construct_callback_list(params, :when)
    then_fn = construct_callback_list(params, :then)
    side_effects_fn = construct_callback_list(params, :side_effects)
    update_history_entry_fn = construct_callback_list(params, :update_history_entry)
    update_possible_transition_fn = construct_callback_list(params, :update_possible_transition)

    attrs =
      params
      |> Keyword.drop([
        :from,
        :to,
        :when,
        :then,
        :side_effects,
        :update_history_entry,
        :update_possible_transition
      ])
      |> Map.new()

    timeout? = timeout_when_fn?(when_fn)
    {:timeout, timeout} = Enum.find(when_fn, {:timeout, -1}, &timeout_when_fn?/1)

    %Transition{
      from: from,
      to: to,
      when_fn: when_fn,
      then_fn: then_fn,
      side_effects_fn: side_effects_fn,
      timeout?: timeout?,
      timeout: timeout,
      attributes: attrs,
      update_history_entry_fn: update_history_entry_fn,
      update_possible_transition_fn: update_possible_transition_fn
    }
  end

  @builtin_fns [:=, :>, :<, :>=, :<=, :contains, :is_in]
  @spec construct_callback_list(Keyword.t(), atom) :: [
          {atom, atom}
          | {:=, term, term}
          | {:>, term, term}
          | {:<, term, term}
          | {:>=, term, term}
          | {:<=, term, term}
          | {:contains, term, term}
          | {:is_in, term, term}
          | {:timeout, integer}
        ]
  defp construct_callback_list(params, key) do
    case Keyword.get(params, key, []) do
      {builtin_fn, _, _} = builtin_fn_def when builtin_fn in @builtin_fns ->
        [builtin_fn_def]

      {:timeout, _} = timeout_def ->
        [timeout_def]

      {mod, fun} = callback_def when is_atom(mod) and is_atom(fun) ->
        [callback_def]

      callback_defs when is_list(callback_defs) ->
        callback_defs
    end
  end

  @spec timeout_when_fn?([Transition.when_fn_def()] | Transition.when_fn_def()) :: boolean
  defp timeout_when_fn?(when_fns) when is_list(when_fns),
    do: Enum.any?(when_fns, &timeout_when_fn?/1)

  defp timeout_when_fn?({:timeout, _}), do: true
  defp timeout_when_fn?(_), do: false

  @spec terminal_status?(t, status) :: boolean
  defp terminal_status?(%WF{} = wf, status) do
    Enum.member?(wf.terminal_statuses, status)
  end

  @spec handle_message(t, Instance.t(), Msg.t()) ::
          {:ok, [OA.oa_params()], Instance.t()}
          | {:terminated, [OA.oa_params()], Instance.t()}
          | {:error, :not_built_yet}
          | {:error, :status_mismatch}
  @doc """
  Uses given workflow definition and message to update the status and state of a given instance.

  If no configured workflow transition matches, nothing will happen, i.e., instance status and
  state will remain the same.

  Order of callback execution:
  1. `when` definitions of transitions in definition order
  2. `then` definitions of matching transition
  3. `update_history_entry` definitions of matching transition
  4. `update_possible_transition` definitions of matching transition
  5. `side_effects` definitions of matching transition
  """
  def handle_message(%WF{built?: false}, _wf_inst, %Msg{}) do
    {:error, :not_built_yet}
  end

  def handle_message(%WF{built?: true} = wf, %Instance{} = wf_inst, %Msg{} = msg) do
    if Enum.member?(wf.statuses, wf_inst.status) do
      case get_matching_transition(wf, wf_inst, msg) do
        {:ok, %Transition{} = tran} ->
          execute_transition(wf, tran, wf_inst, msg)

        {:error, :no_match} ->
          {:ok, [], wf_inst}
      end
    else
      {:error, :status_mismatch}
    end
  end

  @spec get_matching_transition(t, Instance.t(), Msg.t()) ::
          {:ok, Transition.t()} | {:error, :no_match}
  defp get_matching_transition(%WF{} = wf, %Instance{} = wf_inst, %Msg{} = msg) do
    wf.transitions
    |> Map.get(wf_inst.status, [])
    |> Enum.reduce_while({:error, :no_match}, fn %Transition{} = tran, acc ->
      if evaluate_when_fn(tran, wf_inst, msg) do
        {:halt, {:ok, tran}}
      else
        {:cont, acc}
      end
    end)
  end

  @spec execute_transition(t, Transition.t(), Instance.t(), Msg.t()) ::
          {:ok, [OA.oa_params()], Instance.t()}
          | {:terminated, [OA.oa_params()], Instance.t()}
          | {:error, :status_mismatch}
  defp execute_transition(%WF{} = wf, %Transition{} = tran, %Instance{} = wf_inst, %Msg{} = msg) do
    last_update =
      if tran.timeout? do
        wf_inst.last_update + tran.timeout
      else
        msg.timestamp
      end

    new_wf_inst = %Instance{
      wf_inst
      | last_update: last_update,
        state: update_wf_inst_state(wf_inst, tran, msg)
    }

    possible_transitions = format_possible_transitions(wf.transitions, tran, new_wf_inst, msg)

    new_wf_inst = %Instance{
      new_wf_inst
      | status: tran.to,
        terminated?: terminal_status?(wf, tran.to),
        history: append_history(new_wf_inst, tran, msg),
        possible_transitions: possible_transitions,
        next_possible_transition_timestamp: find_next_possible_transition_ts(possible_transitions)
    }

    {:ok, oas} = evaluate_side_effects_fn(tran, new_wf_inst, msg)

    cond do
      new_wf_inst.terminated? ->
        {:terminated, oas, new_wf_inst}

      tran.timeout? ->
        case handle_message(wf, new_wf_inst, msg) do
          {:ok, non_timeout_oas, wf_inst} ->
            {:ok, oas ++ non_timeout_oas, wf_inst}

          {:terminated, non_timeout_oas, wf_inst} ->
            {:terminated, oas ++ non_timeout_oas, wf_inst}

          {:error, :status_mismatch} ->
            {:error, :status_mismatch}
        end

      true ->
        {:ok, oas, new_wf_inst}
    end
  end

  @spec update_wf_inst_state(Instance.t(), Transition.t(), Msg.t()) :: map
  defp update_wf_inst_state(%Instance{} = wf_inst, %Transition{} = tran, %Msg{} = msg) do
    case evaluate_then_fn(tran, wf_inst, msg) do
      {:ok, new_state} ->
        new_state

      {:error, _reason} ->
        wf_inst.state
    end
  end

  @spec append_history(Instance.t(), Transition.t(), Msg.t()) :: [map]
  defp append_history(%Instance{} = wf_inst, %Transition{} = tran, %Msg{} = msg) do
    history_entry = %{
      "timestamp" => msg.timestamp,
      "status" => tran.to
    }

    new_history_entry =
      case evaluate_update_history_entry_fn(history_entry, tran, wf_inst, msg) do
        {:ok, new_history_entry} ->
          new_history_entry

        {:error, _reason} ->
          history_entry
      end

    wf_inst.history ++ [new_history_entry]
  end

  @spec format_possible_transitions(transitions, Transition.t(), Instance.t(), Msg.t()) :: [map()]
  defp format_possible_transitions(transitions, tran, wf_inst, msg) do
    transitions
    |> Map.get(tran.to, [])
    |> Enum.filter(fn pos_tran ->
      Enum.all?(pos_tran.when_fn, fn
        {:=, _, _} = builtin_fn_def ->
          evaluate_builtin_fn(builtin_fn_def, wf_inst.state)

        {:<, _, _} = builtin_fn_def ->
          evaluate_builtin_fn(builtin_fn_def, wf_inst.state)

        {:>, _, _} = builtin_fn_def ->
          evaluate_builtin_fn(builtin_fn_def, wf_inst.state)

        {:<=, _, _} = builtin_fn_def ->
          evaluate_builtin_fn(builtin_fn_def, wf_inst.state)

        {:>=, _, _} = builtin_fn_def ->
          evaluate_builtin_fn(builtin_fn_def, wf_inst.state)

        {:contains, _, _} = builtin_fn_def ->
          evaluate_builtin_fn(builtin_fn_def, wf_inst.state)

        {:is_in, _, _} = builtin_fn_def ->
          evaluate_builtin_fn(builtin_fn_def, wf_inst.state)

        {:timeout, _} ->
          true

        {_, _} ->
          true
      end)
    end)
    |> Enum.map(fn pos_tran ->
      pos_tran_entry =
        if pos_tran.timeout? do
          %{
            "timestamp" => msg.timestamp + pos_tran.timeout,
            "status" => pos_tran.to
          }
        else
          %{
            "timestamp" => -1,
            "status" => pos_tran.to
          }
        end

      case evaluate_update_possible_transition_fn(pos_tran_entry, pos_tran, wf_inst, msg) do
        {:ok, new_pos_tran_entry} ->
          new_pos_tran_entry

        {:error, _reason} ->
          pos_tran_entry
      end
    end)
  end

  @spec find_next_possible_transition_ts([map()]) :: integer | nil
  # loop through possible_transitions and returns lowest transition timestamp
  defp find_next_possible_transition_ts(possible_transitions) do
    possible_transitions
    |> Enum.map(fn pos_tran -> pos_tran["timestamp"] end)
    # timestamp with -1 means unknown transition timestamp
    |> Enum.reject(fn timestamp -> timestamp == -1 end)
    # find lowest timestamp, return `nil` when there is no transition with known timestamp
    |> Enum.min(fn -> nil end)
  end

  @spec evaluate_when_fn(Transition.t(), Instance.t(), Msg.t()) :: boolean
  defp evaluate_when_fn(%Transition{} = tran, %Instance{} = wf_inst, %Msg{} = msg) do
    Enum.all?(tran.when_fn, fn
      {:timeout, timeout} ->
        timeout + wf_inst.last_update <= msg.timestamp

      {:=, _, _} = builtin_fn_def ->
        evaluate_builtin_fn(builtin_fn_def, wf_inst.state)

      {:>, _, _} = builtin_fn_def ->
        evaluate_builtin_fn(builtin_fn_def, wf_inst.state)

      {:<, _, _} = builtin_fn_def ->
        evaluate_builtin_fn(builtin_fn_def, wf_inst.state)

      {:>=, _, _} = builtin_fn_def ->
        evaluate_builtin_fn(builtin_fn_def, wf_inst.state)

      {:<=, _, _} = builtin_fn_def ->
        evaluate_builtin_fn(builtin_fn_def, wf_inst.state)

      {:contains, _, _} = builtin_fn_def ->
        evaluate_builtin_fn(builtin_fn_def, wf_inst.state)

      {:is_in, _, _} = builtin_fn_def ->
        evaluate_builtin_fn(builtin_fn_def, wf_inst.state)

      {mod, fun} ->
        apply(mod, fun, [tran, wf_inst, msg])
    end)
  end

  @spec evaluate_builtin_fn(Transition.builtin_when_fn_def(), map) :: boolean
  defp evaluate_builtin_fn({:=, path, val}, state) do
    UtilsMap.get_path(state, path) == val
  end

  defp evaluate_builtin_fn({:>, path, val}, state) do
    UtilsMap.get_path(state, path) > val
  end

  defp evaluate_builtin_fn({:<, path, val}, state) do
    UtilsMap.get_path(state, path) < val
  end

  defp evaluate_builtin_fn({:>=, path, val}, state) do
    UtilsMap.get_path(state, path) >= val
  end

  defp evaluate_builtin_fn({:<=, path, val}, state) do
    UtilsMap.get_path(state, path) <= val
  end

  defp evaluate_builtin_fn({:contains, path, val}, state) do
    case UtilsMap.get_path(state, path, []) do
      value when is_list(value) ->
        Enum.member?(value, val)

      _ ->
        false
    end
  end

  defp evaluate_builtin_fn({:is_in, path, list_val}, state) do
    Enum.member?(list_val, UtilsMap.get_path(state, path))
  end

  @spec evaluate_then_fn(Transition.t(), Instance.t(), Msg.t()) :: {:ok, map} | {:error, term}
  defp evaluate_then_fn(%Transition{} = tran, %Instance{} = wf_inst, %Msg{} = msg) do
    UtilsEnum.reduce_while_ok(tran.then_fn, wf_inst.state, fn
      {mod, fun}, state ->
        apply(mod, fun, [tran, %Instance{wf_inst | state: state}, msg])
    end)
  end

  @spec evaluate_side_effects_fn(Transition.t(), Instance.t(), Msg.t()) ::
          {:ok, []} | {:error, term}
  defp evaluate_side_effects_fn(tran, wf_inst, msg) do
    Enum.reduce_while(tran.side_effects_fn, {:ok, []}, fn {mod, fun}, {:ok, oas} ->
      case apply(mod, fun, [tran, wf_inst, msg]) do
        {:ok, new_oas} ->
          {:cont, {:ok, oas ++ new_oas}}

        {:error, reason} ->
          {:halt, {:error, reason}}
      end
    end)
  end

  @spec evaluate_update_history_entry_fn(map, Transition.t(), Instance.t(), Msg.t()) ::
          {:ok, map} | {:error, term}
  defp evaluate_update_history_entry_fn(history_entry, tran, wf_inst, msg) do
    UtilsEnum.reduce_while_ok(tran.update_history_entry_fn, history_entry, fn
      {mod, fun}, history_entry ->
        apply(mod, fun, [history_entry, tran, wf_inst, msg])
    end)
  end

  @spec evaluate_update_possible_transition_fn(map, Transition.t(), Instance.t(), Msg.t()) ::
          {:ok, map} | {:error, term}
  defp evaluate_update_possible_transition_fn(pos_tran_entry, tran, wf_inst, msg) do
    UtilsEnum.reduce_while_ok(tran.update_possible_transition_fn, pos_tran_entry, fn
      {mod, fun}, pos_tran_entry ->
        apply(mod, fun, [pos_tran_entry, tran, wf_inst, msg])
    end)
  end

  defp function_exported_validator(arity) do
    fn {mod, fun} ->
      Code.ensure_loaded?(mod) && Kernel.function_exported?(mod, fun, arity)
    end
  end
end