defmodule Yggdrasil.Subscriber.Adapter.Icon do
@moduledoc """
Yggdrasil publisher adapter for Icon. The name of the channel should be a
map with:
- `:source` - Either `:block` or `:event` (required).
- `:identity` - `Icon.RPC.Identity` instance pointed to the right network.
- `:data` - Data for the subscription.
- `:from_height` - Height to start receiving messages. Defaults to `:latest`.
> **Important**: We need to be careful when using `from_height` in the channel
> because `Yggdrasil` will restart the synchronization process from the
> chosen height if the process crashes.
e.g. given a proper channel name, then we can subscribe to ICON 2.0 `:block`
websocket:
```
iex(1)> Yggdrasil.subscribe(name: %{source: :block}, adapter: :icon)
:ok
iex(4)> flush()
{:Y_CONNECTED, %Yggdrasil.Channel{name: %{source: :block}, (...)}}
```
after that, we'll start receiving messages like the following:
```
iex(6)> flush()
{:Y_EVENT, %Yggdrasil.Channel{name: %{source: :block}, (...)}, %Yggdrasil.Icon.Block{hash: "0x...", (...)}}
```
Finally, when we're done, we can unsubscribe from the channel:
```
iex(7)> Yggdrasil.unsubscribe(name: %{source: :block}, adapter: :icon)
:ok
iex(8)> flush()
{:Y_DISCONNECTED, %Yggdrasil.Channel{name: %{source: :block}, (...)}}
```
"""
use GenServer
use Bitwise
use Yggdrasil.Subscriber.Adapter
require Logger
alias __MODULE__, as: State
alias Icon.RPC.Identity
alias Icon.Schema
alias Yggdrasil.Channel
alias Yggdrasil.Config.Icon, as: Config
alias Yggdrasil.Subscriber.Adapter.Icon.Message
alias Yggdrasil.Subscriber.Manager
alias Yggdrasil.Subscriber.Publisher
@doc false
defstruct url: nil,
channel: nil,
websocket: nil,
module: nil,
status: :disconnected,
height: :latest,
retries: 0,
backoff: 0
@typedoc false
@type t :: %State{
url: url :: binary(),
channel: channel :: Channel.t(),
websocket: websocket :: nil | WebSockex.client(),
module: module :: module(),
status: status :: :connected | :disconnected,
height: height :: :latest | pos_integer(),
retries: retries :: non_neg_integer(),
backoff: backoff :: non_neg_integer()
}
############
# Public API
@doc """
Informs the `subscriber` process that the websocket is connected.
"""
@spec send_connected(GenServer.server()) :: :ok
def send_connected(subscriber) do
GenServer.cast(subscriber, :connected)
end
@doc """
Informs the `subscriber` of a new `frame`.
"""
@spec send_frame(GenServer.server(), term()) :: :ok
def send_frame(subscriber, frame) do
GenServer.cast(subscriber, {:frame, frame})
end
@doc """
Informs the `subscriber` of a new `height`.
"""
@spec send_height(GenServer.server(), pos_integer()) :: :ok
def send_height(subscriber, height) do
GenServer.cast(subscriber, {:height, height})
end
@doc """
Informs the `subscriber` process that the websocket is disconnected.
"""
@spec send_disconnected(GenServer.server(), term()) :: :ok
def send_disconnected(subscriber, reason) do
GenServer.cast(subscriber, {:disconnected, reason})
end
################################
# Yggdrasil Subscriber callbacks
@impl Yggdrasil.Subscriber.Adapter
def start_link(channel, options \\ [])
def start_link(%Channel{} = channel, options) do
GenServer.start_link(__MODULE__, channel, options)
end
@spec stop(GenServer.server()) :: :ok
@spec stop(GenServer.server(), term()) :: :ok
@spec stop(GenServer.server(), term(), :infinity | non_neg_integer()) :: :ok
defdelegate stop(subscriber, reason \\ :normal, timeout \\ :infinity),
to: GenServer
#####################
# GenServer callbacks
@impl GenServer
def init(%Channel{} = channel) do
Process.flag(:trap_exit, true)
state = gen_state(channel)
log_started(state)
{:ok, state, {:continue, :init}}
end
@impl GenServer
def handle_continue(continue, state)
def handle_continue(:init, %State{status: :connected} = state) do
{:noreply, state}
end
def handle_continue(:init, %State{} = state) do
initialize(state)
end
def handle_continue(:connected, %State{} = state) do
connected(state)
end
def handle_continue({:backoff, reason}, %State{} = state) do
backoff(reason, state)
end
@impl GenServer
def handle_cast(:connected, %State{} = state) do
message = Message.encode(state.height, state.channel)
state.module.initialize(state.websocket, message)
{:noreply, state}
end
def handle_cast({:frame, :ok}, %State{} = state) do
{:noreply, state, {:continue, :connected}}
end
def handle_cast({:frame, {:error, error}}, %State{} = state) do
{:noreply, state, {:continue, {:backoff, error}}}
end
def handle_cast({:frame, {:ok, events}}, %State{channel: channel} = state) do
Enum.each(events, &Publisher.notify(channel, &1))
{:noreply, state}
end
def handle_cast({:height, encoded_height}, %State{} = state) do
case Schema.Types.Integer.load(encoded_height) do
{:ok, height} -> {:noreply, %{state | height: height}}
:error -> {:noreply, state}
end
end
def handle_cast({:disconnected, reason}, %State{} = state) do
{:noreply, state, {:continue, {:backoff, reason}}}
end
@impl GenServer
def handle_info(msg, state)
def handle_info(:re_init, %State{} = state) do
{:noreply, state, {:continue, :init}}
end
def handle_info(
{:DOWN, _, :process, websocket, reason},
%State{websocket: websocket} = state
) do
{:noreply, state, {:continue, {:backoff, reason}}}
end
def handle_info(_, %State{} = state) do
{:noreply, state}
end
##########################
# State management helpers
@spec initialize(t()) ::
{:noreply, t()}
| {:noreply, t(), {:continue, {:backoff, term()}}}
defp initialize(state)
defp initialize(
%State{
status: :disconnected,
module: module,
channel: channel
} = state
) do
with {:ok, %State{url: url} = state} <- add_height(state),
options = [decoder: &Message.decode(channel, &1)],
{:ok, websocket} <- module.start_link(url, options) do
Process.monitor(websocket)
state = %{state | websocket: websocket}
{:noreply, state}
else
{:error, reason} ->
{:noreply, state, {:continue, {:backoff, reason}}}
end
end
@spec connected(t()) :: {:noreply, t()}
defp connected(state)
defp connected(%State{channel: channel, status: :disconnected} = state) do
Manager.connected(channel)
log_connected(state)
{:noreply, %{state | status: :connected, retries: 0, backoff: 0}}
end
@spec backoff(term(), t()) :: {:noreply, t()}
defp backoff(reason, state)
defp backoff(
reason,
%State{
channel: channel,
module: module,
websocket: websocket,
status: :connected
} = state
) do
Manager.disconnected(channel)
module.stop(websocket)
log_disconnected(reason, state)
new_state = %{state | websocket: nil, status: :disconnected}
backoff(reason, new_state)
end
defp backoff(reason, %State{status: :disconnected, retries: current} = state) do
max_retries = Config.max_retries!()
slot_size = Config.slot_size!()
padding = 2
retries =
if current >= max_retries,
do: max_retries - padding,
else: current - padding
new_backoff = (2 <<< retries) * Enum.random(1..slot_size) * 1_000
new_state = %{state | retries: current + 1, backoff: new_backoff}
Process.send_after(self(), :re_init, new_backoff)
log_retry(reason, new_state)
{:noreply, new_state}
end
@impl GenServer
def terminate(reason, state)
def terminate(reason, %State{status: :disconnected} = state) do
log_terminated(reason, state)
:ok
end
def terminate(
reason,
%State{status: :connected, channel: channel} = state
) do
Manager.disconnected(channel)
log_terminated(reason, state)
:ok
end
########################
# Initialization helpers
@spec gen_state(Channel.t()) :: State.t()
defp gen_state(channel)
defp gen_state(%Channel{name: %{source: _}, adapter: :icon} = channel) do
%State{
url: endpoint(channel),
channel: channel,
module: Config.websocket_module!()
}
end
@spec endpoint(Channel.t()) :: binary()
defp endpoint(channel)
defp endpoint(%Channel{name: %{source: source} = info}) do
%Identity{node: node} = info[:identity] || Identity.new()
"#{node}/api/v3/icon_dex/#{source}"
|> URI.parse()
|> case do
%URI{scheme: "http"} = uri ->
URI.to_string(%{uri | scheme: "ws"})
%URI{scheme: "https"} = uri ->
URI.to_string(%{uri | scheme: "wss"})
end
end
@spec add_height(State.t()) :: {:ok, State.t()} | {:error, Schema.Error.t()}
defp add_height(state)
defp add_height(
%State{
height: :latest,
channel: %Channel{name: %{from_height: height}}
} = state
)
when is_integer(height) do
{:ok, %{state | height: height}}
end
defp add_height(%State{height: height} = state)
when is_integer(height) and height > 0 do
{:ok, state}
end
defp add_height(%State{channel: %Channel{name: info}} = state) do
identity = info[:identity] || Identity.new()
case Icon.get_block(identity) do
{:ok, %Schema.Types.Block{height: height}} ->
{:ok, %{state | height: height}}
{:error, _} = error ->
error
end
end
#################
# Logging helpers
@spec log_started(State.t()) :: :ok
defp log_started(%State{channel: channel, url: url}) do
Logger.debug(fn ->
"Started #{__MODULE__} for #{inspect(channel)} (#{url})"
end)
:ok
end
@spec log_connected(State.t()) :: :ok
defp log_connected(%State{channel: channel, url: url}) do
Logger.debug(fn ->
"Connected #{__MODULE__} for #{inspect(channel)} (#{url})"
end)
:ok
end
@spec log_disconnected(term(), State.t()) :: :ok
defp log_disconnected(reason, %State{channel: channel, url: url}) do
Logger.warn(fn ->
"Disconnected #{__MODULE__} for #{inspect(channel)} (#{url}) " <>
"due to #{inspect(reason)}"
end)
:ok
end
@spec log_retry(term(), State.t()) :: :ok
defp log_retry(_reason, %State{backoff: 0}), do: :ok
defp log_retry(
reason,
%State{channel: channel, url: url, retries: retries, backoff: backoff}
) do
Logger.warn(fn ->
"#{__MODULE__} still unsubscribed from #{inspect(channel)} (#{url}) " <>
"due to #{inspect(reason)} [retry: #{retries}, backoff: #{backoff} ms]"
end)
:ok
end
@spec log_terminated(WebSockex.close_reason(), State.t()) :: :ok
defp log_terminated(reason, state)
defp log_terminated(:normal, %State{channel: channel, url: url}) do
Logger.info(fn ->
"Stopped #{__MODULE__} for #{inspect(channel)} (#{url})"
end)
end
defp log_terminated(reason, %State{channel: channel, url: url}) do
Logger.warn(fn ->
"Stopped #{__MODULE__} for #{inspect(channel)} (#{url}) " <>
"due to #{inspect(reason)}"
end)
end
end