defmodule Formular.Client.Adapter.Websocket do
@moduledoc """
The default adapter for maintaining a two-way connection between
the client and remote server, using WebSocket.
"""
alias Formular.Client.Cache
alias Formular.Client.Compiler
alias Formular.Client.Config
alias Formular.Client.PubSub
alias Phoenix.Channels.GenSocketClient
@behaviour GenSocketClient
@reconnect_delay :timer.seconds(5)
require Logger
import GenSocketClient, only: [push: 4]
def start_link(%Config{} = config, _options) do
Logger.debug(["Starting new formula websocket client, ", inspect(config)])
GenSocketClient.start_link(
__MODULE__,
Phoenix.Channels.GenSocketClient.Transport.WebSocketClient,
config
)
end
@impl true
def init(%Config{} = config) do
{:connect, config.url, [], %{config: config}}
end
@impl true
def handle_connected(transport, state) do
Logger.debug("Formular client connected to the server.")
formulas = state.config.formulas || []
case formulas do
[_ | _] ->
subscribe(formulas, transport, state)
[] ->
Logger.warning("Empty formula list.")
{:stop, :normal, state}
end
end
@impl true
def handle_disconnected(reason, state) do
Logger.error(["Formular client disconnected, reason: ", inspect(reason)])
:timer.sleep(@reconnect_delay)
{:connect, state}
end
@impl true
def handle_reply(topic, _ref, payload, _transport, state) do
Logger.warning("reply on topic #{topic}: #{inspect(payload)}")
{:ok, state}
end
@impl true
def handle_message(<<"formula:", name::binary>> = topic, _, %{"code" => code}, transport, state) do
Logger.debug(["Received new code form #{name}: ", inspect(code)])
old_code = current_code(name)
with :ok <- handle_new_code_revision(name, code, state.config),
:ok <- dispatch_code_change(name, old_code, code),
{:ok, _} <- push(transport, topic, "code_updated", %{}) do
{:ok, state}
else
err ->
Logger.error("Error compiling code for #{name}, reason: #{inspect(err)}")
case try_report_err(transport, topic, err) do
{:ok, _} ->
{:ok, state}
{:error, reason} ->
Logger.error(
"Failed on reporting compilation error to server, error: #{inspect(reason)}."
)
{:stop, reason, state}
end
end
end
def handle_message(topic, event, payload, _transport, state) do
Logger.warning("Unhandled message on topic #{topic}: #{event} #{inspect(payload)}")
{:ok, state}
end
defp try_report_err(transport, topic, {:error, %CompileError{description: reason}}) do
push(transport, topic, "code_update_error", %{reason: ["compile_error", reason]})
end
defp try_report_err(transport, topic, {:error, :unknown_compile_error}) do
push(transport, topic, "code_update_error", %{
reason: ["compile_error", "unknown_compile_error"]
})
end
defp try_report_err(_, _, err),
do: err
@impl true
def handle_joined(topic, _payload, _transport, state) do
Logger.info(["Formular client joined channel: ", topic])
{:ok, state}
end
@impl true
def handle_join_error(topic, payload, _transport, state) do
Logger.error("Formular client join error on the topic #{topic}: #{inspect(payload)}")
{:stop, :error_joined, state}
end
@impl true
def handle_channel_closed(topic, payload, _transport, state) do
Logger.error("disconnected from the topic #{topic}: #{inspect(payload)}")
Process.send_after(self(), {:join, topic}, :timer.seconds(1))
{:ok, state}
end
@impl true
def handle_info(_msg, _transport, state) do
{:ok, state}
end
@impl true
def handle_call(message, _from, _transport, state) do
Logger.warning("Did not expect to receive call with message: #{inspect(message)}")
{:reply, {:error, :unexpected_message}, state}
end
defp subscribe([{name, _opts} | rest], transport, state) do
topic = formula_topic(name)
{:ok, _ref} = GenSocketClient.join(transport, topic, join_payload(state.config))
subscribe(rest, transport, state)
end
defp subscribe([], _transport, state) do
{:ok, state}
end
defp formula_topic(formula_name) do
"formula:#{formula_name}"
end
defp join_payload(%{client_name: client_name}) do
%{client_name: client_name}
end
defp current_code(name) do
case Cache.get(name) do
{mod, code} when is_atom(mod) and is_binary(code) ->
code
other ->
other
end
end
def handle_new_code_revision(name, code, config) do
case Config.formula_config(config, name) do
{^name, opts} ->
maybe_compile(name, code, config, opts)
nil ->
{:error, :formula_not_found}
end
end
defp maybe_compile(name, code, config, opts) do
case Keyword.get(opts, :compile_as) do
mod when is_atom(mod) ->
ref = make_ref()
:ok = Compiler.handle_new_code_revision({self(), ref}, name, code, config, opts)
receive do
{^ref, :ok} ->
true = Cache.put(name, {mod, code})
:ok
{^ref, err} ->
err
after
5_000 ->
{:error, :unknown_compile_error}
end
nil ->
true = Cache.put(name, code)
:ok
end
end
defp dispatch_code_change(name, old_code, new_code) do
PubSub.dispatch_code_change(name, old_code, new_code)
end
end