lib/lockstep.ex

defmodule Lockstep do
  @moduledoc """
  Coyote-style controlled concurrency testing for BEAM.

  See `Lockstep.Test` for the ExUnit integration. The functions in this
  module are the runtime API used inside controlled tests:

      Lockstep.spawn(fn -> ... end)
      Lockstep.send(target, message)
      Lockstep.recv()

  All three are sync points: each call hands control back to the controller,
  which picks the next process to run per the configured strategy.

  ## Patterns and gotchas

  Real-world usage of Lockstep — especially against distributed systems
  like Phoenix.PubSub, Phoenix.Tracker, or hand-rolled Raft-style
  protocols — surfaces a few patterns worth calling out up front.

  ### Choosing a strategy

  See `Lockstep.Strategy` for the full discussion. Short version: the
  default `:pct` strategy is best at finding partial-order races where
  any of several priority swaps work. For races that require the
  scheduler to consistently pick a *specific* proc over several
  consecutive sync points, `:pos` works better — PCT's priority
  shuffle under-explores those sequences. Real example from Lockstep's
  own test suite: `test/leader_follower_register_test.exs` async-
  replication staleness wasn't found in 100 PCT iterations but was
  found at iteration 1 under POS. Rule of thumb: if PCT can't find
  your race in ~50 iterations, try POS before increasing iterations.

  ### Driving timed protocols: explicit triggers, not `send_after`

  Distributed protocols often have time-driven elements: election
  timeouts, heartbeats, retry timers. There's a temptation to model
  them in tests with `Lockstep.send_after`, hoping the strategy will
  explore "what if the timeout fires before the response arrives"
  schedules. **This usually doesn't work as intended.**

  Lockstep's controller fires timers ONLY when every alive proc is
  blocked on `recv_match`. So timer fires get serialized with proc
  execution: timer 1 fires → recipient becomes ready → strategy picks
  it → it processes → returns to recv → all blocked again → timer 2
  fires. Multiple timers at the same `fire_at` don't actually fire
  concurrently — they're explored one at a time.

  For tests where you want to surface concurrent-trigger races,
  drive the triggering events explicitly via `Lockstep.send` from
  the test body. `test/raft_election_test.exs` uses this pattern
  successfully:

      # Trigger an election on every node concurrently. They'll all
      # become candidates for term 1 and race for the majority.
      for {_id, pid} <- nodes do
        Lockstep.spawn(fn ->
          Lockstep.send(pid, :trigger_election)
        end)
      end

  This way the strategy interleaves the per-node election handlers
  freely — the very interleaving the bug needs.

  ### Avoiding timer pile-up

  When a long-running test does need timers (heartbeats, retries),
  every handler that schedules a fresh timer should cancel or
  invalidate the previous one. Naive code:

      def handle_info(:tick, state) do
        Lockstep.send_after(self(), :tick, 100)  # leak!
        {:noreply, work(state)}
      end

  Each tick adds another pending timer to the queue. After a few
  hundred ticks, virtual-time advancement is dominated by trivial
  timer fires that consume `max_steps` budget. Two clean fixes:

    1. **Cancel and re-schedule:**

           def handle_info(:tick, state) do
             if state.timer, do: Lockstep.cancel_timer(state.timer)
             ref = Lockstep.send_after(self(), :tick, 100)
             {:noreply, %{state | timer: ref} |> work()}
           end

    2. **Epoch-tagged messages** (no need to cancel; stale fires are
       ignored on receipt):

           def handle_info({:tick, epoch}, %{epoch: epoch} = state) do
             new_epoch = epoch + 1
             Lockstep.send_after(self(), {:tick, new_epoch}, 100)
             {:noreply, %{state | epoch: new_epoch} |> work()}
           end

           def handle_info({:tick, _stale}, state), do: {:noreply, state}

  The Raft demo uses pattern #2.

  ### Multi-step sync chains

  Operations like `Lockstep.GenServer.call/2` go through several sync
  points: a monitor, a send, and a selective receive. A test that
  performs N gen_server calls puts ~3N controller calls on each iteration's
  step budget. For long workloads or chatty libraries (Phoenix.Tracker
  is a notorious example — every heartbeat triggers ~10 sync points
  through Registry / persistent_term / ETS), `max_steps` may need to
  be set substantially higher than for simple race-hunt scenarios.
  Start with `5_000` for tight micro-races and scale to `50_000+` for
  full Phoenix integration tests.

  ## v0.1 → v0.5 progression

  v0.1 supported bare `send`/`receive`/`spawn` only. v0.5 added
  GenServer, Task, Registry, Supervisor, GenStatem wrappers, virtual
  clock, monitors, links, trap_exit. v1.0 (current) adds distributed-
  cluster simulation (`Lockstep.Cluster`), per-node state isolation
  (Phase D), and Jepsen-level checker infrastructure
  (`Lockstep.History`, `Lockstep.Checker.{Linearizable,
  SequentialConsistency, Causal}`, `Lockstep.Generator`,
  `Lockstep.Model.Register`).
  """

  @doc """
  Spawn a new managed process. The function runs under the controller.
  """
  @spec spawn((-> any())) :: pid()
  def spawn(fun) when is_function(fun, 0) do
    ctl = controller!()
    Lockstep.Controller.request_spawn(ctl, self(), fun, ancestors: ancestors())
  end

  @doc """
  Send a message to another managed process. The send is recorded by the
  controller and the message is queued in the controller-side mailbox of
  the target. Returns `:ok`.
  """
  @spec send(pid(), any()) :: :ok
  def send(target, message) when is_pid(target) do
    ctl = controller!()
    Lockstep.Controller.send_msg(ctl, self(), target, message)
  end

  def send(target, message) when is_atom(target) do
    case Lockstep.Process.whereis(target) do
      nil ->
        # Mirror Kernel.send/2's behavior with an unregistered name:
        # raise ArgumentError. (Real BEAM raises in this case; some
        # callers rescue it, e.g. `Horde.RegistryImpl.on_diffs`.)
        raise ArgumentError,
              "Lockstep.send: no process registered as #{inspect(target)}"

      pid when is_pid(pid) ->
        ctl = controller!()
        Lockstep.Controller.send_msg(ctl, self(), pid, message)
    end
  end

  def send({:via, mod, term}, message) do
    case mod.whereis_name(term) do
      :undefined ->
        raise ArgumentError,
              "Lockstep.send: via #{inspect(mod)} resolved #{inspect(term)} to :undefined"

      pid when is_pid(pid) ->
        Lockstep.send(pid, message)
    end
  end

  def send({:global, name}, message) do
    case Lockstep.Global.whereis_name(name) do
      :undefined ->
        raise ArgumentError,
              "Lockstep.send: no global registration for #{inspect(name)}"

      pid ->
        Lockstep.send(pid, message)
    end
  end

  # `{registered_name, node_atom}` -- BEAM's syntax for remote
  # registered-name sends. In single-node simulation we treat this
  # as a local lookup of `registered_name` (ignoring the node atom).
  # `{name, node}` form — look up `name` registered on `node`
  # specifically (not on the caller's node). Used by Groot, Phoenix.PubSub,
  # and other libraries that target a remote node's named GenServer
  # via `GenServer.cast({Module, remote_node}, msg)`.
  #
  # Mirrors OTP `send({name, node}, msg)` semantics: silently drop
  # if name isn't registered on that node (OTP also drops if the
  # remote node is down).
  def send({name, node}, message) when is_atom(name) and is_atom(node) do
    case Lockstep.Controller.cluster_whereis_on(controller!(), node, name) do
      :undefined ->
        :ok

      pid when is_pid(pid) ->
        Lockstep.send(pid, message)
    end
  end

  @doc """
  Receive the next message in the calling process's controller-side
  mailbox. Blocks (in the controller) until the strategy picks this
  process to receive. No pattern matching: you get the next message in
  delivery order — for selective receive use `recv_first/1`.
  """
  @spec recv() :: any()
  def recv do
    ctl = controller!()
    Lockstep.Controller.recv_msg(ctl, self())
  end

  @doc """
  Selective receive: scan the controller-side mailbox in delivery order
  and return the *first* message for which `predicate` returns `true`.
  Other messages stay in the mailbox in their original order.

  Equivalent to BEAM's `receive` with a pattern, except the patterns are
  expressed as a predicate function:

      msg = Lockstep.recv_first(fn
        {^ref, _reply} -> true
        _              -> false
      end)

  Blocks (in the controller) until a message matching `predicate` is
  available. Predicate failures (raising/throwing inside it) count as
  "no match" so a buggy predicate cannot trip the controller.
  """
  @spec recv_first((any() -> boolean())) :: any()
  def recv_first(predicate) when is_function(predicate, 1) do
    ctl = controller!()
    Lockstep.Controller.recv_match(ctl, self(), predicate)
  end

  @doc """
  Read the controller's *virtual* clock. Time only advances when the
  controller would otherwise deadlock (everyone blocked on receive); at
  that point virtual time jumps to the next pending timer's fire_at.
  Returns milliseconds since iteration start (0 at the first call).
  """
  @spec now() :: non_neg_integer()
  def now do
    Lockstep.Controller.now(controller!(), self())
  end

  @doc """
  Schedule `message` to be delivered to `target` after `delay_ms`
  milliseconds of virtual time. Returns a timer reference that can be
  passed to `cancel_timer/1`.

  Same shape as `Process.send_after/3`. The timer fires when the
  controller advances virtual time, which happens automatically as soon
  as no managed process is ready and the next timer is the only way to
  make progress.
  """
  @spec send_after(pid(), any(), non_neg_integer()) :: reference()
  def send_after(target, message, delay_ms)
      when is_pid(target) and is_integer(delay_ms) and delay_ms >= 0 do
    Lockstep.Controller.send_after(controller!(), self(), target, message, delay_ms)
  end

  @doc """
  Cancel a previously scheduled timer. Returns the number of ms that
  remained on the timer if it was cancelled before firing, or `false`
  if the timer had already fired or never existed.
  """
  @spec cancel_timer(reference()) :: non_neg_integer() | false
  def cancel_timer(ref) when is_reference(ref) do
    Lockstep.Controller.cancel_timer(controller!(), self(), ref)
  end

  @doc """
  Monitor `target_pid`. Returns a reference; when `target_pid` exits,
  the calling process receives `{:DOWN, ref, :process, target_pid, reason}`
  in its controller-side mailbox. Same shape as `Process.monitor/1`,
  except delivery happens through Lockstep's mailbox (so it's
  observable via `Lockstep.recv`/`recv_first`) instead of BEAM's.
  """
  @spec monitor(pid() | atom() | {atom(), node()} | {:via, module(), term()} | {:global, term()}) ::
          reference()
  def monitor(target_pid) when is_pid(target_pid) do
    Lockstep.Controller.monitor(controller!(), self(), target_pid)
  end

  def monitor(name) when is_atom(name) do
    case Lockstep.Process.whereis(name) do
      nil -> spawn_dead_monitor(name, :noproc)
      pid -> monitor(pid)
    end
  end

  def monitor({name, node}) when is_atom(name) and is_atom(node) do
    monitor(name)
  end

  def monitor({:via, mod, term}) do
    case mod.whereis_name(term) do
      :undefined -> spawn_dead_monitor({:via, mod, term}, :noproc)
      pid -> monitor(pid)
    end
  end

  def monitor({:global, name}) do
    case Lockstep.Global.whereis_name(name) do
      :undefined -> spawn_dead_monitor({:global, name}, :noproc)
      pid -> monitor(pid)
    end
  end

  # When the resolution target doesn't exist, OTP's Process.monitor
  # delivers a `:DOWN` immediately. We mirror that by monitoring a
  # transient dead pid -- the controller's monitor handler already
  # fires `:DOWN` immediately for unmanaged or dead pids.
  defp spawn_dead_monitor(_target, _reason) do
    dead = :erlang.spawn(fn -> :ok end)
    # Yield once so the spawned proc exits before the monitor sees it.
    :erlang.yield()
    monitor(dead)
  end

  @doc """
  Stop monitoring. Same shape as `Process.demonitor/2`. The `:flush`
  option removes any already-delivered `:DOWN` for this ref from the
  caller's mailbox.
  """
  @spec demonitor(reference(), [atom()]) :: true
  def demonitor(ref, opts \\ []) when is_reference(ref) and is_list(opts) do
    Lockstep.Controller.demonitor(controller!(), self(), ref, opts)
  end

  @doc """
  Spawn a managed child process and link to it. Same as `Lockstep.spawn/1`
  followed by `Lockstep.link/1`, but atomic — there's no window where
  the child has been spawned but not yet linked.

  When the linked process exits abnormally, the caller dies too unless
  it has set `flag(:trap_exit, true)`. Trapping converts the death into
  a `{:EXIT, child, reason}` message in the caller's mailbox.
  """
  @spec spawn_link((-> any())) :: pid()
  def spawn_link(fun) when is_function(fun, 0) do
    ctl = controller!()
    Lockstep.Controller.request_spawn_link(ctl, self(), fun, ancestors: ancestors())
  end

  # Read the calling process's `:"$ancestors"`. Used by `spawn` /
  # `spawn_link` to propagate the OTP ancestor chain to children
  # — some libraries (ConCache, DynamicSupervisor) rely on it to
  # discover their parent.
  defp ancestors, do: Process.get(:"$ancestors", [])

  @doc """
  Same shape as `Process.link/1`. Establishes a bidirectional link.
  Linking a dead managed process delivers `{:EXIT, target, :noproc}`
  immediately (if trap_exit is on) or kills the caller (if not).
  """
  @spec link(pid()) :: true
  def link(target) when is_pid(target) do
    Lockstep.Controller.link(controller!(), self(), target)
  end

  @doc "Same shape as `Process.unlink/1`."
  @spec unlink(pid()) :: true
  def unlink(target) when is_pid(target) do
    Lockstep.Controller.unlink(controller!(), self(), target)
  end

  @doc """
  Same shape as `Process.flag/2`. Currently only `:trap_exit` is
  modeled at the controller level; other flags are accepted and
  return a placeholder previous value but don't change semantics.
  """
  @spec flag(atom(), any()) :: any()
  def flag(flag, value) when is_atom(flag) do
    Lockstep.Controller.flag(controller!(), self(), flag, value)
  end

  @doc """
  Same shape as `Process.alive?/1` but consults the controller's view.

  Returns `true` if `target_pid` is a managed process that has not yet
  exited under Lockstep's controller. For pids the controller doesn't
  know about (e.g., processes from outside the iteration), falls back
  to vanilla `Process.alive?/1`.

  Calling `alive?/1` is a sync point — the strategy may interleave
  another process between this check and any subsequent action. That's
  the point: TOCTOU bugs (`if Process.alive?(pid), do: GenServer.call(pid, ...)`)
  surface here.
  """
  @spec alive?(pid()) :: boolean()
  def alive?(target_pid) when is_pid(target_pid) do
    Lockstep.Controller.alive?(controller!(), self(), target_pid)
  end

  @doc """
  Virtual-time sleep. Same shape as `Process.sleep/1`. Implemented as
  `send_after(self(), sentinel, ms)` followed by `recv_first` waiting
  for the sentinel — the controller advances virtual time forward to
  fire the timer, which yields control to other managed processes
  while we "sleep."
  """
  @spec sleep(non_neg_integer() | :infinity) :: :ok
  def sleep(:infinity) do
    Lockstep.recv_first(fn _ -> false end)
    :ok
  end

  def sleep(ms) when is_integer(ms) and ms >= 0 do
    sentinel = make_ref()
    _ = send_after(self(), {:__lockstep_sleep__, sentinel}, ms)

    Lockstep.recv_first(fn
      {:__lockstep_sleep__, ^sentinel} -> true
      _ -> false
    end)

    :ok
  end

  @doc """
  Run a controlled test body N times. Used by `Lockstep.Test.ctest/3`;
  most users do not call this directly.
  """
  defdelegate run(test_fun, opts), to: Lockstep.Runner

  @doc false
  def controller do
    Process.get(:lockstep_controller)
  end

  defp controller! do
    case Process.get(:lockstep_controller) do
      nil ->
        raise RuntimeError, """
        No Lockstep controller in scope.

        Lockstep.send/recv/spawn must be called from within a Lockstep test
        (use Lockstep.Test) or via Lockstep.Runner.run/2.
        """

      pid when is_pid(pid) ->
        pid
    end
  end
end