Skip to main content

src/barrel_mcp_http_engine.erl

%%%-------------------------------------------------------------------
%%% @author Benoit Chesneau
%%% @copyright 2024-2026 Benoit Chesneau
%%% @doc Transport-neutral MCP HTTP engine.
%%%
%%% Holds the protocol logic for both the simple HTTP transport and
%%% the Streamable HTTP transport (POST/GET/DELETE/OPTIONS, SSE,
%%% sessions, CORS, Origin validation, authentication, the OAuth
%%% protected-resource-metadata endpoint and async tool calls)
%%% WITHOUT any dependency on a concrete HTTP server.
%%%
%%% A binding (the built-in `barrel_mcp_http_listener' h1/h2 server,
%%% or an external adapter such as Livery's) reads the request line
%%% and body, then calls {@link handle/6} with:
%%% <ul>
%%%   <li>`Method' — the request method binary (`<<"POST">>' …).</li>
%%%   <li>`Path' — the request target (query string allowed; it is
%%%       stripped here).</li>
%%%   <li>`Headers' — a `[{binary(), binary()}]' proplist.
%%%       Lookups are case-insensitive.</li>
%%%   <li>`Body' — the full request body (`<<>>' when none).</li>
%%%   <li>`Responder' — a map of I/O closures (see below).</li>
%%%   <li>`Config' — the engine configuration (see the `config()' type).</li>
%%% </ul>
%%%
%%% The `Responder' abstracts response delivery so the engine never
%%% touches a socket:
%%% ```
%%% #{reply        => fun((Status, Headers, Body) -> ok),
%%%   stream_start => fun((Status, Headers) -> ok),
%%%   stream_chunk => fun((iodata()) -> ok | {error, term()}),
%%%   stream_end   => fun(() -> ok)}
%%% '''
%%% `Headers' passed to the closures is a `[{binary(), binary()}]'
%%% proplist (lowercase names). A streaming (SSE) response is
%%% `stream_start' then repeated `stream_chunk' then `stream_end'.
%%%
%%% `handle/6' runs in the calling (per-request) process. For a
%%% long-lived GET SSE stream it blocks in a receive loop until the
%%% session is terminated or the binding signals a client disconnect
%%% by sending the calling process the message `mcp_disconnect'.
%%% @end
%%%-------------------------------------------------------------------
-module(barrel_mcp_http_engine).

-include("barrel_mcp.hrl").

-export([handle/6]).

%% Shared helpers reused by the transport start/1 functions.
-export([
    is_loopback/1,
    resolve_allowed_origins/2,
    normalize_resource_metadata/1,
    inject_resource_metadata_url/2,
    init_auth/1,
    ensure_session_manager/0
]).

-type responder() :: #{
    reply := fun((non_neg_integer(), [{binary(), binary()}], iodata()) -> ok),
    stream_start := fun((non_neg_integer(), [{binary(), binary()}]) -> ok),
    stream_chunk := fun((iodata()) -> ok | {error, term()}),
    stream_end := fun(() -> ok)
}.

-type config() :: #{
    mode := stream | simple,
    auth_config := map(),
    session_enabled => boolean(),
    allowed_origins => any | [term()],
    allow_missing_origin => boolean(),
    sse_buffer_size => pos_integer(),
    resource_metadata => undefined | map(),
    _ => _
}.

-export_type([responder/0, config/0]).

%% Cap incoming SSE response size mirrors the client-side cap.
-define(MAX_RESP_BYTES, 16 * 1024 * 1024).

%%====================================================================
%% Entry point
%%====================================================================

-spec handle(
    binary(),
    binary(),
    [{binary(), binary()}],
    binary(),
    responder(),
    config()
) -> ok.
handle(Method, RawPath, Headers, Body, Responder, Config) ->
    Path = strip_query(RawPath),
    %% The protected-resource-metadata document is served before
    %% Origin validation, matching the old standalone handler.
    case {Path, maps:get(resource_metadata, Config, undefined)} of
        {<<"/.well-known/oauth-protected-resource">>, #{document := Doc}} ->
            serve_prm(Responder, Doc);
        _ ->
            case validate_origin(Headers, Config) of
                ok ->
                    dispatch(
                        maps:get(mode, Config, stream),
                        Method,
                        Headers,
                        Body,
                        Responder,
                        Config
                    );
                {error, _Reason} ->
                    %% 403 with no CORS header so the browser surfaces
                    %% the rejection rather than retrying.
                    reply(Responder, 403, #{}, <<>>)
            end
    end.

serve_prm(Responder, Doc) ->
    Body = iolist_to_binary(json:encode(Doc)),
    reply(
        Responder,
        200,
        #{
            <<"content-type">> => <<"application/json">>,
            <<"cache-control">> => <<"public, max-age=300">>
        },
        Body
    ).

%%====================================================================
%% Method dispatch
%%====================================================================

%% --- Simple transport (POST/OPTIONS only, no sessions, no SSE) ---
dispatch(simple, <<"POST">>, Headers, Body, Responder, Config) ->
    simple_post(Headers, Body, Responder, Config);
dispatch(simple, <<"OPTIONS">>, Headers, _Body, Responder, Config) ->
    reply(Responder, 204, cors_headers(Headers, Config, #{}), <<>>);
dispatch(simple, _Method, Headers, _Body, Responder, Config) ->
    reply(
        Responder,
        405,
        cors_headers(
            Headers,
            Config,
            #{
                <<"content-type">> => <<"application/json">>,
                <<"allow">> => <<"POST, OPTIONS">>
            }
        ),
        <<"{\"error\":\"Method not allowed\"}">>
    );
%% --- Streamable transport ---
dispatch(stream, <<"POST">>, Headers, Body, Responder, Config) ->
    stream_post(Headers, Body, Responder, Config);
dispatch(stream, <<"GET">>, Headers, _Body, Responder, Config) ->
    stream_get_sse(Headers, Responder, Config);
dispatch(stream, <<"DELETE">>, Headers, _Body, Responder, Config) ->
    stream_delete(Headers, Responder, Config);
dispatch(stream, <<"OPTIONS">>, Headers, _Body, Responder, Config) ->
    reply(Responder, 204, cors_headers(Headers, Config, #{}), <<>>);
dispatch(stream, _Method, Headers, _Body, Responder, Config) ->
    reply(
        Responder,
        405,
        cors_headers(
            Headers,
            Config,
            #{
                <<"content-type">> => <<"application/json">>,
                <<"allow">> => <<"POST, GET, DELETE, OPTIONS">>
            }
        ),
        <<"{\"error\":\"Method not allowed\"}">>
    ).

%%====================================================================
%% Simple transport
%%====================================================================

simple_post(Headers, Body, Responder, Config) ->
    AuthConfig = maps:get(auth_config, Config, #{provider => barrel_mcp_auth_none}),
    AuthRequest = #{headers => extract_headers(Headers, AuthConfig)},
    case authenticate(AuthConfig, AuthRequest) of
        {ok, AuthInfo} ->
            simple_post_authenticated(Headers, Body, Responder, Config, AuthInfo);
        {error, Reason} ->
            auth_error(Headers, Responder, AuthConfig, Reason)
    end.

simple_post_authenticated(Headers, Body, Responder, Config, AuthInfo) ->
    case barrel_mcp_protocol:decode(Body) of
        {ok, Request} ->
            RequestWithAuth = with_auth(Request, AuthInfo),
            case barrel_mcp_protocol:handle(RequestWithAuth) of
                no_response ->
                    reply(Responder, 204, cors_headers(Headers, Config, #{}), <<>>);
                {async, Plan} ->
                    Result = barrel_mcp_protocol:drive_async_plan(
                        Plan,
                        60000,
                        AuthInfo
                    ),
                    reply_json(Headers, Responder, Config, 200, Result);
                Result ->
                    reply_json(Headers, Responder, Config, 200, Result)
            end;
        {error, parse_error} ->
            Err = barrel_mcp_protocol:error_response(
                null,
                ?JSONRPC_PARSE_ERROR,
                <<"Parse error">>
            ),
            reply_json(Headers, Responder, Config, 400, Err)
    end.

reply_json(Headers, Responder, Config, Status, Envelope) ->
    Json = barrel_mcp_protocol:encode(Envelope),
    reply(
        Responder,
        Status,
        cors_headers(
            Headers,
            Config,
            #{<<"content-type">> => <<"application/json">>}
        ),
        Json
    ).

%%====================================================================
%% Streamable transport — POST
%%====================================================================

stream_post(Headers, Body, Responder, Config) ->
    case validate_accept_header(Headers) of
        {error, Reason} ->
            reply(
                Responder,
                406,
                cors_headers(Headers, Config, #{}),
                json_encode(#{<<"error">> => Reason})
            );
        ok ->
            AuthConfig = maps:get(
                auth_config,
                Config,
                #{provider => barrel_mcp_auth_none}
            ),
            AuthRequest = #{headers => extract_headers(Headers, AuthConfig)},
            case authenticate(AuthConfig, AuthRequest) of
                {ok, AuthInfo} ->
                    stream_post_authed(Headers, Body, Responder, Config, AuthInfo);
                {error, Reason} ->
                    auth_error(Headers, Responder, AuthConfig, Reason)
            end
    end.

stream_post_authed(Headers, Body, Responder, Config, AuthInfo) ->
    SessionEnabled = maps:get(session_enabled, Config, true),
    case barrel_mcp_protocol:decode(Body) of
        {ok, Request} when is_list(Request) ->
            reply_jsonrpc_error(
                Headers,
                Responder,
                Config,
                undefined,
                400,
                null,
                ?JSONRPC_INVALID_REQUEST,
                <<"Batch requests are not supported">>
            );
        {ok, Request} when is_map(Request) ->
            %% Pass the raw request through the response-vs-request
            %% split; `_auth' is attached only on the request path
            %% (handle_dispatch), never on inbound responses.
            stream_post_request(
                Headers,
                Responder,
                Config,
                SessionEnabled,
                Request,
                AuthInfo
            );
        {error, parse_error} ->
            reply_jsonrpc_error(
                Headers,
                Responder,
                Config,
                undefined,
                400,
                null,
                ?JSONRPC_PARSE_ERROR,
                <<"Parse error">>
            )
    end.

%% Keep both the original request (for response detection) and an
%% auth-tagged copy used when dispatching to the protocol core.
stream_post_request(Headers, Responder, Config, SessionEnabled, Request, AuthInfo) ->
    case is_jsonrpc_response(Request) of
        true ->
            handle_inbound_response(Headers, Responder, Config, Request);
        false ->
            handle_inbound_request(
                Headers,
                Responder,
                Config,
                SessionEnabled,
                Request,
                AuthInfo
            )
    end.

is_jsonrpc_response(R) ->
    is_map_key(<<"id">>, R) andalso
        (is_map_key(<<"result">>, R) orelse is_map_key(<<"error">>, R)) andalso
        not is_map_key(<<"method">>, R).

handle_inbound_response(Headers, Responder, Config, #{<<"id">> := RespId} = Request) ->
    _ = barrel_mcp_session:deliver_response(RespId, Request),
    reply(Responder, 202, cors_headers(Headers, Config, #{}), <<>>).

handle_inbound_request(Headers, Responder, Config, SessionEnabled, Request, AuthInfo) ->
    Method = maps:get(<<"method">>, Request, undefined),
    case lookup_session(Headers, Config, SessionEnabled, Method) of
        {ok, SessionId} ->
            handle_dispatch(Headers, Responder, Config, SessionId, Request, AuthInfo);
        {error, missing_session_id} ->
            reply_jsonrpc_error(
                Headers,
                Responder,
                Config,
                undefined,
                400,
                null,
                ?JSONRPC_INVALID_REQUEST,
                <<"Mcp-Session-Id header required">>
            );
        {error, unknown_session} ->
            reply_jsonrpc_error(
                Headers,
                Responder,
                Config,
                undefined,
                404,
                null,
                ?JSONRPC_INVALID_REQUEST,
                <<"Unknown Mcp-Session-Id">>
            )
    end.

handle_dispatch(Headers, Responder, Config, SessionId, Request, AuthInfo) ->
    _ =
        case SessionId of
            undefined -> ok;
            _ -> barrel_mcp_session:update_activity(SessionId)
        end,
    Method = maps:get(<<"method">>, Request, undefined),
    case validate_protocol_version(Headers, SessionId, Method) of
        {error, ProtoErr} ->
            reply_jsonrpc_error(
                Headers,
                Responder,
                Config,
                SessionId,
                400,
                null,
                ?JSONRPC_INVALID_REQUEST,
                ProtoErr
            );
        ok ->
            ProtocolState0 =
                case SessionId of
                    undefined -> #{};
                    _ -> #{session_id => SessionId}
                end,
            ProtocolState = ProtocolState0#{auth_info => AuthInfo},
            case
                barrel_mcp_protocol:handle(
                    with_auth(Request, AuthInfo),
                    ProtocolState
                )
            of
                no_response ->
                    Hdrs = add_session_header(
                        cors_headers(Headers, Config, #{}), SessionId
                    ),
                    reply(Responder, 202, Hdrs, <<>>);
                {async, AsyncPlan} ->
                    handle_async_tool_call(
                        Headers,
                        Responder,
                        Config,
                        SessionId,
                        Request,
                        AsyncPlan,
                        AuthInfo
                    );
                Result ->
                    _ = maybe_capture_initialize_version(SessionId, Method, Result),
                    case wants_sse_response(Headers) of
                        true ->
                            stream_sse_response(
                                Headers,
                                Responder,
                                Config,
                                SessionId,
                                Result
                            );
                        false ->
                            ResponseJson = barrel_mcp_protocol:encode(Result),
                            Hdrs = add_session_header(
                                cors_headers(
                                    Headers,
                                    Config,
                                    #{
                                        <<"content-type">> =>
                                            <<"application/json">>
                                    }
                                ),
                                SessionId
                            ),
                            reply(Responder, 200, Hdrs, ResponseJson)
                    end
            end
    end.

%%====================================================================
%% Async tool calls
%%====================================================================

handle_async_tool_call(
    Headers,
    Responder,
    Config,
    SessionId,
    Request,
    AsyncPlan,
    AuthInfo
) ->
    RequestId = maps:get(request_id, AsyncPlan),
    Spawn = maps:get(spawn, AsyncPlan),
    Timeout = maps:get(timeout, AsyncPlan, 60000),
    Params = maps:get(<<"params">>, Request, #{}),
    ToolName = maps:get(<<"name">>, Params, <<>>),
    LongRunning = is_long_running_tool(ToolName),
    Meta = maps:get(<<"_meta">>, Params, #{}),
    ProgressToken = maps:get(<<"progressToken">>, Meta, undefined),
    Self = self(),
    case LongRunning of
        true ->
            handle_long_running_call(
                Headers,
                Responder,
                Config,
                SessionId,
                RequestId,
                ToolName,
                ProgressToken,
                Meta,
                Spawn,
                AuthInfo
            );
        false ->
            Ctx = #{
                session_id => SessionId,
                request_id => RequestId,
                progress_token => ProgressToken,
                meta => Meta,
                emit_progress => emit_progress_fun(SessionId, ProgressToken),
                reply_to => Self,
                auth_info => AuthInfo
            },
            WorkerPid = Spawn(Ctx),
            case SessionId of
                undefined ->
                    ok;
                _ ->
                    ok = barrel_mcp_session:record_in_flight(
                        SessionId, RequestId, WorkerPid, Self
                    )
            end,
            Outcome = wait_for_tool(RequestId, Timeout),
            case SessionId of
                undefined -> ok;
                _ -> ok = barrel_mcp_session:clear_in_flight(SessionId, RequestId)
            end,
            deliver_tool_outcome(
                Headers,
                Responder,
                Config,
                SessionId,
                RequestId,
                Outcome
            )
    end.

is_long_running_tool(Name) ->
    case barrel_mcp_registry:find(tool, Name) of
        {ok, Handler} -> maps:get(long_running, Handler, false);
        error -> false
    end.

handle_long_running_call(
    Headers,
    Responder,
    Config,
    SessionId,
    RequestId,
    ToolName,
    ProgressToken,
    Meta,
    Spawn,
    AuthInfo
) ->
    {ok, TaskId} = barrel_mcp_tasks:create(SessionId, ToolName, #{}),
    Collector = spawn_task_collector(SessionId, TaskId),
    Ctx = #{
        session_id => SessionId,
        request_id => RequestId,
        progress_token => ProgressToken,
        meta => Meta,
        emit_progress => emit_progress_fun(SessionId, ProgressToken),
        reply_to => Collector,
        auth_info => AuthInfo
    },
    Worker = Spawn(Ctx),
    _ = barrel_mcp_tasks:set_worker(
        SessionId,
        TaskId,
        #{worker => Worker, request_id => RequestId}
    ),
    Task =
        case barrel_mcp_tasks:get(SessionId, TaskId) of
            {ok, T} -> T;
            _ -> #{<<"taskId">> => TaskId, <<"status">> => <<"working">>}
        end,
    send_tool_envelope(
        Headers,
        Responder,
        Config,
        SessionId,
        RequestId,
        #{<<"task">> => Task}
    ).

spawn_task_collector(SessionId, TaskId) ->
    spawn(fun() -> task_collector_loop(SessionId, TaskId) end).

task_collector_loop(SessionId, TaskId) ->
    receive
        {tool_result, _ReqId, Result} ->
            Content = barrel_mcp_protocol:format_tool_result_external(Result),
            barrel_mcp_tasks:finish(SessionId, TaskId, #{<<"content">> => Content});
        {tool_structured, _ReqId, Data, Content} ->
            barrel_mcp_tasks:finish(
                SessionId,
                TaskId,
                #{
                    <<"content">> => Content,
                    <<"structuredContent">> => Data
                }
            );
        {tool_error, _ReqId, Content} ->
            barrel_mcp_tasks:fail(SessionId, TaskId, {tool_error, Content});
        {tool_failed, _ReqId, Reason} ->
            barrel_mcp_tasks:fail(SessionId, TaskId, Reason);
        {tool_validation_failed, _ReqId, Errors} ->
            barrel_mcp_tasks:fail(SessionId, TaskId, {validation_failed, Errors});
        {cancelled, _ReqId} ->
            barrel_mcp_tasks:cancel(SessionId, TaskId);
        _Other ->
            task_collector_loop(SessionId, TaskId)
    end.

emit_progress_fun(undefined, _Token) ->
    fun(_, _, _) -> ok end;
emit_progress_fun(_Sid, undefined) ->
    fun(_, _, _) -> ok end;
emit_progress_fun(SessionId, Token) ->
    fun(Progress, Total, _Message) ->
        barrel_mcp_session:notify_progress(SessionId, Token, Progress, Total)
    end.

wait_for_tool(RequestId, Timeout) ->
    Outcome =
        receive
            {tool_result, RequestId, Result} ->
                {result, Result, #{}};
            {tool_result_meta, RequestId, Result, Meta} ->
                {result, Result, Meta};
            {tool_structured, RequestId, Data, Content} ->
                {structured, Data, Content, #{}};
            {tool_structured_meta, RequestId, Data, Content, Meta} ->
                {structured, Data, Content, Meta};
            {tool_error, RequestId, Content} ->
                {tool_error, Content, #{}};
            {tool_error_meta, RequestId, Content, Meta} ->
                {tool_error, Content, Meta};
            {tool_failed, RequestId, Reason} ->
                {failed, Reason};
            {tool_validation_failed, RequestId, Errors} ->
                {validation_failed, Errors};
            {cancelled, RequestId} ->
                cancelled
        after Timeout ->
            timeout
        end,
    case Outcome of
        cancelled ->
            cancelled;
        timeout ->
            timeout;
        _ ->
            %% Cancellation race: prefer a pending cancel.
            receive
                {cancelled, RequestId} -> cancelled
            after 50 -> Outcome
            end
    end.

deliver_tool_outcome(Headers, Responder, Config, SessionId, _RequestId, cancelled) ->
    Hdrs = add_session_header(cors_headers(Headers, Config, #{}), SessionId),
    reply(Responder, 200, Hdrs, <<>>);
deliver_tool_outcome(
    Headers,
    Responder,
    Config,
    SessionId,
    RequestId,
    {result, Result, Meta}
) ->
    Content = barrel_mcp_protocol:format_tool_result_external(Result),
    send_tool_envelope(
        Headers,
        Responder,
        Config,
        SessionId,
        RequestId,
        #{<<"content">> => Content},
        Meta
    );
deliver_tool_outcome(
    Headers,
    Responder,
    Config,
    SessionId,
    RequestId,
    {structured, Data, Content, Meta}
) ->
    send_tool_envelope(
        Headers,
        Responder,
        Config,
        SessionId,
        RequestId,
        #{
            <<"content">> => Content,
            <<"structuredContent">> => Data
        },
        Meta
    );
deliver_tool_outcome(
    Headers,
    Responder,
    Config,
    SessionId,
    RequestId,
    {tool_error, Content, Meta}
) ->
    send_tool_envelope(
        Headers,
        Responder,
        Config,
        SessionId,
        RequestId,
        #{<<"content">> => Content, <<"isError">> => true},
        Meta
    );
deliver_tool_outcome(
    Headers,
    Responder,
    Config,
    SessionId,
    RequestId,
    {validation_failed, Errors}
) ->
    Msg = iolist_to_binary(io_lib:format("Invalid tool input: ~p", [Errors])),
    send_tool_envelope(
        Headers,
        Responder,
        Config,
        SessionId,
        RequestId,
        #{
            <<"content">> =>
                [#{<<"type">> => <<"text">>, <<"text">> => Msg}],
            <<"isError">> => true
        }
    );
deliver_tool_outcome(Headers, Responder, Config, SessionId, RequestId, {failed, _}) ->
    send_jsonrpc_error_envelope(
        Headers,
        Responder,
        Config,
        SessionId,
        RequestId,
        ?MCP_TOOL_ERROR,
        <<"Internal tool error">>
    );
deliver_tool_outcome(Headers, Responder, Config, SessionId, RequestId, timeout) ->
    send_jsonrpc_error_envelope(
        Headers,
        Responder,
        Config,
        SessionId,
        RequestId,
        ?MCP_TOOL_ERROR,
        <<"Tool timed out">>
    ).

send_tool_envelope(Headers, Responder, Config, SessionId, RequestId, Result) ->
    send_tool_envelope(Headers, Responder, Config, SessionId, RequestId, Result, #{}).

send_tool_envelope(Headers, Responder, Config, SessionId, RequestId, Result, Meta) ->
    Resp = barrel_mcp_protocol:success_response(RequestId, Result, Meta),
    Json = barrel_mcp_protocol:encode(Resp),
    Hdrs = add_session_header(
        cors_headers(
            Headers,
            Config,
            #{<<"content-type">> => <<"application/json">>}
        ),
        SessionId
    ),
    reply(Responder, 200, Hdrs, Json).

send_jsonrpc_error_envelope(Headers, Responder, Config, SessionId, Id, Code, Message) ->
    Resp = barrel_mcp_protocol:error_response(Id, Code, Message),
    Json = barrel_mcp_protocol:encode(Resp),
    Hdrs = add_session_header(
        cors_headers(
            Headers,
            Config,
            #{<<"content-type">> => <<"application/json">>}
        ),
        SessionId
    ),
    reply(Responder, 200, Hdrs, Json).

reply_jsonrpc_error(Headers, Responder, Config, SessionId, Status, Id, Code, Message) ->
    Resp = barrel_mcp_protocol:error_response(Id, Code, Message),
    Json = barrel_mcp_protocol:encode(Resp),
    Hdrs = add_session_header(
        cors_headers(
            Headers,
            Config,
            #{<<"content-type">> => <<"application/json">>}
        ),
        SessionId
    ),
    reply(Responder, Status, Hdrs, Json).

%%====================================================================
%% Session resolution / protocol version
%%====================================================================

lookup_session(_Headers, _Config, false, _Method) ->
    {ok, undefined};
lookup_session(Headers, Config, true, Method) ->
    case {Method, session_header(Headers)} of
        {<<"initialize">>, undefined} ->
            {ok, SessionId} = barrel_mcp_session:create(#{}),
            BufMax = maps:get(sse_buffer_size, Config, 256),
            _ = barrel_mcp_session:set_sse_buffer_max(SessionId, BufMax),
            {ok, SessionId};
        {<<"initialize">>, SessionId} ->
            case barrel_mcp_session:get(SessionId) of
                {ok, _} -> {ok, SessionId};
                {error, not_found} -> {error, unknown_session}
            end;
        {_, undefined} ->
            {error, missing_session_id};
        {_, SessionId} ->
            case barrel_mcp_session:get(SessionId) of
                {ok, _} -> {ok, SessionId};
                {error, not_found} -> {error, unknown_session}
            end
    end.

validate_protocol_version(_Headers, _Sid, <<"initialize">>) ->
    ok;
validate_protocol_version(Headers, SessionId, _Method) ->
    case header(<<"mcp-protocol-version">>, Headers, undefined) of
        undefined ->
            ok;
        Version ->
            case lists:member(Version, ?MCP_SUPPORTED_VERSIONS) of
                true ->
                    case SessionId of
                        undefined ->
                            ok;
                        _ ->
                            _ = barrel_mcp_session:set_protocol_version(
                                SessionId, Version
                            ),
                            ok
                    end;
                false ->
                    {error,
                        iolist_to_binary([
                            <<"Bad MCP-Protocol-Version: ">>,
                            Version,
                            <<". Supported: ">>,
                            lists:join(<<", ">>, ?MCP_SUPPORTED_VERSIONS)
                        ])}
            end
    end.

maybe_capture_initialize_version(
    SessionId,
    <<"initialize">>,
    #{
        <<"result">> :=
            #{<<"protocolVersion">> := Version}
    }
) when
    is_binary(SessionId)
->
    _ = barrel_mcp_session:set_protocol_version(SessionId, Version),
    ok;
maybe_capture_initialize_version(_, _, _) ->
    ok.

%%====================================================================
%% Streamable transport — GET (long-lived SSE) and DELETE
%%====================================================================

stream_get_sse(Headers, Responder, Config) ->
    with_authenticated(
        Headers,
        Responder,
        Config,
        fun() -> stream_get_sse_authed(Headers, Responder, Config) end
    ).

stream_get_sse_authed(Headers, Responder, Config) ->
    case maps:get(session_enabled, Config, true) of
        false ->
            reply(
                Responder,
                400,
                cors_headers(Headers, Config, #{}),
                json_encode(#{<<"error">> => <<"Sessions not enabled">>})
            );
        true ->
            case session_header(Headers) of
                undefined ->
                    reply(
                        Responder,
                        400,
                        cors_headers(Headers, Config, #{}),
                        json_encode(#{
                            <<"error">> =>
                                <<"Mcp-Session-Id header required">>
                        })
                    );
                SessionId ->
                    stream_get_sse_session(Headers, Responder, Config, SessionId)
            end
    end.

stream_get_sse_session(Headers, Responder, Config, SessionId) ->
    case barrel_mcp_session:get(SessionId) of
        {ok, _Session} ->
            Hdrs = add_session_header(
                cors_headers(
                    Headers,
                    Config,
                    #{
                        <<"content-type">> => <<"text/event-stream">>,
                        <<"cache-control">> => <<"no-cache">>,
                        <<"connection">> => <<"keep-alive">>
                    }
                ),
                SessionId
            ),
            stream_start(Responder, 200, Hdrs),
            replay_sse_events(
                Responder,
                SessionId,
                header(<<"last-event-id">>, Headers, undefined)
            ),
            _ = barrel_mcp_session:set_sse_pid(SessionId, self()),
            sse_loop(Responder, SessionId);
        {error, not_found} ->
            reply(
                Responder,
                404,
                cors_headers(Headers, Config, #{}),
                json_encode(#{<<"error">> => <<"Unknown Mcp-Session-Id">>})
            )
    end.

%% Long-lived SSE pump. Runs in the per-request process until the
%% session is terminated, the client disconnects (`mcp_disconnect'
%% from the binding) or a chunk write fails.
sse_loop(Responder, SessionId) ->
    receive
        session_terminated ->
            sse_cleanup(Responder, SessionId);
        mcp_disconnect ->
            sse_cleanup(Responder, SessionId);
        {sse_event, EventId, Data} ->
            case push_sse_event(Responder, EventId, Data) of
                ok ->
                    _ = barrel_mcp_session:record_sse_event(SessionId, EventId, Data),
                    sse_loop(Responder, SessionId);
                {error, _} ->
                    sse_cleanup(Responder, SessionId)
            end;
        {sse_send_message, Message} ->
            EventId = generate_event_id(),
            case push_sse_event(Responder, EventId, Message) of
                ok ->
                    _ = barrel_mcp_session:record_sse_event(SessionId, EventId, Message),
                    sse_loop(Responder, SessionId);
                {error, _} ->
                    sse_cleanup(Responder, SessionId)
            end;
        _Other ->
            sse_loop(Responder, SessionId)
    end.

sse_cleanup(Responder, SessionId) ->
    _ =
        (try
            barrel_mcp_session:set_sse_pid(SessionId, undefined)
        catch
            _:_ -> ok
        end),
    _ = stream_end(Responder),
    ok.

stream_delete(Headers, Responder, Config) ->
    with_authenticated(
        Headers,
        Responder,
        Config,
        fun() -> stream_delete_authed(Headers, Responder, Config) end
    ).

stream_delete_authed(Headers, Responder, Config) ->
    case session_header(Headers) of
        undefined ->
            reply(
                Responder,
                400,
                cors_headers(Headers, Config, #{}),
                json_encode(#{<<"error">> => <<"Mcp-Session-Id header required">>})
            );
        SessionId ->
            case barrel_mcp_session:get(SessionId) of
                {ok, _} ->
                    barrel_mcp_session:delete(SessionId),
                    reply(Responder, 204, cors_headers(Headers, Config, #{}), <<>>);
                {error, not_found} ->
                    reply(
                        Responder,
                        404,
                        cors_headers(Headers, Config, #{}),
                        json_encode(#{<<"error">> => <<"Unknown Mcp-Session-Id">>})
                    )
            end
    end.

%%====================================================================
%% SSE helpers
%%====================================================================

%% Single-event SSE response to a POST: open, send the result, close.
stream_sse_response(Headers, Responder, Config, SessionId, Result) ->
    Hdrs = add_session_header(
        cors_headers(
            Headers,
            Config,
            #{
                <<"content-type">> => <<"text/event-stream">>,
                <<"cache-control">> => <<"no-cache">>
            }
        ),
        SessionId
    ),
    stream_start(Responder, 200, Hdrs),
    _ = push_sse_event(Responder, generate_event_id(), Result),
    stream_end(Responder).

push_sse_event(Responder, EventId, Data) ->
    Json = json_encode(Data),
    EventData = iolist_to_binary([
        <<"id: ">>,
        EventId,
        <<"\n">>,
        <<"data: ">>,
        Json,
        <<"\n\n">>
    ]),
    stream_chunk(Responder, EventData).

generate_event_id() ->
    integer_to_binary(erlang:system_time(microsecond)).

replay_sse_events(_Responder, _SessionId, undefined) ->
    ok;
replay_sse_events(Responder, SessionId, LastId) ->
    case barrel_mcp_session:events_since(SessionId, LastId) of
        {ok, Events} ->
            lists:foreach(
                fun({EventId, Payload}) ->
                    _ = push_sse_event(Responder, EventId, Payload)
                end,
                Events
            ),
            ok;
        truncated ->
            _ = push_sse_event(
                Responder,
                generate_event_id(),
                #{
                    <<"jsonrpc">> => <<"2.0">>,
                    <<"method">> => <<"notifications/replay_truncated">>,
                    <<"params">> => #{}
                }
            ),
            ok;
        {error, not_found} ->
            ok
    end.

%%====================================================================
%% Validation helpers
%%====================================================================

validate_accept_header(Headers) ->
    Accept = header(<<"accept">>, Headers, <<"*/*">>),
    HasWildcard = binary:match(Accept, <<"*/*">>) =/= nomatch,
    HasJson = binary:match(Accept, <<"application/json">>) =/= nomatch,
    HasSse = binary:match(Accept, <<"text/event-stream">>) =/= nomatch,
    case HasWildcard orelse (HasJson andalso HasSse) of
        true ->
            ok;
        false ->
            {error, <<
                "Accept header must include both application/json"
                " and text/event-stream"
            >>}
    end.

wants_sse_response(Headers) ->
    Accept = header(<<"accept">>, Headers, <<>>),
    HasJson = binary:match(Accept, <<"application/json">>) =/= nomatch,
    HasSse = binary:match(Accept, <<"text/event-stream">>) =/= nomatch,
    case {HasJson, HasSse} of
        {false, true} ->
            true;
        {true, true} ->
            SsePos = match_pos(Accept, <<"text/event-stream">>),
            JsonPos = match_pos(Accept, <<"application/json">>),
            SsePos < JsonPos;
        _ ->
            false
    end.

match_pos(Bin, Needle) ->
    case binary:match(Bin, Needle) of
        nomatch -> infinity;
        {P, _} -> P
    end.

%%====================================================================
%% Authentication
%%====================================================================

init_auth(#{provider := Provider} = AuthOpts) ->
    ProviderOpts = maps:get(provider_opts, AuthOpts, #{}),
    ProviderState =
        case erlang:function_exported(Provider, init, 1) of
            true ->
                case Provider:init(ProviderOpts) of
                    {ok, S} -> S;
                    _ -> undefined
                end;
            false ->
                undefined
        end,
    AuthOpts#{provider_state => ProviderState};
init_auth(AuthOpts) ->
    init_auth(AuthOpts#{provider => barrel_mcp_auth_none}).

authenticate(#{provider := barrel_mcp_auth_none}, _Request) ->
    barrel_mcp_auth_none:authenticate(#{}, undefined);
authenticate(AuthConfig, Request) ->
    barrel_mcp_auth:authenticate(AuthConfig, Request, AuthConfig).

%% Run `Fun' only if the request passes the configured auth provider.
%% Used by the GET (SSE) and DELETE verbs so they enforce the same
%% credential as POST instead of trusting the session id alone. With
%% `barrel_mcp_auth_none' this admits every request unchanged.
with_authenticated(Headers, Responder, Config, Fun) ->
    AuthConfig = maps:get(
        auth_config,
        Config,
        #{provider => barrel_mcp_auth_none}
    ),
    AuthRequest = #{headers => extract_headers(Headers, AuthConfig)},
    case authenticate(AuthConfig, AuthRequest) of
        {ok, _AuthInfo} ->
            Fun();
        {error, Reason} ->
            auth_error(Headers, Responder, AuthConfig, Reason)
    end.

auth_error(Headers, Responder, AuthConfig, Reason) ->
    {StatusCode, AuthHeaders, Body} =
        barrel_mcp_auth:challenge_response(AuthConfig, Reason),
    %% AuthHeaders is a map; merge with CORS and emit as a list.
    Merged = maps:merge(
        AuthHeaders,
        cors_headers(Headers, #{auth_config => AuthConfig}, #{})
    ),
    reply(Responder, StatusCode, Merged, Body).

extract_headers(Headers, AuthConfig) ->
    Names =
        case AuthConfig of
            undefined ->
                [<<"authorization">>, <<"x-api-key">>];
            _ ->
                case barrel_mcp_auth:auth_headers(AuthConfig) of
                    [] -> [<<"authorization">>, <<"x-api-key">>];
                    Decl -> Decl
                end
        end,
    lists:foldl(
        fun(Name, Acc) ->
            case header(Name, Headers, undefined) of
                undefined -> Acc;
                Value -> Acc#{Name => Value}
            end
        end,
        #{},
        Names
    ).

%% The user-facing `resource_metadata' option processing.
normalize_resource_metadata(undefined) ->
    undefined;
normalize_resource_metadata(#{resource := ResourceUrl} = M) ->
    Doc = maps:without([metadata_url], M),
    MetaUrl =
        case maps:get(metadata_url, M, undefined) of
            undefined -> derive_prm_url(ResourceUrl);
            Explicit when is_binary(Explicit) -> Explicit
        end,
    #{document => Doc, url => MetaUrl}.

derive_prm_url(Resource) when is_binary(Resource) ->
    case uri_string:parse(Resource) of
        #{scheme := Scheme, host := Host} = Parsed ->
            PortPart =
                case maps:get(port, Parsed, undefined) of
                    undefined -> <<>>;
                    P -> iolist_to_binary([<<":">>, integer_to_binary(P)])
                end,
            iolist_to_binary([
                Scheme,
                <<"://">>,
                Host,
                PortPart,
                <<"/.well-known/oauth-protected-resource">>
            ]);
        _ ->
            <<Resource/binary, "/.well-known/oauth-protected-resource">>
    end.

inject_resource_metadata_url(AuthConfig, undefined) ->
    AuthConfig;
inject_resource_metadata_url(#{provider_state := State} = AuthConfig, #{url := Url}) when
    is_map(State)
->
    AuthConfig#{provider_state => State#{resource_metadata_url => Url}};
inject_resource_metadata_url(AuthConfig, _) ->
    AuthConfig.

%%====================================================================
%% CORS
%%====================================================================

cors_headers(Headers, Config, Extra) ->
    BaseAllowHeaders = [
        <<"content-type">>,
        <<"accept">>,
        <<"mcp-session-id">>,
        <<"mcp-protocol-version">>,
        <<"last-event-id">>
    ],
    AuthHeaders =
        case maps:get(auth_config, Config, undefined) of
            undefined -> [];
            AC -> barrel_mcp_auth:auth_headers(AC)
        end,
    AllowHeaders = lists:join(<<", ">>, BaseAllowHeaders ++ AuthHeaders),
    ExposeHeaders = <<"www-authenticate, mcp-session-id, mcp-protocol-version">>,
    Base = #{
        <<"access-control-allow-methods">> => <<"POST, GET, DELETE, OPTIONS">>,
        <<"access-control-allow-headers">> => iolist_to_binary(AllowHeaders),
        <<"access-control-expose-headers">> => ExposeHeaders
    },
    WithOrigin =
        case header(<<"origin">>, Headers, undefined) of
            undefined ->
                Base;
            Origin ->
                Base#{
                    <<"access-control-allow-origin">> => Origin,
                    <<"vary">> => <<"Origin">>
                }
        end,
    maps:merge(WithOrigin, Extra).

%%====================================================================
%% Origin validation + bind helpers
%%====================================================================

resolve_allowed_origins(_Loopback, any) ->
    {ok, any};
resolve_allowed_origins(true, undefined) ->
    {ok, default_loopback_origins()};
resolve_allowed_origins(false, undefined) ->
    {error, allowed_origins_required};
resolve_allowed_origins(_Loopback, List) when is_list(List) ->
    {ok, [parse_origin(O) || O <- List]}.

default_loopback_origins() ->
    [
        #{scheme => <<"http">>, host => <<"localhost">>, port => any},
        #{scheme => <<"http">>, host => <<"127.0.0.1">>, port => any},
        #{scheme => <<"http">>, host => <<"[::1]">>, port => any}
    ].

parse_origin(<<"null">>) ->
    null;
parse_origin(Bin) when is_binary(Bin) ->
    case uri_string:parse(Bin) of
        #{scheme := Scheme, host := Host} = U ->
            #{
                scheme => to_bin(Scheme),
                host => to_bin(Host),
                port => maps:get(port, U, any)
            };
        _ ->
            #{scheme => undefined, host => Bin, port => any}
    end.

is_loopback({127, _, _, _}) -> true;
is_loopback({0, 0, 0, 0, 0, 0, 0, 1}) -> true;
is_loopback("localhost") -> true;
is_loopback(<<"localhost">>) -> true;
is_loopback(_) -> false.

validate_origin(Headers, Config) ->
    Allowed = maps:get(allowed_origins, Config, any),
    AllowMissing = maps:get(allow_missing_origin, Config, true),
    case header(<<"origin">>, Headers, undefined) of
        undefined when AllowMissing -> ok;
        undefined -> {error, missing_origin};
        Origin -> match_origin(Origin, Allowed)
    end.

match_origin(_Origin, any) ->
    ok;
match_origin(<<"null">>, Allowed) ->
    case lists:member(null, Allowed) of
        true -> ok;
        false -> {error, origin_null_not_allowed}
    end;
match_origin(Origin, Allowed) ->
    Parsed = parse_origin(Origin),
    case lists:any(fun(A) -> origin_matches(A, Parsed) end, Allowed) of
        true -> ok;
        false -> {error, origin_not_allowed}
    end.

origin_matches(null, _) ->
    false;
origin_matches(#{scheme := S, host := H, port := P}, Parsed) ->
    SOk = (S =:= undefined) orelse (S =:= maps:get(scheme, Parsed)),
    HOk = (H =:= maps:get(host, Parsed)),
    POk = (P =:= any) orelse (P =:= maps:get(port, Parsed)),
    SOk andalso HOk andalso POk;
origin_matches(_, _) ->
    false.

%%====================================================================
%% Session manager bootstrap
%%====================================================================

ensure_session_manager() ->
    case whereis(barrel_mcp_session) of
        undefined ->
            case whereis(barrel_mcp_sup) of
                undefined -> barrel_mcp_session:start_link();
                _ -> ok
            end;
        _ ->
            ok
    end.

%%====================================================================
%% Request/response plumbing
%%====================================================================

%% Tag the decoded request with the authenticated principal under
%% `_auth' before handing it to the protocol core. Authentication
%% always yields an info map (auth_none included), so there is no
%% untagged path.
with_auth(Request, AuthInfo) -> Request#{<<"_auth">> => AuthInfo}.

session_header(Headers) ->
    header(<<"mcp-session-id">>, Headers, undefined).

add_session_header(Headers, undefined) -> Headers;
add_session_header(Headers, SessionId) -> Headers#{<<"mcp-session-id">> => SessionId}.

%% Case-insensitive header lookup over a `[{binary(), binary()}]' list.
header(Name, Headers, Default) ->
    Lower = string:lowercase(Name),
    case
        lists:search(
            fun({K, _}) -> string:lowercase(to_bin(K)) =:= Lower end,
            Headers
        )
    of
        {value, {_, V}} -> to_bin(V);
        false -> Default
    end.

reply(Responder, Status, HeadersMap, Body) ->
    Fun = maps:get(reply, Responder),
    Fun(Status, headers_list(HeadersMap), Body),
    ok.

stream_start(Responder, Status, HeadersMap) ->
    Fun = maps:get(stream_start, Responder),
    Fun(Status, headers_list(HeadersMap)),
    ok.

stream_chunk(Responder, Data) ->
    Fun = maps:get(stream_chunk, Responder),
    Fun(Data).

stream_end(Responder) ->
    Fun = maps:get(stream_end, Responder),
    Fun().

headers_list(Map) -> maps:to_list(Map).

json_encode(Data) ->
    iolist_to_binary(json:encode(Data)).

strip_query(Path) ->
    case binary:split(Path, <<"?">>) of
        [P, _] -> P;
        [P] -> P
    end.

to_bin(B) when is_binary(B) -> B;
to_bin(L) when is_list(L) -> iolist_to_binary(L);
to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8).