defmodule Runbox.Scenario.Simple do
@moduledoc """
Simple scenario behavior.
Implement this behaviour to create a scenario. Note the scenario must also implement
`Runbox.Scenario.Manifest`, although all this can be done in a single module, see example below.
## Simple scenario structure
Simple scenario has a single module that carries the whole logic of the scenario. The module
specifies which logical topics it wants to consume (`c:input_topics/0`) and defines how messages from
these topics are processed, updating its state and generating outputs along the way.
Usually you would use a single module for a scenario implementing both Manifest and this behavior.
However, you can separate these two if you wish. Note that the name of the module must always be
prefixed by the module name structure of the Manifest. E.g. if you have manifest module
`Scenarios.MyScenario` the module implementing this behaviour can be named
`Scenarios.MyScenario.Logic` but not `Scenarios.MyScenarioLogic`.
Before the processing is first started an `c:init/1` callback is used to initialize the state.
State is later used in the computation.
Each message in the specified topics is then processed by the `c:handle_message/2` callback. This
callback should contain the main logic of the scenario. Messages arrive sorted into this callback -
you can assume the time is non-decreasing.
At minimum you must implement callbacks `c:input_topics/0` and `c:handle_message/2`, see other
callbacks available in this module and in `Runbox.Scenario.Manifest` to see what is possible in a
scenario.
## Determinism
Scenarios should be deterministic, meaning that if you run a scenario twice on the same data you
get the same results. Scenarios shouldn't depend on any external resources other than topics and
scenario configuration. Notably, scenarios shouldn't depend on current time. Scenario should
always only work with time from the incoming messages, called *external time*.
You should avoid directly making side effects in a scenario. Scenario should instead produce
*output actions*, which are instructions to perform specific side effects. Output actions are
automatically performed by the scenario runtime in a deterministic manner. See
`Runbox.Scenario.OutputAction` for the list of available output actions.
There are cases when you want to do some work at a specified point of the external time, for
example to produce some output actions when the external time reaches noon. This can be also
achieved in a deterministic manner via *timeouts*. You can register a timeout by returning
a corresponding *runtime instruction*, see `Runbox.Runtime.RuntimeInstruction.register_timeout/1`
for more details. The message registered in a timeout will be then passed as is to your
`c:handle_message/2` callback exactly at the time you specify in its `timestamp` field.
## State persistence
State used in the computation is automatically and periodically persisted. This makes it easier to
restart the computation. You don't have to recompute all of the data again to get the final state,
you just load the last savepoint and recompute from there.
By default the state is persisted in the same structure as is used in the scenario. However, if
you wish you can implement `c:get_state/1` and `c:set_state/1` callbacks to transform the state
before it is saved into a savepoint and after it is loaded from a savepoint.
## Example
defmodule Scenarios.ServerTemperature do
use Runbox.Scenario.Manifest
use Runbox.Scenario.Simple
alias Runbox.Scenario.Manifest
alias Runbox.Scenario.OutputAction.Event
alias Runbox.Scenario.OutputAction.UpsertAssetAttributes
alias Runbox.Scenario.Type, as: ScenarioType
alias Runbox.Message
@impl Manifest
def get_info do
%Manifest{
id: "server_temperature",
name: "Server temperature",
description: "Monitors server temperature and saves the data into Reality Network.",
type: ScenarioType.simple()
}
end
@impl Runbox.Scenario.Simple
def init(_) do
{:ok, [], %{last_temps: %{}}}
end
@impl Runbox.Scenario.Simple
def input_topics do
["server_sensor"]
end
@impl Runbox.Scenario.Simple
def handle_message(%Message{type: :temperature} = msg, state) do
last_temp = Map.get(state.last_temps, msg.body.server, 0)
if last_temp != msg.body.temperature do
# temperature changed, update reality network
event = %Event{
type: "temp_change",
template: "Temperature on ${actors.server} changed to ${params.temp}",
actors: %{"server" => %{asset_type: "/assets/servers", asset_id: msg.body.server}},
params: %{"temp" => msg.body.temperature},
origin_messages: [msg.origin]
}
update_asset = %UpsertAssetAttributes{
type: "/assets/servers",
id: msg.body.server,
attributes: %{"temp" => msg.body.temperature}
}
new_state = put_in(state.last_temps[msg.body.server], msg.body.temperature)
{:ok, [event, update_asset], new_state}
else
# temperature didn't change, no need to do anything
{:ok, [], state}
end
end
# ignore other sensor data
def handle_message(_, state) do
{:ok, [], state}
end
end
"""
# Note the example scenario above is also located a tested in a test. Do not modify this example
# separately.
alias Runbox.Message
alias Runbox.Runtime.RuntimeInstruction
alias Runbox.Scenario.OutputAction
alias Runbox.Scenario.Simple.Config
defmacro __using__(_opts) do
quote do
@behaviour Runbox.Scenario.Simple
end
end
@typedoc """
Internal state of the scenario used for computation.
This is what the scenario uses for its computation. This state is bootstrapped in `c:init/1` and
is then used and modified in `c:handle_message/2`.
"""
@type state :: any()
@typedoc """
External state of the scenario used for persistence.
When scenario needs to save its state into a savepoint, it converts the internal state into an
external state. For this purpose a callback `c:get_state/1` is used. Similarly, when a savepoint
is loaded the external state inside is converted into the internal state, which is given to the
scenario so it can start processing messages. A callback `c:set_state/1` is used for that purpose.
"""
@type external_state :: any()
@typedoc """
Scenario outputs.
An output can be either an output actions or a runtime instructions.
"""
@type outputs :: [OutputAction.oa_params() | RuntimeInstruction.t()]
@doc """
Initializes the scenario run.
When a run is first started, this callback is called to initialize the state. Additionally you can
also generate some output actions or runtime instructions. Output actions are produced in the time
of the `start_from` parameter (see `Runbox.Scenario.Simple.Config`).
The callback is optional, if not provided the state is initialized to `nil`.
"""
@callback init(Config.t()) :: {:ok, outputs(), state()}
@doc """
Returns a list of topics this scenario consumes.
Each scenario consumes one or more topics specified in this callback.
Topic names in this callback are specified as *logical* topics. That is the name without
Altworx-specific prefixes, e.g. `DATA_1`. It represents the logical data without any tie to the
physical representation. When a run is started physical topics are assigned to each logical topic.
That is the names of actual topics physically available on the Altworx instance. By default the
Altworx-specific prefix is added to the logical name to form a physical topic name, e.g.
`N6_rt_DATA_1`, but user can override this and choose any other physical topic name.
Consumed topics can be of two different types:
* input topic - the default type, regular input, data is read from where run was started
* load topic - data is always read from the start irrespective of where run is started from. This
is useful for sparse configuration topics, or generally topics from which you always need the
complete information before starting processing other data.
A single scenario can subscribe to multiple topics of various types.
This callback is mandatory.
"""
@callback input_topics() :: [
input_topic :: String.t() | {topic :: String.t(), type :: :input | :load}
]
@doc """
Handle a message from input topics.
For each message in the input topics this callback is called. This is where most of the scenario
business logic lives.
The computation shouldn't perform any direct side effects. Instead it should rely on output
actions and runtime instructions that are returned from the callback. Output actions are performed
at the time of the currently handled message.
Since scenario is a stateful processing, you can use the internal state in the computation. You
get the previous state as an argument and you can modify the state.
This callback is mandatory, without it the scenario has no meaning.
"""
@callback handle_message(msg :: Message.t(), state()) ::
{:ok, outputs(), state()}
@doc """
Convert internal state to external.
Called before a state is persisted, the callback should convert the internal state of the to the
external state, which is ready for persistence. If the scenario uses any other resources that are
not directly stored in the internal state, they should be stored in the external state in this
step.
This callback is optional, defaulting to `Function.identity/1`. Therefore, you should implement this
callback only when the external state differs from the internal.
"""
@callback get_state(state()) :: {:ok, external_state()}
@doc """
Convert external state to internal and bootstrap the scenario run.
Called when loading a savepoint to start a run.
This callback converts the external persisted state into the internal state used for computation.
The callback can also be used to start any other resources it uses and bootstrap them with
information from external state.
This callback is optional, defaulting to `Function.identity/1`. Therefore, you should implement
this callback only when the external state differs from the internal or when you need to bootstrap
some other resources.
"""
@callback set_state(external_state()) :: {:ok, state()}
@optional_callbacks init: 1, get_state: 1, set_state: 1
# functions providing access to the module callbacks, handling all exceptional states
@typep error ::
{:error,
{:bad_return_value, value :: any()}
| {:exception, exception :: any(), stacktrace :: [Exception.stacktrace_entry()]}}
@doc false
@spec init(module(), Config.t()) :: {:ok, outputs(), state()} | error()
def init(module, config) do
if Code.ensure_loaded?(module) && function_exported?(module, :init, 1) do
handle_exceptions(fn ->
case module.init(config) do
{:ok, oas, _state} = resp when is_list(oas) -> resp
other -> {:error, {:bad_return_value, other}}
end
end)
else
{:ok, [], nil}
end
end
@doc false
@spec input_topics(module()) ::
{:ok,
[
input_topic :: String.t() | {topic :: String.t(), type :: :input | :load}
]}
| error()
def input_topics(module) do
handle_exceptions(fn ->
topics = module.input_topics()
if input_topics_valid?(topics) do
{:ok, topics}
else
{:error, {:bad_return_value, topics}}
end
end)
end
defp input_topics_valid?(thing) do
is_list(thing) &&
Enum.all?(
thing,
&(is_binary(&1) or
match?({topic, type} when is_binary(topic) and type in [:input, :load], &1))
)
end
@doc false
@spec handle_message(module(), Message.t(), state()) ::
{:ok, outputs(), state()} | error()
def handle_message(module, message, state) do
handle_exceptions(fn ->
case module.handle_message(message, state) do
{:ok, oas, _state} = resp when is_list(oas) -> resp
other -> {:error, {:bad_return_value, other}}
end
end)
end
@doc false
@spec get_state(module(), state()) :: {:ok, external_state()} | error()
def get_state(module, state) do
if Code.ensure_loaded?(module) && function_exported?(module, :get_state, 1) do
handle_exceptions(fn ->
case module.get_state(state) do
{:ok, _state} = resp -> resp
other -> {:error, {:bad_return_value, other}}
end
end)
else
{:ok, state}
end
end
@doc false
@spec set_state(module(), external_state()) :: {:ok, state()} | error()
def set_state(module, state) do
if Code.ensure_loaded?(module) && function_exported?(module, :set_state, 1) do
handle_exceptions(fn ->
case module.set_state(state) do
{:ok, _state} = resp -> resp
other -> {:error, {:bad_return_value, other}}
end
end)
else
{:ok, state}
end
end
defp handle_exceptions(fun) do
fun.()
rescue
exception -> {:error, {:exception, exception, __STACKTRACE__}}
catch
error -> {:error, {:exception, error, __STACKTRACE__}}
end
end