defmodule Dust.SyncEngine do
use GenServer
require Logger
# How long an acked op (one carrying a waiting :from) may go without a server
# ack or committed echo before the caller is answered {:error, :timeout}.
# Generous: real acks land in milliseconds; this only catches a dropped reply.
@ack_timeout_ms 30_000
@default_capabilities %{
permissions: %{read: false, write: false},
scopes: [],
store_access: %{mode: :selected, store_ids: []}
}
defstruct [
:store,
:cache,
:cache_target,
:callbacks,
:pending_ops,
:status,
:last_store_seq,
:catch_up_seq,
:activity_buffer,
# HTTP base URL (derived from the WS url) and bearer token, kept
# here so FileRefs unwrapped from the cache carry the auth context
# they need to fetch blob content via /api/files/:hash.
:http_url,
:token,
# monitor_ref => subscription ref. We Process.monitor the process that
# registers each subscription and auto-unregister it on :DOWN, so a
# crashed subscriber (e.g. a single_flight awaiter) never leaks an ETS
# row + CallbackWorker. The only rescue-free cleanup path.
:callback_monitors,
capabilities: @default_capabilities
]
def start_link(opts) do
store = Keyword.fetch!(opts, :store)
GenServer.start_link(__MODULE__, opts, name: via(store))
end
def via(store), do: {:via, Registry, {Dust.SyncEngineRegistry, store}}
# Normalize a user-supplied path to canonical slash-rendered form.
# Accepts a segment list or a canonical slash-rendered string.
# Strings are interpreted as canonical: `"example.com"` is one
# segment, not legacy two-segment "example"."com".
defp norm!(path) when is_list(path) do
case Dust.Protocol.Path.render(path) do
{:ok, p} -> p
{:error, reason} -> raise ArgumentError, "invalid path #{inspect(path)}: #{reason}"
end
end
defp norm!(path) when is_binary(path) do
case Dust.Protocol.Path.normalize_rendered(path) do
{:ok, p} -> p
{:error, reason} -> raise ArgumentError, "invalid path #{inspect(path)}: #{reason}"
end
end
# Patterns follow the same rule: canonical slash strings or segment
# lists. Wildcards `*` / `**` must already be expressed against
# slash separators.
defp norm_pattern!("**"), do: "**"
defp norm_pattern!(pattern) when is_list(pattern), do: norm!(pattern)
defp norm_pattern!(pattern) when is_binary(pattern) do
case Dust.Protocol.Glob.compile(pattern) do
{:ok, _} -> pattern
{:error, reason} -> raise ArgumentError, "invalid pattern #{inspect(pattern)}: #{reason}"
end
end
def get(store, path) do
GenServer.call(via(store), {:get, norm!(path)})
end
def get_many(store, paths) when is_list(paths) do
GenServer.call(via(store), {:get_many, Enum.map(paths, &norm!/1)})
end
def put(store, path, value) do
GenServer.call(via(store), {:put, norm!(path), value})
end
def put(store, path, value, opts) when is_list(opts) do
GenServer.call(via(store), {:put, norm!(path), value, opts})
end
def delete(store, path) do
GenServer.call(via(store), {:delete, norm!(path)})
end
def delete(store, path, opts) when is_list(opts) do
GenServer.call(via(store), {:delete, norm!(path), opts})
end
def merge(store, path, map) do
GenServer.call(via(store), {:merge, norm!(path), map})
end
def merge(store, path, map, opts) when is_list(opts) do
GenServer.call(via(store), {:merge, norm!(path), map, opts})
end
def increment(store, path, delta \\ 1) do
GenServer.call(via(store), {:increment, norm!(path), delta})
end
def increment(store, path, delta, opts) when is_list(opts) do
GenServer.call(via(store), {:increment, norm!(path), delta, opts})
end
def add(store, path, member) do
GenServer.call(via(store), {:add, norm!(path), member})
end
def add(store, path, member, opts) when is_list(opts) do
GenServer.call(via(store), {:add, norm!(path), member, opts})
end
def remove(store, path, member) do
GenServer.call(via(store), {:remove, norm!(path), member})
end
def remove(store, path, member, opts) when is_list(opts) do
GenServer.call(via(store), {:remove, norm!(path), member, opts})
end
def put_file(store, path, source_path, opts \\ []) do
GenServer.call(via(store), {:put_file, norm!(path), source_path, opts})
end
def enum(store, pattern) do
GenServer.call(via(store), {:enum, norm_pattern!(pattern)})
end
def enum(store, pattern, opts) when is_list(opts) do
GenServer.call(via(store), {:enum_paged, norm_pattern!(pattern), opts})
end
def range(store, from, to, opts \\ []) when is_binary(from) and is_binary(to) do
GenServer.call(via(store), {:range, norm!(from), norm!(to), opts})
end
def entry(store, path) do
GenServer.call(via(store), {:entry, norm!(path)})
end
def lease(store, key, opts \\ []) when is_list(opts) do
GenServer.call(via(store), {:lease, norm!(key), opts}, :infinity)
end
def renew(store, %Dust.Lease{} = lease, opts \\ []) when is_list(opts) do
GenServer.call(via(store), {:renew, lease, opts}, :infinity)
end
def release(store, %Dust.Lease{} = lease) do
GenServer.call(via(store), {:release, lease}, :infinity)
end
def status(store) do
GenServer.call(via(store), :status)
end
def on(store, pattern, callback, opts \\ []) do
GenServer.call(via(store), {:on, norm_pattern!(pattern), callback, opts})
end
@doc """
Remove a subscription previously registered with `on/4`. Returns `:ok`
whether or not a subscription with that ref existed (idempotent). After
this call returns, the subscription's callback will not fire again.
"""
def off(store, ref) when is_reference(ref) do
GenServer.call(via(store), {:off, ref})
end
def handle_server_event(store, event) do
GenServer.cast(via(store), {:server_event, event})
end
def set_status(store, new_status) do
GenServer.cast(via(store), {:set_status, new_status})
end
def set_capabilities(store, capabilities) when is_map(capabilities) do
GenServer.cast(via(store), {:set_capabilities, capabilities})
end
def handle_write_rejected(store, client_op_id, reason) do
GenServer.cast(via(store), {:write_rejected, client_op_id, reason})
end
def handle_write_accepted(store, client_op_id, store_seq) when is_integer(store_seq) do
GenServer.cast(via(store), {:write_accepted, client_op_id, %{store_seq: store_seq}})
end
def handle_write_accepted(store, client_op_id, reply) when is_map(reply) do
GenServer.cast(via(store), {:write_accepted, client_op_id, reply})
end
def set_catch_up_complete(store, through_seq) do
GenServer.cast(via(store), {:catch_up_complete, through_seq})
end
def handle_snapshot(store, snapshot) do
GenServer.cast(via(store), {:snapshot, snapshot})
end
@doc "Write directly to cache without the write pipeline. For test seeding only."
def seed_entry(store, path, value, type) do
GenServer.call(via(store), {:seed_entry, norm!(path), value, type})
end
@doc "Update last_store_seq in state. For test harness only."
def set_store_seq(store, seq) do
GenServer.cast(via(store), {:set_store_seq, seq})
end
# Server
@impl true
def init(opts) do
store = Keyword.fetch!(opts, :store)
{cache_mod, cache_opts} = Keyword.fetch!(opts, :cache)
cache_target =
case cache_opts do
opts when is_list(opts) ->
{:ok, pid} = cache_mod.start_link(opts)
pid
pid when is_pid(pid) ->
pid
module when is_atom(module) ->
# Stateless adapter (e.g., Ecto) — the target is the module itself (a Repo)
module
end
callbacks = Dust.CallbackRegistry.new()
last_seq = cache_mod.last_seq(cache_target, store)
activity_buffer = Keyword.get(opts, :activity_buffer)
state = %__MODULE__{
store: store,
cache: cache_mod,
cache_target: cache_target,
callbacks: callbacks,
pending_ops: %{},
status: :disconnected,
last_store_seq: last_seq,
activity_buffer: activity_buffer,
http_url: derive_http_url(Keyword.get(opts, :url)),
token: Keyword.get(opts, :token),
callback_monitors: %{},
capabilities: @default_capabilities
}
{:ok, state}
end
@impl true
def handle_call({:get, path}, _from, state) do
result =
case state.cache.read(state.cache_target, state.store, path) do
{:ok, value} ->
{:ok, unwrap_value(value, state)}
:miss ->
case assemble_subtree_value(state, path) do
nil -> :miss
value -> {:ok, value}
end
end
{:reply, result, state}
end
@impl true
def handle_call({:get_many, paths}, _from, state) do
raw = state.cache.read_many(state.cache_target, state.store, paths)
result =
Enum.reduce(raw, %{}, fn {path, {value, _type, _seq}}, acc ->
Map.put(acc, path, unwrap_value(value, state))
end)
{:reply, result, state}
end
@impl true
def handle_call({:put, path, value}, _from, state) do
{op_msg, state} = do_put(path, value, [], nil, state)
send_to_connection(state.store, op_msg)
{:reply, :ok, state}
end
@impl true
def handle_call({:put, path, value, opts}, from, state) do
{op_msg, state} = do_put(path, value, opts, from, state)
send_to_connection(state.store, op_msg)
{:noreply, state}
end
# Lease ops are server-authoritative and acked. Fail fast when disconnected
# (a lease acquired late is meaningless) rather than block on the ack.
@impl true
def handle_call({:lease, key, opts}, from, state) do
if state.status == :connected do
{op_msg, state} = do_lease_op(%{op: :lease, path: key}, lease_extra(opts), from, state)
send_to_connection(state.store, op_msg)
{:noreply, state}
else
{:reply, {:error, :unavailable}, state}
end
end
@impl true
def handle_call({:renew, lease, opts}, from, state) do
if state.status == :connected do
extra = %{token: lease.token, ttl_ms: Keyword.get(opts, :ttl_ms, 30_000)}
{op_msg, state} = do_lease_op(%{op: :renew, path: lease.key}, extra, from, state)
send_to_connection(state.store, op_msg)
{:noreply, state}
else
{:reply, {:error, :unavailable}, state}
end
end
@impl true
def handle_call({:release, lease}, from, state) do
if state.status == :connected do
{op_msg, state} =
do_lease_op(%{op: :release, path: lease.key}, %{token: lease.token}, from, state)
send_to_connection(state.store, op_msg)
{:noreply, state}
else
{:reply, {:error, :unavailable}, state}
end
end
@impl true
def handle_call({:delete, path}, _from, state) do
state = do_delete(path, nil, state)
{:reply, :ok, state}
end
@impl true
def handle_call({:delete, path, _opts}, from, state) do
state = do_delete(path, from, state)
{:noreply, state}
end
@impl true
def handle_call({:merge, path, map}, _from, state) do
state = do_merge(path, map, nil, state)
{:reply, :ok, state}
end
@impl true
def handle_call({:merge, path, map, _opts}, from, state) do
state = do_merge(path, map, from, state)
{:noreply, state}
end
@impl true
def handle_call({:increment, path, delta}, _from, state) do
state = do_increment(path, delta, nil, state)
{:reply, :ok, state}
end
@impl true
def handle_call({:increment, path, delta, _opts}, from, state) do
state = do_increment(path, delta, from, state)
{:noreply, state}
end
@impl true
def handle_call({:add, path, member}, _from, state) do
state = do_set_op(:add, path, member, nil, state)
{:reply, :ok, state}
end
@impl true
def handle_call({:add, path, member, _opts}, from, state) do
state = do_set_op(:add, path, member, from, state)
{:noreply, state}
end
@impl true
def handle_call({:remove, path, member}, _from, state) do
state = do_set_op(:remove, path, member, nil, state)
{:reply, :ok, state}
end
@impl true
def handle_call({:remove, path, member, _opts}, from, state) do
state = do_set_op(:remove, path, member, from, state)
{:noreply, state}
end
@impl true
def handle_call({:put_file, path, source_path, opts}, _from, state) do
client_op_id = generate_op_id()
# Read file and encode
content = File.read!(source_path)
base64_content = Base.encode64(content)
filename = opts[:filename] || Path.basename(source_path)
content_type = opts[:content_type] || "application/octet-stream"
# Build optimistic file reference (we know the hash before upload)
hash = "sha256:" <> (:crypto.hash(:sha256, content) |> Base.encode16(case: :lower))
ref = %{
"_type" => "file",
"hash" => hash,
"size" => byte_size(content),
"content_type" => content_type,
"filename" => filename,
"uploaded_at" => DateTime.utc_now() |> DateTime.to_iso8601()
}
# Optimistic local write (store the reference)
:ok = state.cache.write(state.cache_target, state.store, path, ref, "file", 0)
# Fire local callback
dispatch_callbacks(state, path, %{
store: state.store,
path: path,
op: :put_file,
value: ref,
committed: false,
source: :local,
client_op_id: client_op_id
})
# Queue for server (connection sends put_file message with base64 content)
op_msg = %{
op: :put_file,
path: path,
client_op_id: client_op_id,
content: base64_content,
filename: filename,
content_type: content_type
}
pending = Map.put(state.pending_ops, client_op_id, op_msg)
state = %{state | pending_ops: pending}
send_to_connection(state.store, op_msg)
{:reply, :ok, state}
end
@impl true
def handle_call({:enum, pattern}, _from, state) do
results = state.cache.read_all(state.cache_target, state.store, pattern)
{:reply, results, state}
end
@impl true
def handle_call({:enum_paged, pattern, opts}, _from, state) do
with :ok <- validate_enum_opts(pattern, opts) do
limit = opts |> Keyword.get(:limit, 50) |> min(1000)
order = Keyword.get(opts, :order, :asc)
select = Keyword.get(opts, :select, :entries)
cursor = Keyword.get(opts, :after)
browse_opts = [
pattern: pattern,
limit: limit,
order: order,
select: select,
cursor: cursor
]
{items, next_cursor} = state.cache.browse(state.cache_target, state.store, browse_opts)
page = Dust.Page.new(items: wrap_items(items, select), next_cursor: next_cursor)
{:reply, page, state}
else
{:error, _} = err -> {:reply, err, state}
end
end
@impl true
def handle_call({:range, from, to, opts}, _from, state) do
case Keyword.get(opts, :select, :entries) do
:prefixes ->
{:reply, {:error, :unsupported_select}, state}
select when select in [:entries, :keys] ->
limit = opts |> Keyword.get(:limit, 50) |> min(1000)
order = Keyword.get(opts, :order, :asc)
cursor = Keyword.get(opts, :after)
browse_opts = [
from: from,
to: to,
limit: limit,
order: order,
select: select,
cursor: cursor
]
{items, next_cursor} = state.cache.browse(state.cache_target, state.store, browse_opts)
page = Dust.Page.new(items: wrap_items(items, select), next_cursor: next_cursor)
{:reply, page, state}
end
end
@impl true
def handle_call({:entry, path}, _from, state) do
reply =
case state.cache.read_entry(state.cache_target, state.store, path) do
{:ok, {value, type, seq, synced_at}} ->
{:ok,
Dust.Entry.new(
path: path,
value: value,
type: type,
revision: seq,
synced_at: synced_at
)}
:miss ->
case assemble_subtree_entry(state, path) do
nil -> {:error, :not_found}
entry -> {:ok, entry}
end
end
{:reply, reply, state}
end
@impl true
def handle_call(:status, _from, state) do
entry_count =
if function_exported?(state.cache, :count, 2) do
state.cache.count(state.cache_target, state.store)
else
nil
end
status = %{
connection: state.status,
last_store_seq: state.last_store_seq,
pending_ops: map_size(state.pending_ops),
entry_count: entry_count,
store: state.store,
permissions: state.capabilities.permissions,
scopes: state.capabilities.scopes,
store_access: state.capabilities.store_access
}
{:reply, status, state}
end
@impl true
def handle_call(:cache_info, _from, state) do
{:reply, {state.cache, state.cache_target}, state}
end
@impl true
def handle_call({:off, ref}, _from, state) do
Dust.CallbackRegistry.unregister(state.callbacks, ref)
{:reply, :ok,
%{state | callback_monitors: demonitor_subscription(state.callback_monitors, ref)}}
end
@impl true
def handle_call({:on, pattern, callback, opts}, from, state) do
ref = Dust.CallbackRegistry.register(state.callbacks, state.store, pattern, callback, opts)
# Opt-in: with `monitor: true` we Process.monitor the registering process
# and GC its subscription on :DOWN (the rescue-free cleanup single_flight's
# awaiter needs). Default OFF — long-lived subscriptions (LiveViews, the
# one-shot SubscriberRegistrar which exits after init) manage their own
# lifecycle and must NOT be torn down when the registrar dies.
state =
if Keyword.get(opts, :monitor, false) do
{subscriber_pid, _tag} = from
mon_ref = Process.monitor(subscriber_pid)
%{state | callback_monitors: Map.put(state.callback_monitors, mon_ref, ref)}
else
state
end
# Bootstrap current matching entries if requested. Runs INSIDE handle_call
# so no live events can fire between snapshot and return — the single-threaded
# GenServer guarantees bootstrap items hit the worker mailbox before any
# subsequent live event dispatched to the same worker.
if Keyword.get(opts, :include_current, false) do
emit_bootstrap_events(state, pattern, ref, opts)
end
{:reply, ref, state}
end
@impl true
def handle_call({:seed_entry, path, value, type}, _from, state) do
:ok = state.cache.write(state.cache_target, state.store, path, value, type, 0)
{:reply, :ok, state}
end
# A monitored subscriber process died without calling off/2 — drop its
# subscription (and the stale monitor entry).
@impl true
def handle_info({:DOWN, mon_ref, :process, _pid, _reason}, state) do
case Map.pop(state.callback_monitors, mon_ref) do
{nil, _} ->
{:noreply, state}
{sub_ref, monitors} ->
Dust.CallbackRegistry.unregister(state.callbacks, sub_ref)
{:noreply, %{state | callback_monitors: monitors}}
end
end
# Backstop: if neither the phx_reply ack nor the committed echo answered a
# waiting caller within the deadline, answer it {:error, :timeout} rather than
# let it hang. Fires harmlessly (no-op) once the op is already answered or
# reconciled. This is what lets lease/renew/release call with :infinity safely.
@impl true
def handle_info({:ack_timeout, client_op_id}, state) do
case Map.get(state.pending_ops, client_op_id) do
%{from: from} = op_attrs ->
Logger.warning(
"[Dust] no server ack/echo for #{op_attrs[:op]} #{client_op_id} within " <>
"#{@ack_timeout_ms}ms — answering caller {:error, :timeout}"
)
GenServer.reply(from, {:error, :timeout})
answered = Map.delete(op_attrs, :from)
{:noreply, %{state | pending_ops: Map.put(state.pending_ops, client_op_id, answered)}}
_already_answered_or_reconciled ->
{:noreply, state}
end
end
@impl true
def handle_info(_msg, state), do: {:noreply, state}
@impl true
def handle_cast({:set_store_seq, seq}, state) do
{:noreply, %{state | last_store_seq: seq}}
end
@impl true
def handle_cast({:set_status, new_status}, state) do
# When becoming connected, resend all pending ops
if new_status == :connected and map_size(state.pending_ops) > 0 do
Enum.each(state.pending_ops, fn {_client_op_id, op_attrs} ->
send_to_connection(state.store, op_attrs)
end)
end
{:noreply, %{state | status: new_status}}
end
@impl true
def handle_cast({:set_capabilities, capabilities}, state) do
{:noreply, %{state | capabilities: normalize_capabilities(capabilities)}}
end
@impl true
def handle_cast({:snapshot, snapshot}, state) do
snapshot_seq = snapshot["snapshot_seq"]
entries = snapshot["entries"]
# Bulk replace cache with snapshot data
Enum.each(entries, fn {path, %{"value" => value, "type" => type}} ->
state.cache.write(state.cache_target, state.store, path, value, type, snapshot_seq)
end)
{:noreply, %{state | last_store_seq: snapshot_seq}}
end
@impl true
def handle_cast({:catch_up_complete, through_seq}, state) do
{:noreply, %{state | catch_up_seq: through_seq}}
end
@impl true
def handle_cast({:write_rejected, client_op_id, reason}, state) do
error = rejection_reason(reason)
case Map.pop(state.pending_ops, client_op_id) do
{nil, _pending} ->
# Already reconciled or unknown op
{:noreply, state}
{op_attrs, pending} when op_attrs.op in [:lease, :renew, :release] ->
# Lease ops never wrote the cache optimistically and the key may hold
# someone else's live lease — reply the error, touch nothing.
case Map.get(op_attrs, :from) do
nil -> :ok
from -> GenServer.reply(from, {:error, error})
end
{:noreply, %{state | pending_ops: pending}}
{op_attrs, pending} ->
path = op_attrs.path
# Roll back to previous value (or delete if there was none)
case Map.get(op_attrs, :prev) do
{:ok, prev_value} ->
type = detect_type(prev_value)
state.cache.write(state.cache_target, state.store, path, prev_value, type, 0)
_ ->
state.cache.delete(state.cache_target, state.store, path)
end
# Fire rejection callback so the app knows
dispatch_callbacks(state, path, %{
store: state.store,
path: path,
op: op_attrs.op,
value: nil,
committed: false,
source: :server,
client_op_id: client_op_id,
error: %{code: :rejected, message: rejection_message(error)}
})
# If a put/4 caller is awaiting a reply, surface the error.
case Map.get(op_attrs, :from) do
nil -> :ok
from -> GenServer.reply(from, {:error, error})
end
{:noreply, %{state | pending_ops: pending}}
end
end
@impl true
def handle_cast({:write_accepted, client_op_id, reply}, state) do
case Map.get(state.pending_ops, client_op_id) do
nil ->
# The committed echo (:server_event) can arrive before this phx_reply
# ack and reconcile (delete) the pending op first — in which case it
# already answered the waiter. Expected under that ordering; logged, not
# silently dropped.
Logger.debug(
"[Dust] write ack #{client_op_id} arrived after the committed echo reconciled it"
)
{:noreply, state}
op_attrs ->
# Answer the waiter from the ack (at-most-once via :from). Leave the op
# in pending_ops — :server_event reconciliation deletes it — but strip
# :from so the echo doesn't double-reply.
answered = answer_waiter(op_attrs, reply)
{:noreply, %{state | pending_ops: Map.put(state.pending_ops, client_op_id, answered)}}
end
end
@impl true
def handle_cast({:server_event, event}, state) do
client_op_id = event["client_op_id"]
path = event["path"]
store_seq = event["store_seq"]
op = String.to_existing_atom(event["op"])
value = event["value"]
# Update cache with canonical state. Plain-map :set ops flatten to leaf
# entries (matching server storage); :delete clears the path AND every
# descendant so subtree deletes propagate to local subscribers.
case op do
:set ->
apply_set_to_cache(state, path, value, detect_type(value), store_seq)
:delete ->
_ = state.cache.delete_subtree(state.cache_target, state.store, path)
:merge when is_map(value) ->
{:ok, prefix_segs} = Dust.Protocol.Path.parse_rendered(path)
Enum.each(value, fn {key, v} ->
{:ok, child_segs} = Dust.Protocol.Path.child(prefix_segs, to_string(key))
{:ok, child_path} = Dust.Protocol.Path.render(child_segs)
apply_set_to_cache(state, child_path, v, detect_type(v), store_seq)
end)
:increment ->
# Server sends the materialized value (not the delta) for cache reconciliation
state.cache.write(state.cache_target, state.store, path, value, "counter", store_seq)
:add ->
state.cache.write(state.cache_target, state.store, path, value, "set", store_seq)
:remove ->
state.cache.write(state.cache_target, state.store, path, value, "set", store_seq)
:put_file ->
state.cache.write(state.cache_target, state.store, path, value, "file", store_seq)
op when op in [:lease, :renew] ->
# Lease envelope is a typed value; store it opaquely at the key.
state.cache.write(state.cache_target, state.store, path, value, "lease", store_seq)
:release ->
state.cache.delete(state.cache_target, state.store, path)
end
# Reconcile pending ops. The committed echo can beat the phx_reply ack, so
# answer any waiting caller HERE (from the echo's canonical value/seq) before
# dropping the op — otherwise a caller blocked on the ack would hang forever,
# since the ack would land against an already-deleted op.
was_pending = Map.has_key?(state.pending_ops, client_op_id)
_answered =
answer_waiter(
Map.get(state.pending_ops, client_op_id),
ack_reply_from_event(value, store_seq)
)
pending = Map.delete(state.pending_ops, client_op_id)
# Append to activity buffer for dashboard
if state.activity_buffer do
Dust.ActivityBuffer.append(state.activity_buffer, state.store, %{
path: path,
op: op,
source: if(was_pending, do: :local, else: :server),
seq: store_seq
})
end
# Always dispatch the committed event with was_own carrying whether
# this was our own write being echoed. Subscriptions filter by mode:
# `:all` (the default) skips the echo of own writes (preserving the
# historical single-fire semantics); `:committed` keeps it; `:optimistic`
# ignores committed events entirely.
dispatch_callbacks(state, path, %{
store: state.store,
path: path,
op: op,
value: value,
store_seq: store_seq,
committed: true,
was_own: was_pending,
source: :server,
device_id: event["device_id"],
client_op_id: client_op_id
})
state = %{state | pending_ops: pending, last_store_seq: store_seq}
{:noreply, state}
end
defp dispatch_callbacks(state, path, event) do
subscriptions = Dust.CallbackRegistry.match(state.callbacks, state.store, path)
Enum.each(subscriptions, &dispatch_to_subscription(state, &1, event))
end
# Single canonical dispatch path: check backpressure, drop subscription +
# fire on_resync on overflow, otherwise enqueue via CallbackWorker.dispatch.
# Used by both live dispatch (dispatch_callbacks/3) and bootstrap
# (emit_bootstrap_events/4) so the semantics are identical.
defp dispatch_to_subscription(
state,
{worker_pid, ref, max_queue_size, on_resync, mode},
event
) do
if event_matches_mode?(mode, event) do
queue_len = Dust.CallbackWorker.queue_len(worker_pid)
if queue_len >= max_queue_size do
# Subscription has fallen behind — drop it and notify
Dust.CallbackRegistry.unregister(state.callbacks, ref)
if is_function(on_resync, 1) do
on_resync.(%{error: :resync_required, ref: ref})
end
else
Dust.CallbackWorker.dispatch(worker_pid, event)
end
end
end
# `:all` preserves historical behaviour: optimistic for own writes (one
# fire, no store_seq) and committed for others' writes (one fire, with
# store_seq). `:committed` always wants the post-server event with
# store_seq, including for own writes. `:optimistic` ignores committed
# events entirely. Bootstrap events (`type: :present`) flow to every mode.
defp event_matches_mode?(_mode, %{type: :present}), do: true
defp event_matches_mode?(:all, %{committed: true, was_own: true}), do: false
defp event_matches_mode?(:all, _), do: true
defp event_matches_mode?(:committed, %{committed: true}), do: true
defp event_matches_mode?(:committed, _), do: false
defp event_matches_mode?(:optimistic, %{committed: false}), do: true
defp event_matches_mode?(:optimistic, _), do: false
defp emit_bootstrap_events(state, pattern, ref, opts) do
limit = opts |> Keyword.get(:limit, 50) |> min(1000)
order = Keyword.get(opts, :order, :asc)
browse_opts = [
pattern: pattern,
limit: limit,
order: order,
select: :entries
]
{items, _next_cursor} = state.cache.browse(state.cache_target, state.store, browse_opts)
case Dust.CallbackRegistry.lookup(state.callbacks, ref) do
nil ->
:ok
{worker_pid, max_queue_size, on_resync, mode} ->
subscription = {worker_pid, ref, max_queue_size, on_resync, mode}
Enum.reduce_while(items, subscription, fn {path, value, type, seq}, sub ->
event = %{
type: :present,
path: path,
value: value,
entry_type: type,
seq: seq
}
dispatch_to_subscription(state, sub, event)
# If backpressure dropped the subscription during bootstrap, stop.
if Dust.CallbackRegistry.lookup(state.callbacks, ref) == nil do
{:halt, sub}
else
{:cont, sub}
end
end)
:ok
end
end
defp do_put(path, value, opts, from, state) do
client_op_id = generate_op_id()
type = detect_type(value)
# Save previous value for rollback on rejection
prev = state.cache.read(state.cache_target, state.store, path)
# Optimistic local write — canonicalized to match server shape: plain-map
# values get flattened to leaf entries, descendants cleared, no root leaf.
:ok = apply_set_to_cache(state, path, value, type, 0)
# Fire local callbacks
dispatch_callbacks(state, path, %{
store: state.store,
path: path,
op: :set,
value: value,
committed: false,
source: :local,
client_op_id: client_op_id
})
op_msg =
%{op: :set, path: path, value: value, client_op_id: client_op_id, prev: prev}
|> maybe_put_if_match(opts)
|> maybe_put_if_absent(opts)
|> maybe_put_fence_opt(opts)
|> maybe_put_from(from)
pending = Map.put(state.pending_ops, client_op_id, op_msg)
{op_msg, %{state | pending_ops: pending}}
end
defp maybe_put_if_match(op_msg, opts) do
case Keyword.fetch(opts, :if_match) do
{:ok, value} -> Map.put(op_msg, :if_match, value)
:error -> op_msg
end
end
defp maybe_put_if_absent(op_msg, opts) do
if Keyword.get(opts, :if_absent, false) do
Map.put(op_msg, :if_absent, true)
else
op_msg
end
end
defp maybe_put_from(op_msg, nil), do: op_msg
defp maybe_put_from(op_msg, from), do: Map.put(op_msg, :from, from)
# Drop and demonitor the monitor tracking subscription `sub_ref` (if any).
defp demonitor_subscription(monitors, sub_ref) do
case Enum.find(monitors, fn {_mon, ref} -> ref == sub_ref end) do
{mon_ref, ^sub_ref} ->
Process.demonitor(mon_ref, [:flush])
Map.delete(monitors, mon_ref)
nil ->
monitors
end
end
defp maybe_put_fence_opt(op_msg, opts) do
case Keyword.get(opts, :fence) do
%Dust.Lease{key: key, token: token} -> Map.put(op_msg, :fence, %{key: key, token: token})
%{key: key, token: token} -> Map.put(op_msg, :fence, %{key: key, token: token})
_ -> op_msg
end
end
# Answer a caller blocked on an op's server ack, if one is waiting. At-most-once:
# pops :from so whichever of {phx_reply ack, committed echo} lands second is a
# no-op. The server can deliver those two in either order (it broadcasts the
# echo before replying), so BOTH the :write_accepted and :server_event handlers
# call this. Returns op_attrs with :from removed.
defp answer_waiter(nil, _reply), do: nil
defp answer_waiter(op_attrs, reply) do
case Map.pop(op_attrs, :from) do
{nil, attrs} ->
attrs
{from, attrs} ->
GenServer.reply(from, build_ack_reply(attrs, reply))
attrs
end
end
# Shape a committed echo (canonical value + store_seq) into the same reply map
# the phx_reply ack carries, so build_ack_reply works from either source. A
# lease echo's value is the lease envelope (token/expires_at/holder); a scalar
# set's value isn't a map, so only store_seq is meaningful.
defp ack_reply_from_event(value, store_seq) when is_map(value),
do: Map.put(value, "store_seq", store_seq)
defp ack_reply_from_event(_value, store_seq), do: %{"store_seq" => store_seq}
defp lease_extra(opts) do
%{ttl_ms: Keyword.get(opts, :ttl_ms, 30_000), holder: Keyword.get(opts, :holder)}
end
# Lease ops do NOT write the cache optimistically — they are server-
# authoritative claims, and the local key may hold someone else's live
# lease that must not be clobbered or rolled back.
defp do_lease_op(base, extra, from, state) do
client_op_id = generate_op_id()
op_msg =
base
|> Map.merge(extra)
|> Map.put(:client_op_id, client_op_id)
|> Map.put(:from, from)
{op_msg, %{state | pending_ops: Map.put(state.pending_ops, client_op_id, op_msg)}}
end
# Shape the ack reply by op: leases hand back a %Dust.Lease{}, release is
# idempotent :ok, everything else keeps the {:ok, store_seq} contract.
defp build_ack_reply(%{op: op, path: path}, reply) when op in [:lease, :renew] do
{:ok,
Dust.Lease.new(
key: path,
token: reply_field(reply, :token),
holder: reply_field(reply, :holder),
expires_at: reply_field(reply, :expires_at)
)}
end
defp build_ack_reply(%{op: :release}, _reply), do: :ok
defp build_ack_reply(_op_attrs, reply), do: {:ok, reply_field(reply, :store_seq)}
defp reply_field(reply, key), do: Map.get(reply, key) || Map.get(reply, to_string(key))
defp do_delete(path, from, state) do
client_op_id = generate_op_id()
# Optimistic delete clears the leaf AND every descendant, matching the
# server's DELETE op semantics so the cache stays consistent.
_ = state.cache.delete_subtree(state.cache_target, state.store, path)
dispatch_callbacks(state, path, %{
store: state.store,
path: path,
op: :delete,
value: nil,
committed: false,
source: :local,
client_op_id: client_op_id
})
op_msg =
%{op: :delete, path: path, client_op_id: client_op_id}
|> maybe_put_from(from)
pending = Map.put(state.pending_ops, client_op_id, op_msg)
send_to_connection(state.store, op_msg)
%{state | pending_ops: pending}
end
defp do_merge(path, map, from, state) do
client_op_id = generate_op_id()
{:ok, prefix_segs} = Dust.Protocol.Path.parse_rendered(path)
Enum.each(map, fn {key, value} ->
{:ok, child_segs} = Dust.Protocol.Path.child(prefix_segs, to_string(key))
{:ok, child_path} = Dust.Protocol.Path.render(child_segs)
state.cache.write(state.cache_target, state.store, child_path, value, detect_type(value), 0)
end)
dispatch_callbacks(state, path, %{
store: state.store,
path: path,
op: :merge,
value: map,
committed: false,
source: :local,
client_op_id: client_op_id
})
op_msg =
%{op: :merge, path: path, value: map, client_op_id: client_op_id}
|> maybe_put_from(from)
pending = Map.put(state.pending_ops, client_op_id, op_msg)
send_to_connection(state.store, op_msg)
%{state | pending_ops: pending}
end
defp do_increment(path, delta, from, state) do
client_op_id = generate_op_id()
current =
case state.cache.read(state.cache_target, state.store, path) do
{:ok, val} when is_number(val) -> val
_ -> 0
end
new_value = current + delta
:ok = state.cache.write(state.cache_target, state.store, path, new_value, "counter", 0)
dispatch_callbacks(state, path, %{
store: state.store,
path: path,
op: :increment,
value: delta,
committed: false,
source: :local,
client_op_id: client_op_id
})
op_msg =
%{op: :increment, path: path, value: delta, client_op_id: client_op_id}
|> maybe_put_from(from)
pending = Map.put(state.pending_ops, client_op_id, op_msg)
send_to_connection(state.store, op_msg)
%{state | pending_ops: pending}
end
defp do_set_op(op, path, member, from, state) when op in [:add, :remove] do
client_op_id = generate_op_id()
current_set =
case state.cache.read(state.cache_target, state.store, path) do
{:ok, list} when is_list(list) -> list
_ -> []
end
new_set =
case op do
:add -> Enum.uniq([member | current_set])
:remove -> List.delete(current_set, member)
end
:ok = state.cache.write(state.cache_target, state.store, path, new_set, "set", 0)
dispatch_callbacks(state, path, %{
store: state.store,
path: path,
op: op,
value: member,
committed: false,
source: :local,
client_op_id: client_op_id
})
op_msg =
%{op: op, path: path, value: member, client_op_id: client_op_id}
|> maybe_put_from(from)
pending = Map.put(state.pending_ops, client_op_id, op_msg)
send_to_connection(state.store, op_msg)
%{state | pending_ops: pending}
end
defp normalize_capabilities(capabilities) when is_map(capabilities) do
%{
permissions: normalize_permissions(capability_value(capabilities, :permissions)),
scopes: normalize_scopes(capability_value(capabilities, :scopes)),
store_access: normalize_store_access(capability_value(capabilities, :store_access))
}
end
defp normalize_permissions(permissions) when is_map(permissions) do
%{
read: truthy?(capability_value(permissions, :read)),
write: truthy?(capability_value(permissions, :write))
}
end
defp normalize_permissions(_), do: @default_capabilities.permissions
defp normalize_scopes(scopes) when is_list(scopes), do: Enum.map(scopes, &to_string/1)
defp normalize_scopes(_), do: []
defp normalize_store_access(store_access) when is_map(store_access) do
%{
mode: normalize_store_access_mode(capability_value(store_access, :mode)),
store_ids: normalize_store_ids(capability_value(store_access, :store_ids))
}
end
defp normalize_store_access(_), do: @default_capabilities.store_access
defp normalize_store_access_mode(mode) when mode in [:all, "all"], do: :all
defp normalize_store_access_mode(_), do: :selected
defp normalize_store_ids(store_ids) when is_list(store_ids),
do: Enum.map(store_ids, &to_string/1)
defp normalize_store_ids(_), do: []
defp capability_value(map, key) do
Map.get(map, key, Map.get(map, to_string(key)))
end
defp truthy?(value), do: value in [true, "true", "1", 1]
defp rejection_reason(%{"reason" => reason} = payload) do
reason_with_context(reason, payload["scope"], payload["message"])
end
defp rejection_reason(%{reason: reason} = payload) do
reason_with_context(reason, payload[:scope], payload[:message])
end
defp rejection_reason(reason), do: reason_to_atom(reason)
defp reason_with_context(reason, scope, message)
when reason in ["missing_scope", :missing_scope] do
{:missing_scope, scope, message || missing_scope_message(scope)}
end
defp reason_with_context(reason, _scope, message)
when reason in ["store_not_allowed", :store_not_allowed] do
{:store_not_allowed, message || "Token does not have access to this store"}
end
defp reason_with_context(reason, _scope, _message), do: reason_to_atom(reason)
defp missing_scope_message(scope) when is_binary(scope), do: "Token is missing #{scope} scope"
defp missing_scope_message(_scope), do: "Token is missing required scope"
defp rejection_message({_reason, _scope, message}) when is_binary(message), do: message
defp rejection_message({_reason, message}) when is_binary(message), do: message
defp rejection_message(reason) when is_atom(reason), do: Atom.to_string(reason)
defp rejection_message(reason) when is_binary(reason), do: reason
defp rejection_message(_reason), do: "unknown"
defp reason_to_atom(reason) when is_atom(reason), do: reason
defp reason_to_atom("conflict"), do: :conflict
defp reason_to_atom("rate_limited"), do: :rate_limited
defp reason_to_atom("unauthorized"), do: :unauthorized
defp reason_to_atom("store_not_allowed"), do: :store_not_allowed
defp reason_to_atom("missing_scope"), do: :missing_scope
defp reason_to_atom("invalid_op"), do: :invalid_op
defp reason_to_atom("capver_mismatch"), do: :capver_mismatch
defp reason_to_atom("if_match_unsupported_op"), do: :if_match_unsupported_op
defp reason_to_atom("if_match_multi_leaf"), do: :if_match_multi_leaf
defp reason_to_atom("exists"), do: :exists
defp reason_to_atom("invalid_precondition"), do: :invalid_precondition
defp reason_to_atom("if_absent_unsupported_op"), do: :if_absent_unsupported_op
defp reason_to_atom("if_absent_multi_leaf"), do: :if_absent_multi_leaf
defp reason_to_atom("held"), do: :held
defp reason_to_atom("occupied"), do: :occupied
defp reason_to_atom("not_held"), do: :not_held
defp reason_to_atom("fenced"), do: :fenced
# Unknown string reasons are returned as-is so callers see the server's
# raw reason without risking unsafe atom creation.
defp reason_to_atom(reason) when is_binary(reason), do: reason
defp reason_to_atom(_), do: :unknown
defp send_to_connection(store, op_attrs) do
arm_ack_deadline(op_attrs)
case GenServer.whereis(Dust.Connection) do
nil -> :ok
pid -> send(pid, {:send_write, store, with_path_segments(op_attrs)})
end
end
# Only acked ops (those carrying a waiting :from) need the deadline. Re-arming
# on resend after reconnect is fine — a fresh deadline per attempt.
defp arm_ack_deadline(%{from: _, client_op_id: client_op_id}),
do: Process.send_after(self(), {:ack_timeout, client_op_id}, @ack_timeout_ms)
defp arm_ack_deadline(_op_attrs), do: nil
# capver 3 wire shape: send `path_segments` (authoritative) alongside
# `path` (slash-rendered, for back-compat / display). The server
# prefers segments when present.
defp with_path_segments(%{path: path} = op_attrs) when is_binary(path) do
case Dust.Protocol.Path.parse_rendered(path) do
{:ok, segs} -> Map.put(op_attrs, :path_segments, segs)
_ -> op_attrs
end
end
defp with_path_segments(op_attrs), do: op_attrs
defp generate_op_id do
"op_" <> Base.url_encode64(:crypto.strong_rand_bytes(12), padding: false)
end
defp validate_enum_opts(pattern, opts) do
case Keyword.get(opts, :select, :entries) do
:prefixes ->
if valid_prefix_pattern?(pattern), do: :ok, else: {:error, :invalid_pattern_for_prefixes}
_ ->
:ok
end
end
defp valid_prefix_pattern?("**"), do: true
defp valid_prefix_pattern?(pattern), do: String.ends_with?(pattern, "/**")
defp wrap_items(items, :entries) do
Enum.map(items, fn {path, value, type, seq} ->
Dust.Entry.new(path: path, value: value, type: type, revision: seq)
end)
end
defp wrap_items(items, :keys), do: items
defp wrap_items(items, :prefixes), do: items
defp unwrap_value(%{"_type" => "file"} = map, state) do
Dust.FileRef.from_map(map, server_url: state.http_url, token: state.token)
end
defp unwrap_value(other, _state), do: other
defp derive_http_url(nil), do: nil
defp derive_http_url(ws_url) when is_binary(ws_url) do
case URI.parse(ws_url) do
%URI{scheme: scheme, host: host, port: port} when is_binary(host) ->
http_scheme = if scheme in ["wss", "https"], do: "https", else: "http"
port_part = if port, do: ":#{port}", else: ""
"#{http_scheme}://#{host}#{port_part}"
_ ->
nil
end
end
# --- Subtree assembly + canonicalization ---
# `Dust.get/2` and `Dust.entry/2` fall back to subtree assembly when the
# exact path has no leaf — matching the server's `assemble_subtree`
# behaviour. After a map-mode write, leaves live at `<path>.<field>` and
# nothing lives at `<path>` itself.
defp assemble_subtree_value(state, path) do
case state.cache.read_subtree(state.cache_target, state.store, path) do
[] ->
nil
rows ->
{:ok, prefix_segments} = Dust.Protocol.Path.parse_rendered(path)
prefix_len = length(prefix_segments)
Enum.reduce(rows, %{}, fn {p, value, _type, _seq}, acc ->
case Dust.Protocol.Path.parse_rendered(p) do
{:ok, entry_segments} when length(entry_segments) > prefix_len ->
keys = Enum.drop(entry_segments, prefix_len)
put_nested(acc, keys, unwrap_value(value, state))
_ ->
# Exact-path row or unparsable — skip.
acc
end
end)
end
end
defp assemble_subtree_entry(state, path) do
case state.cache.read_subtree(state.cache_target, state.store, path) do
[] ->
nil
rows ->
max_seq = rows |> Enum.map(fn {_, _, _, seq} -> seq end) |> Enum.max()
value = assemble_subtree_value(state, path)
Dust.Entry.new(path: path, value: value, type: "map", revision: max_seq)
end
end
defp put_nested(map, [key], value), do: Map.put(map, key, value)
defp put_nested(map, [key | rest], value) do
child = Map.get(map, key, %{})
Map.put(map, key, put_nested(child, rest, value))
end
# A typed-value map represents a single value (file ref, decimal,
# datetime, etc.), not a record-shaped object. Two cases:
#
# 1. Any struct (Decimal, DateTime, Dust.FileRef, ...). Structs are
# explicitly typed and must not be flattened — `%Decimal{coef: 5}`
# is one value, not a record.
# 2. Maps wearing the canonical `_typed` + `_type` wire shape (used for
# typed values that have already been serialized to the wire).
#
# Plain object maps (no struct, no _typed/_type) are records and must
# flatten to leaves on write to match server canonical form.
defp typed_map?(value) when is_struct(value), do: true
# Mirrors Dust.Sync.ValueCodec.typed_value?/1 on the server. File refs
# are recognized by `_type: "file"` alone; other typed wrappers carry
# both `_typed` and `_type` keys.
defp typed_map?(%{"_type" => "file"}), do: true
defp typed_map?(%{_type: "file"}), do: true
defp typed_map?(%{"_typed" => _, "_type" => _}), do: true
defp typed_map?(%{_typed: _, _type: _}), do: true
defp typed_map?(_), do: false
# Apply a `:set` op to the cache canonically. Plain-map values get
# flattened: descendants under `path` are cleared first, then each leaf is
# written. Typed values and scalars write at the exact path. Mirrors the
# server's `Dust.Sync.Writer.apply_to_entries({:set, ...})` shape so the
# cache stays consistent regardless of whether the data arrived from a
# local optimistic write or a server event echo.
defp apply_set_to_cache(state, path, value, type, seq) do
if is_map(value) and not typed_map?(value) do
_ = state.cache.delete_subtree(state.cache_target, state.store, path)
flatten_into_cache(state, path, value, seq)
else
:ok = state.cache.write(state.cache_target, state.store, path, value, type, seq)
end
end
defp flatten_into_cache(state, base_path, map, seq) when is_map(map) do
{:ok, base_segs} = Dust.Protocol.Path.parse_rendered(base_path)
Enum.each(map, fn {key, value} ->
{:ok, child_segs} = Dust.Protocol.Path.child(base_segs, to_string(key))
{:ok, child_path} = Dust.Protocol.Path.render(child_segs)
if is_map(value) and not typed_map?(value) do
flatten_into_cache(state, child_path, value, seq)
else
leaf_type = detect_type(value)
:ok =
state.cache.write(state.cache_target, state.store, child_path, value, leaf_type, seq)
end
end)
end
defp detect_type(%Decimal{}), do: "decimal"
defp detect_type(%DateTime{}), do: "datetime"
defp detect_type(value) when is_boolean(value), do: "boolean"
defp detect_type(value) when is_map(value), do: "map"
defp detect_type(value) when is_binary(value), do: "string"
defp detect_type(value) when is_integer(value), do: "integer"
defp detect_type(value) when is_float(value), do: "float"
defp detect_type(nil), do: "null"
defp detect_type(_), do: "string"
end