Skip to main content

examples/studio/orchestrator_agent.ex

defmodule Jido.Runic.Examples.Studio.OrchestratorAgent do
  @moduledoc """
  Orchestrator agent for the AI Research Studio.

  Provides two entrypoints for the same 5-node LLM pipeline:

  - `run/2` — Auto mode. Feeds a topic and awaits completion in one shot.
  - `run_step/2` — Step mode. Pauses between each node, providing full
    introspection (annotated graph, step history, input/output data) at
    every transition. Accepts an `:on_step` callback for UI integration.

  The workflow DAG is passed to the strategy via
  `strategy: {Jido.Runic.Strategy, workflow_fn: &build_workflow/0}` — the strategy
  receives it in `init/2` so no `set_workflow` signal is needed at runtime.

  Pipeline: PlanQueries → SimulateSearch → BuildOutline → DraftArticle → EditAndAssemble
  """

  require Logger

  use Jido.Agent,
    name: "studio_orchestrator",
    strategy: {Jido.Runic.Strategy, workflow_fn: &__MODULE__.build_workflow/0},
    schema: []

  @doc false
  @spec plugin_specs() :: [Jido.Plugin.Spec.t()]
  def plugin_specs, do: []

  alias Jido.Runic.Introspection
  alias Jido.Agent.Strategy.State, as: StratState
  alias Runic.Workflow

  alias Jido.Runic.Examples.Studio.Actions.{
    PlanQueries,
    SimulateSearch,
    BuildOutline,
    DraftArticle,
    EditAndAssemble
  }

  @doc "Build the 5-node linear Runic DAG for the research pipeline."
  def build_workflow do
    Workflow.new(name: :research_studio)
    |> Workflow.add(PlanQueries)
    |> Workflow.add(SimulateSearch, to: :plan_queries)
    |> Workflow.add(BuildOutline, to: :simulate_search)
    |> Workflow.add(DraftArticle, to: :build_outline)
    |> Workflow.add(EditAndAssemble, to: :draft_article)
  end

  @doc """
  Run the full research pipeline for a topic through an AgentServer.

  ## Options

    * `:jido` - Name of a running Jido instance (required)
    * `:timeout` - Timeout in ms for `await_completion` (default: 120_000)
    * `:debug` - Enable debug event buffer (default: true)

  ## Returns

  A map with `:topic`, `:productions`, `:facts`, `:status`, `:events`, and `:pid`.
  """
  @spec run(String.t(), keyword()) :: map()
  def run(topic, opts \\ []) do
    jido = Keyword.fetch!(opts, :jido)
    timeout = Keyword.get(opts, :timeout, 120_000)
    debug = Keyword.get(opts, :debug, true)

    Logger.info("[Studio] Starting research pipeline for topic: #{inspect(topic)}")
    Logger.info("[Studio] Timeout: #{timeout}ms, Debug: #{debug}")

    server_opts = [agent: __MODULE__, jido: jido, debug: debug]
    {:ok, pid} = Jido.AgentServer.start_link(server_opts)
    Logger.info("[Studio] AgentServer started: #{inspect(pid)}")

    Logger.info("[Studio] Workflow DAG: PlanQueries → SimulateSearch → BuildOutline → DraftArticle → EditAndAssemble")

    feed_signal =
      Jido.Signal.new!(
        "runic.feed",
        %{data: %{topic: topic}},
        source: "/studio/orchestrator"
      )

    Logger.info("[Studio] Feeding signal: runic.feed with topic=#{inspect(topic)}")
    Jido.AgentServer.cast(pid, feed_signal)

    Logger.info("[Studio] Awaiting completion (timeout: #{timeout}ms)...")
    t0 = System.monotonic_time(:millisecond)
    completion = Jido.AgentServer.await_completion(pid, timeout: timeout)
    elapsed = System.monotonic_time(:millisecond) - t0

    Logger.info("[Studio] await_completion returned after #{elapsed}ms: #{inspect_completion(completion)}")

    {:ok, server_state} = Jido.AgentServer.state(pid)
    agent = server_state.agent
    strat = StratState.get(agent)

    log_strategy_state(strat)

    events =
      case Jido.AgentServer.recent_events(pid) do
        {:ok, evts} ->
          Logger.debug("[Studio] Collected #{length(evts)} debug events")
          log_events(evts)
          evts

        {:error, reason} ->
          Logger.warning("[Studio] Could not fetch events: #{inspect(reason)}")
          []
      end

    {status, productions, facts} =
      case completion do
        {:ok, %{status: :completed}} ->
          prods = Workflow.raw_productions(strat.workflow)
          fcts = Workflow.facts(strat.workflow)
          Logger.info("[Studio] Pipeline COMPLETED — #{length(prods)} productions, #{length(fcts)} facts")
          {:completed, prods, fcts}

        {:ok, %{status: :failed}} ->
          fcts = Workflow.facts(strat.workflow)
          Logger.error("[Studio] Pipeline FAILED — #{length(fcts)} facts accumulated before failure")
          {:failed, [], fcts}

        {:error, reason} ->
          Logger.error("[Studio] Pipeline ERROR: #{inspect(reason)}")
          {:error, [], []}
      end

    %{
      topic: topic,
      productions: productions,
      facts: facts,
      status: status,
      events: events,
      pid: pid
    }
  end

  @doc "Extract the final article markdown from run results."
  def article(%{productions: productions}) do
    productions
    |> Enum.filter(fn
      %{markdown: _} -> true
      _ -> false
    end)
    |> List.last()
  end

  @doc """
  Run the research pipeline in step mode with full introspection.

  Starts the same 5-node pipeline but pauses between each step, logging
  the workflow state, node inputs/outputs, and annotated graph at each
  transition. Useful for debugging, demos, and building low-code UIs.

  ## Options

    * `:jido` - Name of a running Jido instance (required)
    * `:timeout` - Per-step timeout in ms (default: 120_000)
    * `:debug` - Enable debug event buffer (default: true)
    * `:on_step` - Optional callback `fn step_info -> :ok end` invoked
      after each step completes, receiving a map with `:step_index`,
      `:node`, `:input`, `:output`, `:status`, `:graph`, and `:summary`.

  ## Returns

  A map with `:topic`, `:productions`, `:status`, `:steps` (list of per-step
  info maps), `:summary`, and `:pid`.
  """
  @spec run_step(String.t(), keyword()) :: map()
  def run_step(topic, opts \\ []) do
    jido = Keyword.fetch!(opts, :jido)
    timeout = Keyword.get(opts, :timeout, 120_000)
    debug = Keyword.get(opts, :debug, true)
    on_step = Keyword.get(opts, :on_step)

    Logger.info("[Studio:Step] Starting step-mode pipeline for topic: #{inspect(topic)}")

    server_opts = [agent: __MODULE__, jido: jido, debug: debug]
    {:ok, pid} = Jido.AgentServer.start_link(server_opts)

    set_mode_signal = Jido.Signal.new!("runic.set_mode", %{mode: :step}, source: "/studio/step")
    Jido.AgentServer.cast(pid, set_mode_signal)
    Process.sleep(50)

    feed_signal =
      Jido.Signal.new!(
        "runic.feed",
        %{data: %{topic: topic}},
        source: "/studio/step"
      )

    Jido.AgentServer.cast(pid, feed_signal)
    Process.sleep(50)

    steps = step_loop(pid, topic, timeout, on_step, 0, [])

    {:ok, server_state} = Jido.AgentServer.state(pid)
    agent = server_state.agent
    strat = StratState.get(agent)

    summary = Introspection.execution_summary(strat.workflow)
    graph = Introspection.annotated_graph(strat.workflow, strat)

    productions =
      if strat.status == :success,
        do: Workflow.raw_productions(strat.workflow),
        else: []

    Logger.info(
      "[Studio:Step] Pipeline #{strat.status}#{length(steps)} steps, " <>
        "#{length(productions)} productions"
    )

    %{
      topic: topic,
      productions: productions,
      status: strat.status,
      steps: Enum.reverse(steps),
      step_history: strat.step_history,
      summary: summary,
      graph: graph,
      pid: pid
    }
  end

  # -- Debug Helpers -----------------------------------------------------------

  defp inspect_completion({:ok, %{status: status}}), do: "ok/#{status}"
  defp inspect_completion({:error, {:timeout, diag}}), do: "timeout (#{inspect(diag)})"
  defp inspect_completion({:error, reason}), do: "error: #{inspect(reason)}"

  defp log_strategy_state(strat) do
    Logger.info(
      "[Studio] Strategy state: status=#{strat.status}, " <>
        "pending=#{map_size(strat.pending)}, " <>
        "queued=#{length(strat.queued)}, " <>
        "ran_nodes=#{MapSet.size(strat.ran_nodes)}, " <>
        "productions=#{length(Workflow.raw_productions(strat.workflow))}"
    )
  end

  defp log_events(events) do
    events
    |> Enum.take(20)
    |> Enum.each(fn event ->
      type = Map.get(event, :type, :unknown)
      data = Map.get(event, :data, %{})
      detail = summarize_event(type, data)
      Logger.debug("[Studio] Event: #{type} #{detail}")
    end)
  end

  defp summarize_event(:signal_received, %{signal_type: t}), do: "type=#{t}"
  defp summarize_event(:directive_started, %{directive_type: t}), do: "directive=#{t}"
  defp summarize_event(_type, data), do: inspect(data, limit: 3, printable_limit: 80)

  # -- Step-Mode Loop ----------------------------------------------------------

  defp step_loop(pid, _topic, timeout, on_step, step_index, steps) do
    {:ok, server_state} = Jido.AgentServer.state(pid)
    strat = StratState.get(server_state.agent)

    if strat.status in [:success, :failure] do
      steps
    else
      case strat.status do
        :paused ->
          do_step(pid, strat, timeout, on_step, step_index, steps)

        :running ->
          case await_paused(pid, timeout, 50) do
            {:ok, strat} ->
              if strat.status in [:success, :failure] do
                steps
              else
                do_step(pid, strat, timeout, on_step, step_index, steps)
              end

            {:error, :timeout} ->
              Logger.error("[Studio:Step] Timed out waiting for pause at step #{step_index}")
              steps
          end

        _ ->
          steps
      end
    end
  end

  defp do_step(pid, strat, timeout, on_step, step_index, steps) do
    graph_before = Introspection.annotated_graph(strat.workflow, strat)

    Logger.info("[Studio:Step] Step #{step_index}: #{length(strat.held_runnables)} runnable(s) held")

    held_names =
      Enum.map(strat.held_runnables, fn r -> r.node.name end)

    Logger.info("[Studio:Step]   Nodes: #{inspect(held_names)}")

    step_signal = Jido.Signal.new!("runic.step", %{}, source: "/studio/step")
    Jido.AgentServer.cast(pid, step_signal)

    case await_paused(pid, timeout, 100) do
      {:ok, new_strat} ->
        new_history = new_strat.step_history
        prev_history = strat.step_history

        new_entries =
          Enum.take(new_history, length(new_history) - length(prev_history))

        graph_after = Introspection.annotated_graph(new_strat.workflow, new_strat)
        summary = Introspection.execution_summary(new_strat.workflow)

        step_info = %{
          step_index: step_index,
          nodes_dispatched: held_names,
          completed_entries: new_entries,
          graph_before: graph_before,
          graph_after: graph_after,
          summary: summary,
          status: new_strat.status
        }

        Enum.each(new_entries, fn entry ->
          Logger.info(
            "[Studio:Step]   #{entry.node}#{entry.status}" <>
              if(entry.output, do: " (output keys: #{inspect(Map.keys(entry.output))})", else: "")
          )
        end)

        if on_step, do: on_step.(step_info)

        step_loop(pid, nil, timeout, on_step, step_index + 1, [step_info | steps])

      {:error, :timeout} ->
        Logger.error("[Studio:Step] Timed out at step #{step_index}")
        steps
    end
  end

  defp await_paused(pid, timeout, interval) do
    deadline = System.monotonic_time(:millisecond) + timeout

    Stream.repeatedly(fn ->
      {:ok, server_state} = Jido.AgentServer.state(pid)
      strat = StratState.get(server_state.agent)

      if strat.status in [:paused, :success, :failure] do
        {:done, strat}
      else
        Process.sleep(interval)
        :continue
      end
    end)
    |> Enum.reduce_while(nil, fn
      {:done, strat}, _acc ->
        {:halt, {:ok, strat}}

      :continue, _acc ->
        if System.monotonic_time(:millisecond) > deadline do
          {:halt, {:error, :timeout}}
        else
          {:cont, nil}
        end
    end)
  end
end