Skip to main content

lib/omni/ui/handlers.ex

defmodule Omni.UI.Handlers do
  @moduledoc false

  # Central event dispatch for `use Omni.UI`. The macro injects clauses
  # that delegate here — this module is not called directly by consumers.
  #
  # Three entry points:
  #
  #   handle_event/3         — "omni:*" browser events (returns {:noreply, socket})
  #   handle_info/2          — {Omni.UI, ...} component messages (returns {:noreply, socket})
  #   handle_session_event/3 — {:session, ...} streaming events (returns bare socket)

  import Phoenix.LiveView
  import Phoenix.Component

  require Logger

  alias Omni.Session.Tree

  @notification_cap 5

  # ── Events ───────────────────────────────────────────────────────

  @doc false
  def handle_event("omni:select", %{"name" => "model", "value" => value}, socket) do
    [provider, model_id] = String.split(value, ":", parts: 2)
    socket = Omni.UI.update_session(socket, model: {String.to_existing_atom(provider), model_id})
    {:noreply, socket}
  end

  def handle_event("omni:select", %{"name" => "thinking", "value" => value}, socket) do
    thinking = String.to_existing_atom(value)
    socket = Omni.UI.update_session(socket, thinking: thinking)
    {:noreply, socket}
  end

  def handle_event("omni:dismiss", %{"id" => id}, socket) do
    handle_info({Omni.UI, :dismiss_notification, String.to_integer(id)}, socket)
  end

  def handle_event("omni:navigate", %{"node_id" => node_id}, socket) do
    case Omni.Session.navigate(socket.assigns.session, node_id) do
      :ok ->
        {:noreply, push_event(socket, "omni:updated", %{})}

      {:error, reason} ->
        notify_branch_error(reason)
        {:noreply, socket}
    end
  end

  def handle_event("omni:retry", _params, socket) do
    turn = socket.assigns.current_turn

    if turn && turn.status == :error do
      content = turn.user_text ++ turn.user_attachments
      :ok = Omni.Session.prompt(socket.assigns.session, content)
      message = Omni.message(role: :user, content: content)

      {:noreply,
       socket
       |> assign(:current_turn, streaming_turn(nil, message))
       |> push_event("omni:updated", %{})}
    else
      {:noreply, socket}
    end
  end

  def handle_event("omni:regenerate", %{"turn_id" => turn_id}, socket) do
    case Omni.Session.branch(socket.assigns.session, turn_id) do
      :ok ->
        message = socket.assigns.tree.nodes[turn_id].message

        {:noreply,
         socket
         |> assign(:current_turn, streaming_turn(turn_id, message))
         |> push_event("omni:updated", %{})}

      {:error, reason} ->
        notify_branch_error(reason)
        {:noreply, socket}
    end
  end

  # ── Messages ─────────────────────────────────────────────────────

  @doc false
  def handle_info({Omni.UI, :new_message, message}, socket) do
    socket = Omni.UI.ensure_session(socket)
    :ok = Omni.Session.prompt(socket.assigns.session, message.content)
    {:noreply, assign(socket, :current_turn, streaming_turn(nil, message))}
  end

  # Editing a user message branches from its parent. Per `Omni.Session.branch/3`
  # semantics, the target is the assistant above the edited user (or `nil` for
  # a root user, which creates a new disjoint root). The new user + assistant
  # turn appends as children — opposite asymmetry from the old tree-owned-state
  # model, which navigated to the user's parent and pushed a sibling user.
  def handle_info({Omni.UI, :edit_message, turn_id, message}, socket) do
    parent_id = socket.assigns.tree.nodes[turn_id].parent_id

    case Omni.Session.branch(socket.assigns.session, parent_id, message.content) do
      :ok ->
        {:noreply,
         socket
         |> assign(:current_turn, streaming_turn(nil, message))
         |> push_event("omni:updated", %{})}

      {:error, reason} ->
        notify_branch_error(reason)
        {:noreply, socket}
    end
  end

  # ── Notifications ────────────────────────────────────────────────

  def handle_info({Omni.UI, :notify, notification}, socket) do
    Process.send_after(
      self(),
      {Omni.UI, :dismiss_notification, notification.id},
      notification.timeout
    )

    ids = socket.assigns.notification_ids ++ [notification.id]
    {evicted, kept} = split_evicted(ids)

    socket =
      Enum.reduce(evicted, socket, fn id, s ->
        stream_delete(s, :notifications, %{id: id})
      end)

    {:noreply,
     socket
     |> stream_insert(:notifications, notification, at: 0)
     |> assign(:notification_ids, kept)}
  end

  def handle_info({Omni.UI, :dismiss_notification, id}, socket) do
    {:noreply,
     socket
     |> stream_delete(:notifications, %{id: id})
     |> update(:notification_ids, &List.delete(&1, id))}
  end

  defp split_evicted(ids) when length(ids) > @notification_cap do
    Enum.split(ids, length(ids) - @notification_cap)
  end

  defp split_evicted(ids), do: {[], ids}

  # ── Session events (return socket, not {:noreply, socket}) ───────

  # Streaming-deltas accumulate into @current_turn.

  @doc false
  def handle_session_event(:thinking_start, _data, socket) do
    update(socket, :current_turn, fn turn ->
      Omni.UI.Turn.push_content(turn, %Omni.Content.Thinking{text: ""})
    end)
  end

  def handle_session_event(:text_start, _data, socket) do
    update(socket, :current_turn, fn turn ->
      Omni.UI.Turn.push_content(turn, %Omni.Content.Text{text: ""})
    end)
  end

  def handle_session_event(delta_type, %{delta: delta}, socket)
      when delta_type in [:thinking_delta, :text_delta] do
    update(socket, :current_turn, fn turn ->
      Omni.UI.Turn.push_delta(turn, delta)
    end)
  end

  # Push a stub ToolUse on start so the header (icon, tool name) renders
  # immediately. The fully-formed struct replaces it on :tool_use_end.
  def handle_session_event(:tool_use_start, %{id: id, name: name} = data, socket) do
    stub = %Omni.Content.ToolUse{id: id, name: name, input: Map.get(data, :input, %{})}

    update(socket, :current_turn, fn turn ->
      Omni.UI.Turn.push_content(turn, stub)
    end)
  end

  def handle_session_event(:tool_use_end, %{content: tool_use}, socket) do
    update(socket, :current_turn, fn turn ->
      Omni.UI.Turn.replace_content(turn, tool_use)
    end)
  end

  def handle_session_event(:tool_result, tool_result, socket) do
    update(socket, :current_turn, fn turn ->
      Omni.UI.Turn.put_tool_result(turn, tool_result)
    end)
  end

  # Turn boundary. Nil out @current_turn so the committed turn appears
  # in the :turns stream via the :tree event that follows. For
  # continuations, a subsequent :message event sets up a fresh
  # @current_turn for the new turn the agent kicks off.
  def handle_session_event(:turn, {_kind, _response}, socket) do
    assign(socket, :current_turn, nil)
  end

  # Continuation user message. The agent emits :message for the new user
  # prompt after :turn {:continue} commits. When no turn is in flight,
  # start a fresh streaming turn from this message.
  def handle_session_event(:message, %Omni.Message{role: :user} = message, socket) do
    if socket.assigns.current_turn == nil do
      assign(socket, :current_turn, streaming_turn(nil, message))
    else
      socket
    end
  end

  # Tree mirror. Session emits this after every tree mutation: turn commit
  # (with new node ids), navigate (empty new_nodes), and the apply_navigation
  # step inside branch ops (also empty new_nodes). Rebuild the turn list from
  # the new tree on each event — Turn.all walks the active path so navigates
  # and branches naturally drop turns that have left the path.
  #
  # The in-flight streaming turn (@current_turn) is rendered separately and
  # is not yet in the tree, so no filtering is needed — :turn always nils
  # @current_turn before :tree fires.
  def handle_session_event(:tree, %{tree: tree}, socket) do
    turns = Omni.UI.Turn.all(tree)

    socket
    |> assign(tree: tree, usage: Tree.usage(tree))
    |> stream(:turns, turns, reset: true)
  end

  # Persistence acks. The first save after starting a fresh session is the
  # signal that the session id is real — patch the URL so the user can
  # bookmark/share/reload.
  def handle_session_event(:store, {:saved, _kind}, socket) do
    if socket.assigns[:url_synced] do
      socket
    else
      socket
      |> push_patch(to: "/?session_id=#{socket.assigns.session_id}")
      |> assign(:url_synced, true)
    end
  end

  def handle_session_event(:store, {:error, kind, reason}, socket) do
    Logger.error("Session store error (#{kind}): #{inspect(reason)}")
    Omni.UI.notify(:error, "Couldn't save your changes.")
    socket
  end

  # Title changes (set via Omni.Session.set_title/2). Mirror to assigns; the
  # consumer can render @title if it surfaces a title bar.
  def handle_session_event(:title, title, socket) do
    assign(socket, :title, title)
  end

  # Best-effort sync if the session's agent state changes underneath us
  # (e.g. via a future Manager). The explicit update_session/2 paths already
  # keep model/thinking aligned, so this is defensive.
  def handle_session_event(:state, %_{model: model, opts: opts}, socket) do
    thinking = Keyword.get(opts, :thinking, false)

    socket
    |> assign(:model, model)
    |> assign(:thinking, thinking)
  end

  def handle_session_event(:status, _status, socket), do: socket

  def handle_session_event(:error, reason, socket) do
    Logger.error("Session error: #{inspect(reason)}")
    Omni.UI.notify(:error, format_error(reason))

    case socket.assigns.current_turn do
      nil ->
        socket

      turn ->
        assign(socket, :current_turn, %{turn | status: :error, error: format_error(reason)})
    end
  end

  # Catch-all for unhandled session events
  def handle_session_event(_event, _data, socket), do: socket

  # ── Helpers ──────────────────────────────────────────────────────

  defp streaming_turn(id, %Omni.Message{} = message) do
    %Omni.UI.Turn{
      id: id,
      status: :streaming,
      user_text: Enum.filter(message.content, &match?(%Omni.Content.Text{}, &1)),
      user_attachments: Enum.filter(message.content, &match?(%Omni.Content.Attachment{}, &1)),
      user_timestamp: message.timestamp
    }
  end

  defp format_error(%{message: message}) when is_binary(message), do: message
  defp format_error(reason) when is_binary(reason), do: reason
  defp format_error(_reason), do: "Something went wrong. Please try again."

  defp notify_branch_error(:busy),
    do: Omni.UI.notify(:warning, "Wait for the current turn to finish.")

  defp notify_branch_error(:paused),
    do: Omni.UI.notify(:warning, "Resume the paused turn before branching.")

  defp notify_branch_error(:not_found),
    do: Omni.UI.notify(:warning, "Couldn't find that point in the conversation.")

  defp notify_branch_error(reason) do
    Logger.warning("Branch op failed: #{inspect(reason)}")
    Omni.UI.notify(:error, "Couldn't switch branches.")
  end
end