lib/planck/headless/sidecar_manager.ex

defmodule Planck.Headless.SidecarManager do
  @moduledoc """
  Manages the optional sidecar OTP application.

  At startup `SidecarManager` checks whether a sidecar directory is configured
  (`Config.sidecar!/0`). When it exists on disk, it:

  1. Runs `mix deps.get` and `mix compile` synchronously (fast-fail on error).
  2. Spawns the sidecar as a long-lived OS process via erlexec using:
     `elixir --sname planck_sidecar --cookie <cookie> -S mix run --no-halt`
     The following env var is injected so the sidecar can connect back:
     - `PLANCK_HEADLESS_NODE` — `Node.self()` stringified
  3. Monitors node connections. When the sidecar's Erlang node connects
     (name must begin with `"planck_sidecar"`), `SidecarManager` fetches the
     tool list via RPC and stores it in `ResourceStore`.
  4. On node-down or OS-process exit, clears tools from `ResourceStore`.

  ## Progress events

  Subscribe with `subscribe/0` to receive messages on the `"planck:sidecar"`
  PubSub topic. Events are one of:

  - `{:building, sidecar_dir}` — running `mix deps.get` / `mix compile`
  - `{:starting, sidecar_dir}` — OS process spawned, waiting for node
  - `{:connected, node}` — sidecar node is up and tools are loaded
  - `{:disconnected, node}` — sidecar node went down, tools cleared
  - `{:exited, reason}` — OS process exited unexpectedly
  - `{:error, step, reason}` — build/startup step failed
  """

  use GenServer

  require Logger

  alias Planck.Agent.Tool
  alias Planck.Headless.{Config, ResourceStore}

  @pubsub Planck.Agent.PubSub
  @topic "planck:sidecar"
  @sname "planck_sidecar"

  @type status :: :idle | :building | :starting | :connected | :failed

  @type state :: %{
          sidecar_dir: Path.t() | nil,
          sidecar_node: atom() | nil,
          os_pid: pos_integer() | nil,
          status: status()
        }

  # ---------------------------------------------------------------------------
  # Public API
  # ---------------------------------------------------------------------------

  @doc "Start the SidecarManager under its supervisor."
  @spec start_link(keyword()) :: GenServer.on_start()
  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  @doc "Return the connected sidecar node name, or `nil` if not connected."
  @spec node() :: atom() | nil
  def node, do: GenServer.call(__MODULE__, :node)

  @doc "Return the current lifecycle status."
  @spec status() :: status()
  def status, do: GenServer.call(__MODULE__, :status)

  @doc "Subscribe the calling process to sidecar lifecycle events on `#{@topic}`."
  @spec subscribe() :: :ok | {:error, term()}
  def subscribe, do: Phoenix.PubSub.subscribe(@pubsub, @topic)

  @doc "Unsubscribe the calling process from sidecar lifecycle events."
  @spec unsubscribe() :: :ok
  def unsubscribe, do: Phoenix.PubSub.unsubscribe(@pubsub, @topic)

  # ---------------------------------------------------------------------------
  # GenServer callbacks
  # ---------------------------------------------------------------------------

  @impl true
  def init(_opts) do
    Process.flag(:trap_exit, true)

    state = %{
      sidecar_dir: Config.sidecar!(),
      sidecar_node: nil,
      os_pid: nil,
      status: :idle
    }

    {:ok, state, {:continue, :maybe_start}}
  end

  @impl true
  def handle_continue(:maybe_start, %{sidecar_dir: nil} = state) do
    {:noreply, state}
  end

  def handle_continue(:maybe_start, state) do
    expanded = Path.expand(state.sidecar_dir)

    if File.dir?(expanded) do
      {:noreply, %{state | sidecar_dir: expanded}, {:continue, :build}}
    else
      {:noreply, state}
    end
  end

  def handle_continue(:build, state) do
    broadcast({:building, state.sidecar_dir})
    state = %{state | status: :building}

    with :ok <- run_step("deps.get", state.sidecar_dir),
         :ok <- run_step("compile", state.sidecar_dir) do
      {:noreply, state, {:continue, :spawn}}
    else
      {:error, step, reason} ->
        broadcast({:error, step, reason})
        Logger.error("[SidecarManager] #{step} failed in #{state.sidecar_dir}: #{reason}")
        {:noreply, %{state | status: :failed}}
    end
  end

  def handle_continue(:spawn, state) do
    case spawn_sidecar(state.sidecar_dir) do
      {:ok, os_pid} ->
        :net_kernel.monitor_nodes(true)
        broadcast({:starting, state.sidecar_dir})
        {:noreply, %{state | os_pid: os_pid, status: :starting}}

      {:error, reason} ->
        broadcast({:error, :spawn, reason})
        Logger.error("[SidecarManager] failed to spawn sidecar: #{inspect(reason)}")
        {:noreply, %{state | status: :failed}}
    end
  end

  @impl true
  def handle_call(:node, _from, state) do
    {:reply, state.sidecar_node, state}
  end

  def handle_call(:status, _from, state) do
    {:reply, state.status, state}
  end

  @impl true
  def handle_info({:nodeup, node}, %{sidecar_node: nil} = state) do
    if sidecar_node?(node) do
      tools = fetch_tools(node)
      ResourceStore.put_tools(tools)
      broadcast({:connected, node})
      {:noreply, %{state | sidecar_node: node, status: :connected}}
    else
      {:noreply, state}
    end
  end

  def handle_info({:nodedown, node}, %{sidecar_node: node} = state) do
    ResourceStore.clear_tools()
    broadcast({:disconnected, node})
    Logger.warning("[SidecarManager] sidecar node #{node} disconnected")
    {:noreply, %{state | sidecar_node: nil, status: :starting}}
  end

  # erlexec monitor message — OS process exited
  def handle_info({:DOWN, _os_pid, :process, _pid, reason}, state) do
    ResourceStore.clear_tools()
    broadcast({:exited, reason})
    Logger.warning("[SidecarManager] sidecar OS process exited: #{inspect(reason)}")
    {:noreply, %{state | os_pid: nil, sidecar_node: nil, status: :failed}}
  end

  # Ignore nodeup/nodedown for other nodes we don't care about
  def handle_info({:nodeup, _node}, state), do: {:noreply, state}
  def handle_info({:nodedown, _node}, state), do: {:noreply, state}

  @impl true
  def terminate(_reason, %{os_pid: os_pid}) when not is_nil(os_pid) do
    :exec.stop(os_pid)
  end

  def terminate(_reason, _state), do: :ok

  # ---------------------------------------------------------------------------
  # Private
  # ---------------------------------------------------------------------------

  @spec run_step(String.t(), Path.t()) :: :ok | {:error, String.t(), String.t()}
  defp run_step(task, dir) do
    opts = [:sync, :stdout, :stderr, cd: to_charlist(dir), env: env([])]

    case :exec.run("mix #{task}", opts) do
      {:ok, _} ->
        :ok

      {:error, details} ->
        output =
          (Keyword.get_values(details, :stdout) ++ Keyword.get_values(details, :stderr))
          |> List.flatten()
          |> Enum.join()

        {:error, task, output}
    end
  end

  @spec spawn_sidecar(Path.t()) :: {:ok, pos_integer()} | {:error, term()}
  defp spawn_sidecar(dir) do
    cookie =
      Node.get_cookie()
      |> Atom.to_string()

    headless_node =
      Node.self()
      |> Atom.to_string()
      |> to_charlist()

    extra = [{~c"PLANCK_HEADLESS_NODE", headless_node}]
    opts = [:monitor, cd: to_charlist(dir), env: env(extra)]

    cmd = "elixir --sname #{@sname} --cookie #{cookie} -S mix run --no-halt"

    case :exec.run(cmd, opts) do
      {:ok, _pid, os_pid} -> {:ok, os_pid}
      {:error, reason} -> {:error, reason}
    end
  end

  @spec env([{charlist(), charlist()}]) :: [{charlist(), charlist()}]
  defp env(extra) do
    ["PATH", "MIX_ENV", "PLANCK_LOCAL"]
    |> Stream.map(&{&1, System.get_env(&1)})
    |> Stream.reject(fn {_, v} -> is_nil(v) end)
    |> Stream.map(fn {k, v} -> {to_charlist(k), to_charlist(v)} end)
    |> Map.new()
    |> Map.merge(Map.new(extra))
    |> Map.to_list()
  end

  @spec sidecar_node?(atom()) :: boolean()
  defp sidecar_node?(node) do
    node
    |> Atom.to_string()
    |> String.starts_with?(@sname)
  end

  @default_tool_timeout_ms 300_000

  @spec fetch_tools(atom()) :: [Tool.t()]
  defp fetch_tools(node) do
    case :rpc.call(node, Planck.Agent.Sidecar, :list_tools, [], 10_000) do
      {:badrpc, reason} ->
        Logger.warning("[SidecarManager] list_tools RPC failed: #{inspect(reason)}")
        []

      ai_tools ->
        Enum.map(ai_tools, &wrap_tool(&1, node))
    end
  end

  @timeout_param %{
    "type" => "integer",
    "description" =>
      "Maximum milliseconds to wait for this tool call (default #{@default_tool_timeout_ms})."
  }

  @spec wrap_tool(Planck.AI.Tool.t(), atom()) :: Tool.t()
  defp wrap_tool(ai_tool, node) do
    Tool.new(
      name: ai_tool.name,
      description: ai_tool.description,
      parameters: inject_timeout_param(ai_tool.parameters),
      execute_fn: fn agent_id, tool_call_id, args ->
        timeout = Map.get(args, "timeout_ms", @default_tool_timeout_ms)

        case :rpc.call(
               node,
               Planck.Agent.Sidecar,
               :execute_tool,
               [ai_tool.name, agent_id, tool_call_id, args],
               timeout
             ) do
          {:badrpc, reason} -> {:error, reason}
          result -> result
        end
      end
    )
  end

  @spec inject_timeout_param(map()) :: map()
  defp inject_timeout_param(%{"properties" => props} = parameters) do
    if Map.has_key?(props, "timeout_ms") do
      parameters
    else
      put_in(parameters, ["properties", "timeout_ms"], @timeout_param)
    end
  end

  defp inject_timeout_param(parameters), do: parameters

  @spec broadcast(term()) :: :ok
  defp broadcast(event) do
    Phoenix.PubSub.broadcast(@pubsub, @topic, event)
  end
end