defmodule Skuld.Effects.EffectLogger do
@behaviour Skuld.Comp.IHandle
@behaviour Skuld.Comp.IInstall
@moduledoc """
Effect logging for replay, resume, and rerun capabilities.
EffectLogger captures effect invocations in a flat log, enabling:
- **Replay**: Short-circuit completed effects with logged values
- **Resume**: Continue suspended computations from where they left off
- **Rerun**: Re-execute failed computations, replaying successful effects
## Architecture
EffectLogger works by wrapping effect handlers to intercept their invocations.
Each wrapped handler:
1. Creates a log entry in `:started` state
2. Installs a `leave_scope` to mark the entry as `:discarded` if the continuation
is abandoned (e.g., by a Throw effect)
3. Wraps the continuation `k` to mark the entry as `:executed` when it completes
4. Calls the original handler with the wrapped continuation
## Flat Log Structure
Unlike a tree-structured log, entries are stored flat in execution order.
The `leave_scope` mechanism marks entries as `:discarded` when their
continuations are abandoned, preserving the information needed for replay.
## Example - Basic Logging
alias Skuld.Comp
alias Skuld.Effects.{EffectLogger, State}
{{result, log}, _env} =
my_comp
|> EffectLogger.with_logging()
|> State.with_handler(0)
|> Comp.run()
## Entry States
- `:started` - Handler invoked, continuation not yet completed (includes suspended)
- `:executed` - Handler called wrapped_k, continuation completed normally
- `:discarded` - Handler never called wrapped_k (e.g., Throw effect)
## Hot vs Cold Resume
When a computation suspends (e.g., via Yield), there are two ways to resume:
### Hot Resume (In-Memory)
If you have the live `%Comp.ExternalSuspend{}` struct, call the resume function directly:
alias Skuld.Effects.{EffectLogger, State, Yield}
# Run until suspension
{%Comp.ExternalSuspend{value: yielded, resume: resume}, env} =
my_comp
|> EffectLogger.with_logging()
|> Yield.with_handler()
|> State.with_handler(0)
|> Comp.run()
# Hot resume - call continuation directly with input
{result, final_env} = resume.(my_input)
The resume function captures the live continuation and environment.
### Cold Resume (Deserialized)
If you've serialized the log and later deserialize it, use `with_resume/3,4`:
alias Skuld.Effects.EffectLogger.Log
# Serialize log to JSON
json = Jason.encode!(log)
# Later... deserialize and cold resume
cold_log = json |> Jason.decode!() |> Log.from_json()
{{result, new_log}, _env} =
my_comp
|> EffectLogger.with_resume(cold_log, my_input)
|> Yield.with_handler()
|> State.with_handler(0)
|> Comp.run()
Cold resume re-runs the computation, short-circuits `:executed` entries with
their logged values, and injects the resume value at the `:started` Yield entry.
## Use Cases
1. **Logging/Tracing**: Capture effect invocations for debugging or audit
2. **Replay**: Re-run computation, short-circuiting with logged values
3. **Retry**: Re-run after failure - `:discarded` entries are re-executed
4. **Resume**: Continue after Yield suspension (hot or cold)
## Loop Pruning with mark_loop
For long-running loop-based computations (like LLM conversation loops), the log
can grow unboundedly. The `mark_loop/1` operation enables efficient pruning of
completed loop iterations while preserving state checkpoints for cold resume.
### Basic Usage
alias Skuld.Effects.{EffectLogger, Yield}
defcomp conversation_loop(state) do
# Mark iteration start - env.state is captured automatically
_ <- EffectLogger.mark_loop(ConversationLoop)
input <- Yield.yield(:await_input)
state = handle_input(state, input)
conversation_loop(state)
end
# Run with pruning enabled - no extra handler needed
{{result, log}, _env} =
conversation_loop(initial_state)
|> EffectLogger.with_logging(prune_loops: true)
|> Yield.with_handler()
|> Comp.run()
### How It Works
1. A root mark (`:__root__`) is auto-inserted on the first intercepted effect,
capturing the initial `env.state`
2. Each `EffectLogger.mark_loop(loop_id)` captures current `env.state`
3. When `prune_loops: true`, completed iterations are removed on finalization
4. Only the last iteration's effects are retained, plus checkpoints
5. Cold resume restores `env.state` from the most recent checkpoint
### Nested Loops
For nested loops with different loop-ids, pruning respects the hierarchy:
defcomp outer_loop(state) do
_ <- EffectLogger.mark_loop(OuterLoop)
{result, state} <- inner_loop(state)
outer_loop(state)
end
defcomp inner_loop(state) do
_ <- EffectLogger.mark_loop(InnerLoop)
# ... inner loop logic ...
inner_loop(updated_state)
end
The hierarchy `:__root__ <- OuterLoop <- InnerLoop` is inferred from nesting
order. Pruning `InnerLoop` only removes entries between inner loop marks,
preserving outer loop structure. The root mark is never pruned.
### Benefits
- **Bounded log size**: O(current_iteration) instead of O(total_iterations)
- **Fast cold resume**: Replay only current iteration, not entire history
- **State restoration**: `env.state` is restored from checkpoint on cold resume
- **Preserved semantics**: Full logging within each iteration
See `EffectLogger.Log` and `EffectLogger.EffectLogEntry` for details.
"""
alias Skuld.Comp
alias Skuld.Comp.Env
alias Skuld.Comp.ExternalSuspend
alias Skuld.Comp.Types
alias Skuld.Effects.EffectLogger.EffectLogEntry
alias Skuld.Effects.EffectLogger.EnvStateSnapshot
alias Skuld.Effects.EffectLogger.Log
@state_key "Elixir.Skuld.Effects.EffectLogger::log"
@state_keys_key "Elixir.Skuld.Effects.EffectLogger::state_keys"
@sig __MODULE__
#############################################################################
## Operation Structs
#############################################################################
# Operation struct for loop iteration marks.
#
# Contains the loop_id and a snapshot of env.state for cold resume.
defmodule MarkLoop do
@moduledoc false
alias Skuld.Effects.EffectLogger.EnvStateSnapshot
defstruct [:loop_id, :env_state]
@type t :: %__MODULE__{
loop_id: atom(),
env_state: EnvStateSnapshot.t() | nil
}
@doc """
Reconstruct from decoded JSON map.
"""
@spec from_json(map()) :: t()
def from_json(map) do
loop_id =
case map["loop_id"] || map[:loop_id] do
s when is_binary(s) -> String.to_existing_atom(s)
a when is_atom(a) -> a
nil -> nil
end
env_state =
case map["env_state"] || map[:env_state] do
nil -> nil
%EnvStateSnapshot{} = s -> s
m when is_map(m) -> EnvStateSnapshot.from_json(m)
end
%__MODULE__{loop_id: loop_id, env_state: env_state}
end
end
defimpl Jason.Encoder, for: Skuld.Effects.EffectLogger.MarkLoop do
def encode(value, opts) do
value
|> Skuld.Comp.SerializableStruct.encode()
|> Jason.Encode.map(opts)
end
end
@root_loop_id :__root__
#############################################################################
## Operations
#############################################################################
@doc """
Mark the start of a loop iteration with a loop identifier.
This creates a boundary in the effect log that enables pruning of completed
loop iterations. The current `env.state` is automatically captured for
cold resume. Only meaningful when used with `with_logging(prune_loops: true)`.
## Parameters
- `loop_id` - Atom identifying which loop this mark belongs to (module atoms work well)
## Example
defcomp conversation_loop(state) do
# Mark iteration start - env.state captured automatically
_ <- EffectLogger.mark_loop(ConversationLoop)
input <- Yield.yield(:await_input)
state = handle_input(state, input)
conversation_loop(state)
end
# Run with pruning - only last iteration kept in log
{{result, log}, _env} =
conversation_loop(initial_state)
|> EffectLogger.with_logging(prune_loops: true)
|> Yield.with_handler()
|> Comp.run()
"""
@spec mark_loop(atom()) :: Types.computation()
def mark_loop(loop_id) when is_atom(loop_id) do
# env_state will be captured by wrap_handler when this effect is logged
Comp.effect(@sig, %MarkLoop{loop_id: loop_id, env_state: nil})
end
@doc """
Returns the root loop ID used for the implicit root mark.
"""
@spec root_loop_id() :: atom()
def root_loop_id, do: @root_loop_id
#############################################################################
## Handler
#############################################################################
@doc """
Handle EffectLogger operations.
Currently handles:
- `MarkLoop` - Records loop iteration boundary, returns `:ok`
"""
@spec handle(term(), Types.env(), Types.k()) :: {Types.result(), Types.env()}
@impl Skuld.Comp.IHandle
def handle(%MarkLoop{}, env, k) do
# The mark is recorded by the logging wrapper.
# This handler just returns :ok to continue the computation.
k.(:ok, env)
end
#############################################################################
## Log Access
#############################################################################
@doc "Get the current log from the environment"
@spec get_log(Types.env()) :: Log.t() | nil
def get_log(env) do
Env.get_state(env, @state_key)
end
@doc "Put a log into the environment"
@spec put_log(Types.env(), Log.t()) :: Types.env()
def put_log(env, log) do
Env.put_state(env, @state_key, log)
end
@doc "Update the log in the environment"
@spec update_log(Types.env(), (Log.t() -> Log.t())) :: Types.env()
def update_log(env, f) do
log = get_log(env) || Log.new()
put_log(env, f.(log))
end
# Get the state_keys filter from env (internal)
defp get_state_keys(env) do
Env.get_state(env, @state_keys_key) || :all
end
# Put state_keys filter into env (internal)
defp put_state_keys(env, state_keys) do
Env.put_state(env, @state_keys_key, state_keys)
end
# Capture env.state snapshot with configured state_keys filter
defp capture_state_snapshot(env) do
state_keys = get_state_keys(env)
EnvStateSnapshot.capture(env.state, state_keys: state_keys)
end
# Decorate an ExternalSuspend with the current log in ExternalSuspend.data
defp decorate_suspend_with_log(%ExternalSuspend{} = suspend, env) do
log = get_log(env)
finalized_log = if log, do: Log.finalize(log), else: nil
data = suspend.data || %{}
decorated = %{suspend | data: Map.put(data, __MODULE__, finalized_log)}
{decorated, env}
end
#############################################################################
## Handler Wrapping
#############################################################################
@doc """
Wrap an effect handler with logging.
The wrapped handler will log each effect invocation, tracking:
- When the effect starts (creates entry in `:started` state)
- When the effect completes (marks entry as `:executed` with value)
- When the continuation is abandoned (marks entry as `:discarded`)
## Parameters
- `sig` - The effect signature (module) for logging
- `handler` - The original handler function `(args, env, k) -> {result, env}`
## Returns
A wrapped handler function with the same signature.
## Example
logged_handler = EffectLogger.wrap_handler(State, &State.handle/3)
comp
|> Comp.with_handler(State, logged_handler)
|> EffectLogger.with_logging()
|> Comp.run()
"""
@spec wrap_handler(module(), Types.handler()) :: Types.handler()
def wrap_handler(sig, handler) do
fn args, env, k ->
# Ensure root mark exists (lazy insertion on first intercepted effect)
env_with_root = ensure_root_mark(env)
# Generate unique ID for this entry
entry_id = generate_id()
# For MarkLoop, capture current env.state as a serializable snapshot
entry_data =
if sig == __MODULE__ and match?(%MarkLoop{}, args) do
%{args | env_state: capture_state_snapshot(env_with_root)}
else
args
end
# Create entry and push to log
entry = EffectLogEntry.new(entry_id, sig, entry_data)
env_with_entry = update_log(env_with_root, &Log.push_entry(&1, entry))
# Save previous leave_scope
previous_leave_scope = Env.get_leave_scope(env_with_entry)
# Create leave_scope that marks entry as discarded
my_leave_scope = fn result, inner_env ->
# Mark this entry as discarded (set_discarded handles the state transition)
marked_env = update_log(inner_env, &Log.mark_discarded(&1, entry_id))
previous_leave_scope.(result, marked_env)
end
# Track whether this is a mark_loop effect for eager pruning
is_mark_loop = sig == __MODULE__ and match?(%MarkLoop{}, args)
# Create wrapped continuation that marks entry as executed
wrapped_k = fn value, inner_env ->
# Mark entry as executed with the value
executed_env =
update_log(inner_env, fn log ->
update_entry_executed(log, entry_id, value)
end)
# Eagerly prune after mark_loop if prune_on_mark? is enabled
pruned_env =
if is_mark_loop do
update_log(executed_env, fn log ->
if log.prune_on_mark? do
# Prune in place - keeps entries on stack for correct ordering
Log.prune_in_place(log)
else
log
end
end)
else
executed_env
end
# Restore previous leave_scope and call original k.
# Our leave_scope is no longer active - only the current entry's
# leave_scope can mark it as :discarded (if wrapped_k is never called).
restored_env = Env.with_leave_scope(pruned_env, previous_leave_scope)
k.(value, restored_env)
end
# Install leave_scope and call original handler
env_with_leave_scope = Env.with_leave_scope(env_with_entry, my_leave_scope)
Comp.call_handler(handler, args, env_with_leave_scope, wrapped_k)
end
end
# Update an entry in the log to :executed state with a value
defp update_entry_executed(log, entry_id, value) do
updated_stack =
Enum.map(log.effect_stack, fn entry ->
if entry.id == entry_id do
EffectLogEntry.set_executed(entry, value)
else
entry
end
end)
%{log | effect_stack: updated_stack}
end
#############################################################################
## Logging Scope
#############################################################################
@doc """
Wrap a computation with effect logging.
Automatically wraps handlers already installed in the env with logging.
Initializes the log and extracts it when the computation completes.
The result is transformed to `{original_result, log}`.
This should be the INNERMOST wrapper (immediately after the computation
in the pipe), so it can see handlers installed by outer wrappers.
## Variants
- `with_logging(comp)` - Fresh logging, capture all effects
- `with_logging(comp, opts)` - Fresh logging with options
- `with_logging(comp, log)` - Replay from existing log
- `with_logging(comp, log, opts)` - Replay with options
## Options
- `:effects` - List of effect signatures to log. Default is all handlers in env.
- `:allow_divergence` - (replay only) If true, allow effects that don't match the log.
- `:prune_loops` - If true (default), prune completed loop segments eagerly after each
`mark_loop/1` call. This keeps memory bounded during long-running computations.
Set to `false` to preserve all entries (e.g., for debugging).
- `:state_keys` - List of env.state keys to include in `EnvStateSnapshot` captures.
Default `:all` includes everything. Use this to filter out constant Reader state:
state_keys: [Skuld.Effects.State.state_key(MyApp.Counter)]
This only captures the specified State keys, excluding Reader/other constant state.
- `:output` - Optional function `(result, log) -> transformed_result` to transform
the result using the final log before returning. Default: `fn result, log -> {result, log} end`.
- `:suspend` - Optional function `(suspend, env) -> {suspend, env}` to decorate
`ExternalSuspend` values when yielding. Default: attaches the finalized log to
`suspend.data[EffectLogger]`. Pass `nil` to disable suspend decoration.
## Example - Fresh Logging
{{result, log}, _env} =
my_comp
|> EffectLogger.with_logging()
|> State.with_handler(0)
|> Comp.run()
# Or specify which effects to log:
{{result, log}, _env} =
my_comp
|> EffectLogger.with_logging(effects: [State])
|> State.with_handler(0)
|> Reader.with_handler(:config)
|> Comp.run()
## Example - Replay
# First run - capture log
{{result1, log}, _} =
my_comp
|> EffectLogger.with_logging()
|> State.with_handler(0)
|> Comp.run()
# Replay - short-circuit with logged values
{{result2, _}, _} =
my_comp
|> EffectLogger.with_logging(log)
|> State.with_handler(0)
|> Comp.run()
assert result1 == result2
"""
@spec with_logging(Types.computation(), keyword()) :: Types.computation()
@spec with_logging(Types.computation(), Log.t()) :: Types.computation()
def with_logging(comp, opts \\ [])
def with_logging(comp, opts) when is_list(opts) do
effects_to_log = Keyword.get(opts, :effects, :all)
prune_loops = Keyword.get(opts, :prune_loops, true)
state_keys = Keyword.get(opts, :state_keys, :all)
output = Keyword.get(opts, :output, &default_output/2)
suspend_fn = Keyword.get(opts, :suspend, &decorate_suspend_with_log/2)
Comp.scoped(comp, fn env ->
env_with_config = setup_logging_env(env, state_keys)
sigs_to_wrap = sigs_to_wrap(effects_to_log, env_with_config)
{env_with_wrapped, original_handlers} =
install_wrapped_handlers(env_with_config, sigs_to_wrap, &wrap_handler/2)
initial_log = init_log_with_prune(Log.new(), prune_loops)
env_with_log = put_log(env_with_wrapped, initial_log)
{env_final, previous_transform} =
if suspend_fn do
maybe_setup_suspend_decoration(env_with_log, suspend_fn)
else
{env_with_log, nil}
end
finally_k = build_finally_k(prune_loops, original_handlers, previous_transform, output)
{env_final, finally_k}
end)
end
def with_logging(comp, %Log{} = log), do: with_logging(comp, log, [])
@spec with_logging(Types.computation(), Log.t(), keyword()) :: Types.computation()
def with_logging(comp, %Log{} = log, opts) when is_list(opts) do
effects_to_log = Keyword.get(opts, :effects, :all)
allow_divergence = Keyword.get(opts, :allow_divergence, false)
prune_loops = Keyword.get(opts, :prune_loops, true)
state_keys = Keyword.get(opts, :state_keys, :all)
output = Keyword.get(opts, :output, &default_output/2)
suspend_fn = Keyword.get(opts, :suspend, &decorate_suspend_with_log/2)
Comp.scoped(comp, fn env ->
env_with_config = setup_logging_env(env, state_keys)
sigs_to_wrap = sigs_to_wrap(effects_to_log, env_with_config)
{env_with_wrapped, original_handlers} =
install_wrapped_handlers(env_with_config, sigs_to_wrap, &wrap_replay_handler/2)
replay_log = init_replay_log(log, prune_loops, allow_divergence)
env_with_log = put_log(env_with_wrapped, replay_log)
{env_final, previous_transform} =
if suspend_fn do
maybe_setup_suspend_decoration(env_with_log, suspend_fn)
else
{env_with_log, nil}
end
finally_k = build_finally_k(prune_loops, original_handlers, previous_transform, output)
{env_final, finally_k}
end)
end
@doc """
Install EffectLogger via catch clause syntax.
Config is opts, a Log for replay, or `{log, opts}`:
catch
EffectLogger -> nil # fresh logging
EffectLogger -> [effects: [State]] # log specific effects
EffectLogger -> log # replay from log
EffectLogger -> {log, allow_divergence: true} # replay with opts
"""
@impl Skuld.Comp.IInstall
def __handle__(comp, nil), do: with_logging(comp)
def __handle__(comp, opts) when is_list(opts), do: with_logging(comp, opts)
def __handle__(comp, %Log{} = log), do: with_logging(comp, log)
def __handle__(comp, {%Log{} = log, opts}) when is_list(opts), do: with_logging(comp, log, opts)
# Private helpers to reduce cyclomatic complexity
defp setup_logging_env(env, state_keys) do
env
|> Env.with_handler(@sig, &__MODULE__.handle/3)
|> put_state_keys(state_keys)
end
defp sigs_to_wrap(:all, env), do: Env.handler_sigs(env)
defp sigs_to_wrap(list, _env) when is_list(list), do: [@sig | list] |> Enum.uniq()
defp install_wrapped_handlers(env, sigs, wrapper_fn) do
Enum.reduce(sigs, {env, %{}}, fn sig, {acc_env, originals} ->
case Env.get_handler(acc_env, sig) do
nil ->
{acc_env, originals}
handler ->
wrapped = wrapper_fn.(sig, handler)
new_env = Env.with_handler(acc_env, sig, wrapped)
{new_env, Map.put(originals, sig, handler)}
end
end)
end
defp init_log_with_prune(log, true), do: Log.enable_prune_on_mark(log)
defp init_log_with_prune(log, false), do: log
defp init_replay_log(log, prune_loops, allow_divergence) do
log
|> init_log_with_prune(prune_loops)
|> maybe_allow_divergence(allow_divergence)
end
defp maybe_allow_divergence(log, true), do: Log.allow_divergence(log)
defp maybe_allow_divergence(log, false), do: log
defp maybe_setup_suspend_decoration(env, suspend_fn) when is_function(suspend_fn, 2) do
old_transform = Env.get_transform_suspend(env)
new_transform = fn suspend, e ->
{suspend1, e1} = old_transform.(suspend, e)
suspend_fn.(suspend1, e1)
end
{Env.with_transform_suspend(env, new_transform), old_transform}
end
defp build_finally_k(prune_loops, original_handlers, previous_transform, output) do
fn value, final_env ->
log = get_log(final_env) || Log.new()
finalized_log = Log.finalize(log)
output_log = maybe_prune_log(finalized_log, prune_loops)
cleaned_env = cleanup_logger_state(final_env)
restored_env = restore_original_handlers(cleaned_env, original_handlers)
restored_env = maybe_restore_transform(restored_env, previous_transform)
{output.(value, output_log), restored_env}
end
end
defp maybe_prune_log(log, true), do: Log.prune_completed_loops(log)
defp maybe_prune_log(log, false), do: log
defp cleanup_logger_state(env) do
%{env | state: env.state |> Map.delete(@state_key) |> Map.delete(@state_keys_key)}
end
defp restore_original_handlers(env, original_handlers) do
Enum.reduce(original_handlers, env, fn {sig, original}, acc_env ->
Env.with_handler(acc_env, sig, original)
end)
end
defp maybe_restore_transform(env, nil), do: env
defp maybe_restore_transform(env, transform), do: Env.with_transform_suspend(env, transform)
defp default_output(result, log), do: {result, log}
#############################################################################
## Replay Handler
#############################################################################
@doc """
Wrap a handler for replay mode.
During replay, if the next entry in the log queue matches this effect
and can be short-circuited, returns the logged value directly.
Otherwise, executes the handler normally.
## Parameters
- `sig` - The effect signature (module)
- `handler` - The original handler function
## Example
replay_handler = EffectLogger.wrap_replay_handler(State, &State.handle/3)
{{result, _log}, _env} =
my_comp
|> Comp.with_handler(State, replay_handler)
|> EffectLogger.with_logging(previous_log)
|> Comp.run()
"""
@spec wrap_replay_handler(module(), Types.handler()) :: Types.handler()
def wrap_replay_handler(sig, handler) do
fn args, env, k ->
# Skip any loop marks at front of queue, validating state consistency
env_after_marks = skip_loop_marks(env)
log = get_log(env_after_marks) || Log.new()
case Log.peek_queue(log) do
%EffectLogEntry{} = entry ->
cond do
EffectLogEntry.matches?(entry, sig, args) and
EffectLogEntry.can_short_circuit?(entry) ->
# Short-circuit: pop entry from queue and return logged value
{_entry, updated_log} = Log.pop_queue(log)
env_with_updated_log = put_log(env_after_marks, updated_log)
k.(entry.value, env_with_updated_log)
EffectLogEntry.matches?(entry, sig, args) ->
# Matches but can't short-circuit (e.g., :discarded) - pop and re-execute
{_entry, updated_log} = Log.pop_queue(log)
env_with_updated_log = put_log(env_after_marks, updated_log)
wrap_handler(sig, handler).(args, env_with_updated_log, k)
true ->
# Doesn't match - divergence, execute normally without popping
wrap_handler(sig, handler).(args, env_after_marks, k)
end
nil ->
# No entries in queue - execute normally with logging
wrap_handler(sig, handler).(args, env_after_marks, k)
end
end
end
#############################################################################
## Cold Resume
#############################################################################
@resume_value_key "Elixir.Skuld.Effects.EffectLogger::resume_value"
# Resume value helpers - use wrapper tuple to distinguish nil from unset
defp get_resume_value(env) do
case Env.get_state(env, @resume_value_key) do
{:resume_value, value} -> {:ok, value}
nil -> :not_set
end
end
defp put_resume_value(env, value) do
Env.put_state(env, @resume_value_key, {:resume_value, value})
end
defp clear_resume_value(env) do
%{env | state: Map.delete(env.state, @resume_value_key)}
end
@doc """
Resume a suspended computation from a cold (deserialized) log.
Re-runs the computation from scratch, short-circuiting completed effects with
their logged values. When reaching the suspension point (the `:started` Yield
entry), injects `resume_value` instead of actually suspending, then continues
with fresh execution.
## Parameters
- `comp` - The computation to run
- `log` - The log ending with a `:started` Yield entry (from suspension)
- `resume_value` - The value to inject at the suspension point
- `opts` - Options (same as `with_logging/3`)
## Example
alias Skuld.Effects.EffectLogger.Log
# Original run - suspended
{{%Comp.ExternalSuspend{value: yielded}, log}, _} =
my_comp
|> EffectLogger.with_logging()
|> Yield.with_handler()
|> State.with_handler(0)
|> Comp.run()
# Serialize and later deserialize
json = Log.to_json(log)
{:ok, cold_log} = Log.from_json(json)
# Cold resume with injected value
{{result, new_log}, _} =
my_comp
|> EffectLogger.with_resume(cold_log, :my_input)
|> Yield.with_handler()
|> State.with_handler(0)
|> Comp.run()
"""
@spec with_resume(Types.computation(), Log.t(), term()) :: Types.computation()
def with_resume(comp, log, resume_value), do: with_resume(comp, log, resume_value, [])
@spec with_resume(Types.computation(), Log.t(), term(), keyword()) :: Types.computation()
def with_resume(comp, %Log{} = log, resume_value, opts) when is_list(opts) do
effects_to_log = Keyword.get(opts, :effects, :all)
allow_divergence = Keyword.get(opts, :allow_divergence, false)
prune_loops = Keyword.get(opts, :prune_loops, true)
state_keys = Keyword.get(opts, :state_keys, :all)
output = Keyword.get(opts, :output, &default_output/2)
Comp.scoped(comp, fn env ->
# Install EffectLogger's own handler (for mark_loop operations)
env_with_self = Env.with_handler(env, @sig, &__MODULE__.handle/3)
# Store state_keys filter for use in snapshots
env_with_config = put_state_keys(env_with_self, state_keys)
# Determine which handlers to wrap (including our own handler)
sigs_to_wrap =
case effects_to_log do
:all -> Env.handler_sigs(env_with_config)
list when is_list(list) -> [@sig | list] |> Enum.uniq()
end
# Wrap each handler with resume logging and install
{env_with_wrapped, original_handlers} =
Enum.reduce(sigs_to_wrap, {env_with_config, %{}}, fn sig, {acc_env, originals} ->
case Env.get_handler(acc_env, sig) do
nil ->
{acc_env, originals}
handler ->
wrapped = wrap_resume_handler(sig, handler)
new_env = Env.with_handler(acc_env, sig, wrapped)
{new_env, Map.put(originals, sig, handler)}
end
end)
# Initialize with existing log (prepared for replay) and resume value
# Enable prune_on_mark if prune_loops is true (or preserve from log)
replay_log =
log
|> then(fn l -> if prune_loops, do: Log.enable_prune_on_mark(l), else: l end)
|> then(fn l -> if allow_divergence, do: Log.allow_divergence(l), else: l end)
env_with_log = put_log(env_with_wrapped, replay_log)
# Restore env.state from the most recent checkpoint in the log
env_with_restored_state = restore_checkpoint_state(env_with_log)
env_with_resume = put_resume_value(env_with_restored_state, resume_value)
finally_k = fn value, final_env ->
# Extract and finalize the log
final_log = get_log(final_env) || Log.new()
finalized_log = Log.finalize(final_log)
# Final prune (may be redundant if eager pruning is enabled, but ensures consistency)
output_log =
if prune_loops do
Log.prune_completed_loops(finalized_log)
else
finalized_log
end
# Clean up log, state_keys filter, and resume value from env state
cleaned_env =
final_env
|> then(fn e ->
%{e | state: e.state |> Map.delete(@state_key) |> Map.delete(@state_keys_key)}
end)
|> clear_resume_value()
# Restore original handlers
restored_env =
Enum.reduce(original_handlers, cleaned_env, fn {sig, original}, acc_env ->
Env.with_handler(acc_env, sig, original)
end)
# Return result paired with log
{output.(value, output_log), restored_env}
end
{env_with_resume, finally_k}
end)
end
# Predicate: entry matches and can be short-circuited (return cached value)
defp can_short_circuit_entry?(entry, sig, args) do
EffectLogEntry.matches?(entry, sig, args) and EffectLogEntry.can_short_circuit?(entry)
end
# Predicate: entry is a Yield suspension point that should be resumed
defp yield_suspension_to_resume?(entry, sig, args, env) do
EffectLogEntry.matches?(entry, sig, args) and
sig == Skuld.Effects.Yield and
entry.state == :started and
has_resume_value?(env)
end
# Predicate: entry matches but needs re-execution (can't short-circuit)
defp matches_but_needs_reexecution?(entry, sig, args) do
EffectLogEntry.matches?(entry, sig, args)
end
defp has_resume_value?(env), do: get_resume_value(env) != :not_set
@doc """
Wrap a handler for resume mode.
Like `wrap_replay_handler/2`, but when encountering a `:started` Yield entry
(the suspension point), injects the stored resume value instead of suspending.
The resume value is stored in env by `with_resume/4` and cleared after use.
## Parameters
- `sig` - The effect signature (module)
- `handler` - The original handler function
## Returns
A wrapped handler function.
"""
@spec wrap_resume_handler(module(), Types.handler()) :: Types.handler()
def wrap_resume_handler(sig, handler) do
fn args, env, k ->
# Skip any loop marks at front of queue, validating state consistency
env_after_marks = skip_loop_marks(env)
log = get_log(env_after_marks) || Log.new()
case Log.peek_queue(log) do
%EffectLogEntry{} = entry ->
cond do
can_short_circuit_entry?(entry, sig, args) ->
short_circuit_entry(entry, log, env_after_marks, k)
yield_suspension_to_resume?(entry, sig, args, env_after_marks) ->
resume_yield_suspension(entry, sig, args, log, env_after_marks, k)
matches_but_needs_reexecution?(entry, sig, args) ->
reexecute_entry(sig, handler, args, log, env_after_marks, k)
true ->
# Doesn't match - divergence, execute normally without popping
wrap_handler(sig, handler).(args, env_after_marks, k)
end
nil ->
# No entries in queue - execute normally with logging
wrap_handler(sig, handler).(args, env_after_marks, k)
end
end
end
# Short-circuit: pop entry and return its cached value
defp short_circuit_entry(entry, log, env, k) do
{_entry, updated_log} = Log.pop_queue(log)
env_with_updated_log = put_log(env, updated_log)
k.(entry.value, env_with_updated_log)
end
# Resume a Yield suspension: inject the resume value and log the resumed entry
defp resume_yield_suspension(_entry, sig, args, log, env, k) do
{:ok, resume_value} = get_resume_value(env)
{_entry, updated_log} = Log.pop_queue(log)
env_with_updated_log = put_log(env, updated_log)
# Clear resume value so subsequent Yields execute normally
env_cleared = clear_resume_value(env_with_updated_log)
# Log this as a new executed entry (the resumed Yield)
entry_id = generate_id()
resumed_entry =
EffectLogEntry.new(entry_id, sig, args)
|> EffectLogEntry.set_executed(resume_value)
env_with_entry = update_log(env_cleared, &Log.push_entry(&1, resumed_entry))
k.(resume_value, env_with_entry)
end
# Re-execute: pop entry and call the wrapped handler
defp reexecute_entry(sig, handler, args, log, env, k) do
{_entry, updated_log} = Log.pop_queue(log)
env_with_updated_log = put_log(env, updated_log)
wrap_handler(sig, handler).(args, env_with_updated_log, k)
end
#############################################################################
## Private
#############################################################################
defp generate_id do
Uniq.UUID.uuid4()
end
# Ensure the root mark exists in the log (lazy insertion on first effect)
# Always inserts root mark to capture initial env.state for cold resume
defp ensure_root_mark(env) do
log = get_log(env) || Log.new()
if Log.has_root_mark?(log) do
env
else
root_entry =
EffectLogEntry.new(
generate_id(),
__MODULE__,
%MarkLoop{loop_id: @root_loop_id, env_state: capture_state_snapshot(env)}
)
|> EffectLogEntry.set_executed(:ok)
update_log(env, &Log.push_entry(&1, root_entry))
end
end
# Restore env.state from the most recent checkpoint in the log (for cold resume)
defp restore_checkpoint_state(env) do
log = get_log(env)
case Log.find_latest_checkpoint(log) do
nil ->
env
%EffectLogEntry{data: %{env_state: %EnvStateSnapshot{} = snapshot}} ->
restored_state = EnvStateSnapshot.restore(snapshot)
%{env | state: Map.merge(env.state, restored_state)}
_ ->
env
end
end
# Skip any MarkLoop entries at the front of the queue during replay.
# These are checkpoints for state restoration, not user effects.
# If divergence is not allowed, validates that current state matches checkpoint.
defp skip_loop_marks(env) do
log = get_log(env) || Log.new()
case Log.peek_queue(log) do
%EffectLogEntry{sig: sig, data: %MarkLoop{env_state: checkpoint_state}} = entry
when sig == __MODULE__ ->
# Pop the mark entry
{_entry, updated_log} = Log.pop_queue(log)
# Validate state consistency if not allowing divergence
if not updated_log.allow_divergence? and checkpoint_state != nil do
validate_state_consistency(env, checkpoint_state, entry)
end
# Update env and recurse to skip any additional marks
env_with_updated_log = put_log(env, updated_log)
skip_loop_marks(env_with_updated_log)
_ ->
# Not a mark entry, return env unchanged
env
end
end
# Validate that current env.state is consistent with the checkpoint snapshot.
# For now, just compare the user state (excluding EffectLogger internal state).
defp validate_state_consistency(env, %EnvStateSnapshot{} = checkpoint_state, entry) do
current_snapshot = capture_state_snapshot(env)
if current_snapshot.entries != checkpoint_state.entries do
raise """
State divergence detected during replay.
Loop mark: #{inspect(entry.data.loop_id)}
Expected state (from log):
#{inspect(checkpoint_state.entries, pretty: true)}
Actual state:
#{inspect(current_snapshot.entries, pretty: true)}
Use `allow_divergence: true` option if state changes are expected.
"""
end
end
defp validate_state_consistency(_env, _checkpoint_state, _entry), do: :ok
end