lib/yggdrasil/subscriber/adapter/icon/message.ex

defmodule Yggdrasil.Subscriber.Adapter.Icon.Message do
  @moduledoc """
  This module defines functions to deal with ICON 2.0 websocket messages.

  When a connection is established with the ICON 2.0 websocket, we need to send
  a `text` frame with a JSON payload to state our intention. There are two
  channels:

  - `:block` for receiving both ticks on every block and event logs specific to
    the events we're filtering.
  - `:event` for receiving updates for a specific event log.
  """
  alias Icon.RPC.Identity
  alias Icon.Schema
  alias Icon.Schema.Type
  alias Icon.Schema.Types.Block.Tick
  alias Icon.Schema.Types.EventLog
  alias Yggdrasil.Channel

  @typedoc """
  Message.
  """
  @type t :: Tick.t() | EventLog.t()

  @doc """
  Decodes an incoming message from the ICON 2.0 websocket.
  """
  @spec decode(Channel.t(), map()) ::
          :ok
          | {:ok, [t()]}
          | {:error, Schema.Error.t()}
  def decode(%Channel{} = channel, notification) do
    case notification do
      %{"code" => 0} ->
        :ok

      %{"code" => code, "message" => message} ->
        {:error, Schema.Error.new(code: code, message: message)}

      notification when is_map(notification) ->
        do_decode(channel, notification)
    end
  end

  @doc """
  Encodes a request for the ICON 2.0 websocket. It receives the `height` and
  the `channel` to build the request.
  """
  @spec encode(pos_integer(), Channel.t()) :: WebSockex.frame() | no_return()
  def encode(height, channel) do
    data =
      height
      |> do_encode(channel)
      |> Jason.encode!()

    {:text, data}
  end

  ##################
  # Decoding helpers

  @spec do_decode(Channel.t(), map()) ::
          {:ok, [t()]}
          | {:error, Schema.Error.t()}
  defp do_decode(channel, notification)

  defp do_decode(%Channel{name: %{source: :block}} = channel, notification) do
    decode_block(channel, notification)
  end

  defp do_decode(%Channel{name: %{source: :event}} = channel, notification) do
    decode_event(channel, notification)
  end

  @spec decode_block(Channel.t(), map()) ::
          {:ok, [t()]}
          | {:error, Schema.Error.t()}
  defp decode_block(channel, notification)

  defp decode_block(
         %Channel{name: info} = channel,
         %{
           "height" => height,
           "hash" => hash,
           "indexes" => indexes,
           "events" => events
         }
       ) do
    identity = info[:identity] || Identity.new()
    base = %{"height" => height, "hash" => hash}
    {:ok, [tick]} = decode_block(channel, base)

    indexes
    |> Stream.zip(events)
    |> Stream.map(fn {indexes, events} -> Enum.zip(indexes, events) end)
    |> Stream.flat_map(& &1)
    |> Stream.map(fn {index, events} ->
      %{"index" => index, "events" => events}
    end)
    |> Enum.map(&Map.merge(base, &1))
    |> decode_events(identity)
    |> case do
      {:ok, event_logs} ->
        {:ok, [tick | event_logs]}

      {:error, _} = error ->
        error
    end
  end

  defp decode_block(%Channel{}, notification) do
    Schema.Types.Block.Tick
    |> Schema.generate()
    |> Schema.new(notification)
    |> Schema.load()
    |> Schema.apply(into: Schema.Types.Block.Tick)
    |> case do
      {:ok, %Tick{} = tick} ->
        {:ok, [tick]}

      {:error, _} = error ->
        error
    end
  end

  @spec decode_events(
          [map()],
          Identity.t(),
          nil | Schema.Types.Block.t(),
          [Schema.Types.EventLog.t()]
        ) ::
          {:ok, [t()]}
          | {:error, Schema.Error.t()}
  defp decode_events(notifications, identity, block \\ nil, event_logs \\ [])

  defp decode_events([], _identity, _block, event_logs) do
    {:ok, event_logs}
  end

  defp decode_events([first | _] = notifications, identity, nil, event_logs) do
    with {:ok, height} <- decode_block_height(first),
         {:ok, block} <- Icon.get_block(identity, height - 1),
         {:ok, event_logs} <-
           decode_events(notifications, identity, block, event_logs) do
      {:ok, event_logs}
    else
      :error ->
        reason = "cannot decode information in notification"
        {:error, Schema.Error.new(code: -32_000, message: reason)}

      {:error, _} = error ->
        error
    end
  end

  defp decode_events([notification | notifications], identity, block, events) do
    with {:ok, tx_index} <- decode_transaction_index(notification),
         {:ok, events_indexes} <- decode_events_indexes(notification),
         {:ok, transaction} <- get_transaction(block, tx_index),
         {:ok, new_events} <- get_event_logs(identity, block, transaction) do
      new_events = filter_logs(new_events, events_indexes)
      decode_events(notifications, identity, block, new_events ++ events)
    end
  end

  @spec decode_event(Channel.t(), map()) ::
          {:ok, [t()]}
          | {:error, Schema.Error.t()}
  defp decode_event(channel, notification)

  defp decode_event(%Channel{name: info}, notification) do
    identity = info[:identity] || Identity.new()

    with {:ok, tx_index} <- decode_transaction_index(notification),
         {:ok, events_indexes} <- decode_events_indexes(notification),
         {:ok, height} <- decode_block_height(notification),
         {:ok, block} <- Icon.get_block(identity, height - 1),
         block_tick = %Tick{hash: block.block_hash, height: block.height},
         {:ok, tx} <- get_transaction(block, tx_index),
         {:ok, event_logs} <- get_event_logs(identity, block, tx) do
      {:ok, [block_tick | filter_logs(event_logs, events_indexes)]}
    else
      :error ->
        reason = "cannot decode information in notification"
        {:error, Schema.Error.new(code: -32_000, message: reason)}

      {:error, _} = error ->
        error
    end
  end

  @spec decode_transaction_index(map()) ::
          {:ok, Schema.Types.Integer.t()}
          | :error
  defp decode_transaction_index(notification)

  defp decode_transaction_index(%{"index" => encoded}) do
    Schema.Types.Integer.load(encoded)
  end

  defp decode_transaction_index(_), do: :error

  @spec decode_events_indexes(map()) ::
          {:ok, [Schema.Types.Integer.t()]}
          | :error
  defp decode_events_indexes(notification)

  defp decode_events_indexes(%{"events" => events}) do
    Enum.reduce_while(events, {:ok, []}, fn event_index, {:ok, indexes} ->
      case Schema.Types.Integer.load(event_index) do
        {:ok, index} ->
          {:cont, {:ok, [index | indexes]}}

        :error ->
          {:halt, :error}
      end
    end)
  end

  defp decode_events_indexes(_), do: :error

  @spec decode_block_height(map()) :: {:ok, Schema.Types.Integer.t()} | :error
  defp decode_block_height(notification)

  defp decode_block_height(%{"height" => height}) do
    Schema.Types.Integer.load(height)
  end

  defp decode_block_height(_), do: :error

  @spec get_transaction(Schema.Types.Block.t(), non_neg_integer()) ::
          {:ok, Schema.Types.Transaction.t()}
          | {:error, Schema.Error.t()}
  defp get_transaction(block, index)

  defp get_transaction(%Schema.Types.Block{} = block, index) do
    case Enum.at(block.confirmed_transaction_list, index) do
      nil ->
        reason =
          "cannot find the transaction index #{index} on block with height #{block.height}"

        {:error, Schema.Error.new(code: -32_000, message: reason)}

      transaction ->
        {:ok, transaction}
    end
  end

  @spec get_event_logs(
          Identity.t(),
          Schema.Types.Block.t(),
          Schema.Types.Transaction.t()
        ) ::
          {:ok, [Schema.Types.EventLog.t()]}
          | {:error, Schema.Error.t()}
  defp get_event_logs(identity, block, transaction)

  defp get_event_logs(
         identity,
         %Schema.Types.Block{height: height},
         %Schema.Types.Transaction{} = transaction
       ) do
    with {:ok, %Schema.Types.Transaction.Result{} = result} <-
           Icon.get_transaction_result(identity, transaction.txHash) do
      logs = Enum.map(result.eventLogs, fn log -> %{log | height: height} end)
      {:ok, logs}
    end
  end

  @spec filter_logs([Schema.Types.EventLog.t()], [non_neg_integer()]) ::
          [Schema.Types.EventLog.t()]
  defp filter_logs(event_logs, indexes)

  defp filter_logs(event_logs, indexes) do
    0..(length(event_logs) - 1)
    |> Stream.zip(event_logs)
    |> Stream.filter(fn {x, _event} -> x in indexes end)
    |> Enum.map(fn {_, event_log} -> event_log end)
  end

  ##################
  # Encoding helpers

  @spec do_encode(pos_integer(), Channel.t()) :: map()
  defp do_encode(height, channel)

  defp do_encode(height, %Channel{name: %{source: :block} = info}) do
    data = %{height: Type.dump!(Icon.Schema.Types.Integer, height)}

    case info[:data] do
      [_ | _] = events ->
        Map.put(data, :eventFilters, Enum.map(events, &encode_event/1))

      _ ->
        data
    end
  end

  defp do_encode(height, %Channel{name: %{source: :event} = info}) do
    base = %{height: Type.dump!(Icon.Schema.Types.Integer, height)}

    (info[:data] || %{})
    |> encode_event()
    |> Map.merge(base)
  end

  @spec encode_event(map()) :: map() | no_return()
  defp encode_event(data)

  defp encode_event(%{event: header} = data) when is_binary(header) do
    %{event: header}
    |> maybe_add_addr(data)
    |> maybe_add_indexed(data)
    |> maybe_add_data(data)
  end

  defp encode_event(_data) do
    raise ArgumentError, message: "missing event header"
  end

  @spec maybe_add_addr(map(), map()) :: map()
  defp maybe_add_addr(event, %{addr: addr}) do
    Map.put(event, :addr, Type.dump!(Icon.Schema.Types.SCORE, addr))
  end

  defp maybe_add_addr(event, _) do
    event
  end

  @spec maybe_add_indexed(map(), map()) :: map()
  defp maybe_add_indexed(event, data)

  defp maybe_add_indexed(%{event: header} = event, %{indexed: indexed})
       when is_list(indexed) do
    indexed =
      header
      |> get_types()
      |> Enum.zip(indexed)
      |> Enum.map(fn
        {_module, nil} -> nil
        {module, value} -> Type.dump!(module, value)
      end)

    Map.put(event, :indexed, indexed)
  end

  defp maybe_add_indexed(event, _data), do: event

  @spec maybe_add_data(map(), map()) :: map()
  defp maybe_add_data(event, data)

  defp maybe_add_data(%{event: header, indexed: indexed} = event, %{data: data})
       when is_list(indexed) do
    data =
      header
      |> get_types()
      |> Enum.drop(length(indexed))
      |> Enum.zip(data)
      |> Enum.map(fn
        {_module, nil} -> nil
        {module, value} -> Type.dump!(module, value)
      end)

    Map.put(event, :data, data)
  end

  defp maybe_add_data(event, _data), do: event

  @spec get_types(binary()) :: [module()] | no_return()
  defp get_types(header) do
    header
    |> String.splitter(["(", ",", ")"], trim: true)
    |> Enum.into([])
    |> tl()
    |> Enum.map(fn
      "int" -> Icon.Schema.Types.Integer
      "str" -> Icon.Schema.Types.String
      "bytes" -> Icon.Schema.Types.BinaryData
      "bool" -> Icon.Schema.Types.Boolean
      "Address" -> Icon.Schema.Types.Address
    end)
  end
end