defmodule Dust.SingleFlight do
@moduledoc """
Coordinated distributed cache-fill (a.k.a. single-flight) over a Dust lease.
For a key `K`, one caller in the fleet runs the expensive `fun` while every
other caller rides the published result. Read-through, cache, memoize: this
is the "compute once, share many" primitive.
**It is at-least-once / single-flight *while Dust is reachable*, NOT
exactly-once.** `lease_ttl` expiry mid-run, a crash before publish, or the
`on_unavailable: :run_local` degrade can all cause a second run. `fun` MUST
be idempotent, published values MUST be small pointers (the bytes belong in
S3/your DB), and followers MUST apply results idempotently.
See `Dust.single_flight/4`.
"""
alias Dust.Flight
@default_lease_ttl 30_000
@heartbeat_divisor 3
@max_backoff_ms 250
@doc """
Run `fun` under coordination. See `Dust.single_flight/4` for the contract.
"""
def run(store, key, fun, opts \\ []) when is_function(fun, 1) do
cfg = build_cfg(key, opts)
# Fast path: a fresh local hit returns with no network and no lease.
case fresh_cached(store, key, cfg.fresh) do
{:ok, value} ->
{:ok, Flight.new(value: value, source: :cached)}
_miss_or_stale ->
coordinate(store, key, fun, cfg, monotonic_deadline(cfg.wait_timeout))
end
end
# --- Coordination loop ---
defp coordinate(store, key, fun, cfg, deadline) do
case Dust.lease(store, cfg.lock_key, ttl_ms: cfg.lease_ttl) do
{:ok, lease} ->
won(store, key, fun, lease, cfg)
{:error, :held} ->
case await(store, key, cfg, deadline) do
{:ok, flight} ->
{:ok, flight}
# The holder released, or its lease may now be stealable — try to
# (re-)acquire, with a jittered backoff so waiters don't re-claim the
# single writer in lockstep. Bounded by the overall deadline.
:retry ->
if remaining_ms(deadline) > 0 do
Process.sleep(min(backoff(), remaining_ms(deadline)))
coordinate(store, key, fun, cfg, deadline)
else
on_timeout(store, key, cfg)
end
end
{:error, :unavailable} ->
degraded(store, key, fun, cfg)
{:error, reason} ->
{:error, reason}
end
end
# --- Winner: run fun under a heartbeat-renewed lease, publish fenced ---
defp won(store, key, fun, lease, cfg) do
hb = start_heartbeat(store, lease, cfg.lease_ttl)
# No rescue (house rule): if fun raises, this process dies, the linked
# heartbeat dies with it, renewals stop, and the lease expires at
# lease_ttl — others re-elect. That is the documented recovery bound.
result = fun.(lease)
stop_heartbeat(hb)
case result do
{:publish, value} ->
case Dust.put(store, key, encode(value), fence: lease) do
{:ok, _seq} ->
_ = Dust.release(store, lease)
{:ok, Flight.new(value: normalize(value), source: :computed)}
{:error, :fenced} ->
# We lost the lease mid-run; a newer holder owns the result now.
{:error, :fenced}
{:error, other} ->
_ = Dust.release(store, lease)
{:error, other}
end
{:abort, reason} ->
_ = Dust.release(store, lease)
{:error, reason}
other ->
_ = Dust.release(store, lease)
raise ArgumentError,
"single_flight fun must return {:publish, value} | {:abort, reason}, got: #{inspect(other)}"
end
end
# --- Loser: await the winner's result (or a release → re-elect) ---
defp await(store, key, cfg, deadline) do
parent = self()
# Subscribe to BOTH the result key (winner published) and the lock key
# (winner released/expired → re-elect) BEFORE re-reading, to close the
# lost-wakeup window. monitor: true auto-cleans if this caller dies.
kref =
Dust.on(store, key, fn ev -> send(parent, {:sf_key, ev}) end,
monitor: true,
mode: :committed
)
lref =
Dust.on(store, cfg.lock_key, fn ev -> send(parent, {:sf_lock, ev}) end,
monitor: true,
mode: :committed
)
result =
case fresh_cached(store, key, cfg.fresh) do
{:ok, value} ->
{:ok, Flight.new(value: value, source: :awaited)}
_ ->
# Wait at most until the soonest a crashed holder's lease could become
# stealable (lease_ttl), bounded by the overall deadline. On expiry we
# return :retry so the coordinator re-attempts the (now stealable)
# lease rather than giving up — that is how a dead winner is taken over.
wait = min(cfg.lease_ttl, remaining_ms(deadline))
if wait <= 0, do: :retry, else: wait_loop(store, key, cfg, monotonic_deadline(wait))
end
Dust.off(store, kref)
Dust.off(store, lref)
flush_sf_messages()
result
end
defp wait_loop(store, key, cfg, until) do
remaining = remaining_ms(until)
if remaining <= 0 do
:retry
else
receive do
{:sf_key, _ev} ->
# Re-validate the predicate on every wake — an event passing the gate
# (e.g. a delete) does not by itself mean "fresh value present".
case fresh_cached(store, key, cfg.fresh) do
{:ok, value} -> {:ok, Flight.new(value: value, source: :awaited)}
_ -> wait_loop(store, key, cfg, until)
end
{:sf_lock, ev} ->
if lock_released?(ev), do: :retry, else: wait_loop(store, key, cfg, until)
after
remaining -> :retry
end
end
end
# A committed release of the lock key means the fill ended without a fresh
# result reaching us → re-elect. (A steal arrives as an :lease event, not a
# release: a new holder exists, so keep awaiting the result instead.)
defp lock_released?(%{op: :release}), do: true
defp lock_released?(_), do: false
defp on_timeout(store, key, cfg) do
# Freshness mode with a last-known value → serve it stale rather than fail.
case last_value(store, key) do
{:ok, value} when cfg.fresh != nil ->
{:ok, Flight.new(value: value, source: :cached, stale?: true)}
_ ->
{:error, :timeout}
end
end
# --- Degraded: Dust unreachable ---
defp degraded(_store, _key, _fun, %{on_unavailable: :error}), do: {:error, :unavailable}
defp degraded(store, key, fun, %{on_unavailable: :run_local}) do
# No lease available — run uncoordinated (possible duplicate work across
# nodes; documented). In-node duplicate suppression would need the
# coalescing Registry (deferred).
case fun.(nil) do
{:publish, value} ->
_ = best_effort_put(store, key, encode(value))
{:ok, Flight.new(value: normalize(value), source: :computed, coordinated?: false)}
{:abort, reason} ->
{:error, reason}
other ->
raise ArgumentError,
"single_flight fun must return {:publish, value} | {:abort, reason}, got: #{inspect(other)}"
end
end
# --- Heartbeat (keep the lease alive for the duration of fun) ---
defp start_heartbeat(store, lease, ttl) do
interval = max(div(ttl, @heartbeat_divisor), 1)
spawn_link(fn -> heartbeat_loop(store, lease, ttl, interval) end)
end
defp heartbeat_loop(store, lease, ttl, interval) do
receive do
:stop -> :ok
after
interval ->
case Dust.renew(store, lease, ttl_ms: ttl) do
{:ok, _} -> heartbeat_loop(store, lease, ttl, interval)
# Lost the lease (stolen/expired) or unavailable — stop renewing.
{:error, _} -> :ok
end
end
end
defp stop_heartbeat(pid) do
if Process.alive?(pid), do: send(pid, :stop)
:ok
end
# --- Cache reads / value codec ---
# The published value is stored JSON-encoded as a scalar leaf so Dust's
# plain-map flattening doesn't shred a pointer into a subtree. Every reader
# (fast path, await, on_timeout) decodes it, so all sources return the same
# shape.
defp fresh_cached(store, key, fresh) do
case last_value(store, key) do
{:ok, value} ->
if fresh == nil or fresh.(value), do: {:ok, value}, else: :stale
:absent ->
:absent
end
end
defp last_value(store, key) do
case Dust.entry(store, key) do
{:ok, %Dust.Entry{value: raw, type: type}} when type != "lease" and is_binary(raw) ->
{:ok, Jason.decode!(raw)}
_ ->
:absent
end
end
defp encode(value), do: Jason.encode!(value)
defp normalize(value), do: value |> Jason.encode!() |> Jason.decode!()
# put/3 is fire-and-forget (optimistic local write + queued op; returns :ok
# even when disconnected), so a degraded publish never blocks or crashes the
# local computation — no rescue needed.
defp best_effort_put(store, key, encoded) do
Dust.put(store, key, encoded)
end
# --- Config + time helpers ---
defp build_cfg(key, opts) do
lease_ttl = Keyword.get(opts, :lease_ttl, @default_lease_ttl)
%{
fresh: Keyword.get(opts, :fresh?),
lease_ttl: lease_ttl,
wait_timeout: Keyword.get(opts, :wait_timeout, lease_ttl + 5_000),
on_unavailable: Keyword.get(opts, :on_unavailable, :run_local),
lock_key: Keyword.get(opts, :lock_key, "_dust:sf/" <> key)
}
end
defp monotonic_deadline(ms), do: System.monotonic_time(:millisecond) + ms
defp remaining_ms(deadline), do: deadline - System.monotonic_time(:millisecond)
defp backoff, do: :rand.uniform(@max_backoff_ms)
defp flush_sf_messages do
receive do
{:sf_key, _} -> flush_sf_messages()
{:sf_lock, _} -> flush_sf_messages()
after
0 -> :ok
end
end
end