lib/runbox/scenario/simple.ex

defmodule Runbox.Scenario.Simple do
  @moduledoc """
  Simple scenario behavior.

  Implement this behaviour to create a scenario. Note the scenario must also implement
  `Runbox.Scenario.Manifest`, although all this can be done in a single module, see example below.

  ## Simple scenario structure

  Simple scenario has a single module that carries the whole logic of the scenario. The module
  specifies which logical topics it wants to consume (`c:input_topics/0`) and defines how messages from
  these topics are processed, updating its state and generating outputs along the way.

  Usually you would use a single module for a scenario implementing both Manifest and this behavior.
  However, you can separate these two if you wish. Note that the name of the module must always be
  prefixed by the module name structure of the Manifest. E.g. if you have manifest module
  `Scenarios.MyScenario` the module implementing this behaviour can be named
  `Scenarios.MyScenario.Logic` but not `Scenarios.MyScenarioLogic`.

  Before the processing is first started an `c:init/1` callback is used to initialize the state.
  State is later used in the computation.

  Each message in the specified topics is then processed by the `c:handle_message/2` callback. This
  callback should contain the main logic of the scenario. Messages arrive sorted into this callback -
  you can assume the time is non-decreasing.

  At minimum you must implement callbacks  `c:input_topics/0` and `c:handle_message/2`, see other
  callbacks available in this module and in `Runbox.Scenario.Manifest` to see what is possible in a
  scenario.

  ## Determinism

  Scenarios should be deterministic, meaning that if you run a scenario twice on the same data you
  get the same results. Scenarios shouldn't depend on any external resources other than topics and
  scenario configuration. Notably, scenarios shouldn't depend on current time. Scenario should
  always only work with time from the incoming messages, called *external time*.

  You should avoid directly making side effects in a scenario. Scenario should instead produce
  *output actions*, which are instructions to perform specific side effects. Output actions are
  automatically performed by the scenario runtime in a deterministic manner. See
  `Runbox.Scenario.OutputAction` for the list of available output actions.

  There are cases when you want to do some work at a specified point of the external time, for
  example to produce some output actions when the external time reaches noon. This can be also
  achieved in a deterministic manner via *timeouts*. You can register a timeout by returning
  a corresponding *runtime instruction*, see `Runbox.Runtime.RuntimeInstruction.register_timeout/1`
  for more details. The message registered in a timeout will be then passed as is to your
  `c:handle_message/2` callback exactly at the time you specify in its `timestamp` field.

  ## State persistence

  State used in the computation is automatically and periodically persisted. This makes it easier to
  restart the computation. You don't have to recompute all of the data again to get the final state,
  you just load the last savepoint and recompute from there.

  By default the state is persisted in the same structure as is used in the scenario. However, if
  you wish you can implement `c:get_state/1` and `c:set_state/1` callbacks to transform the state
  before it is saved into a savepoint and after it is loaded from a savepoint.

  ## Example

      defmodule Scenarios.ServerTemperature do
        use Runbox.Scenario.Manifest
        use Runbox.Scenario.Simple

        alias Runbox.Scenario.Manifest
        alias Runbox.Scenario.OutputAction.Event
        alias Runbox.Scenario.OutputAction.UpsertAssetAttributes
        alias Runbox.Scenario.Type, as: ScenarioType
        alias Runbox.Message

        @impl Manifest
        def get_info do
          %Manifest{
            id: "server_temperature",
            name: "Server temperature",
            description: "Monitors server temperature and saves the data into Reality Network.",
            type: ScenarioType.simple()
          }
        end

        @impl Runbox.Scenario.Simple
        def init(_) do
          {:ok, [], %{last_temps: %{}}}
        end

        @impl Runbox.Scenario.Simple
        def input_topics do
          ["server_sensor"]
        end

        @impl Runbox.Scenario.Simple
        def handle_message(%Message{type: :temperature} = msg, state) do
          last_temp = Map.get(state.last_temps, msg.body.server, 0)

          if last_temp != msg.body.temperature do
            # temperature changed, update reality network
            event = %Event{
              type: "temp_change",
              template: "Temperature on ${actors.server} changed to ${params.temp}",
              actors: %{"server" => %{asset_type: "/assets/servers", asset_id: msg.body.server}},
              params: %{"temp" => msg.body.temperature},
              origin_messages: [msg.origin]
            }

            update_asset = %UpsertAssetAttributes{
              type: "/assets/servers",
              id: msg.body.server,
              attributes: %{"temp" => msg.body.temperature}
            }

            new_state = put_in(state.last_temps[msg.body.server], msg.body.temperature)
            {:ok, [event, update_asset], new_state}
          else
            # temperature didn't change, no need to do anything
            {:ok, [], state}
          end
        end

        # ignore other sensor data
        def handle_message(_, state) do
          {:ok, [], state}
        end
      end
  """
  # Note the example scenario above is also located a tested in a test. Do not modify this example
  # separately.
  alias Runbox.Message
  alias Runbox.Runtime.RuntimeInstruction
  alias Runbox.Scenario.OutputAction
  alias Runbox.Scenario.Simple.Config

  defmacro __using__(_opts) do
    quote do
      @behaviour Runbox.Scenario.Simple
    end
  end

  @typedoc """
  Internal state of the scenario used for computation.

  This is what the scenario uses for its computation. This state is bootstrapped in `c:init/1` and
  is then used and modified in `c:handle_message/2`.
  """
  @type state :: any()

  @typedoc """
  External state of the scenario used for persistence.

  When scenario needs to save its state into a savepoint, it converts the internal state into an
  external state. For this purpose a callback `c:get_state/1` is used. Similarly, when a savepoint
  is loaded the external state inside is converted into the internal state, which is given to the
  scenario so it can start processing messages. A callback `c:set_state/1` is used for that purpose.
  """
  @type external_state :: any()

  @typedoc """
  Scenario outputs.

  An output can be either an output actions or a runtime instructions.
  """
  @type outputs :: [OutputAction.oa_params() | RuntimeInstruction.t()]

  @doc """
  Initializes the scenario run.

  When a run is first started, this callback is called to initialize the state. Additionally you can
  also generate some output actions or runtime instructions. Output actions are produced in the time
  of the `start_from` parameter (see `Runbox.Scenario.Simple.Config`).

  The callback is optional, if not provided the state is initialized to `nil`.
  """
  @callback init(Config.t()) :: {:ok, outputs(), state()}

  @doc """
  Returns a list of topics this scenario consumes.

  Each scenario consumes one or more topics specified in this callback.

  Topic names in this callback are specified as *logical* topics. That is the name without
  Altworx-specific prefixes, e.g. `DATA_1`. It represents the logical data without any tie to the
  physical representation. When a run is started physical topics are assigned to each logical topic.
  That is the names of actual topics physically available on the Altworx instance. By default the
  Altworx-specific prefix is added to the logical name to form a physical topic name, e.g.
  `N6_rt_DATA_1`, but user can override this and choose any other physical topic name.

  Consumed topics can be of two different types:
  * input topic - the default type, regular input, data is read from where run was started
  * load topic - data is always read from the start irrespective of where run is started from. This
    is useful for sparse configuration topics, or generally topics from which you always need the
    complete information before starting processing other data.

  A single scenario can subscribe to multiple topics of various types.

  This callback is mandatory.
  """
  @callback input_topics() :: [
              input_topic :: String.t() | {topic :: String.t(), type :: :input | :load}
            ]

  @doc """
  Handle a message from input topics.

  For each message in the input topics this callback is called. This is where most of the scenario
  business logic lives.

  The computation shouldn't perform any direct side effects. Instead it should rely on output
  actions and runtime instructions that are returned from the callback. Output actions are performed
  at the time of the currently handled message.

  Since scenario is a stateful processing, you can use the internal state in the computation. You
  get the previous state as an argument and you can modify the state.

  This callback is mandatory, without it the scenario has no meaning.
  """
  @callback handle_message(msg :: Message.t(), state()) ::
              {:ok, outputs(), state()}

  @doc """
  Convert internal state to external.

  Called before a state is persisted, the callback should convert the internal state of the to the
  external state, which is ready for persistence. If the scenario uses any other resources that are
  not directly stored in the internal state, they should be stored in the external state in this
  step.

  This callback is optional, defaulting to `Function.identity/1`. Therefore, you should implement this
  callback only when the external state differs from the internal.
  """
  @callback get_state(state()) :: {:ok, external_state()}

  @doc """
  Convert external state to internal and bootstrap the scenario run.

  Called when loading a savepoint to start a run.

  This callback converts the external persisted state into the internal state used for computation.
  The callback can also be used to start any other resources it uses and bootstrap them with
  information from external state.

  This callback is optional, defaulting to `Function.identity/1`. Therefore, you should implement
  this callback only when the external state differs from the internal or when you need to bootstrap
  some other resources.
  """
  @callback set_state(external_state()) :: {:ok, state()}

  @optional_callbacks init: 1, get_state: 1, set_state: 1

  # functions providing access to the module callbacks, handling all exceptional states

  @typep error ::
           {:error,
            {:bad_return_value, value :: any()}
            | {:exception, exception :: any(), stacktrace :: [Exception.stacktrace_entry()]}}
  @doc false
  @spec init(module(), Config.t()) :: {:ok, outputs(), state()} | error()
  def init(module, config) do
    if Code.ensure_loaded?(module) && function_exported?(module, :init, 1) do
      handle_exceptions(fn ->
        case module.init(config) do
          {:ok, oas, _state} = resp when is_list(oas) -> resp
          other -> {:error, {:bad_return_value, other}}
        end
      end)
    else
      {:ok, [], nil}
    end
  end

  @doc false
  @spec input_topics(module()) ::
          {:ok,
           [
             input_topic :: String.t() | {topic :: String.t(), type :: :input | :load}
           ]}
          | error()
  def input_topics(module) do
    handle_exceptions(fn ->
      topics = module.input_topics()

      if input_topics_valid?(topics) do
        {:ok, topics}
      else
        {:error, {:bad_return_value, topics}}
      end
    end)
  end

  defp input_topics_valid?(thing) do
    is_list(thing) &&
      Enum.all?(
        thing,
        &(is_binary(&1) or
            match?({topic, type} when is_binary(topic) and type in [:input, :load], &1))
      )
  end

  @doc false
  @spec handle_message(module(), Message.t(), state()) ::
          {:ok, outputs(), state()} | error()
  def handle_message(module, message, state) do
    handle_exceptions(fn ->
      case module.handle_message(message, state) do
        {:ok, oas, _state} = resp when is_list(oas) -> resp
        other -> {:error, {:bad_return_value, other}}
      end
    end)
  end

  @doc false
  @spec get_state(module(), state()) :: {:ok, external_state()} | error()
  def get_state(module, state) do
    if Code.ensure_loaded?(module) && function_exported?(module, :get_state, 1) do
      handle_exceptions(fn ->
        case module.get_state(state) do
          {:ok, _state} = resp -> resp
          other -> {:error, {:bad_return_value, other}}
        end
      end)
    else
      {:ok, state}
    end
  end

  @doc false
  @spec set_state(module(), external_state()) :: {:ok, state()} | error()
  def set_state(module, state) do
    if Code.ensure_loaded?(module) && function_exported?(module, :set_state, 1) do
      handle_exceptions(fn ->
        case module.set_state(state) do
          {:ok, _state} = resp -> resp
          other -> {:error, {:bad_return_value, other}}
        end
      end)
    else
      {:ok, state}
    end
  end

  defp handle_exceptions(fun) do
    fun.()
  rescue
    exception -> {:error, {:exception, exception, __STACKTRACE__}}
  catch
    error -> {:error, {:exception, error, __STACKTRACE__}}
  end
end