Skip to main content

lib/agentix/chat/projection.ex

if Code.ensure_loaded?(Phoenix.LiveView) do
  defmodule Agentix.Chat.Projection do
    @moduledoc false
    # Projects a conversation's snapshot and its live-event stream onto LiveView assigns.

    import Phoenix.Component, only: [assign: 3, update: 3]

    import Phoenix.LiveView,
      only: [
        connected?: 1,
        stream: 3,
        stream_configure: 3,
        stream_insert: 3,
        stream_insert: 4,
        push_event: 3
      ]

    alias Agentix.Agent
    alias Agentix.Codec
    alias Agentix.Events.Publisher
    alias Agentix.Persistence
    alias Phoenix.LiveView.Socket
    alias ReqLLM.Message
    alias ReqLLM.Message.ContentPart

    @conversation_assign :agentix_conversation_id
    @default_page_size 50

    @live_event_tags ~w(state_changed turn_started text_delta thinking_delta message_completed
                        tool_call_started tool_progress tool_call_resolved tool_call_errored
                        suspended turn_completed turn_halted cancelled)a

    @doc "The assign key holding the bound conversation id."
    @spec conversation_assign() :: atom()
    def conversation_assign, do: @conversation_assign

    @doc """
    Binds `conversation_id` to the socket: subscribes to its topic, reads the snapshot,
    and seeds the assigns and the message stream. A mid-stream snapshot seeds the JS
    hook with the partial assistant text via a `agentix:seed` push event.

    The topic's pub/sub server is resolved the same way the agent publishes (an explicit
    `:pubsub` opt, else the conversation's configured pub/sub, else the default), so the
    subscriber and publisher never land on different buses.
    """
    @spec attach(Socket.t(), String.t(), keyword()) :: Socket.t()
    def attach(socket, conversation_id, opts \\ []) do
      if connected?(socket) do
        pubsub = resolve_pubsub(conversation_id, opts)
        Phoenix.PubSub.subscribe(pubsub, Publisher.topic(conversation_id))
      end

      page_size = Keyword.get(opts, :page_size, @default_page_size)
      snapshot = Agent.snapshot(conversation_id, limit: page_size)

      socket
      |> assign(@conversation_assign, conversation_id)
      |> assign(:agentix_page_size, page_size)
      |> assign(:agentix_oldest_seq, snapshot.history_cursor)
      |> assign(:agentix_more?, snapshot.more?)
      |> assign(:state, snapshot.state)
      |> assign(:streaming?, streaming?(snapshot.state))
      |> assign(:in_flight_tools, snapshot.in_flight_tools)
      |> assign(:pending, snapshot.pending)
      # The structured-output object of the most recent assistant message (nil if it was
      # plain text), so a host can render it without digging into the message stream.
      |> assign(:last_object, last_object(snapshot.messages))
      # Has the current assistant turn already shown its header? Continuation rows
      # (tools, later streaming, pending) render headerless once it has, so a turn
      # never shows a second "Assistant" header.
      |> assign(:agentix_assistant_open, false)
      # `:dom_id` is configured here (not passed to `stream/4`, where it is undocumented).
      |> stream_configure(:messages, dom_id: &dom_id/1)
      |> stream(:messages, snapshot.messages)
      |> seed_streaming(snapshot.streaming_message)
    end

    @doc """
    Pages one older window into the `:messages` stream (prepended above the current
    oldest), advancing `:agentix_oldest_seq` and `:agentix_more?`. A no-op when there is
    nothing older.
    """
    @spec load_older(Socket.t()) :: Socket.t()
    def load_older(%{assigns: %{agentix_more?: true, agentix_oldest_seq: cursor}} = socket)
        when is_integer(cursor) do
      conversation_id = socket.assigns[@conversation_assign]
      page = Agent.history(conversation_id, before: cursor, limit: socket.assigns.agentix_page_size)

      socket
      # Insert oldest-last so the page ends up ascending above the existing messages.
      |> then(
        &Enum.reduce(Enum.reverse(page.messages), &1, fn m, acc ->
          stream_insert(acc, :messages, m, at: 0)
        end)
      )
      |> assign(:agentix_oldest_seq, page.cursor || cursor)
      |> assign(:agentix_more?, page.more?)
    end

    def load_older(socket), do: socket

    @doc "Optimistically inserts a just-sent user message into the stream."
    @spec insert_user_message(Socket.t(), Message.t()) :: Socket.t()
    def insert_user_message(socket, %Message{} = message),
      do: stream_insert(socket, :messages, message)

    @doc "`true` for the members of the live-event union this projection consumes."
    @spec live_event?(term()) :: boolean()
    def live_event?(event) when is_tuple(event) and tuple_size(event) > 0,
      do: elem(event, 0) in @live_event_tags

    def live_event?(_event), do: false

    @doc "Applies one live event to the socket assigns. See the module doc for the map."
    @spec apply_event(Socket.t(), Publisher.live_event()) :: Socket.t()
    def apply_event(socket, {:state_changed, state}),
      do: socket |> assign(:state, state) |> assign(:streaming?, streaming?(state))

    def apply_event(socket, {:turn_started, _turn_ref}), do: socket

    def apply_event(socket, {:text_delta, _turn_ref, msg_id, chunk, seq}) do
      socket
      |> ensure_streaming(msg_id)
      |> push_event("agentix:delta", %{id: msg_id, kind: "text", chunk: chunk, seq: seq})
    end

    def apply_event(socket, {:thinking_delta, _turn_ref, msg_id, chunk, seq}) do
      socket
      |> ensure_streaming(msg_id)
      |> push_event("agentix:delta", %{id: msg_id, kind: "thinking", chunk: chunk, seq: seq})
    end

    def apply_event(socket, {:message_completed, _turn_ref, %Message{role: role} = message}) do
      socket
      |> stream_insert(:messages, message)
      |> assign(:streaming_message, nil)
      |> assign(
        :agentix_assistant_open,
        role == :assistant or socket.assigns.agentix_assistant_open
      )
      |> maybe_assign_object(role, message)
    end

    def apply_event(socket, {:tool_call_started, id, name, executor, _args}) do
      update(
        socket,
        :in_flight_tools,
        &Map.put(&1, id, %{name: name, executor: executor, status: :running})
      )
    end

    def apply_event(socket, {:tool_progress, id, progress}) do
      update(socket, :in_flight_tools, fn tools ->
        case tools do
          %{^id => entry} -> Map.put(tools, id, Map.put(entry, :progress, progress))
          _ -> tools
        end
      end)
    end

    def apply_event(socket, {:tool_call_resolved, id, result}),
      do: finalize_tool(socket, id, :ok, result)

    def apply_event(socket, {:tool_call_errored, id, reason}),
      do: finalize_tool(socket, id, :error, reason)

    def apply_event(socket, {:suspended, id, executor, prompt}) do
      entry = %{
        # Carry the name across the suspend so a denied call (which never re-broadcasts
        # `tool_call_started`) can still finalize into a named tool row.
        name: get_in(socket.assigns.in_flight_tools, [id, :name]),
        executor: executor,
        kind: prompt[:kind] || prompt["kind"],
        prompt: prompt_args(prompt)
      }

      socket
      # A suspended call is awaiting input, not running — move it out of in_flight.
      |> update(:in_flight_tools, &Map.delete(&1, id))
      |> update(:pending, &Map.put(&1, id, entry))
    end

    def apply_event(socket, {:turn_completed, _turn_ref}), do: reset_turn(socket)
    def apply_event(socket, {:turn_halted, _turn_ref, _reason}), do: reset_turn(socket)
    def apply_event(socket, {:cancelled, _turn_ref}), do: reset_turn(socket)

    @doc "`true` while an assistant message is being produced."
    @spec streaming?(atom()) :: boolean()
    def streaming?(state), do: state in [:preparing, :streaming]

    @doc "The DOM id for a message in the `:messages` stream."
    @spec dom_id(Message.t()) :: String.t()
    # Tool rows key off the tool-call id so the same row inserted live (on resolve) and
    # seeded from history (on reconnect/reload) collapse to one stream node.
    def dom_id(%Message{role: :tool, tool_call_id: id}) when is_binary(id),
      do: "agentix-msg-tool-" <> id

    def dom_id(%Message{metadata: %{"id" => id}}) when is_binary(id), do: "agentix-msg-" <> id

    def dom_id(%Message{}),
      do: "agentix-msg-" <> Integer.to_string(System.unique_integer([:positive]))

    defp resolve_pubsub(conversation_id, opts) do
      case Keyword.fetch(opts, :pubsub) do
        {:ok, pubsub} -> pubsub
        :error -> conversation_id |> Persistence.get_conversation() |> settings_pubsub()
      end
    end

    defp settings_pubsub(%{settings: settings}) when is_map(settings),
      do: Publisher.resolve_pubsub(settings)

    defp settings_pubsub(_conversation), do: Publisher.resolve_pubsub(%{})

    defp prompt_args(prompt) when is_map(prompt), do: prompt[:args] || prompt["args"]
    defp prompt_args(prompt), do: prompt

    defp maybe_assign_object(socket, :assistant, message),
      do: assign(socket, :last_object, Agentix.object(message))

    defp maybe_assign_object(socket, _role, _message), do: socket

    # The structured object of the most recent assistant message in `messages`, or nil.
    defp last_object(messages) do
      case messages |> Enum.reverse() |> Enum.find(&match?(%Message{role: :assistant}, &1)) do
        %Message{} = message -> Agentix.object(message)
        nil -> nil
      end
    end

    defp seed_streaming(socket, nil), do: assign(socket, :streaming_message, nil)

    defp seed_streaming(socket, %{id: id, text: text, thinking: thinking, seq: seq}) do
      # Seed both content nodes from the snapshot; `seq` is the shared delta baseline, so a
      # replayed delta of either kind (seq below it) is dropped by the hook.
      socket
      |> assign(:streaming_message, %{id: id})
      |> push_event("agentix:seed", %{id: id, kind: "text", text: text, seq: seq})
      |> push_event("agentix:seed", %{id: id, kind: "thinking", text: thinking, seq: seq})
    end

    defp ensure_streaming(socket, msg_id) do
      case socket.assigns[:streaming_message] do
        %{id: ^msg_id} -> socket
        _ -> assign(socket, :streaming_message, %{id: msg_id})
      end
    end

    # A resolved/errored call becomes a finalized tool row in the `:messages` stream — a
    # permanent timeline item under the assistant turn, exactly like a completed message —
    # and leaves the live in-flight + pending sets. This makes the live turn converge with
    # what a reconnect/reload renders from history (`Agent.history/2`).
    defp finalize_tool(socket, id, status, result) do
      # The id is in exactly one of the two sets: `in_flight_tools` for a running call,
      # `pending` for a suspended one (e.g. a denied approval). Either carries the name.
      name =
        get_in(socket.assigns.in_flight_tools, [id, :name]) ||
          get_in(socket.assigns.pending, [id, :name])

      socket
      |> stream_insert(:messages, tool_message(id, name, status, result))
      |> update(:in_flight_tools, &Map.delete(&1, id))
      |> update(:pending, &Map.delete(&1, id))
    end

    # Built to match exactly what `Agent.history/2` produces for this tool result — same
    # `dom_id` (keyed on the tool-call id), same content (the shared `Codec` encoder) and
    # the same `tool_name`/`tool_status` metadata — so the live row and a reconnect/reload
    # converge byte-for-byte.
    defp tool_message(id, name, status, result) do
      %Message{
        role: :tool,
        tool_call_id: id,
        content: [ContentPart.text(Codec.encode_tool_result(result))],
        metadata: %{"tool_name" => name, "tool_status" => to_string(status)}
      }
    end

    defp reset_turn(socket) do
      socket
      |> assign(:streaming_message, nil)
      |> assign(:streaming?, false)
      |> assign(:in_flight_tools, %{})
      |> assign(:pending, %{})
      |> assign(:agentix_assistant_open, false)
    end
  end
end