defmodule Wampex.Client.Transports.WebSocket do
use WebSockex
# @behaviour Wampex.Client.Transport
alias Wampex.Client
alias Wampex.Client.Session
alias WebSockex.Conn
require Logger
@header "sec-websocket-protocol"
@ping_interval 20_000
@max_reconnect_interval 2 * 60_000
def send_request(t, data) do
WebSockex.cast(t, {:send_request, data})
end
def start_link(
name: name,
url: url,
session_data: session_data,
protocol: protocol,
serializer: serializer,
reconnect: reconnect
) do
conn = Conn.new(url, extra_headers: [{@header, protocol}])
WebSockex.start_link(
conn,
__MODULE__,
%{
name: name,
session: nil,
session_data: session_data,
serializer: serializer,
connected: false,
reconnect: reconnect,
reconnect_attempts: 1
},
handle_initial_conn_failure: true
)
end
defp get_backoff_delay(attempts) do
backoff = :math.pow(2, attempts) * 100
ceil(min(backoff, @max_reconnect_interval))
end
@impl true
def handle_disconnect(_status, %{reconnect: false} = state) do
{:ok, state}
end
@impl true
def handle_disconnect(_status, %{session: sess, reconnect_attempts: attempts} = state) do
stop_session(sess)
delay = get_backoff_delay(attempts)
Logger.warn("Connection disconnected. Attempting reconnect in #{delay}")
:timer.sleep(delay)
{:reconnect, %{state | reconnect_attempts: attempts + 1}}
end
@impl true
def handle_connect(
_conn,
%{name: name, session_data: sd} = state
) do
Process.send_after(self(), :ping, @ping_interval)
{:ok, sess} =
Session.start_link(
{:local, Client.session_name(name)},
%Session{sd | transport_pid: self()},
[]
)
{:ok, %{state | session: sess, reconnect_attempts: 1, connected: true}}
end
@impl true
def handle_ping(_ping, state) do
{:reply, :pong, state}
end
@impl true
def handle_pong(_pong, state) do
{:ok, state}
end
@impl true
def handle_cast({:send_request, data}, %{serializer: s} = state) do
Logger.debug("#{__MODULE__} Sending Request #{inspect(data)}")
{:reply, {s.data_type(), s.serialize!(data)}, state}
end
@impl true
def handle_cast({:connected, resp}, state) do
send(resp, {:connected, state.connected})
{:ok, state}
end
@impl true
def handle_info(:ping, state) do
Process.send_after(self(), :ping, @ping_interval)
{:reply, :ping, state}
end
@impl true
def handle_frame({_type, message}, %{serializer: s, session: sess} = state) do
message = s.deserialize!(message)
send(sess, {:set_message, message})
{:ok, state}
end
def stop_session(nil), do: :noop
def stop_session(pid) do
case Process.alive?(pid) do
true -> Process.exit(pid, :normal)
false -> :noop
end
end
end