Skip to main content

lib/agent_sea/mcp/transport/http.ex

defmodule AgentSea.MCP.Transport.Http do
  @moduledoc """
  MCP "Streamable HTTP" transport. Each request is a JSON-RPC POST to the server
  endpoint; the response is parsed from either an `application/json` body or a
  `text/event-stream` (SSE) body.

  A `GenServer` so it can carry the `Mcp-Session-Id` the server hands back on
  `initialize` and replay it on subsequent requests.

      {:ok, transport} = AgentSea.MCP.Transport.Http.start_link(url: "https://host/mcp")
      {:ok, client} = AgentSea.MCP.connect({AgentSea.MCP.Transport.Http, transport})
  """

  use GenServer

  @behaviour AgentSea.MCP.Transport

  @request_timeout 30_000

  # --- Transport callback ---

  @impl AgentSea.MCP.Transport
  def request(server, method, params) do
    GenServer.call(server, {:request, method, params}, @request_timeout)
  end

  # --- Client API ---

  @doc "Start the transport. Options: `:url` (required), `:headers`, `:adapter`, `:name`."
  def start_link(opts) do
    {gen_opts, init_opts} = Keyword.split(opts, [:name])
    GenServer.start_link(__MODULE__, init_opts, gen_opts)
  end

  # --- Server ---

  @impl true
  def init(opts) do
    state = %{
      url: Keyword.fetch!(opts, :url),
      headers: Keyword.get(opts, :headers, []),
      adapter: opts[:adapter],
      session_id: nil
    }

    {:ok, state}
  end

  @impl true
  def handle_call({:request, method, params}, _from, state) do
    payload = %{
      "jsonrpc" => "2.0",
      "id" => System.unique_integer([:positive]),
      "method" => method,
      "params" => params
    }

    case Req.post(req(state), url: state.url, json: payload) do
      {:ok, %Req.Response{status: status} = response} when status in 200..299 ->
        {:reply, decode(response), capture_session(state, response)}

      {:ok, %Req.Response{status: status, body: body}} ->
        {:reply, {:error, {:http_error, status, body}}, state}

      {:error, reason} ->
        {:reply, {:error, reason}, state}
    end
  end

  # --- Request building ---

  defp req(state) do
    headers =
      state.headers ++
        session_header(state.session_id) ++
        [{"accept", "application/json, text/event-stream"}]

    [headers: headers]
    |> maybe_put(:adapter, state.adapter)
    |> Req.new()
  end

  defp session_header(nil), do: []
  defp session_header(id), do: [{"mcp-session-id", id}]

  defp maybe_put(kw, _key, nil), do: kw
  defp maybe_put(kw, key, value), do: Keyword.put(kw, key, value)

  defp capture_session(state, response) do
    case Req.Response.get_header(response, "mcp-session-id") do
      [id | _] -> %{state | session_id: id}
      _ -> state
    end
  end

  # --- Response decoding ---

  # Req auto-decodes application/json into a map.
  defp decode(%Req.Response{body: %{"result" => result}}), do: {:ok, result}
  defp decode(%Req.Response{body: %{"error" => error}}), do: {:error, {:rpc_error, error}}

  # A raw body may be JSON or an SSE stream carrying the JSON-RPC message.
  defp decode(%Req.Response{body: body}) when is_binary(body) do
    case extract_message(body) do
      {:ok, %{"result" => result}} -> {:ok, result}
      {:ok, %{"error" => error}} -> {:error, {:rpc_error, error}}
      _ -> {:error, :invalid_response}
    end
  end

  defp decode(_response), do: {:error, :invalid_response}

  defp extract_message(body) do
    case Jason.decode(body) do
      {:ok, %{} = message} ->
        {:ok, message}

      _ ->
        # SSE: take the last `data:` line and decode it.
        body
        |> String.split("\n")
        |> Enum.filter(&String.starts_with?(&1, "data:"))
        |> Enum.map(&(&1 |> String.replace_prefix("data:", "") |> String.trim()))
        |> List.last()
        |> case do
          nil -> :error
          data -> Jason.decode(data)
        end
    end
  end
end