Skip to main content

lib/noizu/mcp/transport/streamable_http/sink.ex

defmodule Noizu.MCP.Transport.StreamableHTTP.Sink do
  @moduledoc false
  # Server-side sink for Streamable HTTP sessions. Routes outbound messages to
  # the right HTTP response stream:
  #
  #   1. the SSE stream of the request that caused the message
  #      (`routing.related_request_id`), else
  #   2. the session's general GET stream (with an event id for resumability),
  #      else
  #   3. the EventStore buffer, delivered on the next GET (with replay).
  #
  # Conn processes register themselves in the server's Registry as
  # `{:http_stream, session_id, request_id}` / `{:http_get, session_id}` and
  # receive `{:mcp_http, binary}` / `{:mcp_http_event, event_id, binary}`.

  @behaviour Noizu.MCP.Transport.Server

  alias Noizu.MCP.Server.EventStore

  @impl true
  def send_message({server, session_id}, iodata, routing) do
    binary = IO.iodata_to_binary(iodata)
    registry = Module.concat(server, Registry)

    request_stream =
      case routing[:related_request_id] do
        nil -> []
        request_id -> Registry.lookup(registry, {:http_stream, session_id, request_id})
      end

    case request_stream do
      [{pid, _} | _] ->
        send(pid, {:mcp_http, binary})
        :ok

      [] ->
        case Registry.lookup(registry, {:http_get, session_id}) do
          [{pid, _} | _] ->
            event_id = EventStore.append(server, session_id, binary)
            send(pid, {:mcp_http_event, event_id, binary})
            :ok

          [] ->
            EventStore.append(server, session_id, binary)
            :ok
        end
    end
  end

  @impl true
  def close_session({server, session_id}) do
    registry = Module.concat(server, Registry)

    for key <- [{:http_get, session_id}],
        {pid, _} <- Registry.lookup(registry, key) do
      send(pid, :mcp_http_close)
    end

    EventStore.drop(server, session_id)
    :ok
  end
end