Skip to main content

lib/attesto_mcp/anubis/session_store/ecto.ex

if Code.ensure_loaded?(Anubis.Server.Session.Store) and Code.ensure_loaded?(Ecto.Schema) do
  defmodule AttestoMCP.Anubis.SessionStore.Ecto do
    @moduledoc """
    Postgres-backed `Anubis.Server.Session.Store` for the Anubis MCP transports.

    ## Why

    Anubis keeps session state in memory and recovers it only on a live
    reconnect, so a deploy (node replacement) drops every session and forces each
    connected client to re-initialize. Persisting the session map lets a client
    reconnect with its `Mcp-Session-Id` after a deploy and have its initialized
    state (client_info + `frame`) restored, with no re-initialization.

    Anubis ships only a Redis adapter; this implements the same behaviour against
    an `Ecto.Repo`, for a Postgres-only deployment that does not run Redis.

    ## Wiring

        # config/config.exs
        config :anubis_mcp, :session_store,
          enabled: true,
          adapter: AttestoMCP.Anubis.SessionStore.Ecto,
          repo: MyApp.Repo,
          ttl: to_timeout(minute: 30)

    The `:repo` may also be set as `config :attesto_mcp, repo: MyApp.Repo`. Run
    `mix attesto_mcp.gen.session_migration --repo MyApp.Repo` to create the
    backing `attesto_mcp_sessions` table.

    ## No process

    The store is stateless: every callback is a direct repo query and the repo is
    already supervised, so `start_link/1` returns `:ignore` instead of booting an
    idle GenServer. Anubis lists the adapter as a child (the `{adapter, config}`
    child-spec form when `enabled: true`); `:ignore` tells the supervisor there is
    nothing to start.

    Postgres has no native key TTL, so expired rows are reaped lazily on `load/2`
    and by `cleanup_expired/1` (call it on a schedule from the host).

    Compile-guarded on `Anubis.Server.Session.Store` and `Ecto.Schema`.
    """

    @behaviour Anubis.Server.Session.Store

    import Ecto.Query

    alias Anubis.Server.Session.Store
    alias AttestoMCP.Anubis.JSONSafe
    alias AttestoMCP.Anubis.Session

    require Logger

    @app :anubis_mcp

    # Matches the Anubis Redis adapter default: 30 minutes, in milliseconds.
    @default_ttl_ms 1_800_000

    # Anubis adds the configured store as `{adapter, config}`, so the adapter must
    # expose `child_spec/1`. `start_link/1` returns `:ignore`: the store is
    # stateless and rides on the already-supervised host repo.
    @doc false
    def child_spec(opts) do
      %{id: __MODULE__, start: {__MODULE__, :start_link, [opts]}}
    end

    @impl Store
    def start_link(_opts) do
      # Resolve the repo at boot so a misconfiguration fails fast here rather
      # than surfacing as a save that silently errors and a load that crashes a
      # reconnect. The store itself starts no process (it rides the host repo).
      _ = repo()
      :ignore
    end

    @impl Store
    def save(session_id, state, opts) do
      now = DateTime.utc_now()
      expires_at = DateTime.add(now, ttl_ms(opts), :millisecond)

      # Anubis folds the whole request `conn.assigns` into the persisted frame,
      # so `state` can carry values with no `JSON.Encoder` impl. Strip anything
      # non-encodable before the jsonb insert so the driver does not raise
      # mid-`initialize` and kill the session process.
      state = JSONSafe.sanitize(state)

      %Session{}
      |> Session.changeset(%{session_id: session_id, state: state, expires_at: expires_at})
      |> repo().insert(
        conflict_target: :session_id,
        on_conflict: {:replace, [:state, :expires_at, :updated_at]}
      )
      |> case do
        {:ok, _session} -> :ok
        {:error, changeset} -> {:error, changeset}
      end
    rescue
      # The sanitizer above should make the insert encodable, but Anubis only
      # logs-and-continues on `{:error, _}` while the session process dies on a
      # raise. A bare rescue is intentional: persisting a session must never take
      # down the connection, whatever the driver/Ecto raises.
      exception ->
        Logger.error("MCP session persist failed for #{inspect(session_id)}: #{inspect(exception)}")
        {:error, exception}
    end

    @impl Store
    def load(session_id, _opts) do
      now = DateTime.utc_now()

      case repo().get(Session, session_id) do
        nil ->
          {:error, :not_found}

        %Session{expires_at: expires_at, state: state} ->
          if DateTime.after?(expires_at, now) do
            {:ok, state}
          else
            # Reap, but only while still expired: guarding on `expires_at <= now`
            # means a row a concurrent `save/3` refreshed with a future expiry is
            # NOT deleted by this read path.
            repo().delete_all(from(s in Session, where: s.session_id == ^session_id and s.expires_at <= ^now))
            {:error, :not_found}
          end
      end
    end

    @impl Store
    def delete(session_id, _opts) do
      repo().delete_all(from(s in Session, where: s.session_id == ^session_id))
      :ok
    end

    @impl Store
    def list_active(_opts) do
      now = DateTime.utc_now()
      {:ok, repo().all(from(s in Session, where: s.expires_at > ^now, select: s.session_id))}
    end

    @impl Store
    def update_ttl(session_id, ttl_ms, _opts) when is_integer(ttl_ms) and ttl_ms > 0 do
      now = DateTime.utc_now()
      expires_at = DateTime.add(now, ttl_ms, :millisecond)

      # Guard on unexpired so a not-yet-reaped expired row cannot be revived
      # (Redis would have already dropped the key).
      case repo().update_all(
             from(s in Session, where: s.session_id == ^session_id and s.expires_at > ^now),
             set: [expires_at: expires_at, updated_at: now]
           ) do
        {0, _} -> {:error, :not_found}
        {_count, _} -> :ok
      end
    end

    @impl Store
    def update(session_id, updates, opts) when is_map(updates) do
      # Read-modify-write under a row lock so concurrent updates of the same
      # session cannot lose each other (this stateless adapter has no serializing
      # GenServer). Only an unexpired row is updated.
      case repo().transaction(fn -> locked_update(session_id, updates, opts) end) do
        {:ok, :ok} -> :ok
        {:error, reason} -> {:error, reason}
      end
    end

    defp locked_update(session_id, updates, opts) do
      now = DateTime.utc_now()
      query = from(s in Session, where: s.session_id == ^session_id and s.expires_at > ^now, lock: "FOR UPDATE")

      case repo().one(query) do
        nil -> repo().rollback(:not_found)
        %Session{state: state} -> merge_and_save(session_id, state, updates, opts)
      end
    end

    defp merge_and_save(session_id, state, updates, opts) do
      case save(session_id, Map.merge(state || %{}, stringify_keys(updates)), opts) do
        :ok -> :ok
        {:error, reason} -> repo().rollback(reason)
      end
    end

    @impl Store
    def cleanup_expired(_opts) do
      now = DateTime.utc_now()
      {count, _} = repo().delete_all(from(s in Session, where: s.expires_at <= ^now))
      {:ok, count}
    end

    # ----- internal -----

    defp ttl_ms(opts), do: Keyword.get(opts, :ttl) || config_ttl_ms()

    defp config_ttl_ms do
      @app |> Application.get_env(:session_store, []) |> Keyword.get(:ttl, @default_ttl_ms)
    end

    # Anubis persists string-keyed JSON; keep merge keys as strings so a later
    # round-trip does not yield a mixed atom/string-keyed map.
    defp stringify_keys(map) do
      Map.new(map, fn
        {k, v} when is_atom(k) -> {Atom.to_string(k), v}
        {k, v} -> {k, v}
      end)
    end

    defp repo do
      config_repo = @app |> Application.get_env(:session_store, []) |> Keyword.get(:repo)

      case config_repo || Application.get_env(:attesto_mcp, :repo) do
        nil ->
          raise ArgumentError,
                "AttestoMCP.Anubis.SessionStore.Ecto: no repo configured. Set " <>
                  "`config :anubis_mcp, :session_store, repo: MyApp.Repo` (or " <>
                  "`config :attesto_mcp, repo: MyApp.Repo`)."

        repo ->
          repo
      end
    end
  end
end