lib/toolbox/normalizer/load_topic/pipeline.ex

defmodule Toolbox.Normalizer.LoadTopic.Pipeline do
  @moduledoc """
  Normalizer pipeline helper for generic load topic.

  This pipeline uses `Toolbox.Normalizer.LoadTopic.Parser` as a raw message parser.

  Example pipeline definition in `config.ini`:

  ```
  [pipelines.reality_network_updates_1]
  input = reality_network_updates
  big_delay_policy = adjust
  definition = Toolbox.Normalizer.LoadTopic.Pipeline.parse_msg
  ```

  Pipeline expects raw messages to be encoded JSON objects. Structure of JSON object should be
  as follow.

  Every object has to have these 3 common attributes + few other type specific attributes.
  * load topic message identification + version via `load_topic_message` attribute with string version (currently only `"1"` and `"2"` are supported)
  * timestamp via `timestamp` attribute
  * action type via `type` attribute

  Type `create_asset` attributes in version `"1"` of load message:
  * `asset_id`
  * `attributes`

  Type `upsert_asset` attributes in version `"1"` of load message:
  * `asset_id`
  * `attributes`

  Type `update_asset` attributes in version `"1"` of load message:
  * `asset_id`
  * `attributes`

  Type `update_asset` attributes in version `"2"` of load message:
  * `asset_id`
  * `update_attributes`
  * `delete_attributes`

  Type `delete_asset` attributes in version `"1"` of load message:
  * `asset_id`

  Type `create_edge` attributes in version `"1"` of load message:
  * `from_asset_id`
  * `to_asset_id`
  * `edge_type`

  Type `upsert_edge` attributes in version `"1"` of load message:
  * `from_asset_id`
  * `to_asset_id`
  * `edge_type`

  Type `delete_edge` attributes in version `"1"` of load message:
  * `from_asset_id`
  * `to_asset_id`
  * `edge_type`

  These types of messages are transformed into corresponding Altworx message \
  `t:Toolbox.Normalizer.LoadTopic.Pipeline.load_topic_message/0`.
  For example raw message
  ```
  {
      "load_topic_message": "1",
      "type": "create_asset",
      "timestamp": 12345,
      "asset_id": "/asset/foo/bar",
      "attributes": {
          "attr1": "attr1_val"
      }
  }
  ```
  would produce this Altworx message
  ```
  %Toolbox.Message{
      type: :create_asset,
      timestamp: 12345,
      body: %{
          load_topic_message: "1",
          asset_id: "/asset/foo/bar",
          attributes: %{"attr1" => "attr1_val"}
      }
  }
  ```

  Messages produced by this pipeline are compatible with `Scenario.LoadScenario` scenario.
  """

  @type load_topic_message ::
          %Toolbox.Message{
            type: :create_asset | :upsert_asset | :update_asset,
            body: %{load_topic_message: String.t(), asset_id: String.t(), attributes: map}
          }
          | %Toolbox.Message{
              type: :delete_asset,
              body: %{load_topic_message: String.t(), asset_id: String.t()}
            }
          | %Toolbox.Message{
              type: :create_edge | :upsert_edge | :delete_edge,
              body: %{
                load_topic_message: String.t(),
                from_asset_id: String.t(),
                to_asset_id: String.t(),
                edge_type: String.t()
              }
            }

  require Logger

  alias Toolbox.Message
  alias Toolbox.Normalizer.LoadTopic.Parser

  @spec parse_msg(Message.t()) :: load_topic_message | {:error, term}
  def parse_msg(%Message{body: raw_msg} = toolbox_msg) do
    case Parser.parse(raw_msg) do
      {:ok, msg} ->
        %{toolbox_msg | body: msg, type: msg.type, timestamp: msg.timestamp}

      {:error, reason} ->
        Logger.warn(
          "Normalization failed with reason #{inspect(reason)} for load topic message #{inspect(raw_msg)}"
        )

        {:error, reason}
    end
  end
end