Skip to main content

lib/agent_sea/mcp/transport/stdio.ex

defmodule AgentSea.MCP.Transport.Stdio do
  @moduledoc """
  MCP stdio transport: spawns an MCP server subprocess and speaks newline-
  delimited JSON-RPC 2.0 over its stdin/stdout.

  It's a `GenServer` that owns the `Port`, assigns request ids, buffers incoming
  bytes into lines, and replies to each caller when the matching response id
  arrives. The client's `ref` is this process.

      {:ok, transport} =
        AgentSea.MCP.Transport.Stdio.start_link(command: ["node", "my-mcp-server.js"])

      {:ok, client} = AgentSea.MCP.connect({AgentSea.MCP.Transport.Stdio, transport})
  """

  use GenServer

  @behaviour AgentSea.MCP.Transport

  @request_timeout 15_000

  # --- Transport callback ---

  @impl AgentSea.MCP.Transport
  def request(server, method, params) do
    GenServer.call(server, {:request, method, params}, @request_timeout)
  end

  # --- Client API ---

  @doc "Start the transport. Options: `:command` (`[executable | args]`), `:name`."
  def start_link(opts) do
    {gen_opts, init_opts} = Keyword.split(opts, [:name])
    GenServer.start_link(__MODULE__, init_opts, gen_opts)
  end

  # --- Server ---

  @impl GenServer
  def init(opts) do
    [executable | args] = Keyword.fetch!(opts, :command)

    path =
      System.find_executable(executable) ||
        raise ArgumentError, "executable not found on PATH: #{executable}"

    port =
      Port.open(
        {:spawn_executable, path},
        [:binary, :exit_status, :use_stdio, :hide, args: args]
      )

    {:ok, %{port: port, buffer: "", next_id: 1, pending: %{}}}
  end

  @impl GenServer
  def handle_call({:request, method, params}, from, state) do
    id = state.next_id

    payload =
      Jason.encode!(%{"jsonrpc" => "2.0", "id" => id, "method" => method, "params" => params})

    Port.command(state.port, payload <> "\n")

    {:noreply, %{state | next_id: id + 1, pending: Map.put(state.pending, id, from)}}
  end

  @impl GenServer
  def handle_info({port, {:data, data}}, %{port: port} = state) do
    {lines, buffer} = take_lines(state.buffer <> data)
    state = Enum.reduce(lines, %{state | buffer: buffer}, &handle_line/2)
    {:noreply, state}
  end

  def handle_info({port, {:exit_status, status}}, %{port: port} = state) do
    # Fail any in-flight requests so callers don't hang.
    for {_id, from} <- state.pending,
        do: GenServer.reply(from, {:error, {:server_exited, status}})

    {:stop, :normal, %{state | pending: %{}}}
  end

  def handle_info(_message, state), do: {:noreply, state}

  # --- Helpers ---

  defp handle_line(line, state) do
    case Jason.decode(line) do
      {:ok, %{"id" => id} = message} ->
        case Map.pop(state.pending, id) do
          {nil, _pending} -> state
          {from, pending} -> reply_and_remove(state, from, message, pending)
        end

      # Notifications (no id) and non-JSON lines are ignored.
      _ ->
        state
    end
  end

  defp reply_and_remove(state, from, message, pending) do
    GenServer.reply(from, rpc_reply(message))
    %{state | pending: pending}
  end

  defp rpc_reply(%{"result" => result}), do: {:ok, result}
  defp rpc_reply(%{"error" => error}), do: {:error, {:rpc_error, error}}
  defp rpc_reply(_message), do: {:error, :invalid_response}

  # Split a buffer into complete lines + trailing remainder.
  defp take_lines(buffer) do
    parts = String.split(buffer, "\n")
    {lines, [rest]} = Enum.split(parts, length(parts) - 1)
    {Enum.reject(lines, &(&1 == "")), rest}
  end
end