lib/planck/agent/compactor.ex

defmodule Planck.Agent.Compactor do
  @moduledoc """
  Behaviour and default implementation for context compaction in `Planck.Agent`.

  ## Behaviour

  Use `use Planck.Agent.Compactor` to implement a custom compaction strategy.
  The only required callback is `compact/2`; `compact_timeout/0` has a default
  implementation of #{120_000} ms.

      defmodule MySidecar.Compactors.Builder do
        use Planck.Agent.Compactor

        @impl true
        def compact(_model, messages) do
          summary = Message.new({:custom, :summary}, [{:text, summarise(messages)}])
          kept    = Enum.take(messages, -5)
          {:compact, summary, kept}
        end

        # Optional — override to declare a custom RPC timeout.
        @impl true
        def compact_timeout, do: 60_000
      end

  ## Building an on_compact function

  `build/2` is the single entry point for both the local LLM-based compactor and
  remote sidecar compactors. When `sidecar_node:` and `compactor:` options are
  absent it runs locally; when both are present it calls the remote module via
  `:rpc.call/5` and falls back to the local compactor if the RPC fails.

      # Local (default):
      on_compact = Planck.Agent.Compactor.build(model)

      # Remote sidecar:
      on_compact = Planck.Agent.Compactor.build(model,
        sidecar_node: :planck_sidecar@localhost,
        compactor:    "MySidecar.Compactors.Builder"
        # The string is converted to the atom :"Elixir.MySidecar.Compactors.Builder"
        # before the RPC call. Always use the bare Elixir module name (no "Elixir." prefix).
      )

  Both return a `fn messages ->` closure of arity 1, as expected by `Planck.Agent`.

  ## Compaction strategy (local default)

  When triggered, the oldest messages (everything except the last `keep_recent`
  messages) are summarised into a single `{:custom, :summary}` message. On LLM
  failure the local compactor falls back to the original message list unchanged
  (`:skip`). On remote failure the local compactor is used as the fallback.
  """

  require Logger

  alias Planck.Agent.{AIBehaviour, Message}
  alias Planck.AI.{Context, Model}

  @default_compact_timeout_ms 120_000

  @doc """
  Compact the message list.

  Return `{:compact, summary_msg, kept}` to replace older messages with a summary,
  or `:skip` to leave the list unchanged.

  - `summary_msg` — a `{:custom, :summary}` `Message` containing the summary text
  - `kept` — recent messages retained verbatim after the summary
  """
  @callback compact(model :: Model.t(), messages :: [Message.t()]) ::
              {:compact, summary :: Message.t(), kept :: [Message.t()]} | :skip

  @doc """
  RPC call timeout in milliseconds when this compactor is invoked remotely.

  Defaults to #{@default_compact_timeout_ms} ms. Override to declare a custom
  expected latency for the compactor — the module knows its own logic better
  than any caller default.
  """
  @callback compact_timeout() :: pos_integer()

  @doc false
  defmacro __using__(_opts) do
    quote do
      @behaviour unquote(__MODULE__)

      @impl unquote(__MODULE__)
      def compact_timeout, do: unquote(__MODULE__).default_compact_timeout()

      defoverridable compact_timeout: 0
    end
  end

  @doc "Default RPC timeout used when a compactor module omits `compact_timeout/0`."
  @spec default_compact_timeout() :: pos_integer()
  def default_compact_timeout, do: @default_compact_timeout_ms

  @default_ratio 0.8
  @default_keep_recent 10

  @typedoc """
  Options accepted by `build/2`.

  - `:ratio` — fraction of `model.context_window` that triggers compaction
    (default `#{@default_ratio}`)
  - `:keep_recent` — number of recent messages to keep verbatim (default `#{@default_keep_recent}`)
  - `:sidecar_node` — node name of a connected sidecar (enables remote compaction)
  - `:compactor` — fully-qualified module name string in the sidecar,
    e.g. `"MySidecar.Compactors.Builder"`. Required when `:sidecar_node` is set.
  """
  @type opts :: [
          ratio: float(),
          keep_recent: pos_integer(),
          sidecar_node: atom() | nil,
          compactor: String.t() | nil
        ]

  @summary_prompt """
  Summarize the conversation below to reduce context length.
  Your summary must:
  - Describe completed work and resolved decisions briefly
  - State clearly what is currently being worked on and the most recent requests
  - Preserve any key facts, file paths, decisions, or constraints still relevant
  - Be written as context for an AI agent continuing this conversation

  Prioritize recency — the active task and latest requests take priority over earlier history.
  """

  @doc """
  Build an `on_compact` function for the given model.

  Returns a `fn messages ->` closure of arity 1. When remote options are provided
  (`sidecar_node:` and `compactor:`), the closure calls the remote module via RPC
  and falls back to the local LLM-based compactor if the call fails.

  ## Examples

      # Local:
      on_compact = Planck.Agent.Compactor.build(model, ratio: 0.75)

      # Remote sidecar with local fallback:
      on_compact = Planck.Agent.Compactor.build(model,
        sidecar_node: :planck_sidecar@localhost,
        compactor: "MySidecar.Compactors.Builder"
      )

  """
  @spec build(Model.t(), opts()) ::
          ([Message.t()] -> :skip | {:compact, Message.t(), [Message.t()]})
  def build(%Model{} = model, opts \\ []) do
    local = build_local(model, opts)

    case {Keyword.get(opts, :sidecar_node), Keyword.get(opts, :compactor)} do
      {nil, _} -> local
      {_, nil} -> local
      {node, name} -> build_remote(model, node, name, local)
    end
  end

  # ---------------------------------------------------------------------------
  # Private
  # ---------------------------------------------------------------------------

  @spec build_local(Model.t(), opts()) ::
          ([Message.t()] -> :skip | {:compact, Message.t(), [Message.t()]})
  defp build_local(model, opts) do
    ratio = Keyword.get(opts, :ratio, @default_ratio)
    keep_recent = Keyword.get(opts, :keep_recent, @default_keep_recent)
    threshold = trunc(model.context_window * ratio)

    fn messages ->
      if Message.estimate_tokens(messages) >= threshold do
        compact_local(messages, model, keep_recent)
      else
        :skip
      end
    end
  end

  @spec build_remote(Model.t(), atom(), String.t(), function()) ::
          ([Message.t()] -> :skip | {:compact, Message.t(), [Message.t()]})
  defp build_remote(model, sidecar_node, compactor_name, local_fallback) do
    module = :"Elixir.#{compactor_name}"
    :rpc.call(sidecar_node, :code, :ensure_loaded, [module], 5_000)
    timeout = remote_compact_timeout(module, sidecar_node)

    fn messages ->
      case :rpc.call(sidecar_node, module, :compact, [model, messages], timeout) do
        {:badrpc, reason} ->
          Logger.warning(
            "[Planck.Agent.Compactor] sidecar RPC failed (#{compactor_name}): #{inspect(reason)}, falling back to local"
          )

          local_fallback.(messages)

        result ->
          result
      end
    end
  end

  @spec remote_compact_timeout(module(), atom()) :: pos_integer()
  defp remote_compact_timeout(module, sidecar_node) do
    case :rpc.call(sidecar_node, module, :compact_timeout, [], 5_000) do
      timeout when is_integer(timeout) and timeout > 0 -> timeout
      _ -> @default_compact_timeout_ms
    end
  end

  @spec compact_local([Message.t()], Model.t(), pos_integer()) ::
          :skip | {:compact, Message.t(), [Message.t()]}
  defp compact_local(messages, model, keep_recent) do
    {old, kept} = Enum.split(messages, -keep_recent)

    # Filter out summary checkpoints — only summarize messages since the last
    # checkpoint, not the checkpoints themselves, to keep the request small.
    to_summarize = Enum.reject(old, &match?(%Message{role: {:custom, :summary}}, &1))

    case {to_summarize, summarize(to_summarize, model)} do
      {[], _} ->
        :skip

      {_, {:ok, text}} ->
        summary_msg = Message.new({:custom, :summary}, [{:text, text}])
        {:compact, summary_msg, kept}

      {_, {:error, _}} ->
        :skip
    end
  end

  @spec summarize([Message.t()], Model.t()) :: {:ok, String.t()} | {:error, term()}
  defp summarize(messages, model) do
    history = format_history(messages)

    context = %Context{
      system: @summary_prompt,
      messages: [%Planck.AI.Message{role: :user, content: [{:text, history}]}],
      tools: []
    }

    result =
      AIBehaviour.client().stream(model, context, [])
      |> Enum.reduce({:ok, ""}, fn
        {:text_delta, text}, {:ok, acc} -> {:ok, acc <> text}
        {:error, reason}, _acc -> {:error, reason}
        _other, acc -> acc
      end)

    case result do
      {:ok, ""} -> {:error, :empty_response}
      {:ok, text} -> {:ok, text}
      {:error, _} = error -> error
    end
  end

  @spec format_history([Message.t()]) :: String.t()
  defp format_history(messages) do
    messages
    |> Enum.map_join("\n\n", fn %Message{role: role, content: content} ->
      label = format_role(role)
      text = extract_text(content)
      "#{label}: #{text}"
    end)
  end

  @spec format_role(Message.role()) :: String.t()
  defp format_role(:user), do: "User"
  defp format_role(:assistant), do: "Assistant"
  defp format_role(:tool_result), do: "Tool result"
  defp format_role({:custom, kind}), do: kind |> Atom.to_string() |> String.capitalize()

  @spec extract_text([Planck.AI.Message.content_part()]) :: String.t()
  defp extract_text(content) do
    Enum.reduce(content, "", fn
      {:text, text}, acc -> acc <> text
      {:tool_result, _id, value}, acc -> acc <> value
      _other, acc -> acc
    end)
  end
end