Skip to main content

lib/skuld/effects/cell.ex

defmodule Skuld.Effects.Cell do
  @moduledoc """
  Single-writer, multi-reader mutable state for concurrent fibers.

  Any fiber can read a Cell; the first fiber to write to a tag claims
  ownership. Only that fiber may write again — writes from other fibers
  error.

  All operations require an explicit tag. There is no default tag.

  ## Usage

      comp do
        # Any fiber can read
        value <- Cell.get(:results)

        # First write claims ownership
        _ <- Cell.put(:results, data)

        # Re-writes by owner are fine
        _ <- Cell.put(:results, new_data)

        # Write from non-owner raises
      end
      |> Cell.with_handler()
      |> FiberPool.with_handler()
      |> Comp.run()
  """

  @behaviour Skuld.Comp.ITotalLinearHandler

  use Skuld.Comp.DefOp

  alias Skuld.Comp
  alias Skuld.Comp.Env
  alias Skuld.Comp.Types
  alias Skuld.Effects.Channel
  alias Skuld.Effects.FiberPool

  @cell_prefix :cell
  @owner_prefix :cell_owner
  @watcher_prefix :cell_watcher

  def_op get(tag)
  def_op put(tag, value)

  @get_op @__get_op__
  @put_op @__put_op__

  @doc """
  Install the Cell handler for a computation.

  Must be installed inside the FiberPool handler stack (after
  `FiberPool.with_handler/1`) so that `FiberPool.current_fiber_id/1`
  can identify the writing fiber.
  """
  @spec with_handler(Types.computation()) :: Types.computation()
  def with_handler(computation) do
    Comp.with_handler(computation, __MODULE__, &handle/3)
  end

  @doc """
  Watch a Cell tag, returning a Channel that will deliver the value.

  Returns a `Channel.Handle` for a capacity-1 channel:
  - If the Cell has already been written to, the channel contains the
    current value and is closed.
  - If the Cell has not been written to, the channel is empty. When
    `Cell.put/2` writes to the tag, the value is delivered to all
    watcher channels and each channel is closed.

  The returned channel never blocks the writer — capacity 1 with
  immediate close means `Cell.put` always succeeds without suspension.

  Requires `Channel.with_handler/1` to be installed.

  ## Example

      comp do
        watch_fiber <- FiberPool.fiber(comp do
          ch <- Cell.watch(:results)
          Channel.take(ch)
        end)

        writer <- FiberPool.fiber(comp do
          _ <- Cell.put(:results, [1, 2, 3])
          :ok
        end)

        result <- FiberPool.await!(watch_fiber)
        # result == {:ok, [1, 2, 3]}
      end
      |> Cell.with_handler()
      |> Channel.with_handler()
      |> FiberPool.with_handler()
      |> Comp.run()
  """
  @spec watch(term()) :: Types.computation()
  def watch(tag) do
    fn env, k ->
      {handle, env} = Channel.Ops.create(env, 1)

      if Map.has_key?(env.state, {@cell_prefix, tag}) do
        value = Env.get_state(env, {@cell_prefix, tag})
        env = Channel.Ops.put_and_close(env, handle, value)
        k.(handle, env)
      else
        watchers = Env.get_state(env, {@watcher_prefix, tag}, [])
        env = Env.put_state(env, {@watcher_prefix, tag}, [handle | watchers])
        k.(handle, env)
      end
    end
  end

  @impl Skuld.Comp.ITotalLinearHandler
  def handle({@get_op, tag}, env) do
    value = Env.get_state(env, {@cell_prefix, tag})
    {value, env}
  end

  @impl Skuld.Comp.ITotalLinearHandler
  def handle({@put_op, tag, value}, env) do
    owner = Env.get_state(env, {@owner_prefix, tag})
    current = FiberPool.current_fiber_id(env)

    env =
      case owner do
        nil ->
          Env.put_state(env, {@owner_prefix, tag}, current)

        ^current ->
          env

        _other ->
          raise "Cell tag #{inspect(tag)} is owned by fiber #{inspect(owner)}, " <>
                  "cannot write from fiber #{inspect(current)}"
      end

    old = Env.get_state(env, {@cell_prefix, tag})
    env = Env.put_state(env, {@cell_prefix, tag}, value)
    env = wake_watchers(env, tag, value)
    change = Skuld.Effects.State.Change.new(old, value)
    {change, env}
  end

  defp handle(args, env, k) do
    {value, env2} = handle(args, env)
    k.(value, env2)
  end

  defp wake_watchers(env, tag, value) do
    watchers = Env.get_state(env, {@watcher_prefix, tag}, [])

    if watchers == [] do
      env
    else
      env =
        Enum.reduce(watchers, env, fn handle, acc_env ->
          Channel.Ops.put_and_close(acc_env, handle, value)
        end)

      Env.put_state(env, {@watcher_prefix, tag}, [])
    end
  end
end