%%%-------------------------------------------------------------------
%%% @doc MCP protocol implementation over JSON-RPC 2.0.
%%%
%%% Handles encoding/decoding and routing of MCP methods.
%%% @end
%%%-------------------------------------------------------------------
-module(barrel_mcp_protocol).
-include("barrel_mcp.hrl").
%% API
-export([
decode/1,
encode/1,
handle/1,
handle/2,
error_response/3,
error_response/4,
success_response/2,
success_response/3,
notification_response/0
]).
%% JSON-RPC envelope helpers (shared by client + server)
-export([
encode_request/3,
encode_notification/2,
encode_response/2,
encode_error/3,
decode_envelope/1,
format_tool_result_external/1,
drive_async_plan/2,
drive_async_plan/3
]).
%%====================================================================
%% API
%%====================================================================
%% @doc Decode a JSON-RPC request body. The spec includes `list()'
%% in the success type so the HTTP transport can detect (and reject)
%% JSON-RPC batches.
-spec decode(binary()) -> {ok, map() | list()} | {error, term()}.
decode(Binary) ->
try
{ok, json:decode(Binary)}
catch
_:_ ->
{error, parse_error}
end.
%% @doc Encode a JSON-RPC response.
-spec encode(map()) -> binary().
encode(Response) ->
iolist_to_binary(json:encode(Response)).
%% @doc Handle a JSON-RPC request with default state.
-spec handle(map() | list()) -> map() | no_response | {async, map()}.
handle(Request) ->
handle(Request, #{}).
%% @doc Handle a JSON-RPC request with state.
%%
%% Returns one of:
%% <ul>
%% <li>`map()' — a JSON-RPC response envelope ready to encode.</li>
%% <li>`no_response' — for inbound notifications.</li>
%% <li>`{async, AsyncPlan}' — for `tools/call'. The transport
%% spawns the worker via `(maps:get(spawn, AsyncPlan))(Ctx)'
%% and waits on its mailbox for a `tool_result' / `tool_error' /
%% `tool_failed' / `tool_validation_failed' / `cancelled'
%% message.</li>
%% </ul>
%%
%% MCP forbids JSON-RPC batches (a top-level JSON array) — they are
%% rejected here with `Invalid Request' so non-HTTP callers see the
%% same error as the HTTP transport.
-spec handle(map() | list(), map()) -> map() | no_response | {async, map()}.
handle(L, _State) when is_list(L) ->
error_response(
null,
?JSONRPC_INVALID_REQUEST,
<<"Batch requests are not supported">>
);
handle(#{<<"jsonrpc">> := <<"2.0">>, <<"method">> := Method} = Request, State) ->
Params = maps:get(<<"params">>, Request, #{}),
case maps:find(<<"id">>, Request) of
error ->
%% No id: this is a notification — no response.
handle_notification(Method, Params, State),
no_response;
{ok, Id} when is_binary(Id); is_integer(Id) ->
handle_request(Method, Params, Id, State);
{ok, _BadId} ->
%% MCP requires id to be a string or integer (and not
%% null). Anything else is an Invalid Request.
error_response(
null,
?JSONRPC_INVALID_REQUEST,
<<"Invalid Request: id must be a string or integer">>
)
end;
handle(#{<<"id">> := Id}, _State) when is_binary(Id); is_integer(Id) ->
error_response(Id, ?JSONRPC_INVALID_REQUEST, <<"Invalid Request">>);
handle(_, _State) ->
error_response(null, ?JSONRPC_INVALID_REQUEST, <<"Invalid Request">>).
%% @doc Create an error response.
-spec error_response(term(), integer(), binary()) -> map().
error_response(Id, Code, Message) ->
error_response(Id, Code, Message, #{}).
%% Optional `_meta' map on the error envelope. Empty map is
%% omitted from the wire; the spec's `_meta' is the
%% extensibility hook that lets server / client extensions thread
%% state without changing the JSON-RPC surface.
-spec error_response(term(), integer(), binary(), map()) -> map().
error_response(Id, Code, Message, Meta) ->
Err = #{
<<"code">> => Code,
<<"message">> => Message
},
Env = #{
<<"jsonrpc">> => <<"2.0">>,
<<"id">> => Id,
<<"error">> => Err
},
add_meta(Env, Meta).
%% @doc Return a marker for no response (notifications).
-spec notification_response() -> no_response.
notification_response() ->
no_response.
%%====================================================================
%% Request Handlers
%%====================================================================
handle_request(<<"initialize">>, Params, Id, State) ->
ServerName = application:get_env(barrel_mcp, server_name, <<"barrel">>),
ServerVersion = application:get_env(barrel_mcp, server_version, <<"1.0.0">>),
NegotiatedVersion = negotiate_protocol_version(
maps:get(<<"protocolVersion">>, Params, undefined)
),
%% Persist client capabilities (notably `sampling') so the server can
%% later issue server-to-client requests via barrel_mcp_session.
%% Also persist the negotiated protocol_version on the session.
_ =
case maps:find(session_id, State) of
{ok, SessionId} when is_binary(SessionId) ->
ClientCaps = maps:get(<<"capabilities">>, Params, #{}),
_ = barrel_mcp_session:set_client_capabilities(SessionId, ClientCaps),
_ = barrel_mcp_session:set_protocol_version(SessionId, NegotiatedVersion),
ok;
_ ->
ok
end,
BaseCaps = #{
<<"tools">> => #{<<"listChanged">> => true},
<<"resources">> => #{
<<"subscribe">> => true,
<<"listChanged">> => true
},
<<"prompts">> => #{<<"listChanged">> => true},
<<"logging">> => #{},
%% Per the MCP tasks SEP (and as enforced by the
%% reference Python SDK), each operation key is an
%% object whose presence advertises support; only
%% `listChanged' is a bare boolean.
<<"tasks">> => #{
<<"list">> => #{},
<<"get">> => #{},
<<"cancel">> => #{},
<<"result">> => #{},
<<"listChanged">> => true
}
},
Caps = maybe_advertise_completions(BaseCaps),
success_response(Id, #{
<<"protocolVersion">> => NegotiatedVersion,
<<"capabilities">> => Caps,
<<"serverInfo">> => #{
<<"name">> => ServerName,
<<"version">> => ServerVersion
}
});
handle_request(<<"ping">>, _Params, Id, _State) ->
success_response(Id, #{});
%% Tools
handle_request(<<"tools/list">>, Params, Id, _State) ->
Cursor = maps:get(<<"cursor">>, Params, undefined),
{Page, Next} = paginate(
barrel_mcp_registry:all(tool),
Cursor,
fun({N, _}) -> N end
),
Tools = lists:map(
fun({Name, Handler}) ->
Base = #{
<<"name">> => Name,
<<"description">> => maps:get(description, Handler, <<>>),
<<"inputSchema">> => maps:get(input_schema, Handler, #{<<"type">> => <<"object">>})
},
with_optional_fields(Base, Handler, [
{<<"outputSchema">>, output_schema},
{<<"title">>, title},
{<<"icons">>, icons},
{<<"annotations">>, annotations}
])
end,
Page
),
success_response(Id, with_next_cursor(#{<<"tools">> => Tools}, Next));
handle_request(<<"tools/call">>, Params, Id, _State) ->
Name = maps:get(<<"name">>, Params, <<>>),
Args = maps:get(<<"arguments">>, Params, #{}),
%% Tool dispatch is asynchronous. The transport drives the
%% lifecycle: it builds `Ctx', invokes the spawn closure, records
%% the in-flight entry, and waits on its mailbox for one of
%% `{tool_result, _, _}', `{tool_error, _, _}',
%% `{tool_failed, _, _}', `{tool_validation_failed, _, _}', or
%% `{cancelled, _}' (sent by `barrel_mcp_session:cancel_in_flight/2').
Meta = maps:get(<<"_meta">>, Params, #{}),
Plan = #{
request_id => Id,
meta => Meta,
spawn => fun(Ctx) ->
case barrel_mcp_registry:run_tool(Name, Args, Ctx) of
{ok, Pid} ->
Pid;
{error, _} = Err ->
%% Surface as if the worker reported it: the
%% transport then maps the error.
ReplyTo = maps:get(reply_to, Ctx),
RequestId = maps:get(request_id, Ctx),
ReplyTo ! {tool_failed, RequestId, Err},
%% Return a transient pid so the in-flight
%% record has something monitorable.
spawn(fun() -> ok end)
end
end
},
{async, Plan};
%% Resources
handle_request(<<"resources/list">>, Params, Id, _State) ->
Cursor = maps:get(<<"cursor">>, Params, undefined),
{Page, Next} = paginate(
barrel_mcp_registry:all(resource),
Cursor,
fun({N, _}) -> N end
),
Resources = lists:map(
fun({_Name, Handler}) ->
Base = #{
<<"uri">> => maps:get(uri, Handler, <<>>),
<<"name">> => maps:get(name, Handler, <<>>),
<<"description">> => maps:get(description, Handler, <<>>),
<<"mimeType">> => maps:get(mime_type, Handler, <<"text/plain">>)
},
with_optional_fields(Base, Handler, [
{<<"title">>, title},
{<<"icons">>, icons},
{<<"annotations">>, annotations}
])
end,
Page
),
success_response(
Id,
with_next_cursor(
#{<<"resources">> => Resources},
Next
)
);
handle_request(<<"resources/read">>, Params, Id, _State) ->
Uri = maps:get(<<"uri">>, Params, <<>>),
%% Exact-URI lookup first.
Resources = barrel_mcp_registry:all(resource),
case lists:keyfind(Uri, 1, [{maps:get(uri, H, <<>>), N, H} || {N, H} <- Resources]) of
{Uri, Name, _Handler} ->
run_resource_read(resource, Name, Params, Uri, Id);
false ->
%% Fall back to RFC 6570 template matching against
%% registered `resource_template' entries.
case match_resource_template(Uri) of
{ok, TplName, Vars} ->
Args = maps:merge(Params, Vars),
run_resource_read(resource_template, TplName, Args, Uri, Id);
nomatch ->
error_response(
Id,
?JSONRPC_METHOD_NOT_FOUND,
<<"Resource not found">>
)
end
end;
handle_request(<<"resources/templates/list">>, Params, Id, _State) ->
Cursor = maps:get(<<"cursor">>, Params, undefined),
{Page, Next} = paginate(
barrel_mcp_registry:all(resource_template),
Cursor,
fun({N, _}) -> N end
),
Templates = lists:map(
fun({_Name, Handler}) ->
Base = #{
<<"uriTemplate">> => maps:get(uri_template, Handler, <<>>),
<<"name">> => maps:get(name, Handler, <<>>),
<<"description">> => maps:get(description, Handler, <<>>),
<<"mimeType">> => maps:get(mime_type, Handler, <<"text/plain">>)
},
Compact = maps:filter(fun(_K, V) -> V =/= <<>> end, Base),
with_optional_fields(Compact, Handler, [
{<<"title">>, title},
{<<"icons">>, icons},
{<<"annotations">>, annotations}
])
end,
Page
),
success_response(
Id,
with_next_cursor(
#{<<"resourceTemplates">> => Templates},
Next
)
);
handle_request(<<"resources/subscribe">>, Params, Id, State) ->
Uri = maps:get(<<"uri">>, Params, <<>>),
case maps:find(session_id, State) of
{ok, SessionId} when is_binary(SessionId), Uri =/= <<>> ->
barrel_mcp_session:subscribe_resource(SessionId, Uri),
success_response(Id, #{});
_ ->
error_response(
Id,
?JSONRPC_INVALID_PARAMS,
<<"Subscribe requires a session and a uri">>
)
end;
handle_request(<<"resources/unsubscribe">>, Params, Id, State) ->
Uri = maps:get(<<"uri">>, Params, <<>>),
case maps:find(session_id, State) of
{ok, SessionId} when is_binary(SessionId), Uri =/= <<>> ->
barrel_mcp_session:unsubscribe_resource(SessionId, Uri),
success_response(Id, #{});
_ ->
error_response(
Id,
?JSONRPC_INVALID_PARAMS,
<<"Unsubscribe requires a session and a uri">>
)
end;
%% Prompts
handle_request(<<"prompts/list">>, Params, Id, _State) ->
Cursor = maps:get(<<"cursor">>, Params, undefined),
{Page, Next} = paginate(
barrel_mcp_registry:all(prompt),
Cursor,
fun({N, _}) -> N end
),
Prompts = lists:map(
fun({Name, Handler}) ->
Base = #{
<<"name">> => Name,
<<"description">> => maps:get(description, Handler, <<>>),
<<"arguments">> => lists:map(
fun(Arg) ->
#{
<<"name">> => maps:get(name, Arg, <<>>),
<<"description">> => maps:get(description, Arg, <<>>),
<<"required">> => maps:get(required, Arg, false)
}
end,
maps:get(arguments, Handler, [])
)
},
with_optional_fields(Base, Handler, [
{<<"title">>, title},
{<<"icons">>, icons},
{<<"annotations">>, annotations}
])
end,
Page
),
success_response(
Id,
with_next_cursor(
#{<<"prompts">> => Prompts},
Next
)
);
handle_request(<<"prompts/get">>, Params, Id, _State) ->
Name = maps:get(<<"name">>, Params, <<>>),
Args = maps:get(<<"arguments">>, Params, #{}),
case barrel_mcp_registry:run(prompt, Name, Args) of
{ok, Result} ->
success_response(Id, #{
<<"description">> => maps:get(description, Result, <<>>),
<<"messages">> => maps:get(messages, Result, [])
});
{error, {not_found, _, _}} ->
error_response(Id, ?JSONRPC_METHOD_NOT_FOUND, <<"Prompt not found">>);
{error, Crash} ->
log_handler_crash(prompt, Name, Id, Crash),
error_response(Id, ?MCP_PROMPT_ERROR, <<"Internal prompt error">>)
end;
%% Tasks
handle_request(<<"tasks/list">>, Params, Id, State) ->
SessionId = maps:get(session_id, State, undefined),
Cursor = maps:get(<<"cursor">>, Params, undefined),
{ok, AllTasks} = barrel_mcp_tasks:list(SessionId, #{}),
{Page, Next} = paginate(
AllTasks,
Cursor,
fun(T) -> maps:get(<<"taskId">>, T) end
),
success_response(Id, with_next_cursor(#{<<"tasks">> => Page}, Next));
handle_request(<<"tasks/get">>, Params, Id, State) ->
SessionId = maps:get(session_id, State, undefined),
TaskId = maps:get(<<"taskId">>, Params, <<>>),
case barrel_mcp_tasks:get(SessionId, TaskId) of
{ok, Task} -> success_response(Id, Task);
{error, not_found} -> error_response(Id, ?JSONRPC_INVALID_PARAMS, <<"Task not found">>)
end;
handle_request(<<"tasks/cancel">>, Params, Id, State) ->
SessionId = maps:get(session_id, State, undefined),
TaskId = maps:get(<<"taskId">>, Params, <<>>),
case barrel_mcp_tasks:cancel(SessionId, TaskId) of
ok ->
%% Spec / reference SDK expect the cancelled Task back,
%% not an empty object.
case barrel_mcp_tasks:get(SessionId, TaskId) of
{ok, Task} -> success_response(Id, Task);
{error, not_found} -> success_response(Id, #{})
end;
{error, not_found} ->
error_response(Id, ?JSONRPC_INVALID_PARAMS, <<"Task not found">>)
end;
handle_request(<<"tasks/result">>, Params, Id, State) ->
SessionId = maps:get(session_id, State, undefined),
TaskId = maps:get(<<"taskId">>, Params, <<>>),
case barrel_mcp_tasks:get(SessionId, TaskId) of
{ok, #{<<"status">> := <<"completed">>} = T} ->
Result = maps:get(<<"result">>, T, #{}),
success_response(Id, Result);
{ok, #{<<"status">> := <<"failed">>} = T} ->
Err = maps:get(<<"error">>, T, <<"Task failed">>),
error_response(Id, ?MCP_TOOL_ERROR, Err);
{ok, #{<<"status">> := <<"cancelled">>}} ->
error_response(
Id,
?JSONRPC_INVALID_PARAMS,
<<"Task cancelled">>
);
{ok, #{<<"status">> := _}} ->
error_response(
Id,
?JSONRPC_INVALID_PARAMS,
<<"Task not yet complete">>
);
{error, not_found} ->
error_response(Id, ?JSONRPC_INVALID_PARAMS, <<"Task not found">>)
end;
%% Completions
handle_request(<<"completion/complete">>, Params, Id, _State) ->
Ref = maps:get(<<"ref">>, Params, #{}),
Argument = maps:get(<<"argument">>, Params, #{}),
ArgName = maps:get(<<"name">>, Argument, <<>>),
Value = maps:get(<<"value">>, Argument, <<>>),
case completion_lookup_key(Ref, ArgName) of
undefined ->
success_response(Id, #{<<"completion">> => empty_completion()});
Key ->
case barrel_mcp_registry:run_completion(Key, Value, #{}) of
{ok, {ok, Values}} ->
success_response(Id, #{
<<"completion">> =>
completion_payload(Values, false)
});
{ok, {ok, Values, #{has_more := HasMore}}} ->
success_response(Id, #{
<<"completion">> =>
completion_payload(Values, HasMore)
});
{error, {not_found, _, _}} ->
success_response(Id, #{<<"completion">> => empty_completion()});
{error, Crash} ->
log_handler_crash(completion, Key, Id, Crash),
error_response(
Id,
?JSONRPC_INTERNAL_ERROR,
<<"Internal completion error">>
)
end
end;
%% Logging
handle_request(<<"logging/setLevel">>, Params, Id, State) ->
Level = maps:get(<<"level">>, Params, undefined),
case {Level, maps:find(session_id, State)} of
{undefined, _} ->
error_response(
Id,
?JSONRPC_INVALID_PARAMS,
<<"Missing required parameter: level">>
);
{_, error} ->
%% Stdio / no session — accept but no per-session storage.
case barrel_mcp_session:log_level_priority(Level) of
error ->
error_response(
Id,
?JSONRPC_INVALID_PARAMS,
<<"Invalid log level">>
);
_ ->
success_response(Id, #{})
end;
{_, {ok, undefined}} ->
case barrel_mcp_session:log_level_priority(Level) of
error ->
error_response(
Id,
?JSONRPC_INVALID_PARAMS,
<<"Invalid log level">>
);
_ ->
success_response(Id, #{})
end;
{_, {ok, SessionId}} ->
case barrel_mcp_session:set_log_level(SessionId, Level) of
ok ->
success_response(Id, #{});
{error, invalid_level} ->
error_response(
Id,
?JSONRPC_INVALID_PARAMS,
<<"Invalid log level">>
);
{error, not_found} ->
success_response(Id, #{})
end
end;
%% Unknown method
handle_request(Method, _Params, Id, _State) ->
error_response(
Id,
?JSONRPC_METHOD_NOT_FOUND,
<<"Method not found: ", Method/binary>>
).
%%====================================================================
%% Notification Handlers
%%====================================================================
%% Spec name (2025-03-26+).
handle_notification(<<"notifications/initialized">>, _Params, _State) ->
ok;
%% Legacy bare name kept for one release; older clients still send this.
handle_notification(<<"initialized">>, _Params, _State) ->
ok;
handle_notification(<<"notifications/cancelled">>, Params, State) ->
case maps:find(session_id, State) of
{ok, SessionId} when is_binary(SessionId) ->
case maps:find(<<"requestId">>, Params) of
{ok, RequestId} ->
barrel_mcp_session:cancel_in_flight(SessionId, RequestId);
error ->
ok
end;
_ ->
ok
end;
handle_notification(<<"notifications/progress">>, _Params, _State) ->
%% The server doesn't currently emit anything special on inbound
%% client-side progress notifications (used for client→server
%% requests, which we don't have). Acknowledge silently.
ok;
handle_notification(<<"notifications/roots/list_changed">>, Params, State) ->
case application:get_env(barrel_mcp, roots_changed_handler) of
{ok, {Mod, Fun}} ->
try
Mod:Fun(Params, State)
catch
_:_ -> ok
end;
_ ->
ok
end;
handle_notification(_, _Params, _State) ->
ok.
%%====================================================================
%% Internal Functions
%%====================================================================
success_response(Id, Result) ->
success_response(Id, Result, #{}).
success_response(Id, Result, Meta) ->
Env = #{
<<"jsonrpc">> => <<"2.0">>,
<<"id">> => Id,
<<"result">> => Result
},
add_meta(Env, Meta).
%% Add `_meta' to a JSON-RPC envelope only when the supplied map
%% is non-empty. Keeps wire payloads compact.
add_meta(Env, Meta) when is_map(Meta), map_size(Meta) > 0 ->
Env#{<<"_meta">> => Meta};
add_meta(Env, _) ->
Env.
%% @doc Format a tool handler's plain return value into the MCP
%% content-block list shape. Public so transports driving async
%% tool calls (HTTP / stdio) can produce identical envelopes.
-spec format_tool_result_external(term()) -> [map()].
format_tool_result_external(Result) ->
format_tool_result(Result).
%% @doc Drive an `{async, AsyncPlan}' from `handle/2' to completion
%% on the calling process and return a JSON-RPC response map.
%%
%% Used by transports that don't have their own request/wait
%% machinery (stdio, legacy HTTP). The Streamable HTTP transport
%% drives async plans itself because it needs to record per-session
%% in-flight entries for cancellation routing.
-spec drive_async_plan(map(), timeout()) -> map().
drive_async_plan(Plan, Timeout) ->
drive_async_plan(Plan, Timeout, undefined).
%% @doc As {@link drive_async_plan/2}, but threads the authenticated
%% principal (`AuthInfo', the auth provider's `authenticate/2' map) into
%% the tool `Ctx' under `auth_info'. Transports that authenticate before
%% driving the plan (the simple HTTP transport) pass it here; callers
%% with no auth provider use the `/2' form and `auth_info' is `undefined'.
-spec drive_async_plan(map(), timeout(), term()) -> map().
drive_async_plan(Plan, Timeout, AuthInfo) ->
Self = self(),
RequestId = maps:get(request_id, Plan),
Spawn = maps:get(spawn, Plan),
Meta = maps:get(meta, Plan, #{}),
Ctx = #{
request_id => RequestId,
session_id => undefined,
progress_token => undefined,
meta => Meta,
emit_progress => fun(_, _, _) -> ok end,
reply_to => Self,
auth_info => AuthInfo
},
_Pid = Spawn(Ctx),
receive
{tool_result, RequestId, Result} ->
success_response(
RequestId,
#{<<"content">> => format_tool_result_external(Result)}
);
{tool_result_meta, RequestId, Result, RespMeta} ->
success_response(
RequestId,
#{<<"content">> => format_tool_result_external(Result)},
RespMeta
);
{tool_structured, RequestId, Data, Content} ->
success_response(
RequestId,
#{
<<"content">> => Content,
<<"structuredContent">> => Data
}
);
{tool_structured_meta, RequestId, Data, Content, RespMeta} ->
success_response(
RequestId,
#{
<<"content">> => Content,
<<"structuredContent">> => Data
},
RespMeta
);
{tool_error, RequestId, Content} ->
success_response(
RequestId,
#{
<<"content">> => Content,
<<"isError">> => true
}
);
{tool_error_meta, RequestId, Content, RespMeta} ->
success_response(
RequestId,
#{
<<"content">> => Content,
<<"isError">> => true
},
RespMeta
);
{tool_validation_failed, RequestId, Errors} ->
Msg = iolist_to_binary(
io_lib:format(
"Invalid tool input: ~p", [Errors]
)
),
success_response(
RequestId,
#{
<<"content">> =>
[#{<<"type">> => <<"text">>, <<"text">> => Msg}],
<<"isError">> => true
}
);
{tool_failed, RequestId, _Reason} ->
%% Crash details are logged server-side by the registry; do
%% not echo `Reason' back to the wire (it can carry module
%% paths, file paths, or secret-bearing exception terms).
error_response(
RequestId,
?MCP_TOOL_ERROR,
<<"Internal tool error">>
)
after Timeout ->
error_response(RequestId, ?MCP_TOOL_ERROR, <<"Tool timed out">>)
end.
format_tool_result(Result) when is_binary(Result) ->
[#{<<"type">> => <<"text">>, <<"text">> => Result}];
format_tool_result(Result) when is_map(Result) ->
case maps:get(<<"type">>, Result, undefined) of
undefined ->
[#{<<"type">> => <<"text">>, <<"text">> => iolist_to_binary(json:encode(Result))}];
_ ->
[Result]
end;
format_tool_result(Result) when is_list(Result) ->
Result;
format_tool_result(Result) ->
[#{<<"type">> => <<"text">>, <<"text">> => io_lib:format("~p", [Result])}].
%% Run a resource handler (exact match or template) and shape
%% the response.
run_resource_read(Type, Name, Args, Uri, Id) ->
case barrel_mcp_registry:run(Type, Name, Args) of
{ok, Result} ->
Content = format_resource_result(Uri, Result),
success_response(Id, #{<<"contents">> => Content});
{error, Crash} ->
log_handler_crash(resource, Name, Id, Crash),
error_response(Id, ?MCP_RESOURCE_ERROR, <<"Internal resource error">>)
end.
%% Walk the registered resource_template entries and return the
%% first whose `uri_template' matches `Uri'.
match_resource_template(Uri) ->
Templates = barrel_mcp_registry:all(resource_template),
do_match_template(Uri, Templates).
do_match_template(_Uri, []) ->
nomatch;
do_match_template(Uri, [{Name, Handler} | Rest]) ->
Tpl = maps:get(uri_template, Handler, <<>>),
case Tpl of
<<>> ->
do_match_template(Uri, Rest);
_ ->
case barrel_mcp_uri_template:match(Uri, Tpl) of
{ok, Vars} -> {ok, Name, Vars};
nomatch -> do_match_template(Uri, Rest)
end
end.
format_resource_result(Uri, Result) when is_list(Result) ->
[add_resource_uri(Uri, B) || B <- Result];
format_resource_result(Uri, Result) when is_binary(Result) ->
[#{<<"uri">> => Uri, <<"text">> => Result}];
format_resource_result(Uri, #{text := Text} = M) ->
Block = #{<<"uri">> => Uri, <<"text">> => Text},
[decorate_block(Block, M)];
format_resource_result(Uri, #{blob := Blob, mimeType := MimeType} = M) ->
Block = #{
<<"uri">> => Uri,
<<"blob">> => base64:encode(Blob),
<<"mimeType">> => MimeType
},
[decorate_block(Block, M)];
format_resource_result(Uri, Result) when is_map(Result) ->
[#{<<"uri">> => Uri, <<"text">> => iolist_to_binary(json:encode(Result))}];
format_resource_result(Uri, Result) ->
[#{<<"uri">> => Uri, <<"text">> => io_lib:format("~p", [Result])}].
%% Pass `annotations' / `mimeType' through onto an already-built block.
decorate_block(Block, M) ->
Block1 =
case maps:find(mimeType, M) of
{ok, Mime} -> Block#{<<"mimeType">> => Mime};
error -> Block
end,
case maps:find(annotations, M) of
{ok, Ann} -> Block1#{<<"annotations">> => Ann};
error -> Block1
end.
%% Inject `uri' into a pre-built content block (binary-keyed map).
add_resource_uri(Uri, Block) when is_map(Block) ->
case maps:is_key(<<"uri">>, Block) of
true -> Block;
false -> Block#{<<"uri">> => Uri}
end.
%% Log a crashed resource/prompt/completion handler server-side and
%% never surface the exception term to the client: a caught `Reason'
%% can carry internal paths, argument values or secret-bearing terms.
%% The wire layer returns a generic message; operators cross-reference
%% via the request id. Mirrors the tool path in barrel_mcp_registry.
log_handler_crash(Kind, Name, Id, Crash) ->
logger:error(
"~p handler crashed: ~p (request_id=~p, name=~p)",
[Kind, Crash, Id, Name]
).
maybe_advertise_completions(Caps) ->
case barrel_mcp_registry:all(completion) of
[] -> Caps;
_ -> Caps#{<<"completions">> => #{}}
end.
completion_lookup_key(
#{<<"type">> := <<"ref/prompt">>, <<"name">> := Name},
ArgName
) when is_binary(Name) ->
<<"prompt:", Name/binary, ":", ArgName/binary>>;
completion_lookup_key(
#{<<"type">> := <<"ref/resource">>, <<"uri">> := Uri},
ArgName
) when is_binary(Uri) ->
<<"resource_template:", Uri/binary, ":", ArgName/binary>>;
completion_lookup_key(_, _) ->
undefined.
empty_completion() ->
#{<<"values">> => [], <<"hasMore">> => false}.
completion_payload(Values, HasMore) when is_list(Values) ->
#{
<<"values">> => Values,
<<"hasMore">> => HasMore =:= true,
<<"total">> => length(Values)
}.
%% Add optional fields from a Handler map to a wire envelope. Each
%% pair `{WireKey, HandlerKey}' becomes `WireKey => Value' in the
%% envelope only when the value is present and not the empty
%% binary; this keeps wire payloads compact and back-compat.
with_optional_fields(Envelope, Handler, Fields) ->
lists:foldl(
fun({WireKey, HandlerKey}, Acc) ->
case maps:get(HandlerKey, Handler, undefined) of
undefined -> Acc;
<<>> -> Acc;
V -> Acc#{WireKey => V}
end
end,
Envelope,
Fields
).
%%====================================================================
%% Cursor pagination for `*/list' handlers
%%====================================================================
-define(PAGE_SIZE, 50).
%% Sort `Items' by `KeyFn(Item)', drop everything up to and
%% including `Cursor', take up to `?PAGE_SIZE'. Returns
%% `{Page, NextCursor}' where `NextCursor' is the last key of the
%% page when more items remain, or `undefined' otherwise.
%% `Cursor' is the opaque last-seen key from a prior response.
paginate(Items, Cursor, KeyFn) ->
Sorted = lists:sort(fun(A, B) -> KeyFn(A) =< KeyFn(B) end, Items),
AfterCursor = drop_until_after(Sorted, Cursor, KeyFn),
case
lists:split(
min(?PAGE_SIZE, length(AfterCursor)),
AfterCursor
)
of
{Page, []} ->
{Page, undefined};
{Page, _Rest} ->
Last = lists:last(Page),
{Page, KeyFn(Last)}
end.
drop_until_after(Items, undefined, _) ->
Items;
drop_until_after(Items, Cursor, KeyFn) ->
lists:dropwhile(fun(I) -> KeyFn(I) =< Cursor end, Items).
with_next_cursor(Resp, undefined) -> Resp;
with_next_cursor(Resp, Cursor) -> Resp#{<<"nextCursor">> => Cursor}.
%% Pick the protocol version to advertise in the `initialize'
%% response. If the client's requested version is one we speak, echo
%% it; otherwise return our preferred version and let the client
%% decide.
negotiate_protocol_version(undefined) ->
?MCP_PROTOCOL_VERSION;
negotiate_protocol_version(Requested) when is_binary(Requested) ->
case lists:member(Requested, ?MCP_SUPPORTED_VERSIONS) of
true -> Requested;
false -> ?MCP_PROTOCOL_VERSION
end.
%%====================================================================
%% JSON-RPC envelope helpers
%%====================================================================
%% @doc Build a JSON-RPC request envelope.
-spec encode_request(term(), binary(), map()) -> map().
encode_request(Id, Method, Params) ->
#{
<<"jsonrpc">> => <<"2.0">>,
<<"id">> => Id,
<<"method">> => Method,
<<"params">> => Params
}.
%% @doc Build a JSON-RPC notification envelope (no id).
-spec encode_notification(binary(), map()) -> map().
encode_notification(Method, Params) ->
#{
<<"jsonrpc">> => <<"2.0">>,
<<"method">> => Method,
<<"params">> => Params
}.
%% @doc Build a JSON-RPC success response.
-spec encode_response(term(), term()) -> map().
encode_response(Id, Result) ->
#{
<<"jsonrpc">> => <<"2.0">>,
<<"id">> => Id,
<<"result">> => Result
}.
%% @doc Build a JSON-RPC error response. Alias of `error_response/3'.
-spec encode_error(term(), integer(), binary()) -> map().
encode_error(Id, Code, Message) ->
error_response(Id, Code, Message).
%% @doc Classify a decoded JSON-RPC envelope.
%%
%% Returns the kind so client and server agree on routing without each
%% having to peek at the same keys.
-spec decode_envelope(map()) ->
{request, Id :: term(), Method :: binary(), Params :: map()}
| {notification, Method :: binary(), Params :: map()}
| {response, Id :: term(), Result :: term()}
| {error, Id :: term(), Code :: integer(), Message :: binary(), Data :: term()}
| {invalid, term()}.
decode_envelope(L) when is_list(L) ->
{invalid, batch_unsupported};
decode_envelope(#{<<"jsonrpc">> := <<"2.0">>} = Msg) ->
case
{
maps:find(<<"method">>, Msg),
maps:find(<<"id">>, Msg),
maps:find(<<"result">>, Msg),
maps:find(<<"error">>, Msg)
}
of
{{ok, Method}, {ok, Id}, error, error} when
is_binary(Id) orelse is_integer(Id)
->
{request, Id, Method, maps:get(<<"params">>, Msg, #{})};
{{ok, _Method}, {ok, _BadId}, error, error} ->
{invalid, bad_id};
{{ok, Method}, error, error, error} ->
{notification, Method, maps:get(<<"params">>, Msg, #{})};
{error, {ok, Id}, {ok, Result}, error} when
is_binary(Id) orelse is_integer(Id)
->
{response, Id, Result};
{error, {ok, _BadId}, {ok, _Result}, error} ->
{invalid, bad_id};
{error, {ok, Id}, error, {ok, Err}} when
is_binary(Id) orelse is_integer(Id)
->
Code = maps:get(<<"code">>, Err, ?JSONRPC_INTERNAL_ERROR),
Message = maps:get(<<"message">>, Err, <<>>),
Data = maps:get(<<"data">>, Err, undefined),
{error, Id, Code, Message, Data};
{error, {ok, _BadId}, error, {ok, _Err}} ->
{invalid, bad_id};
_ ->
{invalid, malformed}
end;
decode_envelope(Other) ->
{invalid, Other}.