docs/pubsub_presence.md

# PubSub & Presence

This document covers real-time event streaming and viewer tracking in Sagents.

## Overview

Sagents uses Phoenix.PubSub for real-time event broadcasting, enabling:
- Multiple LiveView clients watching the same conversation
- Real-time streaming of LLM responses
- TODO list and state synchronization
- Debug event streams for development tools

## PubSub Configuration

### Setup

1. Ensure Phoenix.PubSub is started in your application:

```elixir
# application.ex
children = [
  {Phoenix.PubSub, name: MyApp.PubSub},
  # ... other children
]
```

2. Pass PubSub to AgentServer:

```elixir
AgentServer.start_link(
  agent: agent,
  pubsub: {Phoenix.PubSub, MyApp.PubSub}
)
```

### Topics

Each agent broadcasts on two topics:

| Topic | Purpose |
|-------|---------|
| `"agent_server:#{agent_id}"` | Main events (status, messages, todos) |
| `"agent_server:debug:#{agent_id}"` | Debug events (state snapshots, middleware actions) |

## Subscribing to Events

### Main Events

```elixir
# In your LiveView or GenServer
def mount(%{"id" => id}, _session, socket) do
  if connected?(socket) do
    AgentServer.subscribe("conversation-#{id}")
  end
  {:ok, socket}
end

def handle_info({:agent, event}, socket) do
  # Handle the event
  {:noreply, handle_agent_event(event, socket)}
end
```

### Debug Events

```elixir
# For development/debugging tools
AgentServer.subscribe_debug("conversation-123")

def handle_info({:agent, {:debug, debug_event}}, socket) do
  case debug_event do
    {:agent_state_update, state} ->
      # Full state snapshot
      {:noreply, assign(socket, debug_state: state)}

    {:middleware_action, module, data} ->
      # Middleware-specific event
      {:noreply, log_middleware_action(socket, module, data)}
  end
end
```

## Event Reference

### Status Events

```elixir
{:agent, {:status_changed, status, data}}
```

| Status | Data | Description |
|--------|------|-------------|
| `:idle` | `nil` | Agent ready, not executing |
| `:running` | `nil` | Execution in progress |
| `:interrupted` | `%InterruptData{}` | Waiting for HITL approval |
| `:cancelled` | `nil` | User cancelled execution |
| `:error` | `reason` | Execution failed |

Example handler:

```elixir
def handle_agent_event({:status_changed, status, data}, socket) do
  case {status, data} do
    {:running, nil} ->
      socket
      |> assign(status: :running)
      |> assign(streaming_content: "")

    {:idle, nil} ->
      assign(socket, status: :idle)

    {:interrupted, interrupt_data} ->
      socket
      |> assign(status: :interrupted)
      |> assign(interrupt: interrupt_data)
      |> push_event("show_approval_modal", %{})

    {:error, reason} ->
      socket
      |> assign(status: :error)
      |> put_flash(:error, "Agent error: #{inspect(reason)}")

    {:cancelled, nil} ->
      assign(socket, status: :cancelled)
  end
end
```

### LLM Message Events

```elixir
# Streaming tokens (during generation)
{:agent, {:llm_deltas, [%MessageDelta{}, ...]}}

# Complete message (after generation)
{:agent, {:llm_message, %Message{}}}

# Token usage info
{:agent, {:llm_token_usage, %TokenUsage{}}}
```

Example streaming handler:

```elixir
def handle_agent_event({:llm_deltas, deltas}, socket) do
  # Accumulate streaming content
  new_content =
    Enum.reduce(deltas, socket.assigns.streaming_content, fn delta, acc ->
      acc <> (delta.content || "")
    end)

  socket
  |> assign(streaming_content: new_content)
  |> push_event("scroll_to_bottom", %{})
end

def handle_agent_event({:llm_message, message}, socket) do
  # Complete message received - add to history
  socket
  |> update(:messages, &(&1 ++ [message]))
  |> assign(streaming_content: "")
end

def handle_agent_event({:llm_token_usage, usage}, socket) do
  # Update token counts for display
  assign(socket, token_usage: usage)
end
```

### TODO Events

```elixir
{:agent, {:todos_updated, [%Todo{}, ...]}}
```

The TODO list is a complete snapshot (not a diff):

