lib/client/transports/web_socket.ex

defmodule Wampex.Client.Transports.WebSocket do
  use WebSockex

  # @behaviour Wampex.Client.Transport

  alias Wampex.Client
  alias Wampex.Client.Session
  alias WebSockex.Conn

  require Logger

  @header "sec-websocket-protocol"
  @ping_interval 20_000
  @max_reconnect_interval 2 * 60_000

  def send_request(t, data) do
    WebSockex.cast(t, {:send_request, data})
  end

  def start_link(
        name: name,
        url: url,
        session_data: session_data,
        protocol: protocol,
        serializer: serializer,
        reconnect: reconnect
      ) do
    conn = Conn.new(url, extra_headers: [{@header, protocol}])

    WebSockex.start_link(
      conn,
      __MODULE__,
      %{
        name: name,
        session: nil,
        session_data: session_data,
        serializer: serializer,
        connected: false,
        reconnect: reconnect,
        reconnect_attempts: 1
      },
      handle_initial_conn_failure: true
    )
  end

  defp get_backoff_delay(attempts) do
    backoff = :math.pow(2, attempts) * 100
    ceil(min(backoff, @max_reconnect_interval))
  end

  @impl true
  def handle_disconnect(_status, %{reconnect: false} = state) do
    {:ok, state}
  end

  @impl true
  def handle_disconnect(_status, %{session: sess, reconnect_attempts: attempts} = state) do
    stop_session(sess)
    delay = get_backoff_delay(attempts)
    Logger.warning("Connection disconnected. Attempting reconnect in #{delay}")
    :timer.sleep(delay)
    {:reconnect, %{state | reconnect_attempts: attempts + 1}}
  end

  @impl true
  def handle_connect(
        _conn,
        %{name: name, session_data: sd} = state
      ) do
    Process.send_after(self(), :ping, @ping_interval)

    {:ok, sess} =
      Session.start_link(
        {:local, Client.session_name(name)},
        %Session{sd | transport_pid: self()},
        []
      )

    {:ok, %{state | session: sess, reconnect_attempts: 1, connected: true}}
  end

  @impl true
  def handle_ping(_ping, state) do
    {:reply, :pong, state}
  end

  @impl true
  def handle_pong(_pong, state) do
    {:ok, state}
  end

  @impl true
  def handle_cast({:send_request, data}, %{serializer: s} = state) do
    Logger.debug("#{__MODULE__} Sending Request #{inspect(data)}")
    {:reply, {s.data_type(), s.serialize!(data)}, state}
  end

  @impl true
  def handle_cast({:connected, resp}, state) do
    send(resp, {:connected, state.connected})
    {:ok, state}
  end

  @impl true
  def handle_info(:ping, state) do
    Process.send_after(self(), :ping, @ping_interval)
    {:reply, :ping, state}
  end

  @impl true
  def handle_frame({_type, message}, %{serializer: s, session: sess} = state) do
    message = s.deserialize!(message)
    send(sess, {:set_message, message})
    {:ok, state}
  end

  def stop_session(nil), do: :noop

  def stop_session(pid) do
    case Process.alive?(pid) do
      true -> Process.exit(pid, :normal)
      false -> :noop
    end
  end
end