Skip to main content

lib/noizu/mcp/server/event_store.ex

defmodule Noizu.MCP.Server.EventStore do
  @moduledoc """
  Buffer for Streamable HTTP messages that had no live stream to deliver to,
  enabling SSE resumability via `Last-Event-ID`.

  The default implementation is a bounded per-session ETS ring buffer owned by
  the server's supervision tree (node-local — multi-node deployments need
  sticky sessions or a custom store).
  """

  use GenServer

  @max_events_per_session 1_000

  @type event_id :: String.t()

  # ── API ───────────────────────────────────────────────────────────────────

  def child_spec(opts) do
    server = Keyword.fetch!(opts, :server)
    %{id: __MODULE__, start: {__MODULE__, :start_link, [server]}}
  end

  def start_link(server) do
    GenServer.start_link(__MODULE__, server, name: name(server))
  end

  defp name(server), do: Module.concat(server, EventStore)
  defp table(server), do: Module.concat(server, EventStore.Table)

  @doc "Append a message; returns its event id."
  @spec append(module(), String.t(), binary()) :: event_id()
  def append(server, session_id, binary) do
    GenServer.call(name(server), {:append, session_id, binary})
  end

  @doc "All buffered `{event_id, binary}` for a session after `last_event_id` (nil = all)."
  @spec replay_after(module(), String.t(), event_id() | nil) :: [{event_id(), binary()}]
  def replay_after(server, session_id, last_event_id) do
    last_seq = parse_seq(last_event_id)

    table(server)
    |> :ets.lookup(session_id)
    |> Enum.map(fn {_session, seq, binary} -> {seq, binary} end)
    |> Enum.filter(fn {seq, _} -> last_seq == nil or seq > last_seq end)
    |> Enum.sort()
    |> Enum.map(fn {seq, binary} -> {encode_id(seq), binary} end)
  end

  @doc "Drop a session's buffered events (e.g. on session termination)."
  @spec drop(module(), String.t()) :: :ok
  def drop(server, session_id) do
    GenServer.call(name(server), {:drop, session_id})
  end

  @doc false
  def encode_id(seq), do: "s:#{seq}"

  @doc false
  def parse_seq(nil), do: nil

  def parse_seq("s:" <> seq) do
    case Integer.parse(seq) do
      {n, ""} -> n
      _ -> nil
    end
  end

  def parse_seq(_), do: nil

  # ── GenServer ─────────────────────────────────────────────────────────────

  @impl true
  def init(server) do
    table = :ets.new(table(server), [:bag, :named_table, :protected, read_concurrency: true])
    {:ok, %{server: server, table: table, seq: 0}}
  end

  @impl true
  def handle_call({:append, session_id, binary}, _from, state) do
    seq = state.seq + 1
    :ets.insert(state.table, {session_id, seq, binary})
    prune(state.table, session_id)
    {:reply, encode_id(seq), %{state | seq: seq}}
  end

  def handle_call({:drop, session_id}, _from, state) do
    :ets.delete(state.table, session_id)
    {:reply, :ok, state}
  end

  defp prune(table, session_id) do
    events = :ets.lookup(table, session_id)
    overflow = length(events) - @max_events_per_session

    if overflow > 0 do
      events
      |> Enum.sort_by(fn {_s, seq, _b} -> seq end)
      |> Enum.take(overflow)
      |> Enum.each(fn entry -> :ets.delete_object(table, entry) end)
    end
  end
end