```elixir
def handle_agent_event({:todos_updated, todos}, socket) do
  # Replace entire TODO list
  assign(socket, todos: todos)
end
```

### Tool Events

```elixir
# Tool call identified (LLM requested a tool)
{:agent, {:tool_call_identified, tool_info}}
# tool_info contains: %{call_id, name, display_text, arguments}

# Tool execution started
{:agent, {:tool_execution_started, tool_info}}
# tool_info contains: %{call_id, name, display_text, arguments}

# Tool execution completed successfully
{:agent, {:tool_execution_completed, call_id, tool_result}}
# call_id - matches the started event
# tool_result - %ToolResult{} struct with response

# Tool execution failed
{:agent, {:tool_execution_failed, call_id, error}}
# call_id - matches the started event
# error - error reason or message
```

### Shutdown Event

```elixir
{:agent, {:agent_shutdown, %{reason: reason, metadata: map}}}
```

Reasons:
- `:inactivity` - Timeout expired
- `:no_viewers` - No presence after completion
- `:manual` - Explicitly stopped
- `:crash` - Process crashed (rare)

```elixir
def handle_agent_event({:agent_shutdown, %{reason: reason}}, socket) do
  case reason do
    :inactivity ->
      # Agent timed out - can restart on user action
      socket
      |> assign(agent_status: :inactive)
      |> put_flash(:info, "Conversation paused due to inactivity")

    :no_viewers ->
      # Normal cleanup after completion
      assign(socket, agent_status: :inactive)

    _ ->
      assign(socket, agent_status: :stopped)
  end
end
```

### Display Message Events

```elixir
# Message was persisted to database
{:agent, {:display_message_saved, display_message}}
```

Useful for syncing UI with database state.

## Debug Events

Debug events provide detailed insight into agent internals.

