lib/toolbox/normalizer/load_topic/parser.ex

defmodule Toolbox.Normalizer.LoadTopic.Parser do
  @moduledoc """
  Generic load topic parser.

  Module parses encoded JSON objects with expected attributes.

      iex> raw_message = ~s({"load_topic_message": "1","type": "create_asset","timestamp": 12345,"asset_id": "/asset/foo/bar","attributes": {"attr1": "attr1_val"}})
      ...> Toolbox.Normalizer.LoadTopic.Parser.parse(raw_message)
      {:ok, %{load_topic_message: "1", type: :create_asset, timestamp: 12_345, asset_id: "/asset/foo/bar", attributes: %{"attr1" => "attr1_val"}}}

      iex> raw_message = ~s({"load_topic_message": "1","type": "upsert_asset","timestamp": 12345, "asset_id": "/asset/foo/bar", "attributes": {"attr1": "attr1_val"}})
      ...> Toolbox.Normalizer.LoadTopic.Parser.parse(raw_message)
      {:ok, %{load_topic_message: "1", type: :upsert_asset, timestamp: 12_345, asset_id: "/asset/foo/bar", attributes: %{"attr1" => "attr1_val"}}}

      iex> raw_message = ~s({"load_topic_message": "1","type": "update_asset","timestamp": 12345,"asset_id": "/asset/foo/bar","attributes": {"attr1": "attr1_val"}})
      ...> Toolbox.Normalizer.LoadTopic.Parser.parse(raw_message)
      {:ok, %{load_topic_message: "1", type: :update_asset, timestamp: 12_345, asset_id: "/asset/foo/bar", attributes: %{"attr1" => "attr1_val"}}}

      iex> raw_message = ~s({"load_topic_message": "1","type": "delete_asset","timestamp": 12345,"asset_id": "/asset/foo/bar"})
      ...> Toolbox.Normalizer.LoadTopic.Parser.parse(raw_message)
      {:ok, %{load_topic_message: "1", type: :delete_asset, timestamp: 12_345, asset_id: "/asset/foo/bar"}}

      iex> raw_message = ~s({"load_topic_message": "1","type": "create_edge","timestamp": 12345,"from_asset_id": "/asset/foo/1","to_asset_id": "/asset/foo/2","edge_type": "actor"})
      ...> Toolbox.Normalizer.LoadTopic.Parser.parse(raw_message)
      {:ok, %{load_topic_message: "1", type: :create_edge, timestamp: 12_345, from_asset_id: "/asset/foo/1", to_asset_id: "/asset/foo/2", edge_type: "actor"}}

      iex> raw_message = ~s({"load_topic_message": "1","type": "upsert_edge","timestamp": 12345,"from_asset_id": "/asset/foo/1","to_asset_id": "/asset/foo/2","edge_type": "actor"})
      ...> Toolbox.Normalizer.LoadTopic.Parser.parse(raw_message)
      {:ok, %{load_topic_message: "1", type: :upsert_edge, timestamp: 12_345, from_asset_id: "/asset/foo/1", to_asset_id: "/asset/foo/2", edge_type: "actor"}}

      iex> raw_message = ~s({"load_topic_message": "1","type": "delete_edge","timestamp": 12345,"from_asset_id": "/asset/foo/1","to_asset_id": "/asset/foo/2","edge_type": "actor"})
      ...> Toolbox.Normalizer.LoadTopic.Parser.parse(raw_message)
      {:ok, %{load_topic_message: "1", type: :delete_edge, timestamp: 12_345, from_asset_id: "/asset/foo/1", to_asset_id: "/asset/foo/2", edge_type: "actor"}}

      iex> raw_message = ~s({"load_topic_message": "2","type": "update_asset","timestamp": 12345,"asset_id": "/asset/foo/bar","update_attributes": {"attr1": "attr1_val"}, "delete_attributes": {"attr2": true}})
      ...> Toolbox.Normalizer.LoadTopic.Parser.parse(raw_message)
      {:ok, %{load_topic_message: "2", type: :update_asset, timestamp: 12_345, asset_id: "/asset/foo/bar", update_attributes: %{"attr1" => "attr1_val"}, delete_attributes: %{"attr2" => true}}}
  """

  alias Jason

  @type parsed_load_topic_message ::
          %{
            load_topic_message: String.t(),
            type: :create_asset | :upsert_asset | :update_asset,
            timestamp: integer,
            asset_id: String.t(),
            attributes: map
          }
          | %{
              load_topic_message: String.t(),
              type: :delete_asset,
              timestamp: integer,
              asset_id: String.t()
            }
          | %{
              load_topic_message: String.t(),
              type: :create_edge | :upsert_edge | :delete_edge,
              timestamp: integer,
              from_asset_id: String.t(),
              to_asset_id: String.t(),
              edge_type: String.t()
            }

  @doc """
  Parse raw message into load topic message
  """
  @spec parse(String.t()) :: {:ok, parsed_load_topic_message} | {:error, term}
  def parse(raw) do
    with {:json_object?, {:ok, %{} = json_msg}} <- {:json_object?, Jason.decode(raw)},
         {:load_topic_msg?, {:ok, msg}} <- {:load_topic_msg?, parse_load_topic_msg(json_msg)} do
      {:ok, msg}
    else
      {:json_object?, {:ok, _not_a_map}} ->
        {:error, :not_a_json_object}

      {:json_object?, {:error, _reason}} ->
        {:error, :not_a_json}

      {:load_topic_msg?, {:error, reason}} ->
        {:error, reason}
    end
  end

  defp parse_load_topic_msg(%{"load_topic_message" => "1"} = raw_msg) do
    with :ok <- validate_load_topic_msg_v1(raw_msg) do
      {:ok, parse_load_topic_msg_v1(raw_msg)}
    end
  end

  defp parse_load_topic_msg(%{"load_topic_message" => "2"} = raw_msg) do
    with :ok <- validate_load_topic_msg_v2(raw_msg) do
      {:ok, parse_load_topic_msg_v2(raw_msg)}
    end
  end

  defp parse_load_topic_msg(%{"load_topic_message" => _}) do
    {:error, :unknown_load_topic_version}
  end

  defp parse_load_topic_msg(_raw_msg) do
    {:error, :not_a_load_topic_message}
  end

  defp validate_load_topic_msg_v1(%{"timestamp" => timestamp} = msg) when is_integer(timestamp) do
    case msg do
      %{"type" => "create_asset", "asset_id" => _, "attributes" => %{}} ->
        :ok

      %{"type" => "upsert_asset", "asset_id" => _, "attributes" => %{}} ->
        :ok

      %{"type" => "update_asset", "asset_id" => _, "attributes" => %{}} ->
        :ok

      %{"type" => "delete_asset", "asset_id" => _} ->
        :ok

      %{"type" => "create_edge", "from_asset_id" => _, "to_asset_id" => _, "edge_type" => _} ->
        :ok

      %{"type" => "upsert_edge", "from_asset_id" => _, "to_asset_id" => _, "edge_type" => _} ->
        :ok

      %{"type" => "delete_edge", "from_asset_id" => _, "to_asset_id" => _, "edge_type" => _} ->
        :ok

      _ ->
        {:error, :invalid_load_topic_message}
    end
  end

  defp validate_load_topic_msg_v1(_msg) do
    {:error, :invalid_load_topic_message}
  end

  defp validate_load_topic_msg_v2(%{"timestamp" => timestamp} = msg) when is_integer(timestamp) do
    case msg do
      %{
        "type" => "update_asset",
        "asset_id" => _,
        "update_attributes" => %{},
        "delete_attributes" => %{}
      } ->
        :ok

      _ ->
        {:error, :invalid_load_topic_message}
    end
  end

  defp validate_load_topic_msg_v2(_msg) do
    {:error, :invalid_load_topic_message}
  end

  defp parse_load_topic_msg_v1(%{"type" => type} = msg)
       when type in ["create_asset", "upsert_asset", "update_asset"] do
    %{
      load_topic_message: "1",
      type: String.to_atom(type),
      timestamp: msg["timestamp"],
      asset_id: msg["asset_id"],
      attributes: msg["attributes"]
    }
  end

  defp parse_load_topic_msg_v1(%{"type" => "delete_asset"} = msg) do
    %{
      load_topic_message: "1",
      type: :delete_asset,
      timestamp: msg["timestamp"],
      asset_id: msg["asset_id"]
    }
  end

  defp parse_load_topic_msg_v1(%{"type" => type} = msg)
       when type in ["create_edge", "upsert_edge", "delete_edge"] do
    %{
      load_topic_message: "1",
      type: String.to_atom(type),
      timestamp: msg["timestamp"],
      from_asset_id: msg["from_asset_id"],
      to_asset_id: msg["to_asset_id"],
      edge_type: msg["edge_type"]
    }
  end

  defp parse_load_topic_msg_v2(%{"type" => "update_asset"} = msg) do
    %{
      load_topic_message: "2",
      type: :update_asset,
      timestamp: msg["timestamp"],
      asset_id: msg["asset_id"],
      update_attributes: msg["update_attributes"],
      delete_attributes: msg["delete_attributes"]
    }
  end
end