Skip to main content

lib/dust/sync_engine.ex

defmodule Dust.SyncEngine do
  use GenServer
  require Logger

  # How long an acked op (one carrying a waiting :from) may go without a server
  # ack or committed echo before the caller is answered {:error, :timeout}.
  # Generous: real acks land in milliseconds; this only catches a dropped reply.
  @ack_timeout_ms 30_000

  @default_capabilities %{
    permissions: %{read: false, write: false},
    scopes: [],
    store_access: %{mode: :selected, store_ids: []}
  }

  defstruct [
    :store,
    :cache,
    :cache_target,
    :callbacks,
    :pending_ops,
    :status,
    :last_store_seq,
    :catch_up_seq,
    :activity_buffer,
    # HTTP base URL (derived from the WS url) and bearer token, kept
    # here so FileRefs unwrapped from the cache carry the auth context
    # they need to fetch blob content via /api/files/:hash.
    :http_url,
    :token,
    # monitor_ref => subscription ref. We Process.monitor the process that
    # registers each subscription and auto-unregister it on :DOWN, so a
    # crashed subscriber (e.g. a single_flight awaiter) never leaks an ETS
    # row + CallbackWorker. The only rescue-free cleanup path.
    :callback_monitors,
    capabilities: @default_capabilities
  ]

  def start_link(opts) do
    store = Keyword.fetch!(opts, :store)
    GenServer.start_link(__MODULE__, opts, name: via(store))
  end

  def via(store), do: {:via, Registry, {Dust.SyncEngineRegistry, store}}

  # Normalize a user-supplied path to canonical slash-rendered form.
  # Accepts a segment list or a canonical slash-rendered string.
  # Strings are interpreted as canonical: `"example.com"` is one
  # segment, not legacy two-segment "example"."com".
  defp norm!(path) when is_list(path) do
    case Dust.Protocol.Path.render(path) do
      {:ok, p} -> p
      {:error, reason} -> raise ArgumentError, "invalid path #{inspect(path)}: #{reason}"
    end
  end

  defp norm!(path) when is_binary(path) do
    case Dust.Protocol.Path.normalize_rendered(path) do
      {:ok, p} -> p
      {:error, reason} -> raise ArgumentError, "invalid path #{inspect(path)}: #{reason}"
    end
  end

  # Patterns follow the same rule: canonical slash strings or segment
  # lists. Wildcards `*` / `**` must already be expressed against
  # slash separators.
  defp norm_pattern!("**"), do: "**"
  defp norm_pattern!(pattern) when is_list(pattern), do: norm!(pattern)

  defp norm_pattern!(pattern) when is_binary(pattern) do
    case Dust.Protocol.Glob.compile(pattern) do
      {:ok, _} -> pattern
      {:error, reason} -> raise ArgumentError, "invalid pattern #{inspect(pattern)}: #{reason}"
    end
  end

  def get(store, path) do
    GenServer.call(via(store), {:get, norm!(path)})
  end

  def get_many(store, paths) when is_list(paths) do
    GenServer.call(via(store), {:get_many, Enum.map(paths, &norm!/1)})
  end

  def put(store, path, value) do
    GenServer.call(via(store), {:put, norm!(path), value})
  end

  def put(store, path, value, opts) when is_list(opts) do
    GenServer.call(via(store), {:put, norm!(path), value, opts})
  end

  def delete(store, path) do
    GenServer.call(via(store), {:delete, norm!(path)})
  end

  def delete(store, path, opts) when is_list(opts) do
    GenServer.call(via(store), {:delete, norm!(path), opts})
  end

  def merge(store, path, map) do
    GenServer.call(via(store), {:merge, norm!(path), map})
  end

  def merge(store, path, map, opts) when is_list(opts) do
    GenServer.call(via(store), {:merge, norm!(path), map, opts})
  end

  def increment(store, path, delta \\ 1) do
    GenServer.call(via(store), {:increment, norm!(path), delta})
  end

  def increment(store, path, delta, opts) when is_list(opts) do
    GenServer.call(via(store), {:increment, norm!(path), delta, opts})
  end

  def add(store, path, member) do
    GenServer.call(via(store), {:add, norm!(path), member})
  end

  def add(store, path, member, opts) when is_list(opts) do
    GenServer.call(via(store), {:add, norm!(path), member, opts})
  end

  def remove(store, path, member) do
    GenServer.call(via(store), {:remove, norm!(path), member})
  end

  def remove(store, path, member, opts) when is_list(opts) do
    GenServer.call(via(store), {:remove, norm!(path), member, opts})
  end

  def put_file(store, path, source_path, opts \\ []) do
    GenServer.call(via(store), {:put_file, norm!(path), source_path, opts})
  end

  def enum(store, pattern) do
    GenServer.call(via(store), {:enum, norm_pattern!(pattern)})
  end

  def enum(store, pattern, opts) when is_list(opts) do
    GenServer.call(via(store), {:enum_paged, norm_pattern!(pattern), opts})
  end

  def range(store, from, to, opts \\ []) when is_binary(from) and is_binary(to) do
    GenServer.call(via(store), {:range, norm!(from), norm!(to), opts})
  end

  def entry(store, path) do
    GenServer.call(via(store), {:entry, norm!(path)})
  end

  def lease(store, key, opts \\ []) when is_list(opts) do
    GenServer.call(via(store), {:lease, norm!(key), opts}, :infinity)
  end

  def renew(store, %Dust.Lease{} = lease, opts \\ []) when is_list(opts) do
    GenServer.call(via(store), {:renew, lease, opts}, :infinity)
  end

  def release(store, %Dust.Lease{} = lease) do
    GenServer.call(via(store), {:release, lease}, :infinity)
  end

  def status(store) do
    GenServer.call(via(store), :status)
  end

  def on(store, pattern, callback, opts \\ []) do
    GenServer.call(via(store), {:on, norm_pattern!(pattern), callback, opts})
  end

  @doc """
  Remove a subscription previously registered with `on/4`. Returns `:ok`
  whether or not a subscription with that ref existed (idempotent). After
  this call returns, the subscription's callback will not fire again.
  """
  def off(store, ref) when is_reference(ref) do
    GenServer.call(via(store), {:off, ref})
  end

  def handle_server_event(store, event) do
    GenServer.cast(via(store), {:server_event, event})
  end

  def set_status(store, new_status) do
    GenServer.cast(via(store), {:set_status, new_status})
  end

  def set_capabilities(store, capabilities) when is_map(capabilities) do
    GenServer.cast(via(store), {:set_capabilities, capabilities})
  end

  def handle_write_rejected(store, client_op_id, reason) do
    GenServer.cast(via(store), {:write_rejected, client_op_id, reason})
  end

  def handle_write_accepted(store, client_op_id, store_seq) when is_integer(store_seq) do
    GenServer.cast(via(store), {:write_accepted, client_op_id, %{store_seq: store_seq}})
  end

  def handle_write_accepted(store, client_op_id, reply) when is_map(reply) do
    GenServer.cast(via(store), {:write_accepted, client_op_id, reply})
  end

  def set_catch_up_complete(store, through_seq) do
    GenServer.cast(via(store), {:catch_up_complete, through_seq})
  end

  def handle_snapshot(store, snapshot) do
    GenServer.cast(via(store), {:snapshot, snapshot})
  end

  @doc "Write directly to cache without the write pipeline. For test seeding only."
  def seed_entry(store, path, value, type) do
    GenServer.call(via(store), {:seed_entry, norm!(path), value, type})
  end

  @doc "Update last_store_seq in state. For test harness only."
  def set_store_seq(store, seq) do
    GenServer.cast(via(store), {:set_store_seq, seq})
  end

  # Server

  @impl true
  def init(opts) do
    store = Keyword.fetch!(opts, :store)
    {cache_mod, cache_opts} = Keyword.fetch!(opts, :cache)

    cache_target =
      case cache_opts do
        opts when is_list(opts) ->
          {:ok, pid} = cache_mod.start_link(opts)
          pid

        pid when is_pid(pid) ->
          pid

        module when is_atom(module) ->
          # Stateless adapter (e.g., Ecto) — the target is the module itself (a Repo)
          module
      end

    callbacks = Dust.CallbackRegistry.new()
    last_seq = cache_mod.last_seq(cache_target, store)
    activity_buffer = Keyword.get(opts, :activity_buffer)

    state = %__MODULE__{
      store: store,
      cache: cache_mod,
      cache_target: cache_target,
      callbacks: callbacks,
      pending_ops: %{},
      status: :disconnected,
      last_store_seq: last_seq,
      activity_buffer: activity_buffer,
      http_url: derive_http_url(Keyword.get(opts, :url)),
      token: Keyword.get(opts, :token),
      callback_monitors: %{},
      capabilities: @default_capabilities
    }

    {:ok, state}
  end

  @impl true
  def handle_call({:get, path}, _from, state) do
    result =
      case state.cache.read(state.cache_target, state.store, path) do
        {:ok, value} ->
          {:ok, unwrap_value(value, state)}

        :miss ->
          case assemble_subtree_value(state, path) do
            nil -> :miss
            value -> {:ok, value}
          end
      end

    {:reply, result, state}
  end

  @impl true
  def handle_call({:get_many, paths}, _from, state) do
    raw = state.cache.read_many(state.cache_target, state.store, paths)

    result =
      Enum.reduce(raw, %{}, fn {path, {value, _type, _seq}}, acc ->
        Map.put(acc, path, unwrap_value(value, state))
      end)

    {:reply, result, state}
  end

  @impl true
  def handle_call({:put, path, value}, _from, state) do
    {op_msg, state} = do_put(path, value, [], nil, state)
    send_to_connection(state.store, op_msg)
    {:reply, :ok, state}
  end

  @impl true
  def handle_call({:put, path, value, opts}, from, state) do
    {op_msg, state} = do_put(path, value, opts, from, state)
    send_to_connection(state.store, op_msg)
    {:noreply, state}
  end

  # Lease ops are server-authoritative and acked. Fail fast when disconnected
  # (a lease acquired late is meaningless) rather than block on the ack.
  @impl true
  def handle_call({:lease, key, opts}, from, state) do
    if state.status == :connected do
      {op_msg, state} = do_lease_op(%{op: :lease, path: key}, lease_extra(opts), from, state)
      send_to_connection(state.store, op_msg)
      {:noreply, state}
    else
      {:reply, {:error, :unavailable}, state}
    end
  end

  @impl true
  def handle_call({:renew, lease, opts}, from, state) do
    if state.status == :connected do
      extra = %{token: lease.token, ttl_ms: Keyword.get(opts, :ttl_ms, 30_000)}
      {op_msg, state} = do_lease_op(%{op: :renew, path: lease.key}, extra, from, state)
      send_to_connection(state.store, op_msg)
      {:noreply, state}
    else
      {:reply, {:error, :unavailable}, state}
    end
  end

  @impl true
  def handle_call({:release, lease}, from, state) do
    if state.status == :connected do
      {op_msg, state} =
        do_lease_op(%{op: :release, path: lease.key}, %{token: lease.token}, from, state)

      send_to_connection(state.store, op_msg)
      {:noreply, state}
    else
      {:reply, {:error, :unavailable}, state}
    end
  end

  @impl true
  def handle_call({:delete, path}, _from, state) do
    state = do_delete(path, nil, state)
    {:reply, :ok, state}
  end

  @impl true
  def handle_call({:delete, path, _opts}, from, state) do
    state = do_delete(path, from, state)
    {:noreply, state}
  end

  @impl true
  def handle_call({:merge, path, map}, _from, state) do
    state = do_merge(path, map, nil, state)
    {:reply, :ok, state}
  end

  @impl true
  def handle_call({:merge, path, map, _opts}, from, state) do
    state = do_merge(path, map, from, state)
    {:noreply, state}
  end

  @impl true
  def handle_call({:increment, path, delta}, _from, state) do
    state = do_increment(path, delta, nil, state)
    {:reply, :ok, state}
  end

  @impl true
  def handle_call({:increment, path, delta, _opts}, from, state) do
    state = do_increment(path, delta, from, state)
    {:noreply, state}
  end

  @impl true
  def handle_call({:add, path, member}, _from, state) do
    state = do_set_op(:add, path, member, nil, state)
    {:reply, :ok, state}
  end

  @impl true
  def handle_call({:add, path, member, _opts}, from, state) do
    state = do_set_op(:add, path, member, from, state)
    {:noreply, state}
  end

  @impl true
  def handle_call({:remove, path, member}, _from, state) do
    state = do_set_op(:remove, path, member, nil, state)
    {:reply, :ok, state}
  end

  @impl true
  def handle_call({:remove, path, member, _opts}, from, state) do
    state = do_set_op(:remove, path, member, from, state)
    {:noreply, state}
  end

  @impl true
  def handle_call({:put_file, path, source_path, opts}, _from, state) do
    client_op_id = generate_op_id()

    # Read file and encode
    content = File.read!(source_path)
    base64_content = Base.encode64(content)
    filename = opts[:filename] || Path.basename(source_path)
    content_type = opts[:content_type] || "application/octet-stream"

    # Build optimistic file reference (we know the hash before upload)
    hash = "sha256:" <> (:crypto.hash(:sha256, content) |> Base.encode16(case: :lower))

    ref = %{
      "_type" => "file",
      "hash" => hash,
      "size" => byte_size(content),
      "content_type" => content_type,
      "filename" => filename,
      "uploaded_at" => DateTime.utc_now() |> DateTime.to_iso8601()
    }

    # Optimistic local write (store the reference)
    :ok = state.cache.write(state.cache_target, state.store, path, ref, "file", 0)

    # Fire local callback
    dispatch_callbacks(state, path, %{
      store: state.store,
      path: path,
      op: :put_file,
      value: ref,
      committed: false,
      source: :local,
      client_op_id: client_op_id
    })

    # Queue for server (connection sends put_file message with base64 content)
    op_msg = %{
      op: :put_file,
      path: path,
      client_op_id: client_op_id,
      content: base64_content,
      filename: filename,
      content_type: content_type
    }

    pending = Map.put(state.pending_ops, client_op_id, op_msg)
    state = %{state | pending_ops: pending}

    send_to_connection(state.store, op_msg)

    {:reply, :ok, state}
  end

  @impl true
  def handle_call({:enum, pattern}, _from, state) do
    results = state.cache.read_all(state.cache_target, state.store, pattern)
    {:reply, results, state}
  end

  @impl true
  def handle_call({:enum_paged, pattern, opts}, _from, state) do
    with :ok <- validate_enum_opts(pattern, opts) do
      limit = opts |> Keyword.get(:limit, 50) |> min(1000)
      order = Keyword.get(opts, :order, :asc)
      select = Keyword.get(opts, :select, :entries)
      cursor = Keyword.get(opts, :after)

      browse_opts = [
        pattern: pattern,
        limit: limit,
        order: order,
        select: select,
        cursor: cursor
      ]

      {items, next_cursor} = state.cache.browse(state.cache_target, state.store, browse_opts)
      page = Dust.Page.new(items: wrap_items(items, select), next_cursor: next_cursor)
      {:reply, page, state}
    else
      {:error, _} = err -> {:reply, err, state}
    end
  end

  @impl true
  def handle_call({:range, from, to, opts}, _from, state) do
    case Keyword.get(opts, :select, :entries) do
      :prefixes ->
        {:reply, {:error, :unsupported_select}, state}

      select when select in [:entries, :keys] ->
        limit = opts |> Keyword.get(:limit, 50) |> min(1000)
        order = Keyword.get(opts, :order, :asc)
        cursor = Keyword.get(opts, :after)

        browse_opts = [
          from: from,
          to: to,
          limit: limit,
          order: order,
          select: select,
          cursor: cursor
        ]

        {items, next_cursor} = state.cache.browse(state.cache_target, state.store, browse_opts)
        page = Dust.Page.new(items: wrap_items(items, select), next_cursor: next_cursor)
        {:reply, page, state}
    end
  end

  @impl true
  def handle_call({:entry, path}, _from, state) do
    reply =
      case state.cache.read_entry(state.cache_target, state.store, path) do
        {:ok, {value, type, seq, synced_at}} ->
          {:ok,
           Dust.Entry.new(
             path: path,
             value: value,
             type: type,
             revision: seq,
             synced_at: synced_at
           )}

        :miss ->
          case assemble_subtree_entry(state, path) do
            nil -> {:error, :not_found}
            entry -> {:ok, entry}
          end
      end

    {:reply, reply, state}
  end

  @impl true
  def handle_call(:status, _from, state) do
    entry_count =
      if function_exported?(state.cache, :count, 2) do
        state.cache.count(state.cache_target, state.store)
      else
        nil
      end

    status = %{
      connection: state.status,
      last_store_seq: state.last_store_seq,
      pending_ops: map_size(state.pending_ops),
      entry_count: entry_count,
      store: state.store,
      permissions: state.capabilities.permissions,
      scopes: state.capabilities.scopes,
      store_access: state.capabilities.store_access
    }

    {:reply, status, state}
  end

  @impl true
  def handle_call(:cache_info, _from, state) do
    {:reply, {state.cache, state.cache_target}, state}
  end

  @impl true
  def handle_call({:off, ref}, _from, state) do
    Dust.CallbackRegistry.unregister(state.callbacks, ref)

    {:reply, :ok,
     %{state | callback_monitors: demonitor_subscription(state.callback_monitors, ref)}}
  end

  @impl true
  def handle_call({:on, pattern, callback, opts}, from, state) do
    ref = Dust.CallbackRegistry.register(state.callbacks, state.store, pattern, callback, opts)

    # Opt-in: with `monitor: true` we Process.monitor the registering process
    # and GC its subscription on :DOWN (the rescue-free cleanup single_flight's
    # awaiter needs). Default OFF — long-lived subscriptions (LiveViews, the
    # one-shot SubscriberRegistrar which exits after init) manage their own
    # lifecycle and must NOT be torn down when the registrar dies.
    state =
      if Keyword.get(opts, :monitor, false) do
        {subscriber_pid, _tag} = from
        mon_ref = Process.monitor(subscriber_pid)
        %{state | callback_monitors: Map.put(state.callback_monitors, mon_ref, ref)}
      else
        state
      end

    # Bootstrap current matching entries if requested. Runs INSIDE handle_call
    # so no live events can fire between snapshot and return — the single-threaded
    # GenServer guarantees bootstrap items hit the worker mailbox before any
    # subsequent live event dispatched to the same worker.
    if Keyword.get(opts, :include_current, false) do
      emit_bootstrap_events(state, pattern, ref, opts)
    end

    {:reply, ref, state}
  end

  @impl true
  def handle_call({:seed_entry, path, value, type}, _from, state) do
    :ok = state.cache.write(state.cache_target, state.store, path, value, type, 0)
    {:reply, :ok, state}
  end

  # A monitored subscriber process died without calling off/2 — drop its
  # subscription (and the stale monitor entry).
  @impl true
  def handle_info({:DOWN, mon_ref, :process, _pid, _reason}, state) do
    case Map.pop(state.callback_monitors, mon_ref) do
      {nil, _} ->
        {:noreply, state}

      {sub_ref, monitors} ->
        Dust.CallbackRegistry.unregister(state.callbacks, sub_ref)
        {:noreply, %{state | callback_monitors: monitors}}
    end
  end

  # Backstop: if neither the phx_reply ack nor the committed echo answered a
  # waiting caller within the deadline, answer it {:error, :timeout} rather than
  # let it hang. Fires harmlessly (no-op) once the op is already answered or
  # reconciled. This is what lets lease/renew/release call with :infinity safely.
  @impl true
  def handle_info({:ack_timeout, client_op_id}, state) do
    case Map.get(state.pending_ops, client_op_id) do
      %{from: from} = op_attrs ->
        Logger.warning(
          "[Dust] no server ack/echo for #{op_attrs[:op]} #{client_op_id} within " <>
            "#{@ack_timeout_ms}ms — answering caller {:error, :timeout}"
        )

        GenServer.reply(from, {:error, :timeout})
        answered = Map.delete(op_attrs, :from)
        {:noreply, %{state | pending_ops: Map.put(state.pending_ops, client_op_id, answered)}}

      _already_answered_or_reconciled ->
        {:noreply, state}
    end
  end

  @impl true
  def handle_info(_msg, state), do: {:noreply, state}

  @impl true
  def handle_cast({:set_store_seq, seq}, state) do
    {:noreply, %{state | last_store_seq: seq}}
  end

  @impl true
  def handle_cast({:set_status, new_status}, state) do
    # When becoming connected, resend all pending ops
    if new_status == :connected and map_size(state.pending_ops) > 0 do
      Enum.each(state.pending_ops, fn {_client_op_id, op_attrs} ->
        send_to_connection(state.store, op_attrs)
      end)
    end

    {:noreply, %{state | status: new_status}}
  end

  @impl true
  def handle_cast({:set_capabilities, capabilities}, state) do
    {:noreply, %{state | capabilities: normalize_capabilities(capabilities)}}
  end

  @impl true
  def handle_cast({:snapshot, snapshot}, state) do
    snapshot_seq = snapshot["snapshot_seq"]
    entries = snapshot["entries"]

    # Bulk replace cache with snapshot data
    Enum.each(entries, fn {path, %{"value" => value, "type" => type}} ->
      state.cache.write(state.cache_target, state.store, path, value, type, snapshot_seq)
    end)

    {:noreply, %{state | last_store_seq: snapshot_seq}}
  end

  @impl true
  def handle_cast({:catch_up_complete, through_seq}, state) do
    {:noreply, %{state | catch_up_seq: through_seq}}
  end

  @impl true
  def handle_cast({:write_rejected, client_op_id, reason}, state) do
    error = rejection_reason(reason)

    case Map.pop(state.pending_ops, client_op_id) do
      {nil, _pending} ->
        # Already reconciled or unknown op
        {:noreply, state}

      {op_attrs, pending} when op_attrs.op in [:lease, :renew, :release] ->
        # Lease ops never wrote the cache optimistically and the key may hold
        # someone else's live lease — reply the error, touch nothing.
        case Map.get(op_attrs, :from) do
          nil -> :ok
          from -> GenServer.reply(from, {:error, error})
        end

        {:noreply, %{state | pending_ops: pending}}

      {op_attrs, pending} ->
        path = op_attrs.path

        # Roll back to previous value (or delete if there was none)
        case Map.get(op_attrs, :prev) do
          {:ok, prev_value} ->
            type = detect_type(prev_value)
            state.cache.write(state.cache_target, state.store, path, prev_value, type, 0)

          _ ->
            state.cache.delete(state.cache_target, state.store, path)
        end

        # Fire rejection callback so the app knows
        dispatch_callbacks(state, path, %{
          store: state.store,
          path: path,
          op: op_attrs.op,
          value: nil,
          committed: false,
          source: :server,
          client_op_id: client_op_id,
          error: %{code: :rejected, message: rejection_message(error)}
        })

        # If a put/4 caller is awaiting a reply, surface the error.
        case Map.get(op_attrs, :from) do
          nil -> :ok
          from -> GenServer.reply(from, {:error, error})
        end

        {:noreply, %{state | pending_ops: pending}}
    end
  end

  @impl true
  def handle_cast({:write_accepted, client_op_id, reply}, state) do
    case Map.get(state.pending_ops, client_op_id) do
      nil ->
        # The committed echo (:server_event) can arrive before this phx_reply
        # ack and reconcile (delete) the pending op first — in which case it
        # already answered the waiter. Expected under that ordering; logged, not
        # silently dropped.
        Logger.debug(
          "[Dust] write ack #{client_op_id} arrived after the committed echo reconciled it"
        )

        {:noreply, state}

      op_attrs ->
        # Answer the waiter from the ack (at-most-once via :from). Leave the op
        # in pending_ops — :server_event reconciliation deletes it — but strip
        # :from so the echo doesn't double-reply.
        answered = answer_waiter(op_attrs, reply)
        {:noreply, %{state | pending_ops: Map.put(state.pending_ops, client_op_id, answered)}}
    end
  end

  @impl true
  def handle_cast({:server_event, event}, state) do
    client_op_id = event["client_op_id"]
    path = event["path"]
    store_seq = event["store_seq"]
    op = String.to_existing_atom(event["op"])
    value = event["value"]

    # Update cache with canonical state. Plain-map :set ops flatten to leaf
    # entries (matching server storage); :delete clears the path AND every
    # descendant so subtree deletes propagate to local subscribers.
    case op do
      :set ->
        apply_set_to_cache(state, path, value, detect_type(value), store_seq)

      :delete ->
        _ = state.cache.delete_subtree(state.cache_target, state.store, path)

      :merge when is_map(value) ->
        {:ok, prefix_segs} = Dust.Protocol.Path.parse_rendered(path)

        Enum.each(value, fn {key, v} ->
          {:ok, child_segs} = Dust.Protocol.Path.child(prefix_segs, to_string(key))
          {:ok, child_path} = Dust.Protocol.Path.render(child_segs)
          apply_set_to_cache(state, child_path, v, detect_type(v), store_seq)
        end)

      :increment ->
        # Server sends the materialized value (not the delta) for cache reconciliation
        state.cache.write(state.cache_target, state.store, path, value, "counter", store_seq)

      :add ->
        state.cache.write(state.cache_target, state.store, path, value, "set", store_seq)

      :remove ->
        state.cache.write(state.cache_target, state.store, path, value, "set", store_seq)

      :put_file ->
        state.cache.write(state.cache_target, state.store, path, value, "file", store_seq)

      op when op in [:lease, :renew] ->
        # Lease envelope is a typed value; store it opaquely at the key.
        state.cache.write(state.cache_target, state.store, path, value, "lease", store_seq)

      :release ->
        state.cache.delete(state.cache_target, state.store, path)
    end

    # Reconcile pending ops. The committed echo can beat the phx_reply ack, so
    # answer any waiting caller HERE (from the echo's canonical value/seq) before
    # dropping the op — otherwise a caller blocked on the ack would hang forever,
    # since the ack would land against an already-deleted op.
    was_pending = Map.has_key?(state.pending_ops, client_op_id)

    _answered =
      answer_waiter(
        Map.get(state.pending_ops, client_op_id),
        ack_reply_from_event(value, store_seq)
      )

    pending = Map.delete(state.pending_ops, client_op_id)

    # Append to activity buffer for dashboard
    if state.activity_buffer do
      Dust.ActivityBuffer.append(state.activity_buffer, state.store, %{
        path: path,
        op: op,
        source: if(was_pending, do: :local, else: :server),
        seq: store_seq
      })
    end

    # Always dispatch the committed event with was_own carrying whether
    # this was our own write being echoed. Subscriptions filter by mode:
    # `:all` (the default) skips the echo of own writes (preserving the
    # historical single-fire semantics); `:committed` keeps it; `:optimistic`
    # ignores committed events entirely.
    dispatch_callbacks(state, path, %{
      store: state.store,
      path: path,
      op: op,
      value: value,
      store_seq: store_seq,
      committed: true,
      was_own: was_pending,
      source: :server,
      device_id: event["device_id"],
      client_op_id: client_op_id
    })

    state = %{state | pending_ops: pending, last_store_seq: store_seq}
    {:noreply, state}
  end

  defp dispatch_callbacks(state, path, event) do
    subscriptions = Dust.CallbackRegistry.match(state.callbacks, state.store, path)
    Enum.each(subscriptions, &dispatch_to_subscription(state, &1, event))
  end

  # Single canonical dispatch path: check backpressure, drop subscription +
  # fire on_resync on overflow, otherwise enqueue via CallbackWorker.dispatch.
  # Used by both live dispatch (dispatch_callbacks/3) and bootstrap
  # (emit_bootstrap_events/4) so the semantics are identical.
  defp dispatch_to_subscription(
         state,
         {worker_pid, ref, max_queue_size, on_resync, mode},
         event
       ) do
    if event_matches_mode?(mode, event) do
      queue_len = Dust.CallbackWorker.queue_len(worker_pid)

      if queue_len >= max_queue_size do
        # Subscription has fallen behind — drop it and notify
        Dust.CallbackRegistry.unregister(state.callbacks, ref)

        if is_function(on_resync, 1) do
          on_resync.(%{error: :resync_required, ref: ref})
        end
      else
        Dust.CallbackWorker.dispatch(worker_pid, event)
      end
    end
  end

  # `:all` preserves historical behaviour: optimistic for own writes (one
  # fire, no store_seq) and committed for others' writes (one fire, with
  # store_seq). `:committed` always wants the post-server event with
  # store_seq, including for own writes. `:optimistic` ignores committed
  # events entirely. Bootstrap events (`type: :present`) flow to every mode.
  defp event_matches_mode?(_mode, %{type: :present}), do: true

  defp event_matches_mode?(:all, %{committed: true, was_own: true}), do: false
  defp event_matches_mode?(:all, _), do: true

  defp event_matches_mode?(:committed, %{committed: true}), do: true
  defp event_matches_mode?(:committed, _), do: false

  defp event_matches_mode?(:optimistic, %{committed: false}), do: true
  defp event_matches_mode?(:optimistic, _), do: false

  defp emit_bootstrap_events(state, pattern, ref, opts) do
    limit = opts |> Keyword.get(:limit, 50) |> min(1000)
    order = Keyword.get(opts, :order, :asc)

    browse_opts = [
      pattern: pattern,
      limit: limit,
      order: order,
      select: :entries
    ]

    {items, _next_cursor} = state.cache.browse(state.cache_target, state.store, browse_opts)

    case Dust.CallbackRegistry.lookup(state.callbacks, ref) do
      nil ->
        :ok

      {worker_pid, max_queue_size, on_resync, mode} ->
        subscription = {worker_pid, ref, max_queue_size, on_resync, mode}

        Enum.reduce_while(items, subscription, fn {path, value, type, seq}, sub ->
          event = %{
            type: :present,
            path: path,
            value: value,
            entry_type: type,
            seq: seq
          }

          dispatch_to_subscription(state, sub, event)

          # If backpressure dropped the subscription during bootstrap, stop.
          if Dust.CallbackRegistry.lookup(state.callbacks, ref) == nil do
            {:halt, sub}
          else
            {:cont, sub}
          end
        end)

        :ok
    end
  end

  defp do_put(path, value, opts, from, state) do
    client_op_id = generate_op_id()
    type = detect_type(value)

    # Save previous value for rollback on rejection
    prev = state.cache.read(state.cache_target, state.store, path)

    # Optimistic local write — canonicalized to match server shape: plain-map
    # values get flattened to leaf entries, descendants cleared, no root leaf.
    :ok = apply_set_to_cache(state, path, value, type, 0)

    # Fire local callbacks
    dispatch_callbacks(state, path, %{
      store: state.store,
      path: path,
      op: :set,
      value: value,
      committed: false,
      source: :local,
      client_op_id: client_op_id
    })

    op_msg =
      %{op: :set, path: path, value: value, client_op_id: client_op_id, prev: prev}
      |> maybe_put_if_match(opts)
      |> maybe_put_if_absent(opts)
      |> maybe_put_fence_opt(opts)
      |> maybe_put_from(from)

    pending = Map.put(state.pending_ops, client_op_id, op_msg)
    {op_msg, %{state | pending_ops: pending}}
  end

  defp maybe_put_if_match(op_msg, opts) do
    case Keyword.fetch(opts, :if_match) do
      {:ok, value} -> Map.put(op_msg, :if_match, value)
      :error -> op_msg
    end
  end

  defp maybe_put_if_absent(op_msg, opts) do
    if Keyword.get(opts, :if_absent, false) do
      Map.put(op_msg, :if_absent, true)
    else
      op_msg
    end
  end

  defp maybe_put_from(op_msg, nil), do: op_msg
  defp maybe_put_from(op_msg, from), do: Map.put(op_msg, :from, from)

  # Drop and demonitor the monitor tracking subscription `sub_ref` (if any).
  defp demonitor_subscription(monitors, sub_ref) do
    case Enum.find(monitors, fn {_mon, ref} -> ref == sub_ref end) do
      {mon_ref, ^sub_ref} ->
        Process.demonitor(mon_ref, [:flush])
        Map.delete(monitors, mon_ref)

      nil ->
        monitors
    end
  end

  defp maybe_put_fence_opt(op_msg, opts) do
    case Keyword.get(opts, :fence) do
      %Dust.Lease{key: key, token: token} -> Map.put(op_msg, :fence, %{key: key, token: token})
      %{key: key, token: token} -> Map.put(op_msg, :fence, %{key: key, token: token})
      _ -> op_msg
    end
  end

  # Answer a caller blocked on an op's server ack, if one is waiting. At-most-once:
  # pops :from so whichever of {phx_reply ack, committed echo} lands second is a
  # no-op. The server can deliver those two in either order (it broadcasts the
  # echo before replying), so BOTH the :write_accepted and :server_event handlers
  # call this. Returns op_attrs with :from removed.
  defp answer_waiter(nil, _reply), do: nil

  defp answer_waiter(op_attrs, reply) do
    case Map.pop(op_attrs, :from) do
      {nil, attrs} ->
        attrs

      {from, attrs} ->
        GenServer.reply(from, build_ack_reply(attrs, reply))
        attrs
    end
  end

  # Shape a committed echo (canonical value + store_seq) into the same reply map
  # the phx_reply ack carries, so build_ack_reply works from either source. A
  # lease echo's value is the lease envelope (token/expires_at/holder); a scalar
  # set's value isn't a map, so only store_seq is meaningful.
  defp ack_reply_from_event(value, store_seq) when is_map(value),
    do: Map.put(value, "store_seq", store_seq)

  defp ack_reply_from_event(_value, store_seq), do: %{"store_seq" => store_seq}

  defp lease_extra(opts) do
    %{ttl_ms: Keyword.get(opts, :ttl_ms, 30_000), holder: Keyword.get(opts, :holder)}
  end

  # Lease ops do NOT write the cache optimistically — they are server-
  # authoritative claims, and the local key may hold someone else's live
  # lease that must not be clobbered or rolled back.
  defp do_lease_op(base, extra, from, state) do
    client_op_id = generate_op_id()

    op_msg =
      base
      |> Map.merge(extra)
      |> Map.put(:client_op_id, client_op_id)
      |> Map.put(:from, from)

    {op_msg, %{state | pending_ops: Map.put(state.pending_ops, client_op_id, op_msg)}}
  end

  # Shape the ack reply by op: leases hand back a %Dust.Lease{}, release is
  # idempotent :ok, everything else keeps the {:ok, store_seq} contract.
  defp build_ack_reply(%{op: op, path: path}, reply) when op in [:lease, :renew] do
    {:ok,
     Dust.Lease.new(
       key: path,
       token: reply_field(reply, :token),
       holder: reply_field(reply, :holder),
       expires_at: reply_field(reply, :expires_at)
     )}
  end

  defp build_ack_reply(%{op: :release}, _reply), do: :ok
  defp build_ack_reply(_op_attrs, reply), do: {:ok, reply_field(reply, :store_seq)}

  defp reply_field(reply, key), do: Map.get(reply, key) || Map.get(reply, to_string(key))

  defp do_delete(path, from, state) do
    client_op_id = generate_op_id()

    # Optimistic delete clears the leaf AND every descendant, matching the
    # server's DELETE op semantics so the cache stays consistent.
    _ = state.cache.delete_subtree(state.cache_target, state.store, path)

    dispatch_callbacks(state, path, %{
      store: state.store,
      path: path,
      op: :delete,
      value: nil,
      committed: false,
      source: :local,
      client_op_id: client_op_id
    })

    op_msg =
      %{op: :delete, path: path, client_op_id: client_op_id}
      |> maybe_put_from(from)

    pending = Map.put(state.pending_ops, client_op_id, op_msg)
    send_to_connection(state.store, op_msg)
    %{state | pending_ops: pending}
  end

  defp do_merge(path, map, from, state) do
    client_op_id = generate_op_id()
    {:ok, prefix_segs} = Dust.Protocol.Path.parse_rendered(path)

    Enum.each(map, fn {key, value} ->
      {:ok, child_segs} = Dust.Protocol.Path.child(prefix_segs, to_string(key))
      {:ok, child_path} = Dust.Protocol.Path.render(child_segs)
      state.cache.write(state.cache_target, state.store, child_path, value, detect_type(value), 0)
    end)

    dispatch_callbacks(state, path, %{
      store: state.store,
      path: path,
      op: :merge,
      value: map,
      committed: false,
      source: :local,
      client_op_id: client_op_id
    })

    op_msg =
      %{op: :merge, path: path, value: map, client_op_id: client_op_id}
      |> maybe_put_from(from)

    pending = Map.put(state.pending_ops, client_op_id, op_msg)
    send_to_connection(state.store, op_msg)
    %{state | pending_ops: pending}
  end

  defp do_increment(path, delta, from, state) do
    client_op_id = generate_op_id()

    current =
      case state.cache.read(state.cache_target, state.store, path) do
        {:ok, val} when is_number(val) -> val
        _ -> 0
      end

    new_value = current + delta
    :ok = state.cache.write(state.cache_target, state.store, path, new_value, "counter", 0)

    dispatch_callbacks(state, path, %{
      store: state.store,
      path: path,
      op: :increment,
      value: delta,
      committed: false,
      source: :local,
      client_op_id: client_op_id
    })

    op_msg =
      %{op: :increment, path: path, value: delta, client_op_id: client_op_id}
      |> maybe_put_from(from)

    pending = Map.put(state.pending_ops, client_op_id, op_msg)
    send_to_connection(state.store, op_msg)
    %{state | pending_ops: pending}
  end

  defp do_set_op(op, path, member, from, state) when op in [:add, :remove] do
    client_op_id = generate_op_id()

    current_set =
      case state.cache.read(state.cache_target, state.store, path) do
        {:ok, list} when is_list(list) -> list
        _ -> []
      end

    new_set =
      case op do
        :add -> Enum.uniq([member | current_set])
        :remove -> List.delete(current_set, member)
      end

    :ok = state.cache.write(state.cache_target, state.store, path, new_set, "set", 0)

    dispatch_callbacks(state, path, %{
      store: state.store,
      path: path,
      op: op,
      value: member,
      committed: false,
      source: :local,
      client_op_id: client_op_id
    })

    op_msg =
      %{op: op, path: path, value: member, client_op_id: client_op_id}
      |> maybe_put_from(from)

    pending = Map.put(state.pending_ops, client_op_id, op_msg)
    send_to_connection(state.store, op_msg)
    %{state | pending_ops: pending}
  end

  defp normalize_capabilities(capabilities) when is_map(capabilities) do
    %{
      permissions: normalize_permissions(capability_value(capabilities, :permissions)),
      scopes: normalize_scopes(capability_value(capabilities, :scopes)),
      store_access: normalize_store_access(capability_value(capabilities, :store_access))
    }
  end

  defp normalize_permissions(permissions) when is_map(permissions) do
    %{
      read: truthy?(capability_value(permissions, :read)),
      write: truthy?(capability_value(permissions, :write))
    }
  end

  defp normalize_permissions(_), do: @default_capabilities.permissions

  defp normalize_scopes(scopes) when is_list(scopes), do: Enum.map(scopes, &to_string/1)
  defp normalize_scopes(_), do: []

  defp normalize_store_access(store_access) when is_map(store_access) do
    %{
      mode: normalize_store_access_mode(capability_value(store_access, :mode)),
      store_ids: normalize_store_ids(capability_value(store_access, :store_ids))
    }
  end

  defp normalize_store_access(_), do: @default_capabilities.store_access

  defp normalize_store_access_mode(mode) when mode in [:all, "all"], do: :all
  defp normalize_store_access_mode(_), do: :selected

  defp normalize_store_ids(store_ids) when is_list(store_ids),
    do: Enum.map(store_ids, &to_string/1)

  defp normalize_store_ids(_), do: []

  defp capability_value(map, key) do
    Map.get(map, key, Map.get(map, to_string(key)))
  end

  defp truthy?(value), do: value in [true, "true", "1", 1]

  defp rejection_reason(%{"reason" => reason} = payload) do
    reason_with_context(reason, payload["scope"], payload["message"])
  end

  defp rejection_reason(%{reason: reason} = payload) do
    reason_with_context(reason, payload[:scope], payload[:message])
  end

  defp rejection_reason(reason), do: reason_to_atom(reason)

  defp reason_with_context(reason, scope, message)
       when reason in ["missing_scope", :missing_scope] do
    {:missing_scope, scope, message || missing_scope_message(scope)}
  end

  defp reason_with_context(reason, _scope, message)
       when reason in ["store_not_allowed", :store_not_allowed] do
    {:store_not_allowed, message || "Token does not have access to this store"}
  end

  defp reason_with_context(reason, _scope, _message), do: reason_to_atom(reason)

  defp missing_scope_message(scope) when is_binary(scope), do: "Token is missing #{scope} scope"
  defp missing_scope_message(_scope), do: "Token is missing required scope"

  defp rejection_message({_reason, _scope, message}) when is_binary(message), do: message
  defp rejection_message({_reason, message}) when is_binary(message), do: message
  defp rejection_message(reason) when is_atom(reason), do: Atom.to_string(reason)
  defp rejection_message(reason) when is_binary(reason), do: reason
  defp rejection_message(_reason), do: "unknown"

  defp reason_to_atom(reason) when is_atom(reason), do: reason
  defp reason_to_atom("conflict"), do: :conflict
  defp reason_to_atom("rate_limited"), do: :rate_limited
  defp reason_to_atom("unauthorized"), do: :unauthorized
  defp reason_to_atom("store_not_allowed"), do: :store_not_allowed
  defp reason_to_atom("missing_scope"), do: :missing_scope
  defp reason_to_atom("invalid_op"), do: :invalid_op
  defp reason_to_atom("capver_mismatch"), do: :capver_mismatch
  defp reason_to_atom("if_match_unsupported_op"), do: :if_match_unsupported_op
  defp reason_to_atom("if_match_multi_leaf"), do: :if_match_multi_leaf
  defp reason_to_atom("exists"), do: :exists
  defp reason_to_atom("invalid_precondition"), do: :invalid_precondition
  defp reason_to_atom("if_absent_unsupported_op"), do: :if_absent_unsupported_op
  defp reason_to_atom("if_absent_multi_leaf"), do: :if_absent_multi_leaf
  defp reason_to_atom("held"), do: :held
  defp reason_to_atom("occupied"), do: :occupied
  defp reason_to_atom("not_held"), do: :not_held
  defp reason_to_atom("fenced"), do: :fenced
  # Unknown string reasons are returned as-is so callers see the server's
  # raw reason without risking unsafe atom creation.
  defp reason_to_atom(reason) when is_binary(reason), do: reason
  defp reason_to_atom(_), do: :unknown

  defp send_to_connection(store, op_attrs) do
    arm_ack_deadline(op_attrs)

    case GenServer.whereis(Dust.Connection) do
      nil -> :ok
      pid -> send(pid, {:send_write, store, with_path_segments(op_attrs)})
    end
  end

  # Only acked ops (those carrying a waiting :from) need the deadline. Re-arming
  # on resend after reconnect is fine — a fresh deadline per attempt.
  defp arm_ack_deadline(%{from: _, client_op_id: client_op_id}),
    do: Process.send_after(self(), {:ack_timeout, client_op_id}, @ack_timeout_ms)

  defp arm_ack_deadline(_op_attrs), do: nil

  # capver 3 wire shape: send `path_segments` (authoritative) alongside
  # `path` (slash-rendered, for back-compat / display). The server
  # prefers segments when present.
  defp with_path_segments(%{path: path} = op_attrs) when is_binary(path) do
    case Dust.Protocol.Path.parse_rendered(path) do
      {:ok, segs} -> Map.put(op_attrs, :path_segments, segs)
      _ -> op_attrs
    end
  end

  defp with_path_segments(op_attrs), do: op_attrs

  defp generate_op_id do
    "op_" <> Base.url_encode64(:crypto.strong_rand_bytes(12), padding: false)
  end

  defp validate_enum_opts(pattern, opts) do
    case Keyword.get(opts, :select, :entries) do
      :prefixes ->
        if valid_prefix_pattern?(pattern), do: :ok, else: {:error, :invalid_pattern_for_prefixes}

      _ ->
        :ok
    end
  end

  defp valid_prefix_pattern?("**"), do: true
  defp valid_prefix_pattern?(pattern), do: String.ends_with?(pattern, "/**")

  defp wrap_items(items, :entries) do
    Enum.map(items, fn {path, value, type, seq} ->
      Dust.Entry.new(path: path, value: value, type: type, revision: seq)
    end)
  end

  defp wrap_items(items, :keys), do: items
  defp wrap_items(items, :prefixes), do: items

  defp unwrap_value(%{"_type" => "file"} = map, state) do
    Dust.FileRef.from_map(map, server_url: state.http_url, token: state.token)
  end

  defp unwrap_value(other, _state), do: other

  defp derive_http_url(nil), do: nil

  defp derive_http_url(ws_url) when is_binary(ws_url) do
    case URI.parse(ws_url) do
      %URI{scheme: scheme, host: host, port: port} when is_binary(host) ->
        http_scheme = if scheme in ["wss", "https"], do: "https", else: "http"
        port_part = if port, do: ":#{port}", else: ""
        "#{http_scheme}://#{host}#{port_part}"

      _ ->
        nil
    end
  end

  # --- Subtree assembly + canonicalization ---

  # `Dust.get/2` and `Dust.entry/2` fall back to subtree assembly when the
  # exact path has no leaf — matching the server's `assemble_subtree`
  # behaviour. After a map-mode write, leaves live at `<path>.<field>` and
  # nothing lives at `<path>` itself.
  defp assemble_subtree_value(state, path) do
    case state.cache.read_subtree(state.cache_target, state.store, path) do
      [] ->
        nil

      rows ->
        {:ok, prefix_segments} = Dust.Protocol.Path.parse_rendered(path)
        prefix_len = length(prefix_segments)

        Enum.reduce(rows, %{}, fn {p, value, _type, _seq}, acc ->
          case Dust.Protocol.Path.parse_rendered(p) do
            {:ok, entry_segments} when length(entry_segments) > prefix_len ->
              keys = Enum.drop(entry_segments, prefix_len)
              put_nested(acc, keys, unwrap_value(value, state))

            _ ->
              # Exact-path row or unparsable — skip.
              acc
          end
        end)
    end
  end

  defp assemble_subtree_entry(state, path) do
    case state.cache.read_subtree(state.cache_target, state.store, path) do
      [] ->
        nil

      rows ->
        max_seq = rows |> Enum.map(fn {_, _, _, seq} -> seq end) |> Enum.max()
        value = assemble_subtree_value(state, path)
        Dust.Entry.new(path: path, value: value, type: "map", revision: max_seq)
    end
  end

  defp put_nested(map, [key], value), do: Map.put(map, key, value)

  defp put_nested(map, [key | rest], value) do
    child = Map.get(map, key, %{})
    Map.put(map, key, put_nested(child, rest, value))
  end

  # A typed-value map represents a single value (file ref, decimal,
  # datetime, etc.), not a record-shaped object. Two cases:
  #
  # 1. Any struct (Decimal, DateTime, Dust.FileRef, ...). Structs are
  #    explicitly typed and must not be flattened — `%Decimal{coef: 5}`
  #    is one value, not a record.
  # 2. Maps wearing the canonical `_typed` + `_type` wire shape (used for
  #    typed values that have already been serialized to the wire).
  #
  # Plain object maps (no struct, no _typed/_type) are records and must
  # flatten to leaves on write to match server canonical form.
  defp typed_map?(value) when is_struct(value), do: true

  # Mirrors Dust.Sync.ValueCodec.typed_value?/1 on the server. File refs
  # are recognized by `_type: "file"` alone; other typed wrappers carry
  # both `_typed` and `_type` keys.
  defp typed_map?(%{"_type" => "file"}), do: true
  defp typed_map?(%{_type: "file"}), do: true
  defp typed_map?(%{"_typed" => _, "_type" => _}), do: true
  defp typed_map?(%{_typed: _, _type: _}), do: true
  defp typed_map?(_), do: false

  # Apply a `:set` op to the cache canonically. Plain-map values get
  # flattened: descendants under `path` are cleared first, then each leaf is
  # written. Typed values and scalars write at the exact path. Mirrors the
  # server's `Dust.Sync.Writer.apply_to_entries({:set, ...})` shape so the
  # cache stays consistent regardless of whether the data arrived from a
  # local optimistic write or a server event echo.
  defp apply_set_to_cache(state, path, value, type, seq) do
    if is_map(value) and not typed_map?(value) do
      _ = state.cache.delete_subtree(state.cache_target, state.store, path)
      flatten_into_cache(state, path, value, seq)
    else
      :ok = state.cache.write(state.cache_target, state.store, path, value, type, seq)
    end
  end

  defp flatten_into_cache(state, base_path, map, seq) when is_map(map) do
    {:ok, base_segs} = Dust.Protocol.Path.parse_rendered(base_path)

    Enum.each(map, fn {key, value} ->
      {:ok, child_segs} = Dust.Protocol.Path.child(base_segs, to_string(key))
      {:ok, child_path} = Dust.Protocol.Path.render(child_segs)

      if is_map(value) and not typed_map?(value) do
        flatten_into_cache(state, child_path, value, seq)
      else
        leaf_type = detect_type(value)

        :ok =
          state.cache.write(state.cache_target, state.store, child_path, value, leaf_type, seq)
      end
    end)
  end

  defp detect_type(%Decimal{}), do: "decimal"
  defp detect_type(%DateTime{}), do: "datetime"
  defp detect_type(value) when is_boolean(value), do: "boolean"
  defp detect_type(value) when is_map(value), do: "map"
  defp detect_type(value) when is_binary(value), do: "string"
  defp detect_type(value) when is_integer(value), do: "integer"
  defp detect_type(value) when is_float(value), do: "float"
  defp detect_type(nil), do: "null"
  defp detect_type(_), do: "string"
end