defmodule Lexdee.Observer do
use GenServer
require Logger
require Mint.HTTP
alias Lexdee.Observation
@task Application.compile_env(:lexdee, :task) || Task
defstruct [
:conn,
:feed,
:base_uri,
:parent,
:resource,
:url,
:handler,
:websocket,
:request_ref,
:caller,
:status,
:resp_headers,
:closing?,
:client_options,
:last_pinged_at,
:node
]
def which_node(pid) do
GenServer.call(pid, :node)
end
def connect(pid) do
GenServer.call(pid, :connect)
end
def connect(supervisor, pid, timeout \\ 30_000) when is_pid(pid) do
node = which_node(pid)
{supervisor, node}
|> Task.Supervisor.async_nolink(fn ->
GenServer.call(pid, :connect, timeout + 5_000)
end)
|> Task.await(timeout + 10_000)
end
def start_link(options) do
GenServer.start_link(__MODULE__, options, [])
end
@impl true
def init(opts) do
parent = Keyword.get(opts, :parent)
handler =
Keyword.get(opts, :handler) ||
Keyword.get(Application.get_env(:lexdee, __MODULE__), :handler)
client = Keyword.fetch!(opts, :client)
resource = Keyword.get(opts, :resource)
type = to_string(Keyword.get(opts, :type, "operation"))
%{adapter: {_, _, [options]}, pre: middlewares} = client
{_, _, [base_url]} =
Enum.find(middlewares, fn {k, _, _v} ->
k == Tesla.Middleware.BaseUrl
end)
base_uri = URI.parse(base_url)
websocket_url =
Keyword.get(opts, :url) ||
"wss://#{base_uri.host}:#{base_uri.port}/1.0/events?type=#{type}"
timeout = Keyword.get(options, :timeout, 30_000)
transport_opts =
options
|> Keyword.fetch!(:transport_opts)
|> Keyword.put(:timeout, timeout)
options =
options
|> Keyword.put(:protocols, [:http1])
|> Keyword.put(:transport_opts, transport_opts)
Process.send_after(self(), :check_connectivity, 15_000)
state = %__MODULE__{
resource: resource,
client_options: options,
handler: handler,
url: websocket_url,
feed: type,
parent: parent,
node: Node.self()
}
{:ok, state}
end
@impl true
def handle_call(:node, _from, state) do
{:reply, state.node, state}
end
def handle_call(:connect, from, state) do
uri = URI.parse(state.url)
http_scheme =
case uri.scheme do
"ws" -> :http
"wss" -> :https
end
ws_scheme =
case uri.scheme do
"ws" -> :ws
"wss" -> :wss
end
path =
case uri.query do
nil -> uri.path
query -> uri.path <> "?" <> query
end
@task.async(fn ->
%{
"feed" => state.feed,
"type" => "connection",
"state" => "connecting"
}
|> Observation.new(state.resource)
|> state.handler.handle_event()
end)
with {:ok, conn} <-
Mint.HTTP.connect(
http_scheme,
uri.host,
uri.port,
state.client_options
),
{:ok, conn, ref} <- Mint.WebSocket.upgrade(ws_scheme, conn, path, []) do
state = %{state | conn: conn, request_ref: ref, caller: from}
{:noreply, state}
else
{:error, %Mint.TransportError{reason: _reason} = reason} ->
@task.async(fn ->
%{
"feed" => state.feed,
"type" => "connection",
"state" => "failed"
}
|> Observation.new(state.resource)
|> state.handler.handle_event()
end)
{:reply, {:error, reason}, state}
{:error, reason} ->
{:reply, {:error, reason}, state}
{:error, conn, reason} ->
{:reply, {:error, reason}, put_in(state.conn, conn)}
end
end
@impl true
def handle_info(:check_connectivity, %{conn: nil} = state) do
Process.send_after(self(), :check_connectivity, 15_000)
{:noreply, state}
end
def handle_info(:check_connectivity, %{conn: _conn} = state) do
timestamp = DateTime.to_unix(DateTime.utc_now())
case send_frame(state, {:pong, "ok"}) do
{:ok, state} ->
Process.send_after(self(), :check_connectivity, 15_000)
{:noreply, state}
{:error, state, _reason} ->
if state.resource do
Logger.info(
"[Lexdee.Observer] #{state.resource.type} #{state.resource.id} disconnected"
)
end
@task.async(fn ->
%{
"feed" => state.feed,
"type" => "connection",
"state" => "disconnected",
"metadata" => %{
"last_pinged_at" => state.last_pinged_at,
"checked_at" => timestamp
}
}
|> Observation.new(state.resource)
|> state.handler.handle_event()
end)
if state.parent do
send(state.parent, :closed)
end
do_close(state)
{:noreply, state}
end
end
def handle_info({ref, _}, state) when is_reference(ref) do
{:noreply, state}
end
def handle_info({:DOWN, _ref, :process, _pid, :normal}, state) do
{:noreply, state}
end
def handle_info(message, state) do
case Mint.WebSocket.stream(state.conn, message) do
{:ok, conn, responses} ->
state =
put_in(state.conn, conn)
|> handle_responses(responses)
if state.closing?, do: do_close(state), else: {:noreply, state}
{:error, conn, reason, _responses} ->
state =
put_in(state.conn, conn)
|> reply({:error, reason})
{:noreply, state}
:unknown ->
{:noreply, state}
end
end
defp handle_responses(state, responses)
defp handle_responses(%{request_ref: ref} = state, [
{:status, ref, status} | rest
]) do
put_in(state.status, status)
|> handle_responses(rest)
end
defp handle_responses(%{request_ref: ref} = state, [
{:headers, ref, resp_headers} | rest
]) do
put_in(state.resp_headers, resp_headers)
|> handle_responses(rest)
end
defp handle_responses(%{request_ref: ref} = state, [{:done, ref} | rest]) do
case Mint.WebSocket.new(state.conn, ref, state.status, state.resp_headers) do
{:ok, conn, websocket} ->
@task.async(fn ->
%{
"feed" => state.feed,
"type" => "connection",
"state" => "connected"
}
|> Observation.new(state.resource)
|> state.handler.handle_event()
end)
%{
state
| conn: conn,
websocket: websocket,
status: nil,
resp_headers: nil
}
|> reply({:ok, :connected})
|> handle_responses(rest)
{:error, conn, reason} ->
put_in(state.conn, conn)
|> reply({:error, reason})
end
end
defp handle_responses(%{request_ref: ref, websocket: websocket} = state, [
{:data, ref, data} | rest
])
when websocket != nil do
case Mint.WebSocket.decode(websocket, data) do
{:ok, websocket, frames} ->
put_in(state.websocket, websocket)
|> handle_frames(frames)
|> handle_responses(rest)
{:error, websocket, reason} ->
put_in(state.websocket, websocket)
|> reply({:error, reason})
end
end
defp handle_responses(state, [_response | rest]) do
handle_responses(state, rest)
end
defp handle_responses(state, []), do: state
defp send_frame(state, frame) do
with {:ok, websocket, data} <-
Mint.WebSocket.encode(state.websocket, frame),
state = put_in(state.websocket, websocket),
{:ok, conn} <-
Mint.WebSocket.stream_request_body(
state.conn,
state.request_ref,
data
) do
{:ok, put_in(state.conn, conn)}
else
{:error, %Mint.WebSocket{} = websocket, reason} ->
{:error, put_in(state.websocket, websocket), reason}
{:error, conn, reason} ->
{:error, put_in(state.conn, conn), reason}
end
end
defp handle_frames(state, frames) do
Enum.reduce(frames, state, fn
# reply to pings with pongs
{:ping, data}, state ->
@task.async(fn ->
%{"feed" => state.feed, "type" => "ping", "state" => data}
|> Observation.new(state.resource)
|> state.handler.handle_event()
end)
timestamp = DateTime.to_unix(DateTime.utc_now())
state = put_in(state.last_pinged_at, timestamp)
{:ok, state} = send_frame(state, {:pong, "ok"})
state
{:close, _code, reason}, state ->
Logger.debug("Closing connection: #{inspect(reason)}")
%{state | closing?: true}
{:text, text}, state ->
@task.async(fn ->
text
|> Jason.decode!()
|> Map.merge(%{"feed" => state.feed})
|> Observation.new(state.resource)
|> state.handler.handle_event()
end)
{:ok, state} = send_frame(state, {:pong, "ok"})
state
frame, state ->
Logger.debug("Unexpected frame received: #{inspect(frame)}")
state
end)
end
defp do_close(state) do
# Streaming a close frame may fail if the server has already closed
# for writing.
_ = send_frame(state, :close)
Mint.HTTP.close(state.conn)
{:stop, :normal, state}
end
defp reply(state, {:ok, :connected}) do
if state.caller, do: GenServer.reply(state.caller, {:ok, state})
put_in(state.caller, nil)
end
defp reply(state, response) do
if state.caller, do: GenServer.reply(state.caller, response)
put_in(state.caller, nil)
end
end