Skip to main content

lib/dust/connection.ex

defmodule Dust.Connection do
  @moduledoc """
  WebSocket client that connects to the Dust server via Phoenix Channels.

  Uses Slipstream to manage the connection lifecycle, heartbeats,
  reconnection, and ref tracking. Joins one channel per store using
  topic "store:{store_name}" and forwards server events to SyncEngines.
  """
  use Slipstream

  require Logger

  def start_link(opts) do
    Slipstream.start_link(__MODULE__, opts, name: opts[:name])
  end

  def info(pid) do
    GenServer.call(pid, :info)
  end

  @doc """
  Returns `true` if the named Dust.Connection process is currently connected
  to the Dust server. `false` during initial connect, reconnect backoff, or
  if no connection process exists by that name.

  Defaults to looking up `Dust.Connection`. Pass an explicit name (or pid)
  for facade-instance setups where the connection has a different name.
  """
  def connected?(name_or_pid \\ __MODULE__) do
    case GenServer.whereis(name_or_pid) do
      nil ->
        false

      pid ->
        case GenServer.call(pid, :info, 1_000) do
          %{status: :connected} -> true
          _ -> false
        end
    end
  catch
    :exit, _ -> false
  end

  @impl Slipstream
  def init(opts) do
    url = Keyword.fetch!(opts, :url)
    token = Keyword.fetch!(opts, :token)
    device_id = Keyword.get(opts, :device_id, generate_device_id())
    stores = Keyword.fetch!(opts, :stores)
    test_mode? = Keyword.get(opts, :test_mode?, false)

    socket =
      new_socket()
      |> assign(:token, token)
      |> assign(:device_id, device_id)
      |> assign(:stores, stores)
      |> assign(:joined_stores, MapSet.new())
      |> assign(:outbox, %{})
      |> assign(:pending_refs, %{})
      |> assign(:url, url)
      |> assign(:status, :disconnected)
      |> assign(:connected_at, nil)

    emit_state_change(nil, :disconnected, socket)

    if test_mode? do
      {:ok, socket}
    else
      # Build the URI with connection params embedded in the query string.
      # Phoenix expects params as query string params on the websocket URL.
      uri =
        url
        |> URI.parse()
        |> append_path("/websocket")
        |> Map.put(
          :query,
          URI.encode_query(%{
            "token" => token,
            "device_id" => device_id,
            "capver" => "3",
            "vsn" => "2.0.0"
          })
        )
        |> URI.to_string()

      case connect(socket, uri: uri) do
        {:ok, socket} -> {:ok, socket}
        {:error, reason} -> {:stop, reason}
      end
    end
  end

  @impl Slipstream
  def handle_connect(socket) do
    Logger.info("[Dust.Connection] Connected to server")

    prev_status = socket.assigns.status

    socket =
      socket
      |> assign(:status, :connected)
      |> assign(:connected_at, DateTime.utc_now())

    emit_state_change(prev_status, :connected, socket)

    stores = socket.assigns.stores

    # Join each store's channel topic
    socket =
      Enum.reduce(stores, socket, fn store_name, sock ->
        topic = "store:#{store_name}"
        last_seq = get_last_store_seq(store_name)
        join(sock, topic, %{"last_store_seq" => last_seq})
      end)

    {:ok, socket}
  end

  @impl Slipstream
  def handle_join("store:" <> store_name, reply, socket) do
    store_seq = reply["store_seq"] || Map.get(reply, :store_seq, 0)
    Logger.info("[Dust.Connection] Joined store:#{store_name}, seq=#{store_seq}")

    joined = MapSet.put(socket.assigns.joined_stores, store_name)
    socket = assign(socket, :joined_stores, joined)

    # Flush any queued writes for this store
    socket = flush_outbox(socket, store_name)

    Dust.SyncEngine.set_capabilities(store_name, capabilities_from_join(reply))

    # Update the SyncEngine's status to :connected (also triggers resend of pending_ops)
    Dust.SyncEngine.set_status(store_name, :connected)

    {:ok, socket}
  end

  @impl Slipstream
  def handle_message("store:" <> store_name, "event", payload, socket) do
    Dust.SyncEngine.handle_server_event(store_name, payload)
    {:ok, socket}
  end

  @impl Slipstream
  def handle_message("store:" <> store_name, "snapshot", payload, socket) do
    Logger.info("[Dust.Connection] Received snapshot for #{store_name}")
    Dust.SyncEngine.handle_snapshot(store_name, payload)
    {:ok, socket}
  end

  @impl Slipstream
  def handle_message("store:" <> store_name, "catch_up_complete", payload, socket) do
    through_seq = payload["through_seq"]

    Logger.debug(
      "[Dust.Connection] Catch-up complete for #{store_name}, through_seq=#{through_seq}"
    )

    # Ack the seq back to the server
    topic = "store:#{store_name}"
    push(socket, topic, "ack_seq", %{"seq" => through_seq})

    Dust.SyncEngine.set_catch_up_complete(store_name, through_seq)
    {:ok, socket}
  end

  @impl Slipstream
  def handle_disconnect(reason, socket) do
    Logger.warning("[Dust.Connection] Disconnected: #{inspect(reason)}")

    prev_status = socket.assigns.status
    socket = assign(socket, :status, :reconnecting)
    emit_state_change(prev_status, :reconnecting, socket, %{reason: reason})

    # Update all joined stores to :reconnecting
    Enum.each(socket.assigns.joined_stores, fn store_name ->
      Dust.SyncEngine.set_status(store_name, :reconnecting)
    end)

    socket = assign(socket, :joined_stores, MapSet.new())

    # Use Slipstream's built-in reconnect with backoff
    case reconnect(socket) do
      {:ok, socket} ->
        {:ok, socket}

      {:error, :no_config} ->
        # Test mode or not yet configured -- just stay alive
        {:ok, socket}

      {:error, reason} ->
        {:stop, reason, socket}
    end
  end

  @impl Slipstream
  def handle_topic_close("store:" <> store_name = topic, reason, socket) do
    Logger.warning("[Dust.Connection] Topic #{topic} closed: #{inspect(reason)}")

    joined = MapSet.delete(socket.assigns.joined_stores, store_name)
    socket = assign(socket, :joined_stores, joined)
    Dust.SyncEngine.set_status(store_name, :reconnecting)

    # Rejoin with backoff
    case rejoin(socket, topic) do
      {:ok, socket} -> {:ok, socket}
      {:error, _reason} -> {:ok, socket}
    end
  end

  @impl Slipstream
  def handle_reply(ref, reply, socket) do
    case Map.pop(socket.assigns.pending_refs, ref) do
      {nil, _} ->
        {:ok, socket}

      {{store_name, client_op_id}, pending_refs} ->
        socket = assign(socket, :pending_refs, pending_refs)

        case reply do
          {:error, %{"reason" => _reason} = payload} ->
            Dust.SyncEngine.handle_write_rejected(store_name, client_op_id, payload)

          {:error, %{reason: _reason} = payload} ->
            Dust.SyncEngine.handle_write_rejected(store_name, client_op_id, payload)

          # Pass the whole reply map through — lease acquire/renew carry
          # token/expires_at/holder, a no-op release carries {released: false},
          # and ordinary writes carry just store_seq. The engine shapes the
          # caller reply per op.
          {:ok, reply} when is_map(reply) ->
            Dust.SyncEngine.handle_write_accepted(store_name, client_op_id, reply)

          # An ack we don't recognise (bare :ok/:error, or a non-map payload) on
          # a tracked write ref. Don't swallow it: a blocked caller must get a
          # tagged error rather than hang. Surface as a rejection and log loudly.
          other ->
            Logger.warning(
              "[Dust.Connection] unexpected reply for #{client_op_id}: #{inspect(other)}"
            )

            Dust.SyncEngine.handle_write_rejected(store_name, client_op_id, %{
              "reason" => "unexpected_reply"
            })
        end

        {:ok, socket}
    end
  end

  @impl Slipstream
  def handle_info({:send_write, store_name, %{op: :put_file} = op_attrs}, socket) do
    topic = "store:#{store_name}"

    params =
      %{
        "path" => op_attrs.path,
        "content" => op_attrs.content,
        "filename" => op_attrs.filename,
        "content_type" => op_attrs.content_type,
        "client_op_id" => op_attrs.client_op_id
      }
      |> maybe_put_path_segments(op_attrs)

    if MapSet.member?(socket.assigns.joined_stores, store_name) do
      {:ok, ref} = push(socket, topic, "put_file", params)
      pending = Map.put(socket.assigns.pending_refs, ref, {store_name, op_attrs.client_op_id})
      {:noreply, assign(socket, :pending_refs, pending)}
    else
      outbox = Map.get(socket.assigns, :outbox, %{})
      store_queue = Map.get(outbox, store_name, :queue.new())
      store_queue = :queue.in({:put_file, params, op_attrs.client_op_id}, store_queue)
      outbox = Map.put(outbox, store_name, store_queue)
      {:noreply, assign(socket, :outbox, outbox)}
    end
  end

  @impl Slipstream
  def handle_info({:send_write, store_name, op_attrs}, socket) do
    topic = "store:#{store_name}"

    params =
      %{
        "op" => to_string(op_attrs.op),
        "path" => op_attrs.path,
        "value" => op_attrs[:value],
        "client_op_id" => op_attrs.client_op_id
      }
      |> maybe_put_path_segments(op_attrs)
      |> maybe_put_if_match(op_attrs)
      |> maybe_put_if_absent(op_attrs)
      |> maybe_put_fence(op_attrs)
      |> maybe_put_lease_wire(op_attrs)

    if MapSet.member?(socket.assigns.joined_stores, store_name) do
      {:ok, ref} = push(socket, topic, "write", params)
      pending = Map.put(socket.assigns.pending_refs, ref, {store_name, op_attrs.client_op_id})
      {:noreply, assign(socket, :pending_refs, pending)}
    else
      # Not yet joined — queue the write for flush after join
      outbox = Map.get(socket.assigns, :outbox, %{})
      store_queue = Map.get(outbox, store_name, :queue.new())
      store_queue = :queue.in({params, op_attrs.client_op_id}, store_queue)
      outbox = Map.put(outbox, store_name, store_queue)
      {:noreply, assign(socket, :outbox, outbox)}
    end
  end

  # Note: Slipstream processes support handle_call via GenServer
  @impl Slipstream
  def handle_call(:info, _from, socket) do
    now = DateTime.utc_now()
    connected_at = socket.assigns.connected_at

    uptime_seconds =
      if connected_at do
        DateTime.diff(now, connected_at, :second)
      else
        nil
      end

    info = %{
      status: socket.assigns.status,
      url: socket.assigns.url,
      device_id: socket.assigns.device_id,
      connected_at: connected_at,
      uptime_seconds: uptime_seconds,
      stores: socket.assigns.stores,
      joined_stores: MapSet.to_list(socket.assigns.joined_stores)
    }

    {:reply, info, socket}
  end

  # -- Private helpers --

  defp emit_state_change(from, to, socket, extra_meta \\ %{}) do
    metadata =
      %{
        from: from,
        to: to,
        url: socket.assigns[:url],
        device_id: socket.assigns[:device_id],
        stores: socket.assigns[:stores]
      }
      |> Map.merge(extra_meta)

    :telemetry.execute(
      [:dustlayer, :connection, :state_change],
      %{system_time: System.system_time()},
      metadata
    )
  end

  defp maybe_put_if_match(params, %{if_match: value}) when not is_nil(value) do
    Map.put(params, "if_match", value)
  end

  defp maybe_put_if_match(params, _op_attrs), do: params

  defp maybe_put_if_absent(params, %{if_absent: true}) do
    Map.put(params, "if_absent", true)
  end

  defp maybe_put_if_absent(params, _op_attrs), do: params

  defp maybe_put_fence(params, %{fence: %{key: key, token: token}}) do
    Map.put(params, "fence", %{"key" => key, "token" => token})
  end

  defp maybe_put_fence(params, _op_attrs), do: params

  # Lease ops carry ttl_ms/holder (acquire/renew) and token (renew/release).
  defp maybe_put_lease_wire(params, %{op: op} = op_attrs) when op in [:lease, :renew, :release] do
    params
    |> put_if_present("ttl_ms", op_attrs[:ttl_ms])
    |> put_if_present("holder", op_attrs[:holder])
    |> put_if_present("token", op_attrs[:token])
  end

  defp maybe_put_lease_wire(params, _op_attrs), do: params

  defp put_if_present(params, _key, nil), do: params
  defp put_if_present(params, key, value), do: Map.put(params, key, value)

  defp capabilities_from_join(reply) do
    %{
      permissions: reply["permissions"] || Map.get(reply, :permissions, %{}),
      scopes: reply["scopes"] || Map.get(reply, :scopes, []),
      store_access: reply["store_access"] || Map.get(reply, :store_access, %{})
    }
  end

  # SyncEngine populates :path_segments alongside :path for capver 3
  # writes. Channels prefer segments when present and fall back to
  # parsing the rendered path otherwise.
  defp maybe_put_path_segments(params, %{path_segments: segs}) when is_list(segs) do
    Map.put(params, "path_segments", segs)
  end

  defp maybe_put_path_segments(params, _op_attrs), do: params

  defp flush_outbox(socket, store_name) do
    outbox = Map.get(socket.assigns, :outbox, %{})
    store_queue = Map.get(outbox, store_name, :queue.new())
    topic = "store:#{store_name}"

    socket =
      Enum.reduce(:queue.to_list(store_queue), socket, fn
        {:put_file, params, client_op_id}, sock ->
          {:ok, ref} = push(sock, topic, "put_file", params)
          pending = Map.put(sock.assigns.pending_refs, ref, {store_name, client_op_id})
          assign(sock, :pending_refs, pending)

        {params, client_op_id}, sock ->
          {:ok, ref} = push(sock, topic, "write", params)
          pending = Map.put(sock.assigns.pending_refs, ref, {store_name, client_op_id})
          assign(sock, :pending_refs, pending)
      end)

    outbox = Map.delete(outbox, store_name)
    assign(socket, :outbox, outbox)
  end

  defp get_last_store_seq(store_name) do
    case Registry.lookup(Dust.SyncEngineRegistry, store_name) do
      [{pid, _}] ->
        status = GenServer.call(pid, :status)
        status.last_store_seq

      [] ->
        0
    end
  end

  defp generate_device_id do
    "dev_" <> Base.url_encode64(:crypto.strong_rand_bytes(8), padding: false)
  end

  defp append_path(%URI{path: nil} = uri, suffix), do: %{uri | path: suffix}
  defp append_path(%URI{path: path} = uri, suffix), do: %{uri | path: path <> suffix}
end