# Internal state for a Channel.
#
# Channels are bounded buffers with suspend semantics:
# - `put` suspends when buffer is full (backpressure)
# - `take` suspends when buffer is empty (flow control)
# - Error state propagates to all consumers (sticky error)
#
# ## Fields
#
# - `id` - Unique channel identifier
# - `capacity` - Maximum buffer size (positive integer)
# - `buffer` - Queue of buffered items
# - `status` - `:open`, `:closed`, or `{:error, reason}`
# - `waiting_puts` - List of `{fiber_id, item}` waiting for space
# - `waiting_takes` - List of `fiber_id` waiting for items
#
# ## Status Transitions
#
# ```
# ┌─────────┐
# │ :open │ ───── put/take work normally
# └────┬────┘
# │
# close() or error()
# │
# ┌────┴─────────────────┐
# │ │
# ▼ ▼
# ┌─────────┐ ┌──────────────────┐
# │ :closed │ │ {:error, reason} │
# └─────────┘ └──────────────────┘
# │ │
# ▼ ▼
# take returns take ALWAYS returns
# :closed when {:error, reason}
# buffer empty (error is sticky!)
# ```
defmodule Skuld.Effects.Channel.ChannelState do
@moduledoc false
@type channel_id :: term()
@type fiber_id :: term()
@type status :: :open | :closed | {:error, term()}
@type waiting_put :: {fiber_id(), term()}
@type waiting_take :: fiber_id()
@type t :: %__MODULE__{
id: channel_id(),
capacity: non_neg_integer(),
buffer: :queue.queue(term()),
status: status(),
waiting_puts: [waiting_put()],
waiting_takes: [waiting_take()]
}
defstruct [
:id,
:capacity,
:buffer,
:status,
:waiting_puts,
:waiting_takes
]
@doc """
Create a new channel state with the given capacity.
A capacity of 0 creates a rendezvous/synchronous channel where put always
blocks until there is a matching take (direct handoff, no buffering).
"""
@spec new(non_neg_integer(), keyword()) :: t()
def new(capacity, opts \\ []) when is_integer(capacity) and capacity >= 0 do
%__MODULE__{
id: Keyword.get(opts, :id),
capacity: capacity,
buffer: :queue.new(),
status: :open,
waiting_puts: [],
waiting_takes: []
}
end
#############################################################################
## Buffer Operations
#############################################################################
@doc """
Check if the buffer is full.
"""
@spec buffer_full?(t()) :: boolean()
def buffer_full?(state) do
:queue.len(state.buffer) >= state.capacity
end
@doc """
Check if the buffer is empty.
"""
@spec buffer_empty?(t()) :: boolean()
def buffer_empty?(state) do
:queue.is_empty(state.buffer)
end
@doc """
Get the number of items in the buffer.
"""
@spec buffer_size(t()) :: non_neg_integer()
def buffer_size(state) do
:queue.len(state.buffer)
end
@doc """
Add an item to the buffer (back of queue).
"""
@spec enqueue(t(), term()) :: t()
def enqueue(state, item) do
%{state | buffer: :queue.in(item, state.buffer)}
end
@doc """
Remove and return the first item from the buffer.
Returns `{:ok, item, state}` or `:empty`.
"""
@spec dequeue(t()) :: {:ok, term(), t()} | :empty
def dequeue(state) do
case :queue.out(state.buffer) do
{{:value, item}, buffer} ->
{:ok, item, %{state | buffer: buffer}}
{:empty, _buffer} ->
:empty
end
end
@doc """
Peek at the first item without removing it.
Returns `{:ok, item}` or `:empty`.
"""
@spec peek(t()) :: {:ok, term()} | :empty
def peek(state) do
case :queue.peek(state.buffer) do
{:value, item} -> {:ok, item}
:empty -> :empty
end
end
#############################################################################
## Waiting Put Management
#############################################################################
@doc """
Add a fiber to the waiting puts list.
"""
@spec add_waiting_put(t(), fiber_id(), term()) :: t()
def add_waiting_put(state, fiber_id, item) do
%{state | waiting_puts: state.waiting_puts ++ [{fiber_id, item}]}
end
@doc """
Check if there are fibers waiting to put.
"""
@spec has_waiting_puts?(t()) :: boolean()
def has_waiting_puts?(state) do
state.waiting_puts != []
end
@doc """
Pop the first waiting put.
Returns `{:ok, {fiber_id, item}, state}` or `:empty`.
"""
@spec pop_waiting_put(t()) :: {:ok, waiting_put(), t()} | :empty
def pop_waiting_put(state) do
case state.waiting_puts do
[first | rest] ->
{:ok, first, %{state | waiting_puts: rest}}
[] ->
:empty
end
end
@doc """
Get all waiting puts and clear the list.
"""
@spec pop_all_waiting_puts(t()) :: {[waiting_put()], t()}
def pop_all_waiting_puts(state) do
{state.waiting_puts, %{state | waiting_puts: []}}
end
#############################################################################
## Waiting Take Management
#############################################################################
@doc """
Add a fiber to the waiting takes list.
"""
@spec add_waiting_take(t(), fiber_id()) :: t()
def add_waiting_take(state, fiber_id) do
%{state | waiting_takes: state.waiting_takes ++ [fiber_id]}
end
@doc """
Check if there are fibers waiting to take.
"""
@spec has_waiting_takes?(t()) :: boolean()
def has_waiting_takes?(state) do
state.waiting_takes != []
end
@doc """
Pop the first waiting take.
Returns `{:ok, fiber_id, state}` or `:empty`.
"""
@spec pop_waiting_take(t()) :: {:ok, waiting_take(), t()} | :empty
def pop_waiting_take(state) do
case state.waiting_takes do
[first | rest] ->
{:ok, first, %{state | waiting_takes: rest}}
[] ->
:empty
end
end
@doc """
Get all waiting takes and clear the list.
"""
@spec pop_all_waiting_takes(t()) :: {[waiting_take()], t()}
def pop_all_waiting_takes(state) do
{state.waiting_takes, %{state | waiting_takes: []}}
end
#############################################################################
## Status Management
#############################################################################
@doc """
Check if the channel is open.
"""
@spec open?(t()) :: boolean()
def open?(state) do
state.status == :open
end
@doc """
Check if the channel is closed.
"""
@spec closed?(t()) :: boolean()
def closed?(state) do
state.status == :closed
end
@doc """
Check if the channel is in error state.
"""
@spec errored?(t()) :: boolean()
def errored?(state) do
match?({:error, _}, state.status)
end
@doc """
Close the channel (normal completion).
Only transitions from `:open` state.
"""
@spec close(t()) :: t()
def close(state) do
case state.status do
:open -> %{state | status: :closed}
_ -> state
end
end
@doc """
Put the channel into error state.
Only transitions from `:open` state (first error wins).
"""
@spec error(t(), term()) :: t()
def error(state, reason) do
case state.status do
:open -> %{state | status: {:error, reason}}
_ -> state
end
end
#############################################################################
## Inspection
#############################################################################
@doc """
Get channel statistics for debugging/metrics.
"""
@spec stats(t()) :: map()
def stats(state) do
%{
id: state.id,
capacity: state.capacity,
buffer_size: buffer_size(state),
status: state.status,
waiting_puts: length(state.waiting_puts),
waiting_takes: length(state.waiting_takes)
}
end
end