> **Note:** While you can subscribe to debug events directly, the `sagents_live_debugger` package already uses these events to provide a powerful real-time debugging dashboard. It visualizes agent state, middleware actions, and sub-agent hierarchies out of the box. See the [Live Debugger documentation](https://github.com/sagents-ai/sagents_live_debugger) for setup instructions.

### State Updates

```elixir
{:agent, {:debug, {:agent_state_update, %State{}}}}
```

Full state snapshot after each execution step.

### Middleware Actions

```elixir
{:agent, {:debug, {:middleware_action, module, data}}}
```

Each middleware can emit custom debug events:

| Middleware | Event | Data |
|------------|-------|------|
| `Summarization` | `:summarization_started` | Token count info |
| `Summarization` | `:summarization_completed` | Message count reduction |
| `PatchToolCalls` | `:dangling_tool_calls_patched` | Patch count |
| `ConversationTitle` | `:title_generated` | Generated title |
| `HumanInTheLoop` | `:interrupt_triggered` | Tool calls requiring approval |

Example debug handler:

```elixir
def handle_info({:agent, {:debug, event}}, socket) do
  case event do
    {:agent_state_update, state} ->
      {:noreply, update(socket, :debug_log, &[{:state, state} | &1])}

    {:middleware_action, Sagents.Middleware.Summarization, {:summarization_completed, info}} ->
      {:noreply, update(socket, :debug_log, &[{:summarization, info} | &1])}

    {:middleware_action, module, data} ->
      {:noreply, update(socket, :debug_log, &[{module, data} | &1])}
  end
end
```

## Publishing Custom Events

### From Middleware

Middleware can publish events using the AgentServer API:

```elixir
defmodule MyMiddleware do
  @behaviour Sagents.Middleware

  def after_model(state, _config) do
    # Publish custom event
    AgentServer.publish_event_from(
      state.agent_id,
      {:my_custom_event, %{data: "something"}}
    )

    # Publish debug event (separate topic)
    AgentServer.publish_debug_event_from(
      state.agent_id,
      {:middleware_action, __MODULE__, {:custom_action, "details"}}
    )

    {:ok, state}
  end
end
```

### Event Structure

Events are always wrapped in `{:agent, event}`:

```elixir
# What you publish
AgentServer.publish_event_from(agent_id, {:my_event, data})

# What subscribers receive
{:agent, {:my_event, data}}
```

## Phoenix.Presence Integration

### Overview

Presence tracking enables:
- Knowing how many clients are viewing a conversation
- Automatic shutdown when all viewers leave
- Displaying who else is viewing

### Setup

1. Create a Presence module:

```elixir
defmodule MyApp.Presence do
  use Phoenix.Presence,
    otp_app: :my_app,
    pubsub_server: MyApp.PubSub
end
```

2. Start it in your application:

```elixir
children = [
  MyApp.Presence,
  # ...
]
```

3. Configure AgentServer:

```elixir
AgentServer.start_link(
  agent: agent,
  pubsub: {Phoenix.PubSub, MyApp.PubSub},
  presence_tracking: [
    enabled: true,
    presence_module: MyApp.Presence,
    topic: "conversation:#{conversation_id}",
    grace_period: 5_000  # 5 seconds before shutdown
  ]
)
```

### Tracking Viewers

In your LiveView:

```elixir
def mount(%{"id" => conversation_id}, _session, socket) do
  topic = "conversation:#{conversation_id}"

  if connected?(socket) do
    # Subscribe to agent events
    AgentServer.subscribe("conversation-#{conversation_id}")

    # Track this viewer
    {:ok, _} = MyApp.Presence.track(
      self(),
      topic,
      socket.assigns.current_user.id,
      %{
        user_name: socket.assigns.current_user.name,
        joined_at: DateTime.utc_now()
      }
    )

    # Subscribe to presence changes
    Phoenix.PubSub.subscribe(MyApp.PubSub, topic)
  end

  {:ok, assign(socket, topic: topic)}
end

# Handle presence changes
def handle_info(%Phoenix.Socket.Broadcast{event: "presence_diff", payload: diff}, socket) do
  presence = MyApp.Presence.list(socket.assigns.topic)
  {:noreply, assign(socket, viewers: presence)}
end
```

### Displaying Viewers

```elixir
def render(assigns) do
  ~H"""
  <div class="viewers">
    <%= for {user_id, %{metas: metas}} <- @viewers do %>
      <span class="viewer" title={hd(metas).user_name}>
        <%= String.first(hd(metas).user_name) %>
      </span>
    <% end %>
  </div>
  """
end
```

### How Presence Affects Shutdown

1. Agent completes execution (status: `:idle`)
2. AgentServer checks presence count
3. If no viewers: start grace period timer
4. During grace period: if viewer joins, cancel shutdown
5. After grace period: save state and terminate

```
Execution completes → Check presence → No viewers → Grace period → Shutdown
                           ↓
                      Has viewers → Stay running
```

## Multiple Subscribers

Multiple LiveView processes can subscribe to the same agent:

```elixir
# Tab 1
AgentServer.subscribe("conversation-123")

# Tab 2 (same or different user)
AgentServer.subscribe("conversation-123")

# Both receive all events
```

This enables:
- Multiple browser tabs with same conversation
- Shared conversations between users
- Admin dashboards monitoring user conversations

## Event Ordering

Events are delivered in order per agent, but:
- No ordering guarantee across different agents
- Deltas may batch multiple tokens
- State updates are eventual (not transactional)

For consistency:
- Use `{:llm_message, msg}` for final message content (not accumulated deltas)
- Use `{:todos_updated, todos}` snapshots (not diffs)
- Handle `{:agent_shutdown, _}` to detect agent going away

## Best Practices

### 1. Always Check Connection

```elixir
def mount(_params, _session, socket) do
  if connected?(socket) do
    # Only subscribe when actually connected
    AgentServer.subscribe(agent_id)
  end
  {:ok, socket}
end
```

### 2. Handle Agent Not Running

```elixir
def mount(%{"id" => id}, _session, socket) do
  case Coordinator.start_conversation_session(id) do
    {:ok, session} ->
      AgentServer.subscribe(session.agent_id)
      {:ok, assign(socket, agent_id: session.agent_id)}

    {:error, :not_found} ->
      {:ok, redirect(socket, to: ~p"/conversations")}
  end
end
```

### 3. Explicit Unsubscription

Subscriptions are automatically cleaned up when the LiveView process terminates. However, you can explicitly unsubscribe when your application needs to disconnect for other reasons:

```elixir
@impl true
def handle_event("close_conversation", _params, socket) do
  AgentServer.unsubscribe(socket.assigns.agent_id)
  # ... other logic ...
  {:noreply, socket}
end
```