defmodule Dust.Connection do
@moduledoc """
WebSocket client that connects to the Dust server via Phoenix Channels.
Uses Slipstream to manage the connection lifecycle, heartbeats,
reconnection, and ref tracking. Joins one channel per store using
topic "store:{store_name}" and forwards server events to SyncEngines.
"""
use Slipstream
require Logger
def start_link(opts) do
Slipstream.start_link(__MODULE__, opts, name: opts[:name])
end
def info(pid) do
GenServer.call(pid, :info)
end
@doc """
Returns `true` if the named Dust.Connection process is currently connected
to the Dust server. `false` during initial connect, reconnect backoff, or
if no connection process exists by that name.
Defaults to looking up `Dust.Connection`. Pass an explicit name (or pid)
for facade-instance setups where the connection has a different name.
"""
def connected?(name_or_pid \\ __MODULE__) do
case GenServer.whereis(name_or_pid) do
nil ->
false
pid ->
case GenServer.call(pid, :info, 1_000) do
%{status: :connected} -> true
_ -> false
end
end
catch
:exit, _ -> false
end
@impl Slipstream
def init(opts) do
url = Keyword.fetch!(opts, :url)
token = Keyword.fetch!(opts, :token)
device_id = Keyword.get(opts, :device_id, generate_device_id())
stores = Keyword.fetch!(opts, :stores)
test_mode? = Keyword.get(opts, :test_mode?, false)
socket =
new_socket()
|> assign(:token, token)
|> assign(:device_id, device_id)
|> assign(:stores, stores)
|> assign(:joined_stores, MapSet.new())
|> assign(:outbox, %{})
|> assign(:pending_refs, %{})
|> assign(:url, url)
|> assign(:status, :disconnected)
|> assign(:connected_at, nil)
emit_state_change(nil, :disconnected, socket)
if test_mode? do
{:ok, socket}
else
# Build the URI with connection params embedded in the query string.
# Phoenix expects params as query string params on the websocket URL.
uri =
url
|> URI.parse()
|> append_path("/websocket")
|> Map.put(
:query,
URI.encode_query(%{
"token" => token,
"device_id" => device_id,
"capver" => "3",
"vsn" => "2.0.0"
})
)
|> URI.to_string()
case connect(socket, uri: uri) do
{:ok, socket} -> {:ok, socket}
{:error, reason} -> {:stop, reason}
end
end
end
@impl Slipstream
def handle_connect(socket) do
Logger.info("[Dust.Connection] Connected to server")
prev_status = socket.assigns.status
socket =
socket
|> assign(:status, :connected)
|> assign(:connected_at, DateTime.utc_now())
emit_state_change(prev_status, :connected, socket)
stores = socket.assigns.stores
# Join each store's channel topic
socket =
Enum.reduce(stores, socket, fn store_name, sock ->
topic = "store:#{store_name}"
last_seq = get_last_store_seq(store_name)
join(sock, topic, %{"last_store_seq" => last_seq})
end)
{:ok, socket}
end
@impl Slipstream
def handle_join("store:" <> store_name, reply, socket) do
store_seq = reply["store_seq"] || Map.get(reply, :store_seq, 0)
Logger.info("[Dust.Connection] Joined store:#{store_name}, seq=#{store_seq}")
joined = MapSet.put(socket.assigns.joined_stores, store_name)
socket = assign(socket, :joined_stores, joined)
# Flush any queued writes for this store
socket = flush_outbox(socket, store_name)
Dust.SyncEngine.set_capabilities(store_name, capabilities_from_join(reply))
# Update the SyncEngine's status to :connected (also triggers resend of pending_ops)
Dust.SyncEngine.set_status(store_name, :connected)
{:ok, socket}
end
@impl Slipstream
def handle_message("store:" <> store_name, "event", payload, socket) do
Dust.SyncEngine.handle_server_event(store_name, payload)
{:ok, socket}
end
@impl Slipstream
def handle_message("store:" <> store_name, "snapshot", payload, socket) do
Logger.info("[Dust.Connection] Received snapshot for #{store_name}")
Dust.SyncEngine.handle_snapshot(store_name, payload)
{:ok, socket}
end
@impl Slipstream
def handle_message("store:" <> store_name, "catch_up_complete", payload, socket) do
through_seq = payload["through_seq"]
Logger.debug(
"[Dust.Connection] Catch-up complete for #{store_name}, through_seq=#{through_seq}"
)
# Ack the seq back to the server
topic = "store:#{store_name}"
push(socket, topic, "ack_seq", %{"seq" => through_seq})
Dust.SyncEngine.set_catch_up_complete(store_name, through_seq)
{:ok, socket}
end
@impl Slipstream
def handle_disconnect(reason, socket) do
Logger.warning("[Dust.Connection] Disconnected: #{inspect(reason)}")
prev_status = socket.assigns.status
socket = assign(socket, :status, :reconnecting)
emit_state_change(prev_status, :reconnecting, socket, %{reason: reason})
# Update all joined stores to :reconnecting
Enum.each(socket.assigns.joined_stores, fn store_name ->
Dust.SyncEngine.set_status(store_name, :reconnecting)
end)
socket = assign(socket, :joined_stores, MapSet.new())
# Use Slipstream's built-in reconnect with backoff
case reconnect(socket) do
{:ok, socket} ->
{:ok, socket}
{:error, :no_config} ->
# Test mode or not yet configured -- just stay alive
{:ok, socket}
{:error, reason} ->
{:stop, reason, socket}
end
end
@impl Slipstream
def handle_topic_close("store:" <> store_name = topic, reason, socket) do
Logger.warning("[Dust.Connection] Topic #{topic} closed: #{inspect(reason)}")
joined = MapSet.delete(socket.assigns.joined_stores, store_name)
socket = assign(socket, :joined_stores, joined)
Dust.SyncEngine.set_status(store_name, :reconnecting)
# Rejoin with backoff
case rejoin(socket, topic) do
{:ok, socket} -> {:ok, socket}
{:error, _reason} -> {:ok, socket}
end
end
@impl Slipstream
def handle_reply(ref, reply, socket) do
case Map.pop(socket.assigns.pending_refs, ref) do
{nil, _} ->
{:ok, socket}
{{store_name, client_op_id}, pending_refs} ->
socket = assign(socket, :pending_refs, pending_refs)
case reply do
{:error, %{"reason" => _reason} = payload} ->
Dust.SyncEngine.handle_write_rejected(store_name, client_op_id, payload)
{:error, %{reason: _reason} = payload} ->
Dust.SyncEngine.handle_write_rejected(store_name, client_op_id, payload)
# Pass the whole reply map through — lease acquire/renew carry
# token/expires_at/holder, a no-op release carries {released: false},
# and ordinary writes carry just store_seq. The engine shapes the
# caller reply per op.
{:ok, reply} when is_map(reply) ->
Dust.SyncEngine.handle_write_accepted(store_name, client_op_id, reply)
# An ack we don't recognise (bare :ok/:error, or a non-map payload) on
# a tracked write ref. Don't swallow it: a blocked caller must get a
# tagged error rather than hang. Surface as a rejection and log loudly.
other ->
Logger.warning(
"[Dust.Connection] unexpected reply for #{client_op_id}: #{inspect(other)}"
)
Dust.SyncEngine.handle_write_rejected(store_name, client_op_id, %{
"reason" => "unexpected_reply"
})
end
{:ok, socket}
end
end
@impl Slipstream
def handle_info({:send_write, store_name, %{op: :put_file} = op_attrs}, socket) do
topic = "store:#{store_name}"
params =
%{
"path" => op_attrs.path,
"content" => op_attrs.content,
"filename" => op_attrs.filename,
"content_type" => op_attrs.content_type,
"client_op_id" => op_attrs.client_op_id
}
|> maybe_put_path_segments(op_attrs)
if MapSet.member?(socket.assigns.joined_stores, store_name) do
{:ok, ref} = push(socket, topic, "put_file", params)
pending = Map.put(socket.assigns.pending_refs, ref, {store_name, op_attrs.client_op_id})
{:noreply, assign(socket, :pending_refs, pending)}
else
outbox = Map.get(socket.assigns, :outbox, %{})
store_queue = Map.get(outbox, store_name, :queue.new())
store_queue = :queue.in({:put_file, params, op_attrs.client_op_id}, store_queue)
outbox = Map.put(outbox, store_name, store_queue)
{:noreply, assign(socket, :outbox, outbox)}
end
end
@impl Slipstream
def handle_info({:send_write, store_name, op_attrs}, socket) do
topic = "store:#{store_name}"
params =
%{
"op" => to_string(op_attrs.op),
"path" => op_attrs.path,
"value" => op_attrs[:value],
"client_op_id" => op_attrs.client_op_id
}
|> maybe_put_path_segments(op_attrs)
|> maybe_put_if_match(op_attrs)
|> maybe_put_if_absent(op_attrs)
|> maybe_put_fence(op_attrs)
|> maybe_put_lease_wire(op_attrs)
if MapSet.member?(socket.assigns.joined_stores, store_name) do
{:ok, ref} = push(socket, topic, "write", params)
pending = Map.put(socket.assigns.pending_refs, ref, {store_name, op_attrs.client_op_id})
{:noreply, assign(socket, :pending_refs, pending)}
else
# Not yet joined — queue the write for flush after join
outbox = Map.get(socket.assigns, :outbox, %{})
store_queue = Map.get(outbox, store_name, :queue.new())
store_queue = :queue.in({params, op_attrs.client_op_id}, store_queue)
outbox = Map.put(outbox, store_name, store_queue)
{:noreply, assign(socket, :outbox, outbox)}
end
end
# Note: Slipstream processes support handle_call via GenServer
@impl Slipstream
def handle_call(:info, _from, socket) do
now = DateTime.utc_now()
connected_at = socket.assigns.connected_at
uptime_seconds =
if connected_at do
DateTime.diff(now, connected_at, :second)
else
nil
end
info = %{
status: socket.assigns.status,
url: socket.assigns.url,
device_id: socket.assigns.device_id,
connected_at: connected_at,
uptime_seconds: uptime_seconds,
stores: socket.assigns.stores,
joined_stores: MapSet.to_list(socket.assigns.joined_stores)
}
{:reply, info, socket}
end
# -- Private helpers --
defp emit_state_change(from, to, socket, extra_meta \\ %{}) do
metadata =
%{
from: from,
to: to,
url: socket.assigns[:url],
device_id: socket.assigns[:device_id],
stores: socket.assigns[:stores]
}
|> Map.merge(extra_meta)
:telemetry.execute(
[:dustlayer, :connection, :state_change],
%{system_time: System.system_time()},
metadata
)
end
defp maybe_put_if_match(params, %{if_match: value}) when not is_nil(value) do
Map.put(params, "if_match", value)
end
defp maybe_put_if_match(params, _op_attrs), do: params
defp maybe_put_if_absent(params, %{if_absent: true}) do
Map.put(params, "if_absent", true)
end
defp maybe_put_if_absent(params, _op_attrs), do: params
defp maybe_put_fence(params, %{fence: %{key: key, token: token}}) do
Map.put(params, "fence", %{"key" => key, "token" => token})
end
defp maybe_put_fence(params, _op_attrs), do: params
# Lease ops carry ttl_ms/holder (acquire/renew) and token (renew/release).
defp maybe_put_lease_wire(params, %{op: op} = op_attrs) when op in [:lease, :renew, :release] do
params
|> put_if_present("ttl_ms", op_attrs[:ttl_ms])
|> put_if_present("holder", op_attrs[:holder])
|> put_if_present("token", op_attrs[:token])
end
defp maybe_put_lease_wire(params, _op_attrs), do: params
defp put_if_present(params, _key, nil), do: params
defp put_if_present(params, key, value), do: Map.put(params, key, value)
defp capabilities_from_join(reply) do
%{
permissions: reply["permissions"] || Map.get(reply, :permissions, %{}),
scopes: reply["scopes"] || Map.get(reply, :scopes, []),
store_access: reply["store_access"] || Map.get(reply, :store_access, %{})
}
end
# SyncEngine populates :path_segments alongside :path for capver 3
# writes. Channels prefer segments when present and fall back to
# parsing the rendered path otherwise.
defp maybe_put_path_segments(params, %{path_segments: segs}) when is_list(segs) do
Map.put(params, "path_segments", segs)
end
defp maybe_put_path_segments(params, _op_attrs), do: params
defp flush_outbox(socket, store_name) do
outbox = Map.get(socket.assigns, :outbox, %{})
store_queue = Map.get(outbox, store_name, :queue.new())
topic = "store:#{store_name}"
socket =
Enum.reduce(:queue.to_list(store_queue), socket, fn
{:put_file, params, client_op_id}, sock ->
{:ok, ref} = push(sock, topic, "put_file", params)
pending = Map.put(sock.assigns.pending_refs, ref, {store_name, client_op_id})
assign(sock, :pending_refs, pending)
{params, client_op_id}, sock ->
{:ok, ref} = push(sock, topic, "write", params)
pending = Map.put(sock.assigns.pending_refs, ref, {store_name, client_op_id})
assign(sock, :pending_refs, pending)
end)
outbox = Map.delete(outbox, store_name)
assign(socket, :outbox, outbox)
end
defp get_last_store_seq(store_name) do
case Registry.lookup(Dust.SyncEngineRegistry, store_name) do
[{pid, _}] ->
status = GenServer.call(pid, :status)
status.last_store_seq
[] ->
0
end
end
defp generate_device_id do
"dev_" <> Base.url_encode64(:crypto.strong_rand_bytes(8), padding: false)
end
defp append_path(%URI{path: nil} = uri, suffix), do: %{uri | path: suffix}
defp append_path(%URI{path: path} = uri, suffix), do: %{uri | path: path <> suffix}
end