Skip to main content

lib/attesto_mcp/test/dpop_assertions.ex

if Code.ensure_loaded?(ExUnit.Assertions) do
  defmodule AttestoMCP.Test.DPoPAssertions do
    @moduledoc """
    Shipped ExUnit assertions for the DPoP sender-constraint contract.

    RFC 9449 binds an access token to the client key that signed a DPoP proof
    via the token's `cnf.jkt` claim (RFC 7800). Two properties of that contract
    are worth a host MCP server proving on its own pipeline, because getting
    either wrong silently downgrades sender-constrained tokens to bearer tokens:

      1. A DPoP-bound token presented as a plain `Bearer` token (no proof) MUST
        be rejected. Otherwise a captured token is usable without the key.
      2. A DPoP-bound token presented as `DPoP` with a valid proof for the live
        request MUST be accepted.

    These helpers drive a host's already-wired authentication plug against both
    cases so the host's exact `:config`, `:replay_check`, `:htu`, and error
    rendering are exercised. They build the token and proof with
    `AttestoMCP.Test.Factory` and use only Attesto's published DPoP API
    (`Attesto.DPoP.compute_ath/1` via the factory), so they do not depend on any
    Attesto-internal test scaffolding.

    The module compiles only when `ExUnit` is loaded, so it adds nothing to a
    host's production build.

    ## Usage

        defmodule MyApp.MCPAuthTest do
          use ExUnit.Case
          import AttestoMCP.Test.DPoPAssertions

          setup do
            %{config: AttestoMCP.Test.Factory.config()}
          end

          test "MCP endpoint enforces DPoP binding", %{config: config} do
            plug = fn conn ->
              MyAppWeb.MCPAuth.call(conn, MyAppWeb.MCPAuth.init([]))
            end

            assert_dpop_bound_bearer_rejected(plug, config)
            assert_dpop_proof_accepted(plug, config)
          end
        end

    `plug_fun` is a one-arity function that runs the host's authentication
    pipeline on a `Plug.Conn` and returns the resulting conn. A host that
    requires `:replay_check` for DPoP requests must wire it inside `plug_fun`.
    """

    import ExUnit.Assertions
    import Plug.Conn, only: [put_req_header: 3]
    import Plug.Test, only: [conn: 3]

    alias AttestoMCP.Test.Factory

    @type plug_fun :: (Plug.Conn.t() -> Plug.Conn.t())

    @doc """
    Assert that a DPoP-bound token presented as a plain `Bearer` token (with no
    proof) is rejected by the host pipeline.

    Options:

      * `:path` - request path (default `"/mcp"`).
      * `:method` - request method (default `:post`).
      * `:scopes` - scopes minted into the token (default the factory default).

    Returns the resulting halted `Plug.Conn` for further assertions.
    """
    @spec assert_dpop_bound_bearer_rejected(plug_fun(), Attesto.Config.t(), keyword()) :: Plug.Conn.t()
    def assert_dpop_bound_bearer_rejected(plug_fun, config, opts \\ []) when is_function(plug_fun, 1) do
      {_proof, jkt} = Factory.dpop_proof("placeholder")
      token = Factory.access_token(config, token_opts(opts, dpop_jkt: jkt))

      conn =
        opts
        |> base_conn()
        |> put_req_header("authorization", "Bearer " <> token)
        |> plug_fun.()

      assert conn.halted,
             "expected a DPoP-bound token presented as Bearer to be rejected, but the pipeline let it through"

      conn
    end

    @doc """
    Assert that a DPoP-bound token presented as `DPoP` with a valid proof for
    the live request is accepted by the host pipeline.

    Options:

      * `:path` - request path (default `"/mcp"`).
      * `:method` - request method (default `:post`).
      * `:scopes` - scopes minted into the token (default the factory default).
      * `:htu` - the `htu` claim of the proof (default the factory audience). It
        must match the host's computed request URI.

    Returns the resulting non-halted `Plug.Conn` for further assertions.
    """
    @spec assert_dpop_proof_accepted(plug_fun(), Attesto.Config.t(), keyword()) :: Plug.Conn.t()
    def assert_dpop_proof_accepted(plug_fun, config, opts \\ []) when is_function(plug_fun, 1) do
      jwk = Factory.dpop_jwk()
      {_unused, jkt} = Factory.dpop_proof("placeholder", jwk: jwk)
      token = Factory.access_token(config, token_opts(opts, dpop_jkt: jkt))
      {proof, ^jkt} = Factory.dpop_proof(token, proof_opts(opts, jwk: jwk))

      conn =
        opts
        |> base_conn()
        |> put_req_header("authorization", "DPoP " <> token)
        |> put_req_header("dpop", proof)
        |> plug_fun.()

      refute conn.halted,
             "expected a DPoP-bound token presented with a valid proof to be accepted, but the pipeline rejected it"

      conn
    end

    defp base_conn(opts) do
      method = Keyword.get(opts, :method, :post)
      path = Keyword.get(opts, :path, "/mcp")
      conn(method, path, nil)
    end

    defp token_opts(opts, extra) do
      case Keyword.fetch(opts, :scopes) do
        {:ok, scopes} -> Keyword.put(extra, :scopes, scopes)
        :error -> extra
      end
    end

    defp proof_opts(opts, extra) do
      case Keyword.fetch(opts, :htu) do
        {:ok, htu} -> Keyword.put(extra, :htu, htu)
        :error -> extra
      end
    end
  end
end