defmodule Toolbox.Normalizer.LoadTopic.Parser do
@moduledoc """
Generic load topic parser.
Module parses encoded JSON objects with expected attributes.
iex> raw_message = ~s({"load_topic_message": "1","type": "create_asset","timestamp": 12345,"asset_id": "/asset/foo/bar","attributes": {"attr1": "attr1_val"}})
...> Toolbox.Normalizer.LoadTopic.Parser.parse(raw_message)
{:ok, %{load_topic_message: "1", type: :create_asset, timestamp: 12_345, asset_id: "/asset/foo/bar", attributes: %{"attr1" => "attr1_val"}}}
iex> raw_message = ~s({"load_topic_message": "1","type": "upsert_asset","timestamp": 12345, "asset_id": "/asset/foo/bar", "attributes": {"attr1": "attr1_val"}})
...> Toolbox.Normalizer.LoadTopic.Parser.parse(raw_message)
{:ok, %{load_topic_message: "1", type: :upsert_asset, timestamp: 12_345, asset_id: "/asset/foo/bar", attributes: %{"attr1" => "attr1_val"}}}
iex> raw_message = ~s({"load_topic_message": "1","type": "update_asset","timestamp": 12345,"asset_id": "/asset/foo/bar","attributes": {"attr1": "attr1_val"}})
...> Toolbox.Normalizer.LoadTopic.Parser.parse(raw_message)
{:ok, %{load_topic_message: "1", type: :update_asset, timestamp: 12_345, asset_id: "/asset/foo/bar", attributes: %{"attr1" => "attr1_val"}}}
iex> raw_message = ~s({"load_topic_message": "1","type": "delete_asset","timestamp": 12345,"asset_id": "/asset/foo/bar"})
...> Toolbox.Normalizer.LoadTopic.Parser.parse(raw_message)
{:ok, %{load_topic_message: "1", type: :delete_asset, timestamp: 12_345, asset_id: "/asset/foo/bar"}}
iex> raw_message = ~s({"load_topic_message": "1","type": "create_edge","timestamp": 12345,"from_asset_id": "/asset/foo/1","to_asset_id": "/asset/foo/2","edge_type": "actor"})
...> Toolbox.Normalizer.LoadTopic.Parser.parse(raw_message)
{:ok, %{load_topic_message: "1", type: :create_edge, timestamp: 12_345, from_asset_id: "/asset/foo/1", to_asset_id: "/asset/foo/2", edge_type: "actor"}}
iex> raw_message = ~s({"load_topic_message": "1","type": "upsert_edge","timestamp": 12345,"from_asset_id": "/asset/foo/1","to_asset_id": "/asset/foo/2","edge_type": "actor"})
...> Toolbox.Normalizer.LoadTopic.Parser.parse(raw_message)
{:ok, %{load_topic_message: "1", type: :upsert_edge, timestamp: 12_345, from_asset_id: "/asset/foo/1", to_asset_id: "/asset/foo/2", edge_type: "actor"}}
iex> raw_message = ~s({"load_topic_message": "1","type": "delete_edge","timestamp": 12345,"from_asset_id": "/asset/foo/1","to_asset_id": "/asset/foo/2","edge_type": "actor"})
...> Toolbox.Normalizer.LoadTopic.Parser.parse(raw_message)
{:ok, %{load_topic_message: "1", type: :delete_edge, timestamp: 12_345, from_asset_id: "/asset/foo/1", to_asset_id: "/asset/foo/2", edge_type: "actor"}}
iex> raw_message = ~s({"load_topic_message": "2","type": "update_asset","timestamp": 12345,"asset_id": "/asset/foo/bar","update_attributes": {"attr1": "attr1_val"}, "delete_attributes": {"attr2": true}})
...> Toolbox.Normalizer.LoadTopic.Parser.parse(raw_message)
{:ok, %{load_topic_message: "2", type: :update_asset, timestamp: 12_345, asset_id: "/asset/foo/bar", update_attributes: %{"attr1" => "attr1_val"}, delete_attributes: %{"attr2" => true}}}
"""
alias Jason
@type parsed_load_topic_message ::
%{
load_topic_message: String.t(),
type: :create_asset | :upsert_asset | :update_asset,
timestamp: integer,
asset_id: String.t(),
attributes: map
}
| %{
load_topic_message: String.t(),
type: :delete_asset,
timestamp: integer,
asset_id: String.t()
}
| %{
load_topic_message: String.t(),
type: :create_edge | :upsert_edge | :delete_edge,
timestamp: integer,
from_asset_id: String.t(),
to_asset_id: String.t(),
edge_type: String.t()
}
@doc """
Parse raw message into load topic message
"""
@spec parse(String.t()) :: {:ok, parsed_load_topic_message} | {:error, term}
def parse(raw) do
with {:json_object?, {:ok, %{} = json_msg}} <- {:json_object?, Jason.decode(raw)},
{:load_topic_msg?, {:ok, msg}} <- {:load_topic_msg?, parse_load_topic_msg(json_msg)} do
{:ok, msg}
else
{:json_object?, {:ok, _not_a_map}} ->
{:error, :not_a_json_object}
{:json_object?, {:error, _reason}} ->
{:error, :not_a_json}
{:load_topic_msg?, {:error, reason}} ->
{:error, reason}
end
end
defp parse_load_topic_msg(%{"load_topic_message" => "1"} = raw_msg) do
with :ok <- validate_load_topic_msg_v1(raw_msg) do
{:ok, parse_load_topic_msg_v1(raw_msg)}
end
end
defp parse_load_topic_msg(%{"load_topic_message" => "2"} = raw_msg) do
with :ok <- validate_load_topic_msg_v2(raw_msg) do
{:ok, parse_load_topic_msg_v2(raw_msg)}
end
end
defp parse_load_topic_msg(%{"load_topic_message" => _}) do
{:error, :unknown_load_topic_version}
end
defp parse_load_topic_msg(_raw_msg) do
{:error, :not_a_load_topic_message}
end
defp validate_load_topic_msg_v1(%{"timestamp" => timestamp} = msg) when is_integer(timestamp) do
case msg do
%{"type" => "create_asset", "asset_id" => _, "attributes" => %{}} ->
:ok
%{"type" => "upsert_asset", "asset_id" => _, "attributes" => %{}} ->
:ok
%{"type" => "update_asset", "asset_id" => _, "attributes" => %{}} ->
:ok
%{"type" => "delete_asset", "asset_id" => _} ->
:ok
%{"type" => "create_edge", "from_asset_id" => _, "to_asset_id" => _, "edge_type" => _} ->
:ok
%{"type" => "upsert_edge", "from_asset_id" => _, "to_asset_id" => _, "edge_type" => _} ->
:ok
%{"type" => "delete_edge", "from_asset_id" => _, "to_asset_id" => _, "edge_type" => _} ->
:ok
_ ->
{:error, :invalid_load_topic_message}
end
end
defp validate_load_topic_msg_v1(_msg) do
{:error, :invalid_load_topic_message}
end
defp validate_load_topic_msg_v2(%{"timestamp" => timestamp} = msg) when is_integer(timestamp) do
case msg do
%{
"type" => "update_asset",
"asset_id" => _,
"update_attributes" => %{},
"delete_attributes" => %{}
} ->
:ok
_ ->
{:error, :invalid_load_topic_message}
end
end
defp validate_load_topic_msg_v2(_msg) do
{:error, :invalid_load_topic_message}
end
defp parse_load_topic_msg_v1(%{"type" => type} = msg)
when type in ["create_asset", "upsert_asset", "update_asset"] do
%{
load_topic_message: "1",
type: String.to_atom(type),
timestamp: msg["timestamp"],
asset_id: msg["asset_id"],
attributes: msg["attributes"]
}
end
defp parse_load_topic_msg_v1(%{"type" => "delete_asset"} = msg) do
%{
load_topic_message: "1",
type: :delete_asset,
timestamp: msg["timestamp"],
asset_id: msg["asset_id"]
}
end
defp parse_load_topic_msg_v1(%{"type" => type} = msg)
when type in ["create_edge", "upsert_edge", "delete_edge"] do
%{
load_topic_message: "1",
type: String.to_atom(type),
timestamp: msg["timestamp"],
from_asset_id: msg["from_asset_id"],
to_asset_id: msg["to_asset_id"],
edge_type: msg["edge_type"]
}
end
defp parse_load_topic_msg_v2(%{"type" => "update_asset"} = msg) do
%{
load_topic_message: "2",
type: :update_asset,
timestamp: msg["timestamp"],
asset_id: msg["asset_id"],
update_attributes: msg["update_attributes"],
delete_attributes: msg["delete_attributes"]
}
end
end