lib/yggdrasil/subscriber/adapter/icon.ex

defmodule Yggdrasil.Subscriber.Adapter.Icon do
  @moduledoc """
  Yggdrasil publisher adapter for Icon. The name of the channel should be a
  map with:

  - `:source` - Either `:block` or `:event` (required).
  - `:identity` - `Icon.RPC.Identity` instance pointed to the right network.
  - `:data` - Data for the subscription.
  - `:from_height` - Height to start receiving messages. Defaults to `:latest`.

  > **Important**: We need to be careful when using `from_height` in the channel
  > because `Yggdrasil` will restart the synchronization process from the
  > chosen height if the process crashes.

  e.g. given a proper channel name, then we can subscribe to ICON 2.0 `:block`
  websocket:
  ```
  iex(1)> Yggdrasil.subscribe(name: %{source: :block}, adapter: :icon)
  :ok
  iex(4)> flush()
  {:Y_CONNECTED, %Yggdrasil.Channel{name: %{source: :block}, (...)}}
  ```

  after that, we'll start receiving messages like the following:
  ```
  iex(6)> flush()
  {:Y_EVENT, %Yggdrasil.Channel{name: %{source: :block}, (...)}, %Yggdrasil.Icon.Block{hash: "0x...", (...)}}
  ```

  Finally, when we're done, we can unsubscribe from the channel:
  ```
  iex(7)> Yggdrasil.unsubscribe(name: %{source: :block}, adapter: :icon)
  :ok
  iex(8)> flush()
  {:Y_DISCONNECTED, %Yggdrasil.Channel{name: %{source: :block}, (...)}}
  ```
  """
  use WebSockex
  use Yggdrasil.Subscriber.Adapter

  require Logger

  alias __MODULE__, as: State
  alias Icon.RPC.Identity
  alias Icon.Schema
  alias Yggdrasil.Channel
  alias Yggdrasil.Subscriber.Adapter.Icon.Message
  alias Yggdrasil.Subscriber.Manager

  @doc false
  defstruct url: nil,
            channel: nil,
            state: :initializing,
            height: :latest

  @typedoc false
  @type t :: %State{
          url: url :: binary(),
          channel: channel :: Channel.t(),
          state: state :: :initializing | :connected | :disconnected,
          height: height :: :latest | pos_integer()
        }

  ################################
  # Yggdrasil Subscriber callbacks

  @impl Yggdrasil.Subscriber.Adapter
  def start_link(channel, options \\ [])

  def start_link(%Channel{} = channel, options) do
    state = gen_state(channel)
    options = Keyword.put_new(options, :handle_initial_conn_failure, true)

    WebSockex.start_link(state.url, __MODULE__, state, options)
  end

  #####################
  # WebSockex callbacks

  @impl WebSockex
  def handle_connect(%WebSockex.Conn{}, %State{channel: channel} = state) do
    Manager.connected(channel)
    initialize()
    connected(state)

    {:ok, state}
  end

  @impl WebSockex
  def handle_info(call, state)

  def handle_info(:init, %State{channel: channel} = state) do
    case add_height(state) do
      {:ok, %State{height: height} = state} ->
        {:reply, Message.encode(height, channel), %{state | state: :connected}}

      {:error, %Schema.Error{} = error} ->
        crash(state, error)
    end
  end

  def handle_info({_ref, {:error, %Schema.Error{} = error}}, state) do
    crash(state, error)
  end

  def handle_info(_, state) do
    {:ok, state}
  end

  @impl WebSockex
  def handle_frame(frame, state)

  def handle_frame({:text, frame}, %State{channel: channel} = state) do
    Message.publish(channel, frame)
    {:ok, state}
  end

  def handle_frame(_, %State{} = state) do
    {:ok, state}
  end

  @impl WebSockex
  def handle_disconnect(status, state)

  def handle_disconnect(status, %State{state: :initializing} = state) do
    retry(status, state)
    {:reconnect, state}
  end

  def handle_disconnect(status, %State{state: :disconnected} = state) do
    retry(status, state)
    {:reconnect, state}
  end

  def handle_disconnect(
        status,
        %State{channel: channel, state: :connected} = state
      ) do
    Manager.disconnected(channel)
    disconnected(status, state)

    {:reconnect, %{state | state: :disconnected}}
  end

  @impl WebSockex
  def terminate(reason, state)

  def terminate(reason, %State{state: :initializing} = state) do
    terminated(reason, state)
    :ok
  end

  def terminate(reason, %State{state: :disconnected} = state) do
    terminated(reason, state)
    :ok
  end

  def terminate(
        reason,
        %State{state: :connected, channel: %Channel{} = channel} = state
      ) do
    Manager.disconnected(channel)
    terminated(reason, state)
    :ok
  end

  ########################
  # Initialization helpers

  @spec gen_state(Channel.t()) :: State.t()
  defp gen_state(channel)

  defp gen_state(%Channel{name: %{source: _}, adapter: :icon} = channel) do
    %State{
      url: endpoint(channel),
      channel: channel
    }
  end

  @spec endpoint(Channel.t()) :: binary()
  defp endpoint(channel)

  defp endpoint(%Channel{name: %{source: source} = info}) do
    %Identity{node: node} = info[:identity] || Identity.new()

    "#{node}/api/v3/icon_dex/#{source}"
    |> URI.parse()
    |> case do
      %URI{scheme: "http"} = uri ->
        URI.to_string(%{uri | scheme: "ws"})

      %URI{scheme: "https"} = uri ->
        URI.to_string(%{uri | scheme: "wss"})
    end
  end

  @spec initialize() :: :ok
  defp initialize do
    Process.send_after(self(), :init, 0)
    :ok
  end

  @spec add_height(State.t()) :: {:ok, State.t()} | {:error, Schema.Error.t()}
  defp add_height(state)

  defp add_height(
         %State{
           height: :latest,
           channel: %Channel{name: %{from_height: height}}
         } = state
       )
       when is_integer(height) do
    {:ok, %{state | height: height}}
  end

  defp add_height(%State{channel: %Channel{name: info}} = state) do
    identity = info[:identity] || Identity.new()

    case Icon.get_block(identity) do
      {:ok, %Schema.Types.Block{height: height}} ->
        {:ok, %{state | height: height}}

      {:error, _} = error ->
        error
    end
  end

  #################
  # Logging helpers

  @spec connected(State.t()) :: :ok
  defp connected(%State{channel: channel, url: url}) do
    Logger.debug(fn ->
      "Started #{__MODULE__} for #{inspect(channel)} (#{url})"
    end)

    :ok
  end

  @spec disconnected(WebSockex.connection_status_map(), State.t()) :: :ok
  defp disconnected(%{reason: reason}, %State{channel: channel, url: url}) do
    Logger.warn(fn ->
      "Stopped #{__MODULE__} for #{inspect(channel)} (#{url}) " <>
        "due to #{inspect(reason)}"
    end)

    :ok
  end

  @spec retry(WebSockex.connection_status_map(), State.t()) :: :ok
  defp retry(
         %{reason: reason, attempt_number: retry},
         %State{channel: channel, url: url}
       ) do
    Logger.warn(fn ->
      "#{__MODULE__} still unsubscribed from #{inspect(channel)} (#{url}) " <>
        "due to #{inspect(reason)} [retry: #{retry}]"
    end)

    :ok
  end

  @spec crash(State.t(), Schema.Error.t()) :: no_return()
  defp crash(
         %State{channel: channel, url: url},
         %Schema.Error{message: message, reason: reason}
       ) do
    Logger.error(fn ->
      "Crashed #{__MODULE__} for #{inspect(channel)} (#{url}) " <>
        "due to #{message} [reason: #{reason}]"
    end)

    raise RuntimeError,
      message: "cannot get current block height due to #{message} [#{reason}]"
  end

  @spec terminated(WebSockex.close_reason(), State.t()) :: :ok
  defp terminated(reason, state)

  defp terminated(:normal, %State{channel: channel, url: url}) do
    Logger.info(fn ->
      "#{__MODULE__} stopped for #{inspect(channel)} (#{url})"
    end)
  end

  defp terminated(reason, %State{channel: channel, url: url}) do
    Logger.warn(fn ->
      "#{__MODULE__} stopped for #{inspect(channel)} (#{url}) " <>
        "due to #{inspect(reason)}"
    end)
  end
end