defmodule Lockstep do
@moduledoc """
Coyote-style controlled concurrency testing for BEAM.
See `Lockstep.Test` for the ExUnit integration. The functions in this
module are the runtime API used inside controlled tests:
Lockstep.spawn(fn -> ... end)
Lockstep.send(target, message)
Lockstep.recv()
All three are sync points: each call hands control back to the controller,
which picks the next process to run per the configured strategy.
## Patterns and gotchas
Real-world usage of Lockstep — especially against distributed systems
like Phoenix.PubSub, Phoenix.Tracker, or hand-rolled Raft-style
protocols — surfaces a few patterns worth calling out up front.
### Choosing a strategy
See `Lockstep.Strategy` for the full discussion. Short version: the
default `:pct` strategy is best at finding partial-order races where
any of several priority swaps work. For races that require the
scheduler to consistently pick a *specific* proc over several
consecutive sync points, `:pos` works better — PCT's priority
shuffle under-explores those sequences. Real example from Lockstep's
own test suite: `test/leader_follower_register_test.exs` async-
replication staleness wasn't found in 100 PCT iterations but was
found at iteration 1 under POS. Rule of thumb: if PCT can't find
your race in ~50 iterations, try POS before increasing iterations.
### Driving timed protocols: explicit triggers, not `send_after`
Distributed protocols often have time-driven elements: election
timeouts, heartbeats, retry timers. There's a temptation to model
them in tests with `Lockstep.send_after`, hoping the strategy will
explore "what if the timeout fires before the response arrives"
schedules. **This usually doesn't work as intended.**
Lockstep's controller fires timers ONLY when every alive proc is
blocked on `recv_match`. So timer fires get serialized with proc
execution: timer 1 fires → recipient becomes ready → strategy picks
it → it processes → returns to recv → all blocked again → timer 2
fires. Multiple timers at the same `fire_at` don't actually fire
concurrently — they're explored one at a time.
For tests where you want to surface concurrent-trigger races,
drive the triggering events explicitly via `Lockstep.send` from
the test body. `test/raft_election_test.exs` uses this pattern
successfully:
# Trigger an election on every node concurrently. They'll all
# become candidates for term 1 and race for the majority.
for {_id, pid} <- nodes do
Lockstep.spawn(fn ->
Lockstep.send(pid, :trigger_election)
end)
end
This way the strategy interleaves the per-node election handlers
freely — the very interleaving the bug needs.
### Avoiding timer pile-up
When a long-running test does need timers (heartbeats, retries),
every handler that schedules a fresh timer should cancel or
invalidate the previous one. Naive code:
def handle_info(:tick, state) do
Lockstep.send_after(self(), :tick, 100) # leak!
{:noreply, work(state)}
end
Each tick adds another pending timer to the queue. After a few
hundred ticks, virtual-time advancement is dominated by trivial
timer fires that consume `max_steps` budget. Two clean fixes:
1. **Cancel and re-schedule:**
def handle_info(:tick, state) do
if state.timer, do: Lockstep.cancel_timer(state.timer)
ref = Lockstep.send_after(self(), :tick, 100)
{:noreply, %{state | timer: ref} |> work()}
end
2. **Epoch-tagged messages** (no need to cancel; stale fires are
ignored on receipt):
def handle_info({:tick, epoch}, %{epoch: epoch} = state) do
new_epoch = epoch + 1
Lockstep.send_after(self(), {:tick, new_epoch}, 100)
{:noreply, %{state | epoch: new_epoch} |> work()}
end
def handle_info({:tick, _stale}, state), do: {:noreply, state}
The Raft demo uses pattern #2.
### Multi-step sync chains
Operations like `Lockstep.GenServer.call/2` go through several sync
points: a monitor, a send, and a selective receive. A test that
performs N gen_server calls puts ~3N controller calls on each iteration's
step budget. For long workloads or chatty libraries (Phoenix.Tracker
is a notorious example — every heartbeat triggers ~10 sync points
through Registry / persistent_term / ETS), `max_steps` may need to
be set substantially higher than for simple race-hunt scenarios.
Start with `5_000` for tight micro-races and scale to `50_000+` for
full Phoenix integration tests.
## v0.1 → v0.5 progression
v0.1 supported bare `send`/`receive`/`spawn` only. v0.5 added
GenServer, Task, Registry, Supervisor, GenStatem wrappers, virtual
clock, monitors, links, trap_exit. v1.0 (current) adds distributed-
cluster simulation (`Lockstep.Cluster`), per-node state isolation
(Phase D), and Jepsen-level checker infrastructure
(`Lockstep.History`, `Lockstep.Checker.{Linearizable,
SequentialConsistency, Causal}`, `Lockstep.Generator`,
`Lockstep.Model.Register`).
"""
@doc """
Spawn a new managed process. The function runs under the controller.
"""
@spec spawn((-> any())) :: pid()
def spawn(fun) when is_function(fun, 0) do
ctl = controller!()
Lockstep.Controller.request_spawn(ctl, self(), fun, ancestors: ancestors())
end
@doc """
Send a message to another managed process. The send is recorded by the
controller and the message is queued in the controller-side mailbox of
the target. Returns `:ok`.
"""
@spec send(pid(), any()) :: :ok
def send(target, message) when is_pid(target) do
ctl = controller!()
Lockstep.Controller.send_msg(ctl, self(), target, message)
end
def send(target, message) when is_atom(target) do
case Lockstep.Process.whereis(target) do
nil ->
# Mirror Kernel.send/2's behavior with an unregistered name:
# raise ArgumentError. (Real BEAM raises in this case; some
# callers rescue it, e.g. `Horde.RegistryImpl.on_diffs`.)
raise ArgumentError,
"Lockstep.send: no process registered as #{inspect(target)}"
pid when is_pid(pid) ->
ctl = controller!()
Lockstep.Controller.send_msg(ctl, self(), pid, message)
end
end
def send({:via, mod, term}, message) do
case mod.whereis_name(term) do
:undefined ->
raise ArgumentError,
"Lockstep.send: via #{inspect(mod)} resolved #{inspect(term)} to :undefined"
pid when is_pid(pid) ->
Lockstep.send(pid, message)
end
end
def send({:global, name}, message) do
case Lockstep.Global.whereis_name(name) do
:undefined ->
raise ArgumentError,
"Lockstep.send: no global registration for #{inspect(name)}"
pid ->
Lockstep.send(pid, message)
end
end
# `{registered_name, node_atom}` -- BEAM's syntax for remote
# registered-name sends. In single-node simulation we treat this
# as a local lookup of `registered_name` (ignoring the node atom).
# `{name, node}` form — look up `name` registered on `node`
# specifically (not on the caller's node). Used by Groot, Phoenix.PubSub,
# and other libraries that target a remote node's named GenServer
# via `GenServer.cast({Module, remote_node}, msg)`.
#
# Mirrors OTP `send({name, node}, msg)` semantics: silently drop
# if name isn't registered on that node (OTP also drops if the
# remote node is down).
def send({name, node}, message) when is_atom(name) and is_atom(node) do
case Lockstep.Controller.cluster_whereis_on(controller!(), node, name) do
:undefined ->
:ok
pid when is_pid(pid) ->
Lockstep.send(pid, message)
end
end
@doc """
Receive the next message in the calling process's controller-side
mailbox. Blocks (in the controller) until the strategy picks this
process to receive. No pattern matching: you get the next message in
delivery order — for selective receive use `recv_first/1`.
"""
@spec recv() :: any()
def recv do
ctl = controller!()
Lockstep.Controller.recv_msg(ctl, self())
end
@doc """
Selective receive: scan the controller-side mailbox in delivery order
and return the *first* message for which `predicate` returns `true`.
Other messages stay in the mailbox in their original order.
Equivalent to BEAM's `receive` with a pattern, except the patterns are
expressed as a predicate function:
msg = Lockstep.recv_first(fn
{^ref, _reply} -> true
_ -> false
end)
Blocks (in the controller) until a message matching `predicate` is
available. Predicate failures (raising/throwing inside it) count as
"no match" so a buggy predicate cannot trip the controller.
"""
@spec recv_first((any() -> boolean())) :: any()
def recv_first(predicate) when is_function(predicate, 1) do
ctl = controller!()
Lockstep.Controller.recv_match(ctl, self(), predicate)
end
@doc """
Read the controller's *virtual* clock. Time only advances when the
controller would otherwise deadlock (everyone blocked on receive); at
that point virtual time jumps to the next pending timer's fire_at.
Returns milliseconds since iteration start (0 at the first call).
"""
@spec now() :: non_neg_integer()
def now do
Lockstep.Controller.now(controller!(), self())
end
@doc """
Schedule `message` to be delivered to `target` after `delay_ms`
milliseconds of virtual time. Returns a timer reference that can be
passed to `cancel_timer/1`.
Same shape as `Process.send_after/3`. The timer fires when the
controller advances virtual time, which happens automatically as soon
as no managed process is ready and the next timer is the only way to
make progress.
"""
@spec send_after(pid(), any(), non_neg_integer()) :: reference()
def send_after(target, message, delay_ms)
when is_pid(target) and is_integer(delay_ms) and delay_ms >= 0 do
Lockstep.Controller.send_after(controller!(), self(), target, message, delay_ms)
end
@doc """
Cancel a previously scheduled timer. Returns the number of ms that
remained on the timer if it was cancelled before firing, or `false`
if the timer had already fired or never existed.
"""
@spec cancel_timer(reference()) :: non_neg_integer() | false
def cancel_timer(ref) when is_reference(ref) do
Lockstep.Controller.cancel_timer(controller!(), self(), ref)
end
@doc """
Monitor `target_pid`. Returns a reference; when `target_pid` exits,
the calling process receives `{:DOWN, ref, :process, target_pid, reason}`
in its controller-side mailbox. Same shape as `Process.monitor/1`,
except delivery happens through Lockstep's mailbox (so it's
observable via `Lockstep.recv`/`recv_first`) instead of BEAM's.
"""
@spec monitor(pid() | atom() | {atom(), node()} | {:via, module(), term()} | {:global, term()}) ::
reference()
def monitor(target_pid) when is_pid(target_pid) do
Lockstep.Controller.monitor(controller!(), self(), target_pid)
end
def monitor(name) when is_atom(name) do
case Lockstep.Process.whereis(name) do
nil -> spawn_dead_monitor(name, :noproc)
pid -> monitor(pid)
end
end
def monitor({name, node}) when is_atom(name) and is_atom(node) do
monitor(name)
end
def monitor({:via, mod, term}) do
case mod.whereis_name(term) do
:undefined -> spawn_dead_monitor({:via, mod, term}, :noproc)
pid -> monitor(pid)
end
end
def monitor({:global, name}) do
case Lockstep.Global.whereis_name(name) do
:undefined -> spawn_dead_monitor({:global, name}, :noproc)
pid -> monitor(pid)
end
end
# When the resolution target doesn't exist, OTP's Process.monitor
# delivers a `:DOWN` immediately. We mirror that by monitoring a
# transient dead pid -- the controller's monitor handler already
# fires `:DOWN` immediately for unmanaged or dead pids.
defp spawn_dead_monitor(_target, _reason) do
dead = :erlang.spawn(fn -> :ok end)
# Yield once so the spawned proc exits before the monitor sees it.
:erlang.yield()
monitor(dead)
end
@doc """
Stop monitoring. Same shape as `Process.demonitor/2`. The `:flush`
option removes any already-delivered `:DOWN` for this ref from the
caller's mailbox.
"""
@spec demonitor(reference(), [atom()]) :: true
def demonitor(ref, opts \\ []) when is_reference(ref) and is_list(opts) do
Lockstep.Controller.demonitor(controller!(), self(), ref, opts)
end
@doc """
Spawn a managed child process and link to it. Same as `Lockstep.spawn/1`
followed by `Lockstep.link/1`, but atomic — there's no window where
the child has been spawned but not yet linked.
When the linked process exits abnormally, the caller dies too unless
it has set `flag(:trap_exit, true)`. Trapping converts the death into
a `{:EXIT, child, reason}` message in the caller's mailbox.
"""
@spec spawn_link((-> any())) :: pid()
def spawn_link(fun) when is_function(fun, 0) do
ctl = controller!()
Lockstep.Controller.request_spawn_link(ctl, self(), fun, ancestors: ancestors())
end
# Read the calling process's `:"$ancestors"`. Used by `spawn` /
# `spawn_link` to propagate the OTP ancestor chain to children
# — some libraries (ConCache, DynamicSupervisor) rely on it to
# discover their parent.
defp ancestors, do: Process.get(:"$ancestors", [])
@doc """
Same shape as `Process.link/1`. Establishes a bidirectional link.
Linking a dead managed process delivers `{:EXIT, target, :noproc}`
immediately (if trap_exit is on) or kills the caller (if not).
"""
@spec link(pid()) :: true
def link(target) when is_pid(target) do
Lockstep.Controller.link(controller!(), self(), target)
end
@doc "Same shape as `Process.unlink/1`."
@spec unlink(pid()) :: true
def unlink(target) when is_pid(target) do
Lockstep.Controller.unlink(controller!(), self(), target)
end
@doc """
Same shape as `Process.flag/2`. Currently only `:trap_exit` is
modeled at the controller level; other flags are accepted and
return a placeholder previous value but don't change semantics.
"""
@spec flag(atom(), any()) :: any()
def flag(flag, value) when is_atom(flag) do
Lockstep.Controller.flag(controller!(), self(), flag, value)
end
@doc """
Same shape as `Process.alive?/1` but consults the controller's view.
Returns `true` if `target_pid` is a managed process that has not yet
exited under Lockstep's controller. For pids the controller doesn't
know about (e.g., processes from outside the iteration), falls back
to vanilla `Process.alive?/1`.
Calling `alive?/1` is a sync point — the strategy may interleave
another process between this check and any subsequent action. That's
the point: TOCTOU bugs (`if Process.alive?(pid), do: GenServer.call(pid, ...)`)
surface here.
"""
@spec alive?(pid()) :: boolean()
def alive?(target_pid) when is_pid(target_pid) do
Lockstep.Controller.alive?(controller!(), self(), target_pid)
end
@doc """
Virtual-time sleep. Same shape as `Process.sleep/1`. Implemented as
`send_after(self(), sentinel, ms)` followed by `recv_first` waiting
for the sentinel — the controller advances virtual time forward to
fire the timer, which yields control to other managed processes
while we "sleep."
"""
@spec sleep(non_neg_integer() | :infinity) :: :ok
def sleep(:infinity) do
Lockstep.recv_first(fn _ -> false end)
:ok
end
def sleep(ms) when is_integer(ms) and ms >= 0 do
sentinel = make_ref()
_ = send_after(self(), {:__lockstep_sleep__, sentinel}, ms)
Lockstep.recv_first(fn
{:__lockstep_sleep__, ^sentinel} -> true
_ -> false
end)
:ok
end
@doc """
Run a controlled test body N times. Used by `Lockstep.Test.ctest/3`;
most users do not call this directly.
"""
defdelegate run(test_fun, opts), to: Lockstep.Runner
@doc false
def controller do
Process.get(:lockstep_controller)
end
defp controller! do
case Process.get(:lockstep_controller) do
nil ->
raise RuntimeError, """
No Lockstep controller in scope.
Lockstep.send/recv/spawn must be called from within a Lockstep test
(use Lockstep.Test) or via Lockstep.Runner.run/2.
"""
pid when is_pid(pid) ->
pid
end
end
end