Skip to main content

lib/skuld/comp/internal_suspend.ex

defmodule Skuld.Comp.InternalSuspend do
  @moduledoc """
  Internal suspension for Env-aware code.

  Unlike `Comp.ExternalSuspend` (which closes over env for external callbacks),
  this suspension receives env at resume time, allowing callers to
  supply an updated env when resuming.

  ## Two Kinds of Suspension

  **External suspension (`Comp.ExternalSuspend`)**:
  - `resume :: (val) -> {result, env}`
  - Closes over env at suspension point
  - For passing to external code that doesn't understand Skuld's env threading

  **Internal suspension (`Comp.InternalSuspend`)**:
  - `resume :: (val, env) -> {result, env}`
  - Does NOT close over env - receives it at resume time
  - For schedulers/handlers that need to thread updated env through suspensions

  ## Payload Types

  The `payload` field contains a struct that identifies the kind of suspension:
  - `Batch` - batchable operations (DB fetches, etc.)
  - `Channel` - channel put/take operations
  - `Await` - fiber await operations
  """

  alias Skuld.Comp.Types
  alias Skuld.Coroutine.Handle

  @type payload ::
          __MODULE__.Batch.t()
          | __MODULE__.Channel.t()
          | __MODULE__.Await.t()
          | __MODULE__.FiberYield.t()

  @type t :: %__MODULE__{
          resume: (term(), Types.env() -> {term(), Types.env()}),
          payload: payload()
        }

  defstruct [:resume, :payload]

  #############################################################################
  ## Payload Structs
  #############################################################################

  # Payload for batch operation suspensions.
  #
  # When a fiber performs a batchable operation (like DB.fetch), it suspends
  # with this payload. The scheduler collects these suspensions, groups them
  # by batch_key, and executes them together.
  defmodule Batch do
    @moduledoc false

    @type t :: %__MODULE__{
            batch_key: term(),
            op: term(),
            request_id: reference()
          }

    defstruct [:batch_key, :op, :request_id]
  end

  # Payload for channel operation suspensions.
  #
  # When a fiber performs a blocking channel operation (put on full buffer,
  # take on empty buffer), it suspends with this payload. The scheduler
  # resumes the fiber when the channel state changes.
  defmodule Channel do
    @moduledoc false

    @type operation :: :put | :take

    @type t :: %__MODULE__{
            channel_id: reference(),
            operation: operation(),
            item: term() | nil
          }

    defstruct [:channel_id, :operation, :item]
  end

  # Payload for fiber await suspensions.
  #
  # When a fiber awaits other fibers, it suspends with this payload.
  # The scheduler resumes the fiber when the awaited fibers complete.
  defmodule Await do
    @moduledoc false

    @type mode :: :one | :all | :any

    @type t :: %__MODULE__{
            handles: [Handle.t()],
            mode: mode(),
            consume_ids: [reference()]
          }

    defstruct [:handles, :mode, consume_ids: []]
  end

  # Payload for fiber yield suspensions.
  #
  # When a fiber calls `Yield.yield(value)` in a FiberPool server context,
  # the yield is surfaced as an InternalSuspend with this payload. The
  # scheduler suspends just this fiber while other fibers continue running.
  # The server layer routes the yield value to the caller.
  #
  # When `notify: true`, the yield is fire-and-forget — the server forwards
  # the value to the caller but the fiber auto-resumes on the next scheduler
  # round without waiting for explicit caller input.
  defmodule FiberYield do
    @moduledoc false

    @type t :: %__MODULE__{value: term(), notify: boolean()}

    defstruct [:value, notify: false]
  end

  #############################################################################
  ## Constructors
  #############################################################################

  @doc """
  Create a batch operation suspension.
  """
  @spec batch(term(), term(), reference(), Types.k()) :: t()
  def batch(batch_key, op, request_id, resume) do
    %__MODULE__{
      resume: resume,
      payload: %Batch{batch_key: batch_key, op: op, request_id: request_id}
    }
  end

  @doc """
  Create a channel put suspension.
  """
  @spec channel_put(reference(), term(), Types.k()) :: t()
  def channel_put(channel_id, item, resume) do
    %__MODULE__{
      resume: resume,
      payload: %Channel{channel_id: channel_id, operation: :put, item: item}
    }
  end

  @doc """
  Create a channel take suspension.
  """
  @spec channel_take(reference(), Types.k()) :: t()
  def channel_take(channel_id, resume) do
    %__MODULE__{
      resume: resume,
      payload: %Channel{channel_id: channel_id, operation: :take, item: nil}
    }
  end

  @doc """
  Create an await-one suspension.
  """
  @spec await_one(Handle.t(), Types.k(), keyword()) :: t()
  def await_one(handle, resume, opts \\ []) do
    consume = Keyword.get(opts, :consume, false)

    %__MODULE__{
      resume: resume,
      payload: %Await{
        handles: [handle],
        mode: :one,
        consume_ids: if(consume, do: [handle.id], else: [])
      }
    }
  end

  @doc """
  Create an await-all suspension.
  """
  @spec await_all([Handle.t()], Types.k()) :: t()
  def await_all(handles, resume) do
    %__MODULE__{
      resume: resume,
      payload: %Await{handles: handles, mode: :all, consume_ids: []}
    }
  end

  @doc """
  Create an await-any suspension.
  """
  @spec await_any([Handle.t()], Types.k()) :: t()
  def await_any(handles, resume) do
    %__MODULE__{
      resume: resume,
      payload: %Await{handles: handles, mode: :any, consume_ids: []}
    }
  end

  @doc """
  Create a fiber yield suspension.
  """
  @spec fiber_yield(term(), Types.k()) :: t()
  def fiber_yield(value, resume) do
    %__MODULE__{
      resume: resume,
      payload: %FiberYield{value: value}
    }
  end

  @doc """
  Create a fire-and-forget fiber notification.

  Like `fiber_yield/2` but sets `notify: true` so the scheduler
  auto-resumes the fiber after forwarding the value to the caller.
  """
  @spec fiber_notify(term(), Types.k()) :: t()
  def fiber_notify(value, resume) do
    %__MODULE__{
      resume: resume,
      payload: %FiberYield{value: value, notify: true}
    }
  end
end

defimpl Skuld.Comp.ISentinel, for: Skuld.Comp.InternalSuspend do
  alias Skuld.Comp.InternalSuspend.Await
  alias Skuld.Comp.InternalSuspend.Batch
  alias Skuld.Comp.InternalSuspend.Channel
  alias Skuld.Comp.InternalSuspend.FiberYield

  def run(suspend, env) do
    {drained, drained_env} = Skuld.FiberPool.Main.drain_pending(suspend, env)
    Skuld.Comp.ISentinel.run(drained, drained_env)
  end

  def run!(_suspend) do
    raise "InternalSuspend must be handled by a scheduler"
  end

  def sentinel?(_), do: true
  def suspend?(_), do: true
  def error?(_), do: false

  def serializable_payload(%{payload: %Batch{batch_key: batch_key, op: op, request_id: id}}) do
    %{type: :batch, batch_key: batch_key, op: op, request_id: id}
  end

  def serializable_payload(%{payload: %Channel{channel_id: ch_id, operation: op, item: item}}) do
    %{type: :channel, channel_id: ch_id, operation: op, item: item}
  end

  def serializable_payload(%{payload: %Await{handles: handles, mode: mode}}) do
    %{type: :await, handle_ids: Enum.map(handles, & &1.id), mode: mode}
  end

  def serializable_payload(%{payload: %FiberYield{value: value, notify: notify}}) do
    %{type: :fiber_yield, value: value, notify: notify}
  end
end