Skip to main content

lib/skuld/async_coroutine.ex

defmodule Skuld.AsyncCoroutine do
  @moduledoc """
  Run a computation in a separate process, bridging yields, throws, and results
  back to the calling process via messages.

  This is for running effectful computations from non-effectful code (e.g., LiveView).
  If you're inside a computation and want concurrency, use `Skuld.Effects.FiberPool` instead.

  ## Messages

  The runner sends messages to the caller in the form `{AsyncCoroutine, tag, result}`:

  - `%ExternalSuspend{value: v, data: d, resume: nil}` - computation yielded, waiting for resume
  - `%Throw{error: e}` - computation threw an error
  - `%Cancelled{reason: r}` - computation was cancelled
  - Any other value - computation completed successfully

  The `ExternalSuspend.data` field contains any decorations added by scoped effects
  (e.g., EffectLogger attaches its log here). The `resume` field is always `nil`
  in IPC messages since resume functions can't be sent between processes.

  ## Example

      # Build computation with handlers
      computation =
        comp do
          result <- Command.execute(%CreateTodo{title: "Buy milk"})
          result
        end
        |> Command.with_handler(&DomainHandler.handle/1)
        |> Reader.with_handler(context, tag: CommandContext)
        |> EctoPersist.with_handler(Repo)

      # Start async - will add Yield and Throw handlers
      {:ok, runner} = AsyncCoroutine.run(computation, :create_todo)

      # Or start sync for fast-yielding computations
      {:ok, runner, %ExternalSuspend{value: :ready}} =
        AsyncCoroutine.run_sync(computation, :create_todo)

      # In handle_info - single clause handles all messages for a tag:
      def handle_info({AsyncCoroutine, :create_todo, result}, socket) do
        case result do
          %ExternalSuspend{value: value, data: data} ->
            handle_yield(value, data, socket)

          %Throw{error: error} ->
            handle_error(error, socket)

          %Cancelled{reason: reason} ->
            handle_cancelled(reason, socket)

          value ->
            handle_success(value, socket)
        end
      end

  ## With Yields

      # Computation that yields for user input
      computation =
        comp do
          name <- Yield.yield(:get_name)
          email <- Yield.yield(:get_email)
          create_user(name, email)
        end
        |> ...handlers...

      {:ok, runner} = AsyncCoroutine.run(computation, :create_user)

      # Handle yields
      def handle_info({AsyncCoroutine, :create_user, %ExternalSuspend{value: :get_name}}, socket) do
        # Maybe wait for user input, then:
        AsyncCoroutine.run(runner, "Alice")
        {:noreply, socket}
      end

      def handle_info({AsyncCoroutine, :create_user, %ExternalSuspend{value: :get_email}}, socket) do
        AsyncCoroutine.run(runner, "alice@example.com")
        {:noreply, socket}
      end

      def handle_info({AsyncCoroutine, :create_user, {:ok, user}}, socket) do
        {:noreply, assign(socket, user: user)}
      end
  """

  alias Skuld.Comp.Cancelled
  alias Skuld.Comp.Env
  alias Skuld.Comp.ExternalSuspend
  alias Skuld.Comp.Throw, as: ThrowStruct
  alias Skuld.Coroutine
  alias Skuld.Coroutine.Error
  alias Skuld.Effects.Throw
  alias Skuld.Effects.Yield

  defstruct [:tag, :ref, :pid, :monitor_ref, :caller]

  @type t :: %__MODULE__{
          tag: term(),
          ref: reference(),
          pid: pid(),
          monitor_ref: reference(),
          caller: pid()
        }

  @doc """
  Run a computation in a separate process.

  The computation will have `Throw.with_handler/1` and `Yield.with_handler/1`
  added automatically (outermost). Add your other handlers before calling run.

  ## Arguments

  - `computation` — the effectful computation to run
  - `tag` — identifier for messages, e.g. `:create_todo`
  - `opts`:
    - `:caller` — process to send messages to (default: `self()`)
    - `:link` — whether to link the runner process (default: `true`)

  ## Returns

  `{:ok, runner}` where runner is used with `run/3` and `cancel/1`.
  """
  @spec run(Skuld.Comp.Types.computation(), term()) :: {:ok, t()}
  def run(computation, tag) when is_function(computation, 2), do: run(computation, tag, [])

  @spec run(t(), term(), keyword()) :: :ok
  def run(%__MODULE__{} = runner, value), do: run(runner, value, [])

  @spec run(Skuld.Comp.Types.computation(), term(), keyword()) :: {:ok, t()}
  def run(computation, tag, opts) when is_function(computation, 2) do
    caller = Keyword.get(opts, :caller, self())
    link? = Keyword.get(opts, :link, true)
    ref = make_ref()

    spawn_fn = if link?, do: &spawn_link/1, else: &spawn/1

    pid =
      spawn_fn.(fn ->
        run_bridged(computation, caller, tag, ref)
      end)

    monitor_ref = Process.monitor(pid)

    {:ok, %__MODULE__{tag: tag, ref: ref, pid: pid, monitor_ref: monitor_ref, caller: caller}}
  end

  def run(%__MODULE__{ref: ref, pid: pid}, value, opts) do
    reply_to = Keyword.get(opts, :reply_to)
    send(pid, {:async_resume, ref, value, reply_to})
    :ok
  end

  @doc """
  Run a computation and wait synchronously for the first response.

  Use this when you know the computation will quickly yield after setup
  (e.g., a command processor that immediately yields waiting for commands).
  Avoids dealing with async messages for the initial handshake.

  ## Arguments

  - `computation` — the effectful computation to run
  - `tag` — identifier for messages
  - `opts`:
    - `:timeout` — maximum time to wait in ms (default: 5000)
    - `:caller`, `:link` — same as `run/3`

  ## Returns

  - `{:ok, runner, result}` where result is one of:
    - `%ExternalSuspend{value: v, data: d}` - computation yielded
    - `%Throw{error: e}` - computation threw
    - `%Cancelled{reason: r}` - computation cancelled
    - Any other value - computation completed
  - `{:error, :timeout}` - timed out waiting for first response

  ## Example

      # Command processor that yields immediately for commands
      {:ok, runner, %ExternalSuspend{value: :ready}} =
        command_processor
        |> Reader.with_handler(context)
        |> AsyncCoroutine.run_sync(:processor)

      # Now resume synchronously for quick commands
      %ExternalSuspend{value: :ready} = AsyncCoroutine.run_sync(runner, %QuickCommand{})
  """
  @spec run_sync(Skuld.Comp.Types.computation(), term()) ::
          {:ok, t(), term()}
          | {:error, :timeout}
  def run_sync(computation, tag) when is_function(computation, 2),
    do: run_sync(computation, tag, [])

  def run_sync(%__MODULE__{} = runner, value), do: run_sync(runner, value, [])

  @spec run_sync(Skuld.Comp.Types.computation(), term(), keyword()) ::
          {:ok, t(), term()}
          | {:error, :timeout}
  def run_sync(computation, tag, opts) when is_function(computation, 2) do
    timeout = Keyword.get(opts, :timeout, 5000)

    {:ok, runner} = run(computation, tag, opts)

    receive do
      {__MODULE__, ^tag, result} -> {:ok, runner, result}
    after
      timeout -> {:error, :timeout}
    end
  end

  @doc """
  Resume a yielded computation and wait synchronously for the next response.

  Blocks until the computation yields again, completes, throws, or times out.

  This can be called from any process - the response will be sent to the calling
  process, not necessarily the original caller from `run/2`.

  ## Options

  - `:timeout` - Maximum time to wait in ms (default: 5000)

  ## Returns

  - `%ExternalSuspend{value: v, data: d}` - computation yielded again
  - `%Throw{error: e}` - computation threw
  - `%Cancelled{reason: r}` - computation cancelled
  - Any other value - computation completed
  - `{:error, :timeout}` - timed out waiting for response

  ## Example

      {:ok, runner} = AsyncCoroutine.run(computation, :cmd)

      # First yield arrives via message
      receive do
        {AsyncCoroutine, :cmd, %ExternalSuspend{value: :ready}} -> :ok
      end

      # Now resume and wait synchronously
      case AsyncCoroutine.run_sync(runner, %SomeCommand{}) do
        %ExternalSuspend{value: :ready} -> # ready for next command
        %Throw{error: e} -> # something went wrong
        %Cancelled{reason: r} -> # was cancelled
        value -> # computation finished with value
      end
  """
  @spec run_sync(t(), term(), keyword()) ::
          ExternalSuspend.t()
          | ThrowStruct.t()
          | Cancelled.t()
          | term()
          | {:error, :timeout}
  def run_sync(%__MODULE__{ref: ref, pid: pid, tag: tag}, value, opts) do
    timeout = Keyword.get(opts, :timeout, 5000)
    send(pid, {:async_resume, ref, value, self()})

    receive do
      {__MODULE__, ^tag, result} -> result
    after
      timeout -> {:error, :timeout}
    end
  end

  @doc """
  Cancel a running computation (async).

  Sends a cancel signal to the computation. The computation will invoke `leave_scope`
  for all active scoped effects (allowing cleanup), then send a
  `{AsyncCoroutine, tag, %Cancelled{reason: :cancelled}}` message to the caller.

  Use `cancel_sync/2` if you need to wait for the cancellation to complete.
  """
  @spec cancel(t()) :: :ok
  def cancel(%__MODULE__{ref: ref, pid: pid, monitor_ref: monitor_ref}) do
    Process.demonitor(monitor_ref, [:flush])
    send(pid, {:async_cancel, ref})
    :ok
  end

  @doc """
  Cancel a running computation and wait for it to complete.

  Like `cancel/1`, but blocks until the computation has finished its cleanup
  (invoking `leave_scope` for all active scoped effects) and returns the result.

  This can be called from any process - the response will be sent to the calling
  process, not necessarily the original caller from `start/2`.

  ## Options

  - `:timeout` - Maximum time to wait in ms (default: 5000)

  ## Returns

  - `%Cancelled{reason: :cancelled}` - computation was cancelled successfully
  - `{:error, :timeout}` - timed out waiting for cancellation to complete

  ## Example

      {:ok, runner, %ExternalSuspend{value: :ready}} =
        AsyncCoroutine.run_sync(computation, :worker)

      # Cancel and wait for cleanup to finish
      %Cancelled{reason: :cancelled} = AsyncCoroutine.cancel_sync(runner)
  """
  @spec cancel_sync(t(), keyword()) :: Cancelled.t() | {:error, :timeout}
  def cancel_sync(%__MODULE__{ref: ref, pid: pid, monitor_ref: monitor_ref, tag: tag}, opts \\ []) do
    timeout = Keyword.get(opts, :timeout, 5000)

    Process.demonitor(monitor_ref, [:flush])
    # Send cancel with self() as the implicit recipient (via process dictionary override)
    # But actually we need to tell the runner to reply to us, not the original caller
    send(pid, {:async_cancel_sync, ref, self()})

    receive do
      {__MODULE__, ^tag, result} -> result
    after
      timeout -> {:error, :timeout}
    end
  end

  # Child process entry point
  defp run_bridged(computation, caller, tag, ref) do
    comp =
      computation
      |> Throw.with_handler()
      |> Yield.with_handler()

    case Coroutine.new(comp, Env.new()) |> Coroutine.run() do
      %Coroutine.ExternalSuspended{} = fiber ->
        run_yield_loop(fiber, caller, tag, ref, caller)

      other ->
        send(caller, {__MODULE__, tag, coroutine_to_ipc(other)})
    end
  end

  # Main yield/resume loop
  # - reply_to: where to send THIS yield's suspend message
  # - original_caller: default destination (reset target after override)
  defp run_yield_loop(
         %Coroutine.ExternalSuspended{value: value, data: data} = fiber,
         original_caller,
         tag,
         ref,
         reply_to
       ) do
    ipc_suspend = %ExternalSuspend{value: value, data: data, resume: nil}
    send(reply_to, {__MODULE__, tag, ipc_suspend})

    receive do
      {:async_resume, ^ref, value, nil} ->
        case Coroutine.run(fiber, value) do
          %Coroutine.ExternalSuspended{} = new_fiber ->
            run_yield_loop(new_fiber, original_caller, tag, ref, original_caller)

          other ->
            send(original_caller, {__MODULE__, tag, coroutine_to_ipc(other)})
        end

      {:async_resume, ^ref, value, override} when not is_nil(override) ->
        case Coroutine.run(fiber, value) do
          %Coroutine.ExternalSuspended{} = new_fiber ->
            run_yield_loop(new_fiber, original_caller, tag, ref, override)

          other ->
            send(override, {__MODULE__, tag, coroutine_to_ipc(other)})
        end

      {:async_cancel, ^ref} ->
        cancelled = Coroutine.cancel(fiber, :cancelled) |> coroutine_to_ipc()
        send(reply_to, {__MODULE__, tag, cancelled})

      {:async_cancel_sync, ^ref, sync_reply_to} ->
        cancelled = Coroutine.cancel(fiber, :cancelled) |> coroutine_to_ipc()
        send(sync_reply_to, {__MODULE__, tag, cancelled})
    end
  end

  #############################################################################
  ## IPC Conversion (Coroutine states → Comp sentinels)
  #############################################################################

  defp coroutine_to_ipc(%Coroutine.Completed{result: result}), do: result
  defp coroutine_to_ipc(%Coroutine.Errored{error: error}), do: error_to_throw(error)
  defp coroutine_to_ipc(%Coroutine.Cancelled{reason: reason}), do: %Cancelled{reason: reason}

  defp error_to_throw(%Error{type: :cancelled, error: reason}) do
    %Cancelled{reason: reason}
  end

  defp error_to_throw(%Error{type: :exception, error: exception, stacktrace: stacktrace}) do
    %ThrowStruct{error: %{kind: :error, payload: exception, stacktrace: stacktrace}}
  end

  defp error_to_throw(%Error{type: :throw, error: value, stacktrace: nil}) do
    %ThrowStruct{error: value}
  end

  defp error_to_throw(%Error{type: :throw, error: value, stacktrace: stacktrace}) do
    %ThrowStruct{error: %{kind: :throw, payload: value, stacktrace: stacktrace}}
  end

  defp error_to_throw(%Error{type: :exit, error: reason, stacktrace: stacktrace}) do
    %ThrowStruct{error: %{kind: :exit, payload: reason, stacktrace: stacktrace}}
  end
end