defmodule Toolbox.Normalizer.LoadTopic.Pipeline do
@moduledoc """
Normalizer pipeline helper for generic load topic.
This pipeline uses `Toolbox.Normalizer.LoadTopic.Parser` as a raw message parser.
Example pipeline definition in `config.ini`:
```
[pipelines.reality_network_updates_1]
input = reality_network_updates
big_delay_policy = adjust
definition = Toolbox.Normalizer.LoadTopic.Pipeline.parse_msg
```
Pipeline expects raw messages to be encoded JSON objects. Structure of JSON object should be
as follow.
Every object has to have these 3 common attributes + few other type specific attributes.
* load topic message identification + version via `load_topic_message` attribute with string version (currently only `"1"` and `"2"` are supported)
* timestamp via `timestamp` attribute
* action type via `type` attribute
Type `create_asset` attributes in version `"1"` of load message:
* `asset_id`
* `attributes`
Type `upsert_asset` attributes in version `"1"` of load message:
* `asset_id`
* `attributes`
Type `update_asset` attributes in version `"1"` of load message:
* `asset_id`
* `attributes`
Type `update_asset` attributes in version `"2"` of load message:
* `asset_id`
* `update_attributes`
* `delete_attributes`
Type `delete_asset` attributes in version `"1"` of load message:
* `asset_id`
Type `create_edge` attributes in version `"1"` of load message:
* `from_asset_id`
* `to_asset_id`
* `edge_type`
Type `upsert_edge` attributes in version `"1"` of load message:
* `from_asset_id`
* `to_asset_id`
* `edge_type`
Type `delete_edge` attributes in version `"1"` of load message:
* `from_asset_id`
* `to_asset_id`
* `edge_type`
These types of messages are transformed into corresponding Altworx message \
`t:Toolbox.Normalizer.LoadTopic.Pipeline.load_topic_message/0`.
For example raw message
```
{
"load_topic_message": "1",
"type": "create_asset",
"timestamp": 12345,
"asset_id": "/asset/foo/bar",
"attributes": {
"attr1": "attr1_val"
}
}
```
would produce this Altworx message
```
%Toolbox.Message{
type: :create_asset,
timestamp: 12345,
body: %{
load_topic_message: "1",
asset_id: "/asset/foo/bar",
attributes: %{"attr1" => "attr1_val"}
}
}
```
Messages produced by this pipeline are compatible with `Scenario.LoadScenario` scenario.
"""
@type load_topic_message ::
%Toolbox.Message{
type: :create_asset | :upsert_asset | :update_asset,
body: %{load_topic_message: String.t(), asset_id: String.t(), attributes: map}
}
| %Toolbox.Message{
type: :delete_asset,
body: %{load_topic_message: String.t(), asset_id: String.t()}
}
| %Toolbox.Message{
type: :create_edge | :upsert_edge | :delete_edge,
body: %{
load_topic_message: String.t(),
from_asset_id: String.t(),
to_asset_id: String.t(),
edge_type: String.t()
}
}
require Logger
alias Toolbox.Message
alias Toolbox.Normalizer.LoadTopic.Parser
@spec parse_msg(Message.t()) :: load_topic_message | {:error, term}
def parse_msg(%Message{body: raw_msg} = toolbox_msg) do
case Parser.parse(raw_msg) do
{:ok, msg} ->
%{toolbox_msg | body: msg, type: msg.type, timestamp: msg.timestamp}
{:error, reason} ->
Logger.warn(
"Normalization failed with reason #{inspect(reason)} for load topic message #{inspect(raw_msg)}"
)
{:error, reason}
end
end
end