lib/allm/providers/fake.ex

defmodule ALLM.Providers.Fake do
  @moduledoc """
  Deterministic, scripted adapter for testing. Implements both `ALLM.Adapter`
  and `ALLM.StreamAdapter`. See spec §31, §7.1, §7.2, §8.

  Layer B — runtime. Fake is the canonical testing adapter that every
  orchestration phase (5, 6, 7, 8) tests against. It carries serializable
  plain data in `adapter_opts` and passes both the Phase 3 adapter- and
  stream-adapter conformance harnesses.

  ## What Fake is (and isn't)

  Fake **ignores the `%ALLM.Request{}`** passed to `generate/2` and `stream/2`.
  It never inspects the request's `:messages`, `:tools`, `:tool_choice`,
  `:temperature`, `:max_tokens`, or any other field. The scripted response is
  produced irrespective of the request. This is intentional: Fake is for
  testing orchestration, not provider-wire fidelity. Testing tool orchestration
  happens at Phase 6 (ToolRunner) and Phase 7 (Chat) — Fake's scripts directly
  emit `{:tool_call, _}` entries to simulate what the model would return; the
  caller sets up an engine with `tools: [...]` and a `{:tool_call, ...}`
  script, and the orchestrator dispatches to the tool executor exactly as it
  would with a real provider.

  ## Script shapes

  Fake accepts two disjoint script shapes on `adapter_opts`. See
  `ALLM.Providers.Fake.Script` for the full tag-to-shape table.

  ### Spec §31 (user-facing)

      adapter_opts: [
        script: [
          {:text, "Hello "},
          {:text, "world"},
          {:finish, :stop}
        ]
      ]

  Entry tags: `:text`, `:tool_call`, `:tool_call_delta`, `:usage`,
  `:raw_chunk`, `:finish`, `:error` (2-tuple), `:delay`, `:sleep`
  (deprecated alias of `:delay`).

  ### Phase 3 harness

      adapter_opts: [
        script: [{:ok, %{output_text: "hi"}}],
        stream_script: [[{:text_delta, "hel"}, {:text_delta, "lo"}, {:finish, :stop}]]
      ]

  Entry tags (non-streaming, on `:script`): `{:ok, map}`, `{:error, reason_atom, keyword}`.
  Entry tags (streaming, on `:stream_script`): `:text_delta`, `:finish`,
  `:preflight_error`, `:error_event`, `:stream_error`, shared-semantics
  `:tool_call` / `:finish`.

  ### Per-entry-point key precedence

  | Entry point | Key precedence |
  |-------------|----------------|
  | `generate/2` | `:scripts` > `:script` (wrapped as `[script]`). `:stream_script` is not consulted; if only `:stream_script` is set, `generate/2` returns `:no_scripted_response`. |
  | `stream/2`   | `:stream_script` > `:scripts` > `:script` (wrapped as `[script]`). |

  Multi-call scripting uses `:scripts` / `:stream_script` as a list of lists —
  each call consumes one inner list and advances the cursor.

  ### Disambiguation

  For `generate/2`, both shapes share the `:script` key and are disambiguated
  by leading entry tag. The `:error` tag disambiguates by `tuple_size/1`
  (2 → §31, 3 → harness). For `stream/2`, distinct keys disambiguate.

  ## Cursor behaviour

  Multi-call scripts (`:scripts` / `:stream_script`) advance a per-process
  cursor on every call. By default the cursor lives in the process dictionary
  at `{:allm_fake_cursor, :erlang.phash2(scripts)}` — isolated per ExUnit test
  process (`async: true`), GC'd on pid-down, zero-setup for the common case.

  **Two engines built with content-equal `:scripts` values in the same
  process share a cursor.** This is a documented footgun: the cursor key is
  `:erlang.phash2(scripts)`, so identical script contents collide. A test
  that constructs two Fake engines simulating two distinct providers with the
  same fixture script finds the second engine's first call already at index 1.
  Workaround: pass distinct `adapter_opts[:script_cursor]` Agent pids,
  obtained from `start_script_cursor/0`.

  The explicit-Agent override also supports cross-process cursor sharing
  (`Task.async/1` over the adapter call) and mitigates the rare hash-collision
  case (`:erlang.phash2/1` is a 27-bit hash).

  ## Testing patterns

  **Use `start_script_cursor/0` for multi-call tests.** Reach for the explicit
  Agent cursor whenever a test (a) runs multiple Fake calls with a `:scripts`
  or `:stream_script` list AND (b) another test in the same `async: true`
  module could share content-equal script entries, OR (c) the test dispatches
  the adapter call across processes (`Task.async/1`). The default
  process-dict cursor is fine for one-shot `:script` calls and for
  single-multi-call tests whose script content is unique in the module; for
  everything else, the explicit cursor is load-bearing:

      cursor = ALLM.Providers.Fake.start_script_cursor()
      opts = [adapter_opts: [scripts: [...]] ++ [script_cursor: cursor]]

  Worked examples: `test/allm/providers/fake_test.exs:143` and
  `test/allm/providers/fake/fixtures_test.exs:66`.

  ## Cleanup observation

  When `adapter_opts[:cleanup_observer]` is a `:counters` ref (as created by
  `:counters.new(1, [:atomics])`), the `Stream.resource/3` `after_fun`
  increments index 1 on every normal termination path — consumer
  `Enum.take/N`, `Stream.take_while/2` returning false, `Stream.run/1` scope
  exit, throws from the reducer, consumer process exit with a trappable
  reason. The counter increments at most once per stream (not once-per-event).

  Halt-safety test shape:

      ref = :counters.new(1, [:atomics])
      {:ok, stream} = ALLM.Providers.Fake.stream(req,
        adapter_opts: [script: [...], cleanup_observer: ref])
      _ = Enum.take(stream, 2)
      # :counters.get(ref, 1) == 1 within 500 ms

  **Brutal-kill caveat.** `Stream.resource/3`'s `after_fun` does NOT run when
  the consumer is killed with `Process.exit(pid, :kill)` — brutal exits skip
  all cleanup by OTP design. Real provider adapters (Phase 10–11) address this
  via Finch's own monitor-based connection cleanup; Fake has no HTTP ref to
  leak so the caveat is purely documentary. Tests assert cleanup on normal
  halts only; no test should simulate `:kill`.

  ## Adapter event vocabulary (streaming)

  **Emitted** (spec §8 subset that belongs to an adapter):
  `:message_started`, `:text_delta`, `:text_completed`, `:tool_call_started`,
  `:tool_call_delta`, `:tool_call_completed`, `:message_completed`,
  `:raw_chunk`, `:error`.

  **Not emitted** (orchestrator-owned — Phase 6/7):
  `:tool_execution_started`, `:tool_execution_completed`, `:tool_result_encoded`,
  `:ask_user_requested`, `:tool_halt`, `:step_completed`, `:chat_completed`.

  ## Backpressure and delays

  `{:delay, ms}` entries call `Process.sleep/1` inside the `next_fun` of the
  `Stream.resource/3` — the delay blocks the consumer's reducing process, not
  a simulated provider. `{:delay, _}` is **front-loaded**: the interpreter
  sleeps before consuming the NEXT entry, so placing `{:delay, ms}` as the
  FIRST entry delays `:message_started`. Timing tests measure the wall-clock
  interval between the emit preceding the `{:delay, _}` entry and the emit
  following it.

  `{:sleep, ms}` is a deprecated alias for `{:delay, ms}` — one `Logger.warning/1`
  fires per BEAM lifetime when `{:sleep, _}` is used. Deletion target: v0.3.

  ## Examples

      iex> req = ALLM.Request.new([%ALLM.Message{role: :user, content: "hi"}])
      iex> opts = [adapter_opts: [script: [{:text, "hi"}, {:finish, :stop}]]]
      iex> {:ok, resp} = ALLM.Providers.Fake.generate(req, opts)
      iex> {resp.output_text, resp.finish_reason}
      {"hi", :stop}
  """

  @behaviour ALLM.Adapter
  @behaviour ALLM.StreamAdapter

  alias ALLM.Error.AdapterError
  alias ALLM.Message
  alias ALLM.Providers.Fake.Script

  # ---------------------------------------------------------------------------
  # ALLM.Adapter — generate/2
  # ---------------------------------------------------------------------------

  @doc """
  Execute a scripted non-streaming request. See spec §7.1.

  Reads the script from `opts[:adapter_opts]`, validates via
  `ALLM.Providers.Fake.Script.validate!/1`, resolves the cursor, folds the
  current call's entries into a `%ALLM.Response{}` via
  `ALLM.Providers.Fake.Script.fold_to_response/1`, and advances the cursor.

  Ignores the `%Request{}` entirely — the scripted response is produced
  irrespective of the request's messages, tools, or params (see "What Fake is
  (and isn't)" in the moduledoc).

  Returns `{:ok, %ALLM.Response{}}` on a scripted success (including
  harness-shape `{:ok, _}` entries), `{:error, %AdapterError{}}` on a scripted
  failure or a script-exhausted cursor. `adapter_opts[:request_id]` is
  propagated onto `response.request_id` verbatim.

  ## Examples

      iex> req = ALLM.Request.new([%ALLM.Message{role: :user, content: "x"}])
      iex> opts = [adapter_opts: [script: [{:text, "hello"}, {:finish, :stop}]]]
      iex> {:ok, resp} = ALLM.Providers.Fake.generate(req, opts)
      iex> resp.output_text
      "hello"
  """
  @impl ALLM.Adapter
  @spec generate(ALLM.Request.t(), keyword()) ::
          {:ok, ALLM.Response.t()} | {:error, AdapterError.t()}
  def generate(_request, opts) do
    adapter_opts = Keyword.get(opts, :adapter_opts, [])
    :ok = Script.validate!(adapter_opts)

    # Phase 9.3: wrap the per-call work in `ALLM.Retry.run/3` so the
    # `retry_until_call: n` opt drives a real retry loop and emits
    # `[:allm, :adapter, :retry]` per attempt. Outside of retry tests
    # the closure returns `{:ok, response}` on first call (the counter
    # absent path is a no-op).
    retry_policy = Keyword.get(opts, :retry, :default)
    telemetry_meta = build_retry_telemetry_meta(opts)

    ALLM.Retry.run(retry_policy, telemetry_meta, fn ->
      generate_attempt(adapter_opts)
    end)
  end

  # ---------------------------------------------------------------------------
  # ALLM.StreamAdapter — stream/2
  # ---------------------------------------------------------------------------

  @doc """
  Open a scripted streaming request. See spec §7.2, §8.

  Reads the script from `opts[:adapter_opts]`, validates via
  `ALLM.Providers.Fake.Script.validate!/1`, resolves the cursor, and returns
  a lazy `Enumerable.t()` of `ALLM.Event` values. No event fires until the
  consumer reduces.

  Key precedence: `:stream_script` > `:scripts` > `:script` (wrapped as
  `[script]`). Empty opts return `{:error, script_exhausted_error()}` (no
  stream opened).

  When the first entry of the current call is a harness-shape
  `{:preflight_error, reason, opts}`, returns synchronously as
  `{:error, %AdapterError{reason: ^reason, ...opts}}` — no stream is opened.

  The returned stream emits `:message_started` on open, per-entry events via
  `ALLM.Providers.Fake.Script.interpret/1`, and closes with
  `:message_completed` (prepended by `:text_completed` if any `:text`/`:text_delta`
  was emitted). `{:delay, ms}` / `{:sleep, ms}` entries call `Process.sleep/1`
  and yield no events. `after_fun` increments `adapter_opts[:cleanup_observer]`
  (a `:counters` ref, when present).

  Ignores the `%Request{}` (see "What Fake is (and isn't)" in the moduledoc).

  ## Examples

      iex> req = ALLM.Request.new([%ALLM.Message{role: :user, content: "x"}])
      iex> opts = [adapter_opts: [script: [{:text, "hi"}, {:finish, :stop}]]]
      iex> {:ok, stream} = ALLM.Providers.Fake.stream(req, opts)
      iex> events = Enum.to_list(stream)
      iex> Enum.any?(events, &match?({:text_delta, %{delta: "hi"}}, &1))
      true
  """
  @impl ALLM.StreamAdapter
  @spec stream(ALLM.Request.t(), keyword()) ::
          {:ok, Enumerable.t()} | {:error, AdapterError.t()}
  def stream(_request, opts) do
    adapter_opts = Keyword.get(opts, :adapter_opts, [])
    :ok = Script.validate!(adapter_opts)

    # Phase 9.3: spec §6.1 — streaming calls are NOT retried. We do
    # NOT call `ALLM.Retry.run/3` here. Instead, when
    # `retry_until_call: n` is set we share the same per-process
    # counter as `generate/2` and, on the first n-1 calls, surface the
    # transient failure as a terminal `{:error, _}` event mid-stream.
    # The consumer reduces it to `%Response{finish_reason: :error}`
    # per CLAUDE.md's "mid-stream errors fold into the response, not
    # the call-site tuple" invariant.
    case maybe_retry_transient(adapter_opts) do
      :retry ->
        {:ok, transient_error_stream()}

      :proceed ->
        case resolve_scripts_for_stream(adapter_opts) do
          :empty ->
            {:error, script_exhausted_error()}

          {:ok, scripts} ->
            open_stream(scripts, adapter_opts)
        end
    end
  end

  # A minimal stream that emits `:message_started` and a terminal
  # `{:error, %AdapterError{}}` event. Used when `retry_until_call:` is
  # set and the counter says this call should fail transiently; the
  # downstream collector folds the terminal error into
  # `%Response{finish_reason: :error}`.
  defp transient_error_stream do
    err = AdapterError.new(:timeout, message: "scripted transient stream failure")

    Stream.resource(
      fn ->
        [{:message_started, %{message: %Message{role: :assistant, content: ""}}}, {:error, err}]
      end,
      fn
        [] -> {:halt, []}
        events -> {events, []}
      end,
      fn _ -> :ok end
    )
  end

  # ---------------------------------------------------------------------------
  # Public helpers
  # ---------------------------------------------------------------------------

  @doc """
  Return the canonical `%AdapterError{}` for a script-exhausted call.

  Spec §31 phrases this as `{:error, :no_scripted_response}`; the atom is
  preserved as the struct's `:reason` field via the Phase 1 enum amendment.

  ## Examples

      iex> err = ALLM.Providers.Fake.script_exhausted_error()
      iex> err.reason
      :no_scripted_response
      iex> err.message
      "no scripted response"
  """
  @spec script_exhausted_error() :: AdapterError.t()
  def script_exhausted_error do
    %AdapterError{reason: :no_scripted_response, message: "no scripted response"}
  end

  @doc """
  Start an Agent-backed script cursor for cross-process multi-call scripting
  and for disambiguating content-equal scripts in the same process (see
  moduledoc "Cursor behaviour").

  Pass the returned pid as `adapter_opts[:script_cursor]`; subsequent calls
  increment the cursor on the Agent rather than on the process dictionary.

  ## Examples

      iex> pid = ALLM.Providers.Fake.start_script_cursor()
      iex> ALLM.Providers.Fake.cursor_index(pid)
      0
  """
  @spec start_script_cursor() :: pid()
  def start_script_cursor do
    {:ok, pid} = Agent.start_link(fn -> 0 end)
    pid
  end

  @doc """
  Read the current cursor index for an Agent-backed cursor. Used in tests to
  assert how many calls have been consumed.
  """
  @spec cursor_index(pid()) :: non_neg_integer()
  def cursor_index(pid) when is_pid(pid), do: Agent.get(pid, & &1)

  # ---------------------------------------------------------------------------
  # Internals — generate/2 + retry plumbing
  # ---------------------------------------------------------------------------

  # One attempt of the scripted generate path. Returns one of the
  # `ALLM.Retry.closure_result/1` shapes (`{:ok, _} | {:retry, _, _}
  # | {:error, _}`). When `retry_until_call: n` is set on
  # `adapter_opts`, the first n-1 attempts return `{:retry, 0,
  # :fake_transient}`; attempt n returns the scripted `{:ok, response}`
  # (or `{:error, _}` if the script is configured to fail there).
  defp generate_attempt(adapter_opts) do
    case maybe_retry_transient(adapter_opts) do
      :retry ->
        # `:timeout` is in the default `retry_on` set (spec §6.1) so the
        # bare-atom error matches under the default policy. Tests
        # exercising custom `retry_on` lists pass a policy that
        # includes `:timeout`.
        {:retry, 0, :timeout}

      :proceed ->
        case resolve_scripts_for_generate(adapter_opts) do
          {:ok, scripts} -> wrap_generate_result(run_generate_call(scripts, adapter_opts))
          :empty -> {:error, script_exhausted_error()}
        end
    end
  end

  # Convert the raw `run_generate_call/2` return into the
  # `closure_result/1` shape consumed by `ALLM.Retry.run/3`. The Fake
  # uses non-retryable `{:error, _}` for scripted errors (a real adapter
  # would inspect the status code and decide).
  defp wrap_generate_result({:ok, _} = ok), do: ok
  defp wrap_generate_result({:error, %AdapterError{} = err}), do: {:error, err}

  # Decrement the per-process retry counter (keyed by the
  # `retry_until_call:` opt's identity). Returns `:retry` while the
  # counter is > 1 and `:proceed` when the counter has reached 1 (the
  # call that should succeed). Absent opt → always `:proceed`.
  defp maybe_retry_transient(adapter_opts) do
    case Keyword.get(adapter_opts, :retry_until_call) do
      nil ->
        :proceed

      n when is_integer(n) and n >= 1 ->
        decrement_retry_counter(n)
    end
  end

  # The counter lives in the calling process's process dictionary so the
  # streaming and non-streaming arms share state — the test plan
  # specifies that `Fake.stream/2` honours `retry_until_call:` from the
  # SAME counter as `Fake.generate/2`. Test isolation is per-PID
  # (`async: true` is safe — each ExUnit test runs in its own process).
  @retry_counter_key {__MODULE__, :retry_until_call}

  defp decrement_retry_counter(initial) do
    current = Process.get(@retry_counter_key, initial)

    if current > 1 do
      Process.put(@retry_counter_key, current - 1)
      :retry
    else
      # The successful attempt: reset the counter so a subsequent test
      # in the same process (or a follow-up call) starts fresh.
      Process.delete(@retry_counter_key)
      :proceed
    end
  end

  # Telemetry metadata attached to every `[:allm, :adapter, :retry]`
  # event from this adapter. `:request_id` is propagated from the
  # surrounding span (set by `Runner.run/3` / `StreamRunner.run/3` per
  # Phase 9.1) so retry events correlate with their parent generate
  # / step / chat span.
  defp build_retry_telemetry_meta(opts) do
    %{provider: :fake}
    |> maybe_put_meta(:request_id, Keyword.get(opts, :request_id))
  end

  defp maybe_put_meta(meta, _key, nil), do: meta
  defp maybe_put_meta(meta, key, value), do: Map.put(meta, key, value)

  # generate/2 reads :scripts > :script. :stream_script is ignored per
  # key-precedence table in the moduledoc.
  defp resolve_scripts_for_generate(adapter_opts) do
    cond do
      Keyword.has_key?(adapter_opts, :scripts) ->
        {:ok, Keyword.fetch!(adapter_opts, :scripts)}

      Keyword.has_key?(adapter_opts, :script) ->
        {:ok, [Keyword.fetch!(adapter_opts, :script)]}

      true ->
        :empty
    end
  end

  defp run_generate_call(scripts, adapter_opts) do
    cursor = advance_cursor(scripts, adapter_opts)

    case Enum.at(scripts, cursor) do
      nil ->
        {:error, script_exhausted_error()}

      entries ->
        entries
        |> Script.fold_to_response()
        |> wrap_fold_result(adapter_opts, entries)
    end
  end

  # Wrap the fold output in an {:ok, _} | {:error, _} tuple and propagate
  # request_id / empty-script metadata onto the Response.
  defp wrap_fold_result({:error, %AdapterError{}} = err, _adapter_opts, _entries), do: err

  defp wrap_fold_result(%ALLM.Response{} = resp, adapter_opts, entries) do
    resp =
      resp
      |> maybe_attach_request_id(adapter_opts)
      |> maybe_attach_empty_metadata(entries)

    {:ok, resp}
  end

  defp maybe_attach_request_id(%ALLM.Response{} = resp, adapter_opts) do
    case Keyword.get(adapter_opts, :request_id) do
      nil -> resp
      req_id when is_binary(req_id) -> %{resp | request_id: req_id}
    end
  end

  defp maybe_attach_empty_metadata(%ALLM.Response{metadata: meta} = resp, []) do
    %{resp | metadata: Map.put(meta, :empty_script, true)}
  end

  defp maybe_attach_empty_metadata(%ALLM.Response{} = resp, _entries), do: resp

  # ---------------------------------------------------------------------------
  # Internals — stream/2
  # ---------------------------------------------------------------------------

  # stream/2 reads :stream_script > :scripts > :script.
  #
  # :stream_script is polymorphic — either a list-of-lists (multi-call, one
  # inner list per call) or a flat list of entries (single-call). We
  # normalize by wrapping a flat list as `[list]`. Detection: if every
  # element is itself a list, it's list-of-lists. The Phase 3 harness
  # passes both forms (single-call pre-flight tests pass a flat list;
  # multi-call tests pass a list-of-lists).
  defp resolve_scripts_for_stream(adapter_opts) do
    cond do
      Keyword.has_key?(adapter_opts, :stream_script) ->
        {:ok, normalize_multi_call(Keyword.fetch!(adapter_opts, :stream_script))}

      Keyword.has_key?(adapter_opts, :scripts) ->
        {:ok, Keyword.fetch!(adapter_opts, :scripts)}

      Keyword.has_key?(adapter_opts, :script) ->
        {:ok, [Keyword.fetch!(adapter_opts, :script)]}

      true ->
        :empty
    end
  end

  # Normalize a possibly-flat script list into a list-of-lists. An empty
  # list is treated as "one empty call" so the cursor advances and the
  # stream closes well-formed rather than being misread as exhausted.
  defp normalize_multi_call([]), do: [[]]

  defp normalize_multi_call(list) when is_list(list) do
    if Enum.all?(list, &is_list/1), do: list, else: [list]
  end

  defp open_stream(scripts, adapter_opts) do
    cursor = advance_cursor(scripts, adapter_opts)

    case Enum.at(scripts, cursor) do
      nil ->
        {:error, script_exhausted_error()}

      entries ->
        case preflight_error(entries) do
          {:preflight, err} ->
            {:error, err}

          :no_preflight ->
            observer = Keyword.get(adapter_opts, :cleanup_observer)
            {:ok, build_stream(entries, observer)}
        end
    end
  end

  defp preflight_error([{:preflight_error, reason, opts} | _])
       when is_atom(reason) and is_list(opts) do
    {:preflight, AdapterError.new(reason, opts)}
  end

  defp preflight_error(_entries), do: :no_preflight

  # Build the Stream.resource/3 — the heart of stream/2.
  #
  # Stream.resource/3's start_fun returns the initial acc (no events). We
  # stash a `:message_started` event in the acc's `:pending` list so the
  # first next_fun call emits it before consuming any entry. This keeps
  # `:message_started` synchronous with the consumer's first reduce.
  defp build_stream(entries, observer) do
    Stream.resource(
      fn -> start_fun(entries, observer) end,
      &next_fun/1,
      &after_fun/1
    )
  end

  defp start_fun(entries, observer) do
    msg = %Message{role: :assistant, content: ""}

    %{
      entries: entries,
      emitted_text?: false,
      cleanup_observer: observer,
      accumulated_text: "",
      closed?: false,
      finish_reason: nil,
      pending: [{:message_started, %{message: msg}}]
    }
  end

  # next_fun — emit pending events first, then pop one entry. Handles:
  # - :delay / :sleep : Process.sleep and recurse without emitting.
  # - :finish : emit :text_completed (if text was seen) + :message_completed,
  #   then halt after the next pull.
  # - every other entry : delegate to Script.interpret/1 (tracking
  #   emitted_text? when the interpreted events include :text_delta).
  # - empty entries list : emit :message_completed (with optional
  #   :text_completed prefix), mark closed, halt next pull.
  defp next_fun(%{pending: [_ | _] = pending} = acc) do
    {pending, %{acc | pending: []}}
  end

  defp next_fun(%{closed?: true} = acc), do: {:halt, acc}

  defp next_fun(%{entries: []} = acc), do: close_stream(acc)

  defp next_fun(%{entries: [{:delay, ms} | rest]} = acc)
       when is_integer(ms) and ms >= 0 do
    Process.sleep(ms)
    next_fun(%{acc | entries: rest})
  end

  defp next_fun(%{entries: [{:sleep, ms} | rest]} = acc)
       when is_integer(ms) and ms >= 0 do
    # Script.interpret/1 fires the one-time deprecation log for :sleep; we
    # reuse that path by delegating here. The returned `[]` is discarded
    # (sleep entries emit no events); the sleep itself must happen here, not
    # inside interpret/1, because interpret/1 is pure.
    _ = Script.interpret({:sleep, ms})
    Process.sleep(ms)
    next_fun(%{acc | entries: rest})
  end

  defp next_fun(%{entries: [{:finish, reason} | rest]} = acc) do
    acc = %{acc | finish_reason: reason}
    closing = closing_events(acc)
    {closing, %{acc | entries: rest, closed?: true}}
  end

  defp next_fun(%{entries: [entry | rest]} = acc) do
    events = Script.interpret(entry)
    acc = update_emitted_text(acc, events)
    {events, %{acc | entries: rest}}
  end

  # No finish entry remained — synthesize the terminal :message_completed
  # (with optional :text_completed) so every stream closes well-formed.
  defp close_stream(acc) do
    closing = closing_events(acc)
    {closing, %{acc | closed?: true}}
  end

  defp closing_events(%{emitted_text?: true, accumulated_text: text, finish_reason: reason}) do
    [
      {:text_completed, %{id: nil, text: text}},
      {:message_completed,
       %{message: %Message{role: :assistant, content: text}, finish_reason: reason}}
    ]
  end

  defp closing_events(%{emitted_text?: false, finish_reason: reason}) do
    [
      {:message_completed,
       %{message: %Message{role: :assistant, content: ""}, finish_reason: reason}}
    ]
  end

  # Track whether any :text_delta has been emitted so we know whether to
  # prepend :text_completed at close. Also accumulate the text so
  # :text_completed and :message_completed carry the full content.
  defp update_emitted_text(acc, events) do
    Enum.reduce(events, acc, fn
      {:text_delta, %{delta: d}}, a when is_binary(d) ->
        %{a | emitted_text?: true, accumulated_text: a.accumulated_text <> d}

      _, a ->
        a
    end)
  end

  # Cleanup observer increment. Runs on normal termination (including halt
  # via Enum.take/2). Does not run on brutal :kill (documented caveat).
  defp after_fun(%{cleanup_observer: nil}), do: :ok

  defp after_fun(%{cleanup_observer: observer}) do
    :counters.add(observer, 1, 1)
    :ok
  rescue
    # If the observer isn't a counters ref, swallow silently — this is a
    # test-only hook and a bogus observer value shouldn't mask the real
    # test failure.
    _ -> :ok
  end

  # ---------------------------------------------------------------------------
  # Internals — cursor management (shared by generate/2 and stream/2)
  # ---------------------------------------------------------------------------

  # Returns the CURRENT index and advances the stored cursor by one. When
  # `adapter_opts[:script_cursor]` is a pid, the Agent is the source of
  # truth; otherwise the process dictionary is.
  defp advance_cursor(scripts, adapter_opts) do
    case Keyword.get(adapter_opts, :script_cursor) do
      nil ->
        advance_process_dict_cursor(scripts)

      pid when is_pid(pid) ->
        Agent.get_and_update(pid, fn i -> {i, i + 1} end)
    end
  end

  defp advance_process_dict_cursor(scripts) do
    key = {:allm_fake_cursor, :erlang.phash2(scripts)}
    current = Process.get(key, 0)
    Process.put(key, current + 1)
    current
  end
end