Skip to main content

lib/bloccs.ex

defmodule Bloccs do
  @moduledoc """
  A typed declarative IR for Agentic Computation Graphs (ACGs) that compiles
  to a Broadway supervision tree on the BEAM.

  See `Bloccs.Manifest.Node`, `Bloccs.Manifest.Network`, `Bloccs.Parser`,
  `Bloccs.Validator`, and `Bloccs.Compiler` for the v0.1 surface.

  ## Request/response

  A network is otherwise fire-and-forget: a message goes in and side effects come
  out, but the caller gets nothing back. `call/4` and `cast/4` add request/response
  on top — without making the pipeline synchronous — by correlating a **reply**
  back to the caller via the message `trace_id` (`Bloccs.Lineage`, `Bloccs.Collector`).

  To use them, a terminal node must be declared `reply = true` in its manifest and
  emit the response on an out-port:

      [node]
      id = "price"
      kind = "transform"
      reply = true

  Then:

      {:ok, %{"total" => 42}} = Bloccs.call(:checkout, :request, %{"items" => [...]})

  `call/4` blocks until that node reports (or the timeout fires); `cast/4` returns
  the `trace_id` immediately and, with `send_result: true`, later messages the
  caller `{:bloccs_reply, trace_id, result}`. Both target an **exposed input port**
  (`[expose].in` in the network manifest). See `Bloccs.Collector` for limitations
  (first-wins aggregation; correlation does not survive a `[batch]`/`[join]`).
  """

  alias Bloccs.{Collector, Discovery, Lineage, Producer, Router}

  @type network_id :: atom()

  @doc """
  Returns the current bloccs library version.

  ## Examples

      iex> is_binary(Bloccs.version())
      true

  """
  @spec version() :: String.t()
  def version do
    Application.spec(:bloccs, :vsn) |> to_string()
  end

  @doc """
  Push `payload` into the exposed input port `in_port` of running network
  `network_id` and **block** until a `reply = true` node responds.

  Returns `{:ok, reply_payload}` on success, or `{:error, reason}` where reason
  is one of:

  - `%Bloccs.EffectError{}` — a node on the request's trace failed terminally
    (bad inbound schema, the node raised / returned `{:error, _}` / timed out, or
    downstream delivery failed). The struct names the node and phase.
  - `:timeout` — no reply *and* no error within `:timeout` ms (default `5_000`),
    e.g. the request was filtered/dropped before reaching the reply node.
  - `:no_producer` | `:unknown_network` | `{:unknown_port, in_port}` — the
    request could not be admitted.

  The network keeps running asynchronously; only the calling process waits.
  """
  @spec call(network_id(), atom(), term(), keyword()) ::
          {:ok, term()} | {:error, Bloccs.EffectError.t() | term()}
  def call(network_id, in_port, payload, opts \\ []) do
    timeout = Keyword.get(opts, :timeout, 5_000)

    with {:ok, producer, trace_id, meta} <- prepare(network_id, in_port),
         :ok <- Collector.register(network_id, trace_id, timeout),
         :ok <- push(producer, payload, meta) do
      Collector.await(network_id, trace_id)
    end
  end

  @doc """
  Push `payload` into the exposed input port `in_port` of running network
  `network_id` and return immediately with `{:ok, trace_id}` — the correlation id
  for the request.

  With `send_result: true`, the calling process is later sent
  `{:bloccs_reply, trace_id, result}` where `result` is `{:ok, payload}`,
  `{:error, %Bloccs.EffectError{}}`, or `{:error, :timeout}` (after `:timeout` ms,
  default `5_000`). Without it, the request is fire-and-forget and the returned
  `trace_id` is useful only for correlating telemetry / traces.
  """
  @spec cast(network_id(), atom(), term(), keyword()) ::
          {:ok, Lineage.id()} | {:error, term()}
  def cast(network_id, in_port, payload, opts \\ []) do
    with {:ok, producer, trace_id, meta} <- prepare(network_id, in_port) do
      if Keyword.get(opts, :send_result, false) do
        Collector.register_async(network_id, trace_id, self(), Keyword.get(opts, :timeout, 5_000))
      end

      case push(producer, payload, meta) do
        :ok -> {:ok, trace_id}
        {:error, _} = err -> err
      end
    end
  end

  # Resolve the entry producer for an exposed input port, and mint a fresh root
  # lineage whose trace_id is the correlation key the reply is matched on.
  defp prepare(network_id, in_port) do
    with {:ok, %{supervisor: sup}} <- lookup_network(network_id),
         {:ok, {node, port}} <- fetch_exposed_in(sup.__bloccs_introspect__(), in_port) do
      lineage = Lineage.root()
      producer = Router.producer_name(network_id, node, port)
      {:ok, producer, lineage.trace_id, %{Lineage.key() => lineage}}
    end
  end

  defp lookup_network(network_id) do
    case Discovery.lookup(network_id) do
      {:ok, entry} -> {:ok, entry}
      :error -> {:error, :unknown_network}
    end
  end

  defp fetch_exposed_in(%{expose: %{in: ins}}, in_port) do
    case Map.fetch(ins, in_port) do
      {:ok, {_node, _port} = endpoint} -> {:ok, endpoint}
      :error -> {:error, {:unknown_port, in_port}}
    end
  end

  defp push(producer, payload, meta) do
    case Producer.push(producer, payload, meta) do
      :ok -> :ok
      :no_producer -> {:error, :no_producer}
      {:error, _} = err -> err
    end
  end
end