Skip to main content

lib/jido_messaging.ex

defmodule Jido.Messaging do
  @moduledoc """
  Messaging and notification system for the Jido ecosystem.

  ## Usage

  Define a messaging module in your application:

      defmodule MyApp.Messaging do
        use Jido.Messaging,
          persistence: Jido.Messaging.Persistence.ETS
      end

  Add it to your supervision tree:

      children = [
        MyApp.Messaging
      ]

  Use the API:

      {:ok, room} = MyApp.Messaging.create_room(%{type: :direct, name: "Chat"})
      {:ok, message} = MyApp.Messaging.save_message(%{
        room_id: room.id,
        sender_id: "user_123",
        role: :user,
        content: [%{type: :text, text: "Hello!"}]
      })
      {:ok, messages} = MyApp.Messaging.list_messages(room.id)

  For app-level chat commands that should notify realtime consumers, use the
  eventful command APIs:

      {:ok, result} = MyApp.Messaging.post_message(%{
        room_id: room.id,
        sender_id: "user_123",
        role: :user,
        content: [%{type: :text, text: "Hello!"}]
      })

      [%Jido.Signal{type: "jido.messaging.room.message_added"}] = result.signals

  `save_message/1` remains the low-level persistence primitive. `post_message/2`
  persists and emits committed `jido.messaging.*` CloudEvents through the
  instance Signal Bus.
  """

  alias Jido.Chat.{Participant, Room}
  alias Jido.Messaging.BridgeRoomSpec
  alias Jido.Messaging.TopologyValidator

  alias Jido.Messaging.{
    AgentRunner,
    AgentSupervisor,
    ConfigStore,
    IngressSubscriptions,
    Message,
    Onboarding,
    RoomServer,
    RoomSupervisor,
    Runtime,
    Thread
  }

  defmacro __using__(opts) do
    quote bind_quoted: [opts: opts] do
      @persistence Keyword.get(opts, :persistence, Jido.Messaging.Persistence.ETS)
      @persistence_opts Keyword.get(opts, :persistence_opts, [])
      @runtime_profile Keyword.get(opts, :runtime_profile, :full)
      @runtime_features Keyword.get(opts, :runtime_features, [])
      @bridge_manifest_paths Keyword.get(opts, :bridge_manifest_paths, [])
      @required_bridges Keyword.get(opts, :required_bridges, [])
      @bridge_collision_policy Keyword.get(opts, :bridge_collision_policy, :prefer_last)
      @pubsub Keyword.get(opts, :pubsub)

      def child_spec(init_opts) do
        %{
          id: __MODULE__,
          start: {__MODULE__, :start_link, [init_opts]},
          type: :supervisor
        }
      end

      def start_link(opts \\ []) do
        persistence = Keyword.get(opts, :persistence, @persistence)
        persistence_opts = Keyword.get(opts, :persistence_opts, @persistence_opts)
        runtime_profile = Keyword.get(opts, :runtime_profile, @runtime_profile)
        runtime_features = Keyword.get(opts, :runtime_features, @runtime_features)
        bridge_manifest_paths = Keyword.get(opts, :bridge_manifest_paths, @bridge_manifest_paths)
        required_bridges = Keyword.get(opts, :required_bridges, @required_bridges)
        bridge_collision_policy = Keyword.get(opts, :bridge_collision_policy, @bridge_collision_policy)

        Jido.Messaging.Supervisor.start_link(
          name: __jido_messaging__(:supervisor),
          instance_module: __MODULE__,
          persistence: persistence,
          persistence_opts: persistence_opts,
          runtime_profile: runtime_profile,
          runtime_features: runtime_features,
          bridge_manifest_paths: bridge_manifest_paths,
          required_bridges: required_bridges,
          bridge_collision_policy: bridge_collision_policy
        )
      end

      @doc "Returns naming info for this instance"
      def __jido_messaging__(key) do
        case key do
          :supervisor -> Module.concat(__MODULE__, :Supervisor)
          :runtime -> Module.concat(__MODULE__, :Runtime)
          :room_registry -> Module.concat([__MODULE__, "Registry", "Rooms"])
          :room_supervisor -> Module.concat(__MODULE__, :RoomSupervisor)
          :agent_registry -> Module.concat([__MODULE__, "Registry", "Agents"])
          :agent_supervisor -> Module.concat(__MODULE__, :AgentSupervisor)
          :instance_registry -> Module.concat([__MODULE__, "Registry", "Instances"])
          :bridge_registry -> Module.concat([__MODULE__, "Registry", "Bridges"])
          :instance_supervisor -> Module.concat(__MODULE__, :InstanceSupervisor)
          :bridge_supervisor -> Module.concat(__MODULE__, :BridgeSupervisor)
          :onboarding_registry -> Module.concat([__MODULE__, "Registry", "Onboarding"])
          :onboarding_supervisor -> Module.concat(__MODULE__, :OnboardingSupervisor)
          :session_manager_supervisor -> Module.concat(__MODULE__, :SessionManagerSupervisor)
          :dead_letter -> Module.concat(__MODULE__, :DeadLetter)
          :dead_letter_replay_supervisor -> Module.concat(__MODULE__, :DeadLetterReplaySupervisor)
          :config_store -> Module.concat(__MODULE__, :ConfigStore)
          :deduper -> Module.concat(__MODULE__, :Deduper)
          :persistence -> @persistence
          :persistence_opts -> @persistence_opts
          :runtime_profile -> @runtime_profile
          :runtime_features -> @runtime_features
          :bridge_manifest_paths -> @bridge_manifest_paths
          :required_bridges -> @required_bridges
          :bridge_collision_policy -> @bridge_collision_policy
          :pubsub -> @pubsub
        end
      end

      # Delegated API functions

      @doc "Create a new room"
      def create_room(attrs) do
        Jido.Messaging.create_room(__jido_messaging__(:runtime), attrs)
      end

      @doc "Get a room by ID"
      def get_room(room_id) do
        Jido.Messaging.get_room(__jido_messaging__(:runtime), room_id)
      end

      @doc "List rooms with optional filters"
      def list_rooms(opts \\ []) do
        Jido.Messaging.list_rooms(__jido_messaging__(:runtime), opts)
      end

      @doc "Delete a room"
      def delete_room(room_id) do
        Jido.Messaging.delete_room(__jido_messaging__(:runtime), room_id)
      end

      @doc "Create a new participant"
      def create_participant(attrs) do
        Jido.Messaging.create_participant(__jido_messaging__(:runtime), attrs)
      end

      @doc "Get a participant by ID"
      def get_participant(participant_id) do
        Jido.Messaging.get_participant(__jido_messaging__(:runtime), participant_id)
      end

      @doc "Save a message"
      def save_message(attrs) do
        Jido.Messaging.save_message(__jido_messaging__(:runtime), attrs)
      end

      @doc "Persist a message and emit a committed `jido.messaging.room.message_added` signal"
      def post_message(attrs, opts \\ []) do
        Jido.Messaging.post_message(__MODULE__, __jido_messaging__(:runtime), attrs, opts)
      end

      @doc "Get a message by ID"
      def get_message(message_id) do
        Jido.Messaging.get_message(__jido_messaging__(:runtime), message_id)
      end

      @doc "List messages for a room"
      def list_messages(room_id, opts \\ []) do
        Jido.Messaging.list_messages(__jido_messaging__(:runtime), room_id, opts)
      end

      @doc "Return raw timeline/thread grouping for a room"
      def room_timeline(room_id, opts \\ []) do
        Jido.Messaging.room_timeline(__jido_messaging__(:runtime), room_id, opts)
      end

      @doc "Delete a message"
      def delete_message(message_id) do
        Jido.Messaging.delete_message(__jido_messaging__(:runtime), message_id)
      end

      @doc "Save a thread"
      def save_thread(attrs) do
        Jido.Messaging.save_thread(__jido_messaging__(:runtime), attrs)
      end

      @doc "Get a thread by ID"
      def get_thread(thread_id) do
        Jido.Messaging.get_thread(__jido_messaging__(:runtime), thread_id)
      end

      @doc "Get a thread by external thread ID within a room"
      def get_thread_by_external_id(room_id, external_thread_id) do
        Jido.Messaging.get_thread_by_external_id(
          __jido_messaging__(:runtime),
          room_id,
          external_thread_id
        )
      end

      @doc "Get a thread by root message ID"
      def get_thread_by_root_message(room_id, root_message_id) do
        Jido.Messaging.get_thread_by_root_message(
          __jido_messaging__(:runtime),
          room_id,
          root_message_id
        )
      end

      @doc "List threads in a room"
      def list_threads(room_id, opts \\ []) do
        Jido.Messaging.list_threads(__jido_messaging__(:runtime), room_id, opts)
      end

      @doc "Get or create room by external binding"
      def get_or_create_room_by_external_binding(channel, bridge_id, external_id, attrs \\ %{}) do
        Jido.Messaging.get_or_create_room_by_external_binding(
          __jido_messaging__(:runtime),
          channel,
          bridge_id,
          external_id,
          attrs
        )
      end

      @doc "Get or create participant by external ID"
      def get_or_create_participant_by_external_id(channel, external_id, attrs \\ %{}) do
        Jido.Messaging.get_or_create_participant_by_external_id(
          __jido_messaging__(:runtime),
          channel,
          external_id,
          attrs
        )
      end

      @doc "Get a message by its external ID within a channel/bridge context"
      def get_message_by_external_id(channel, bridge_id, external_id) do
        Jido.Messaging.get_message_by_external_id(
          __jido_messaging__(:runtime),
          channel,
          bridge_id,
          external_id
        )
      end

      @doc "Update a message's external_id"
      def update_message_external_id(message_id, external_id) do
        Jido.Messaging.update_message_external_id(
          __jido_messaging__(:runtime),
          message_id,
          external_id
        )
      end

      @doc "Save an already-constructed message struct (for updates)"
      def save_message_struct(message) do
        Jido.Messaging.save_message_struct(__jido_messaging__(:runtime), message)
      end

      @doc "Add a reaction to a message and emit a committed reaction signal"
      def add_reaction(message_id, participant_id, reaction, opts \\ []) do
        Jido.Messaging.add_reaction(
          __MODULE__,
          __jido_messaging__(:runtime),
          message_id,
          participant_id,
          reaction,
          opts
        )
      end

      @doc "Remove a reaction from a message and emit a committed reaction signal"
      def remove_reaction(message_id, participant_id, reaction, opts \\ []) do
        Jido.Messaging.remove_reaction(
          __MODULE__,
          __jido_messaging__(:runtime),
          message_id,
          participant_id,
          reaction,
          opts
        )
      end

      @doc "Save an already-constructed thread struct (for updates)"
      def save_thread_struct(thread) do
        Jido.Messaging.save_thread_struct(__jido_messaging__(:runtime), thread)
      end

      @doc "Save a room struct directly (for custom IDs)"
      def save_room(room) do
        Jido.Messaging.save_room(__jido_messaging__(:runtime), room)
      end

      @doc "Get room by external binding (without creating)"
      def get_room_by_external_binding(channel, bridge_id, external_id) do
        Jido.Messaging.get_room_by_external_binding(
          __jido_messaging__(:runtime),
          channel,
          bridge_id,
          external_id
        )
      end

      @doc "Create a binding between an internal room and an external platform"
      def create_room_binding(room_id, channel, bridge_id, external_id, attrs \\ %{}) do
        Jido.Messaging.create_room_binding(
          __jido_messaging__(:runtime),
          room_id,
          channel,
          bridge_id,
          external_id,
          attrs
        )
      end

      @doc "List all bindings for a room"
      def list_room_bindings(room_id) do
        Jido.Messaging.list_room_bindings(__jido_messaging__(:runtime), room_id)
      end

      @doc "Delete a room binding"
      def delete_room_binding(binding_id) do
        Jido.Messaging.delete_room_binding(__jido_messaging__(:runtime), binding_id)
      end

      # Directory functions

      @doc "Lookup a single directory entry."
      def directory_lookup(target, query, opts \\ []) do
        Jido.Messaging.directory_lookup(__jido_messaging__(:runtime), target, query, opts)
      end

      @doc "Search directory entries."
      def directory_search(target, query, opts \\ []) do
        Jido.Messaging.directory_search(__jido_messaging__(:runtime), target, query, opts)
      end

      # Onboarding functions

      @doc "Start (or resume) an onboarding flow."
      def start_onboarding(attrs, opts \\ []) do
        Jido.Messaging.start_onboarding(__MODULE__, attrs, opts)
      end

      @doc "Advance an onboarding flow."
      def advance_onboarding(onboarding_id, transition, metadata \\ %{}, opts \\ []) do
        Jido.Messaging.advance_onboarding(__MODULE__, onboarding_id, transition, metadata, opts)
      end

      @doc "Resume an onboarding flow."
      def resume_onboarding(onboarding_id) do
        Jido.Messaging.resume_onboarding(__MODULE__, onboarding_id)
      end

      @doc "Cancel an onboarding flow."
      def cancel_onboarding(onboarding_id, metadata \\ %{}, opts \\ []) do
        Jido.Messaging.cancel_onboarding(__MODULE__, onboarding_id, metadata, opts)
      end

      @doc "Complete an onboarding flow."
      def complete_onboarding(onboarding_id, metadata \\ %{}, opts \\ []) do
        Jido.Messaging.complete_onboarding(__MODULE__, onboarding_id, metadata, opts)
      end

      @doc "Fetch onboarding flow state."
      def get_onboarding(onboarding_id) do
        Jido.Messaging.get_onboarding(__MODULE__, onboarding_id)
      end

      @doc "Find the onboarding worker PID for a flow."
      def whereis_onboarding_worker(onboarding_id) do
        Jido.Messaging.whereis_onboarding_worker(__MODULE__, onboarding_id)
      end

      # Room Server functions

      @doc "Start a room server for the given room"
      def start_room_server(room, opts \\ []) do
        Jido.Messaging.RoomSupervisor.start_room(__MODULE__, room, opts)
      end

      @doc "Get or start a room server"
      def get_or_start_room_server(room, opts \\ []) do
        Jido.Messaging.RoomSupervisor.get_or_start_room(__MODULE__, room, opts)
      end

      @doc "Stop a room server"
      def stop_room_server(room_id) do
        Jido.Messaging.RoomSupervisor.stop_room(__MODULE__, room_id)
      end

      @doc "Find a running room server by room ID"
      def whereis_room_server(room_id) do
        Jido.Messaging.RoomServer.whereis(__MODULE__, room_id)
      end

      @doc "List all running room servers"
      def list_room_servers do
        Jido.Messaging.RoomSupervisor.list_rooms(__MODULE__)
      end

      @doc "Count running room servers"
      def count_room_servers do
        Jido.Messaging.RoomSupervisor.count_rooms(__MODULE__)
      end

      # Agent functions

      @doc "Register an agent with a room"
      def register_agent(room_id, agent_spec, opts \\ []) do
        Jido.Messaging.register_agent(__MODULE__, room_id, agent_spec, opts)
      end

      @doc "Unregister an agent from a room"
      def unregister_agent(room_id, agent_id) do
        Jido.Messaging.unregister_agent(__MODULE__, room_id, agent_id)
      end

      @doc "List registered agents in a room"
      def list_agents(room_id) do
        Jido.Messaging.list_agents(__MODULE__, room_id)
      end

      @doc "Assign a thread to an agent"
      def assign_thread(room_id, thread_id, agent_id) do
        Jido.Messaging.assign_thread(__MODULE__, room_id, thread_id, agent_id)
      end

      @doc "Unassign a thread"
      def unassign_thread(room_id, thread_id) do
        Jido.Messaging.unassign_thread(__MODULE__, room_id, thread_id)
      end

      @doc "Fetch thread assignment"
      def thread_assignment(room_id, thread_id) do
        Jido.Messaging.thread_assignment(__MODULE__, room_id, thread_id)
      end

      @doc "Find a running agent by room, thread, and agent ID"
      def whereis_agent(room_id, thread_id, agent_id) do
        Jido.Messaging.AgentRunner.whereis(__MODULE__, room_id, thread_id, agent_id)
      end

      @doc "Count running agents"
      def count_agents do
        Jido.Messaging.AgentSupervisor.count_agents(__MODULE__)
      end

      # Instance lifecycle functions

      @doc "Start a new channel instance"
      def start_instance(channel_type, attrs \\ %{}) do
        Jido.Messaging.InstanceSupervisor.start_instance(__MODULE__, channel_type, attrs)
      end

      @doc "Stop an instance"
      def stop_instance(instance_id) do
        Jido.Messaging.InstanceSupervisor.stop_instance(__MODULE__, instance_id)
      end

      @doc "Get instance status"
      def instance_status(instance_id) do
        Jido.Messaging.InstanceSupervisor.instance_status(__MODULE__, instance_id)
      end

      @doc "List all running instances"
      def list_instances do
        Jido.Messaging.InstanceSupervisor.list_instances(__MODULE__)
      end

      @doc "Count running instances"
      def count_instances do
        Jido.Messaging.InstanceSupervisor.count_instances(__MODULE__)
      end

      @doc "List running bridge workers"
      def list_bridges do
        Jido.Messaging.BridgeSupervisor.list_bridges(__MODULE__)
      end

      @doc "Get runtime status for a bridge worker."
      def bridge_status(bridge_id) do
        Jido.Messaging.bridge_status(__MODULE__, bridge_id)
      end

      @doc "List runtime status for all bridge workers."
      def list_bridge_status do
        Jido.Messaging.list_bridge_status(__MODULE__)
      end

      # Bridge control-plane functions

      @doc "Create or update bridge config."
      def put_bridge_config(attrs) do
        Jido.Messaging.put_bridge_config(__MODULE__, attrs)
      end

      @doc "Fetch bridge config by id."
      def get_bridge_config(bridge_id) do
        Jido.Messaging.get_bridge_config(__MODULE__, bridge_id)
      end

      @doc "List bridge configs."
      def list_bridge_configs(opts \\ []) do
        Jido.Messaging.list_bridge_configs(__MODULE__, opts)
      end

      @doc "Delete bridge config."
      def delete_bridge_config(bridge_id) do
        Jido.Messaging.delete_bridge_config(__MODULE__, bridge_id)
      end

      @doc "Ensure adapter-scoped provider ingress subscription for a bridge."
      def ensure_ingress_subscription(bridge_id, opts \\ []) do
        Jido.Messaging.ensure_ingress_subscription(__MODULE__, bridge_id, opts)
      end

      @doc "List adapter-scoped provider ingress subscriptions for a bridge."
      def list_ingress_subscriptions(bridge_id, opts \\ []) do
        Jido.Messaging.list_ingress_subscriptions(__MODULE__, bridge_id, opts)
      end

      @doc "Delete adapter-scoped provider ingress subscription for a bridge."
      def delete_ingress_subscription(bridge_id, subscription_id, opts \\ []) do
        Jido.Messaging.delete_ingress_subscription(__MODULE__, bridge_id, subscription_id, opts)
      end

      @doc "Create or update per-room routing policy."
      def put_routing_policy(room_id, attrs) do
        Jido.Messaging.put_routing_policy(__MODULE__, room_id, attrs)
      end

      @doc "Fetch routing policy for room."
      def get_routing_policy(room_id) do
        Jido.Messaging.get_routing_policy(__MODULE__, room_id)
      end

      @doc "Delete routing policy for room."
      def delete_routing_policy(room_id) do
        Jido.Messaging.delete_routing_policy(__MODULE__, room_id)
      end

      # Inbound routing functions

      @doc "Route webhook payload through bridge-config parse/verify path into ingest."
      def route_webhook(bridge_id, payload, opts \\ []) do
        Jido.Messaging.route_webhook(__MODULE__, bridge_id, payload, opts)
      end

      @doc "Route webhook request and return typed webhook response + ingest outcome."
      def route_webhook_request(bridge_id, request_meta, payload, opts \\ []) do
        Jido.Messaging.route_webhook_request(__MODULE__, bridge_id, request_meta, payload, opts)
      end

      @doc "Route direct payload through bridge-config transform path into ingest."
      def route_payload(bridge_id, payload, opts \\ []) do
        Jido.Messaging.route_payload(__MODULE__, bridge_id, payload, opts)
      end

      @doc "Create or ensure a bridge-backed room topology in one call."
      def create_bridge_room(attrs) do
        Jido.Messaging.create_bridge_room(__MODULE__, attrs)
      end

      # Outbound routing functions

      @doc "Resolve configured outbound adapter routes for a room."
      def resolve_outbound_routes(room_id, opts \\ []) do
        Jido.Messaging.resolve_outbound_routes(__MODULE__, room_id, opts)
      end

      @doc "Route outbound text through bridge bindings/policy for a room."
      def route_outbound(room_id, text, opts \\ []) do
        Jido.Messaging.route_outbound(__MODULE__, room_id, text, opts)
      end

      @doc "Get health snapshot for an instance"
      def instance_health(instance_id) do
        case Jido.Messaging.InstanceServer.whereis(__MODULE__, instance_id) do
          nil -> {:error, :not_found}
          pid -> Jido.Messaging.InstanceServer.health_snapshot(pid)
        end
      end

      @doc "Get health snapshots for all running instances"
      def list_instance_health do
        Jido.Messaging.InstanceSupervisor.list_instance_health(__MODULE__)
      end

      # Deduplication functions

      @doc "Check if a message key is a duplicate (and mark as seen if new)"
      def check_dedupe(key, ttl_ms \\ nil) do
        Jido.Messaging.Deduper.check_and_mark(__MODULE__, key, ttl_ms)
      end

      @doc "Check if a message key has been seen"
      def seen?(key) do
        Jido.Messaging.Deduper.seen?(__MODULE__, key)
      end

      @doc "Clear all dedupe keys"
      def clear_dedupe do
        Jido.Messaging.Deduper.clear(__MODULE__)
      end

      # Dead-letter functions

      @doc "List dead-letter records."
      def list_dead_letters(opts \\ []) do
        Jido.Messaging.DeadLetter.list(__MODULE__, opts)
      end

      @doc "Get a dead-letter record by ID."
      def get_dead_letter(dead_letter_id) do
        Jido.Messaging.DeadLetter.get(__MODULE__, dead_letter_id)
      end

      @doc "Replay a dead-letter record by ID."
      def replay_dead_letter(dead_letter_id, opts \\ []) do
        Jido.Messaging.DeadLetter.replay(__MODULE__, dead_letter_id, opts)
      end

      @doc "Archive a dead-letter record by ID."
      def archive_dead_letter(dead_letter_id) do
        Jido.Messaging.DeadLetter.archive(__MODULE__, dead_letter_id)
      end

      @doc "Purge dead-letter records by filter."
      def purge_dead_letters(opts \\ []) do
        Jido.Messaging.DeadLetter.purge(__MODULE__, opts)
      end

      # PubSub functions

      @doc "Subscribe to room events via PubSub"
      def subscribe(room_id) do
        Jido.Messaging.PubSub.subscribe(__MODULE__, room_id)
      end

      @doc "Unsubscribe from room events"
      def unsubscribe(room_id) do
        Jido.Messaging.PubSub.unsubscribe(__MODULE__, room_id)
      end

      @doc "Subscribe to Jido Signal events emitted by this messaging instance"
      def subscribe_signals(path \\ "jido.messaging.**", opts \\ []) do
        Jido.Messaging.subscribe_signals(__MODULE__, path, opts)
      end

      @doc "Unsubscribe from a Jido Signal Bus subscription"
      def unsubscribe_signals(subscription_id, opts \\ []) do
        Jido.Messaging.unsubscribe_signals(__MODULE__, subscription_id, opts)
      end

      @doc "Emit a custom room-scoped Jido Signal event"
      def dispatch_room_event(event_type, room_id, data \\ %{}, opts \\ []) do
        Jido.Messaging.dispatch_room_event(__MODULE__, event_type, room_id, data, opts)
      end

      @doc "Emit a room-scoped participant joined signal"
      def participant_joined(room_id, participant_id, opts \\ []) do
        Jido.Messaging.participant_joined(__MODULE__, room_id, participant_id, opts)
      end

      @doc "Emit a room-scoped participant left signal"
      def participant_left(room_id, participant_id, opts \\ []) do
        Jido.Messaging.participant_left(__MODULE__, room_id, participant_id, opts)
      end

      @doc "Emit a participant presence changed signal"
      def participant_presence_changed(room_id, participant_id, from, to, opts \\ []) do
        Jido.Messaging.participant_presence_changed(__MODULE__, room_id, participant_id, from, to, opts)
      end

      @doc "Emit a participant typing signal"
      def participant_typing(room_id, participant_id, is_typing, opts \\ []) do
        Jido.Messaging.participant_typing(__MODULE__, room_id, participant_id, is_typing, opts)
      end

      @doc "Return the instance Signal Bus name"
      def signal_bus_name do
        Jido.Messaging.Supervisor.signal_bus_name(__MODULE__)
      end
    end
  end

  # Core API implementations that work with the Runtime

  @doc "Create a new room"
  def create_room(runtime, attrs) when is_map(attrs) do
    {persistence, persistence_state} = Runtime.get_persistence(runtime)
    room = Room.new(attrs)
    persistence.save_room(persistence_state, room)
  end

  @doc "Get a room by ID"
  def get_room(runtime, room_id) do
    {persistence, persistence_state} = Runtime.get_persistence(runtime)
    persistence.get_room(persistence_state, room_id)
  end

  @doc "Save a room struct directly (for custom IDs)"
  def save_room(runtime, %Room{} = room) do
    {persistence, persistence_state} = Runtime.get_persistence(runtime)
    persistence.save_room(persistence_state, room)
  end

  @doc "List rooms"
  def list_rooms(runtime, opts \\ []) do
    {persistence, persistence_state} = Runtime.get_persistence(runtime)
    persistence.list_rooms(persistence_state, opts)
  end

  @doc "Delete a room"
  def delete_room(runtime, room_id) do
    {persistence, persistence_state} = Runtime.get_persistence(runtime)
    persistence.delete_room(persistence_state, room_id)
  end

  @doc "Create a new participant"
  def create_participant(runtime, attrs) when is_map(attrs) do
    {persistence, persistence_state} = Runtime.get_persistence(runtime)
    participant = Participant.new(attrs)
    persistence.save_participant(persistence_state, participant)
  end

  @doc "Get a participant by ID"
  def get_participant(runtime, participant_id) do
    {persistence, persistence_state} = Runtime.get_persistence(runtime)
    persistence.get_participant(persistence_state, participant_id)
  end

  @doc "Save a message"
  def save_message(runtime, attrs) when is_map(attrs) do
    {persistence, persistence_state} = Runtime.get_persistence(runtime)
    message = Message.new(attrs)
    persistence.save_message(persistence_state, message)
  end

  @doc """
  Persist a message and emit the committed `jido.messaging.room.message_added` signal.

  This is the eventful command API for chat apps. `save_message/2` remains the
  low-level persistence primitive for migrations, imports, and tests that need
  no side effects.
  """
  def post_message(instance_module, runtime, attrs, opts \\ [])
      when is_atom(instance_module) and is_map(attrs) do
    opts = normalize_event_opts(opts)

    with {:ok, message} <- save_message(runtime, attrs),
         {:ok, signal} <- Jido.Messaging.Events.message_added(instance_module, message, signal_opts(message, opts)),
         {:ok, signal} <-
           Jido.Messaging.Dispatch.emit(instance_module, signal,
             telemetry_event: Jido.Messaging.Events.telemetry_event_for(:message_added),
             telemetry_metadata:
               telemetry_metadata(instance_module, :message_added, message.room_id, %{
                 message: message,
                 sender_id: message.sender_id,
                 thread_id: message.thread_id
               }),
             room_id: message.room_id,
             legacy_event: {:message_added, message}
           ) do
      {:ok, Jido.Messaging.CommandResult.new(message, [signal])}
    end
  end

  @doc "Get a message by ID"
  def get_message(runtime, message_id) do
    {persistence, persistence_state} = Runtime.get_persistence(runtime)
    persistence.get_message(persistence_state, message_id)
  end

  @doc "List messages for a room"
  def list_messages(runtime, room_id, opts \\ []) do
    {persistence, persistence_state} = Runtime.get_persistence(runtime)
    persistence.get_messages(persistence_state, room_id, opts)
  end

  @doc """
  Return raw timeline and thread grouping for a room.

  This helper returns canonical message structs grouped into top-level
  timeline messages, thread replies, and reply counts. Apps still own their
  user-facing chat context and UI projections.
  """
  def room_timeline(runtime, room_id, opts \\ []) do
    with {:ok, messages} <- list_messages(runtime, room_id, opts) do
      {:ok, Jido.Messaging.Query.room_timeline(messages)}
    end
  end

  @doc "Delete a message"
  def delete_message(runtime, message_id) do
    {persistence, persistence_state} = Runtime.get_persistence(runtime)
    persistence.delete_message(persistence_state, message_id)
  end

  @doc "Get or create room by external binding"
  def get_or_create_room_by_external_binding(runtime, channel, bridge_id, external_id, attrs) do
    {persistence, persistence_state} = Runtime.get_persistence(runtime)

    persistence.get_or_create_room_by_external_binding(
      persistence_state,
      channel,
      bridge_id,
      external_id,
      attrs
    )
  end

  @doc "Get or create participant by external ID"
  def get_or_create_participant_by_external_id(runtime, channel, external_id, attrs) do
    {persistence, persistence_state} = Runtime.get_persistence(runtime)
    persistence.get_or_create_participant_by_external_id(persistence_state, channel, external_id, attrs)
  end

  @doc "Get a message by its external ID within a channel/instance context"
  def get_message_by_external_id(runtime, channel, bridge_id, external_id) do
    {persistence, persistence_state} = Runtime.get_persistence(runtime)
    persistence.get_message_by_external_id(persistence_state, channel, bridge_id, external_id)
  end

  @doc "Update a message's external_id"
  def update_message_external_id(runtime, message_id, external_id) do
    {persistence, persistence_state} = Runtime.get_persistence(runtime)
    persistence.update_message_external_id(persistence_state, message_id, external_id)
  end

  @doc "Save an already-constructed message struct (for updates)"
  def save_message_struct(runtime, %Message{} = message) do
    {persistence, persistence_state} = Runtime.get_persistence(runtime)
    persistence.save_message(persistence_state, message)
  end

  @doc "Add a participant reaction to a message and emit a committed reaction signal."
  def add_reaction(instance_module, runtime, message_id, participant_id, reaction, opts \\ [])
      when is_atom(instance_module) and is_binary(message_id) and is_binary(participant_id) and
             is_binary(reaction) do
    update_reaction(
      instance_module,
      runtime,
      message_id,
      participant_id,
      reaction,
      :reaction_added,
      normalize_event_opts(opts)
    )
  end

  @doc "Remove a participant reaction from a message and emit a committed reaction signal."
  def remove_reaction(instance_module, runtime, message_id, participant_id, reaction, opts \\ [])
      when is_atom(instance_module) and is_binary(message_id) and is_binary(participant_id) and
             is_binary(reaction) do
    update_reaction(
      instance_module,
      runtime,
      message_id,
      participant_id,
      reaction,
      :reaction_removed,
      normalize_event_opts(opts)
    )
  end

  @doc "Dispatch a custom room-scoped Jido Signal event."
  def dispatch_room_event(instance_module, event_type, room_id, data \\ %{}, opts \\ [])
      when is_atom(instance_module) and is_atom(event_type) and is_binary(room_id) and is_map(data) do
    opts = normalize_event_opts(opts)

    with {:ok, signal} <- Jido.Messaging.Events.room_event(instance_module, event_type, room_id, data, opts),
         {:ok, signal} <-
           Jido.Messaging.Dispatch.emit(instance_module, signal,
             telemetry_event: Jido.Messaging.Events.telemetry_event_for(event_type),
             telemetry_metadata: telemetry_metadata(instance_module, event_type, room_id, data),
             room_id: room_id,
             legacy_event: Keyword.get(opts, :legacy_event)
           ) do
      {:ok, signal}
    end
  end

  @doc """
  Emit a canonical participant joined signal for a room.

  This is a transport-agnostic helper for apps that receive participant lifecycle
  information from Phoenix Presence, adapters, polling, or any other source.
  """
  def participant_joined(instance_module, room_id, participant_id, opts \\ [])
      when is_atom(instance_module) and is_binary(room_id) and is_binary(participant_id) do
    opts = normalize_event_opts(opts)

    dispatch_room_event(
      instance_module,
      :participant_joined,
      room_id,
      participant_event_data(participant_id, opts),
      opts
    )
  end

  @doc """
  Emit a canonical participant left signal for a room.
  """
  def participant_left(instance_module, room_id, participant_id, opts \\ [])
      when is_atom(instance_module) and is_binary(room_id) and is_binary(participant_id) do
    opts = normalize_event_opts(opts)

    dispatch_room_event(
      instance_module,
      :participant_left,
      room_id,
      participant_event_data(participant_id, opts),
      opts
    )
  end

  @doc """
  Emit a canonical participant presence changed signal.
  """
  def participant_presence_changed(instance_module, room_id, participant_id, from, to, opts \\ [])
      when is_atom(instance_module) and is_binary(room_id) and is_binary(participant_id) do
    opts = normalize_event_opts(opts)

    data =
      participant_event_data(participant_id, opts)
      |> Map.put(:from, from)
      |> Map.put(:to, to)

    dispatch_room_event(instance_module, :presence_changed, room_id, data, opts)
  end

  @doc """
  Emit a canonical participant typing signal.
  """
  def participant_typing(instance_module, room_id, participant_id, is_typing, opts \\ [])
      when is_atom(instance_module) and is_binary(room_id) and is_binary(participant_id) and
             is_boolean(is_typing) do
    opts = normalize_event_opts(opts)

    data =
      participant_event_data(participant_id, opts)
      |> Map.put(:is_typing, is_typing)
      |> maybe_put_data(:thread_id, Keyword.get(opts, :thread_id))

    dispatch_room_event(instance_module, :typing, room_id, data, opts)
  end

  @doc "Subscribe to Jido Signal events for an instance module."
  def subscribe_signals(instance_module, path \\ "jido.messaging.**", opts \\ [])
      when is_atom(instance_module) and is_binary(path) and is_list(opts) do
    Jido.Messaging.Dispatch.subscribe(instance_module, path, opts)
  end

  @doc "Unsubscribe from a Jido Signal Bus subscription."
  def unsubscribe_signals(instance_module, subscription_id, opts \\ [])
      when is_atom(instance_module) and is_list(opts) do
    Jido.Messaging.Dispatch.unsubscribe(instance_module, subscription_id, opts)
  end

  @doc "Save a thread"
  def save_thread(runtime, attrs) when is_map(attrs) do
    {persistence, persistence_state} = Runtime.get_persistence(runtime)
    thread = Thread.new(attrs)
    persistence.save_thread(persistence_state, thread)
  end

  @doc "Save an already-constructed thread struct (for updates)"
  def save_thread_struct(runtime, %Thread{} = thread) do
    {persistence, persistence_state} = Runtime.get_persistence(runtime)
    persistence.save_thread(persistence_state, thread)
  end

  @doc "Get a thread by ID"
  def get_thread(runtime, thread_id) do
    {persistence, persistence_state} = Runtime.get_persistence(runtime)
    persistence.get_thread(persistence_state, thread_id)
  end

  @doc "Get a thread by external thread ID"
  def get_thread_by_external_id(runtime, room_id, external_thread_id) do
    {persistence, persistence_state} = Runtime.get_persistence(runtime)
    persistence.get_thread_by_external_id(persistence_state, room_id, external_thread_id)
  end

  @doc "Get a thread by root message ID"
  def get_thread_by_root_message(runtime, room_id, root_message_id) do
    {persistence, persistence_state} = Runtime.get_persistence(runtime)
    persistence.get_thread_by_root_message(persistence_state, room_id, root_message_id)
  end

  @doc "List threads for a room"
  def list_threads(runtime, room_id, opts \\ []) do
    {persistence, persistence_state} = Runtime.get_persistence(runtime)
    persistence.list_threads(persistence_state, room_id, opts)
  end

  @doc "Get room by external binding (without creating)"
  def get_room_by_external_binding(runtime, channel, bridge_id, external_id) do
    {persistence, persistence_state} = Runtime.get_persistence(runtime)
    persistence.get_room_by_external_binding(persistence_state, channel, bridge_id, external_id)
  end

  @doc "Create a binding between an internal room and an external platform"
  def create_room_binding(runtime, room_id, channel, bridge_id, external_id, attrs) do
    {persistence, persistence_state} = Runtime.get_persistence(runtime)
    persistence.create_room_binding(persistence_state, room_id, channel, bridge_id, external_id, attrs)
  end

  @doc "List all bindings for a room"
  def list_room_bindings(runtime, room_id) do
    {persistence, persistence_state} = Runtime.get_persistence(runtime)
    persistence.list_room_bindings(persistence_state, room_id)
  end

  @doc "Delete a room binding"
  def delete_room_binding(runtime, binding_id) do
    {persistence, persistence_state} = Runtime.get_persistence(runtime)
    persistence.delete_room_binding(persistence_state, binding_id)
  end

  @doc "Lookup a single directory entry."
  def directory_lookup(runtime, target, query, opts \\ [])
      when is_atom(target) and is_map(query) and is_list(opts) do
    {persistence, persistence_state} = Runtime.get_persistence(runtime)
    persistence.directory_lookup(persistence_state, target, query, opts)
  end

  @doc "Search directory entries."
  def directory_search(runtime, target, query, opts \\ [])
      when is_atom(target) and is_map(query) and is_list(opts) do
    {persistence, persistence_state} = Runtime.get_persistence(runtime)
    persistence.directory_search(persistence_state, target, query, opts)
  end

  @doc "Start (or resume) an onboarding flow."
  def start_onboarding(instance_module, attrs, opts \\ [])
      when is_atom(instance_module) and is_map(attrs) and is_list(opts) do
    Onboarding.start(instance_module, attrs, opts)
  end

  @doc "Advance an onboarding flow."
  def advance_onboarding(instance_module, onboarding_id, transition, metadata \\ %{}, opts \\ [])
      when is_atom(instance_module) and is_binary(onboarding_id) and is_atom(transition) and is_map(metadata) and
             is_list(opts) do
    Onboarding.advance(instance_module, onboarding_id, transition, metadata, opts)
  end

  @doc "Resume an onboarding flow."
  def resume_onboarding(instance_module, onboarding_id)
      when is_atom(instance_module) and is_binary(onboarding_id) do
    Onboarding.resume(instance_module, onboarding_id)
  end

  @doc "Cancel an onboarding flow."
  def cancel_onboarding(instance_module, onboarding_id, metadata \\ %{}, opts \\ [])
      when is_atom(instance_module) and is_binary(onboarding_id) and is_map(metadata) and is_list(opts) do
    Onboarding.cancel(instance_module, onboarding_id, metadata, opts)
  end

  @doc "Complete an onboarding flow."
  def complete_onboarding(instance_module, onboarding_id, metadata \\ %{}, opts \\ [])
      when is_atom(instance_module) and is_binary(onboarding_id) and is_map(metadata) and is_list(opts) do
    Onboarding.complete(instance_module, onboarding_id, metadata, opts)
  end

  @doc "Fetch an onboarding flow."
  def get_onboarding(instance_module, onboarding_id)
      when is_atom(instance_module) and is_binary(onboarding_id) do
    Onboarding.get(instance_module, onboarding_id)
  end

  @doc "Find onboarding worker PID."
  def whereis_onboarding_worker(instance_module, onboarding_id)
      when is_atom(instance_module) and is_binary(onboarding_id) do
    Onboarding.whereis_worker(instance_module, onboarding_id)
  end

  @doc "Create or update bridge config."
  def put_bridge_config(instance_module, attrs)
      when is_atom(instance_module) and is_map(attrs) do
    ConfigStore.put_bridge_config(instance_module, attrs)
  end

  @doc "Fetch runtime status for one bridge worker."
  def bridge_status(instance_module, bridge_id)
      when is_atom(instance_module) and is_binary(bridge_id) do
    case Jido.Messaging.BridgeServer.whereis(instance_module, bridge_id) do
      nil -> {:error, :not_found}
      pid -> Jido.Messaging.BridgeServer.status(pid)
    end
  end

  @doc "List runtime status for all bridge workers."
  def list_bridge_status(instance_module) when is_atom(instance_module) do
    Jido.Messaging.BridgeSupervisor.list_bridges(instance_module)
  end

  @doc "Fetch bridge config by id."
  def get_bridge_config(instance_module, bridge_id)
      when is_atom(instance_module) and is_binary(bridge_id) do
    ConfigStore.get_bridge_config(instance_module, bridge_id)
  end

  @doc "List bridge configs."
  def list_bridge_configs(instance_module, opts \\ [])
      when is_atom(instance_module) and is_list(opts) do
    ConfigStore.list_bridge_configs(instance_module, opts)
  end

  @doc "Delete bridge config."
  def delete_bridge_config(instance_module, bridge_id)
      when is_atom(instance_module) and is_binary(bridge_id) do
    ConfigStore.delete_bridge_config(instance_module, bridge_id)
  end

  @doc "Ensure adapter-scoped provider ingress subscription for a bridge."
  def ensure_ingress_subscription(instance_module, bridge_id, opts \\ [])
      when is_atom(instance_module) and is_binary(bridge_id) and is_list(opts) do
    IngressSubscriptions.ensure(instance_module, bridge_id, opts)
  end

  @doc "List adapter-scoped provider ingress subscriptions for a bridge."
  def list_ingress_subscriptions(instance_module, bridge_id, opts \\ [])
      when is_atom(instance_module) and is_binary(bridge_id) and is_list(opts) do
    IngressSubscriptions.list(instance_module, bridge_id, opts)
  end

  @doc "Delete adapter-scoped provider ingress subscription for a bridge."
  def delete_ingress_subscription(instance_module, bridge_id, subscription_id, opts \\ [])
      when is_atom(instance_module) and is_binary(bridge_id) and is_binary(subscription_id) and is_list(opts) do
    IngressSubscriptions.delete(instance_module, bridge_id, subscription_id, opts)
  end

  @doc "Create or update room routing policy."
  def put_routing_policy(instance_module, room_id, attrs)
      when is_atom(instance_module) and is_binary(room_id) and is_map(attrs) do
    ConfigStore.put_routing_policy(instance_module, room_id, attrs)
  end

  @doc "Fetch room routing policy."
  def get_routing_policy(instance_module, room_id)
      when is_atom(instance_module) and is_binary(room_id) do
    ConfigStore.get_routing_policy(instance_module, room_id)
  end

  @doc "Delete room routing policy."
  def delete_routing_policy(instance_module, room_id)
      when is_atom(instance_module) and is_binary(room_id) do
    ConfigStore.delete_routing_policy(instance_module, room_id)
  end

  @doc "Register an agent with a room."
  def register_agent(instance_module, room_id, agent_spec, opts \\ [])
      when is_atom(instance_module) and is_binary(room_id) and is_map(agent_spec) and is_list(opts) do
    normalized_spec = normalize_agent_spec(agent_spec, opts)

    with {:ok, room_server} <- ensure_room_server(instance_module, room_id),
         {:ok, registered_spec} <- RoomServer.register_agent(room_server, normalized_spec) do
      restore_agent_assignments(instance_module, room_id, room_server, registered_spec)
      {:ok, registered_spec}
    end
  end

  @doc "Unregister an agent from a room."
  def unregister_agent(instance_module, room_id, agent_id)
      when is_atom(instance_module) and is_binary(room_id) and is_binary(agent_id) do
    runtime = runtime_name(instance_module)

    with {:ok, room_server} <- ensure_room_server(instance_module, room_id) do
      runtime
      |> assigned_thread_ids_for_agent(room_server, room_id, agent_id)
      |> Enum.reduce_while(:ok, fn thread_id, :ok ->
        with :ok <- stop_thread_runner(instance_module, room_id, thread_id, agent_id),
             _ <- clear_persisted_thread_assignment(runtime, room_id, thread_id),
             :ok <- room_server_unassign_thread(room_server, thread_id) do
          {:cont, :ok}
        else
          {:error, reason} -> {:halt, {:error, reason}}
        end
      end)
      |> case do
        :ok -> room_server_unregister_agent(room_server, agent_id)
        {:error, reason} -> {:error, reason}
      end
    end
  end

  @doc "List registered agents for a room."
  def list_agents(instance_module, room_id)
      when is_atom(instance_module) and is_binary(room_id) do
    with {:ok, room_server} <- ensure_room_server(instance_module, room_id) do
      {:ok, RoomServer.list_agents(room_server)}
    end
  end

  @doc "Assign a thread to an agent."
  def assign_thread(instance_module, room_id, thread_id, agent_id)
      when is_atom(instance_module) and is_binary(room_id) and is_binary(thread_id) and is_binary(agent_id) do
    runtime = runtime_name(instance_module)

    with {:ok, thread} <- get_thread(runtime, thread_id),
         :ok <- ensure_thread_room(thread, room_id),
         {:ok, room_server} <- ensure_room_server(instance_module, room_id),
         {:ok, agent_spec} <- room_server_get_agent(room_server, agent_id) do
      previous_agent_id = room_server_thread_assignment(room_server, thread_id) || thread.assigned_agent_id

      previous_agent_spec =
        registered_agent_spec(instance_module, room_id, thread_id, room_server, previous_agent_id)

      case AgentRunner.whereis(instance_module, room_id, thread_id, agent_id) do
        pid when is_pid(pid) and previous_agent_id == agent_id ->
          updated_thread = maybe_touch_thread_assignment(thread, agent_id)

          with {:ok, persisted_thread} <- persist_thread_assignment(runtime, updated_thread),
               :ok <- room_server_assign_thread(room_server, thread_id, agent_id) do
            {:ok, persisted_thread}
          else
            {:error, reason} ->
              _ = save_thread_struct(runtime, thread)
              {:error, reason}
          end

        _ ->
          updated_thread = %{thread | assigned_agent_id: agent_id, updated_at: DateTime.utc_now()}

          case stop_thread_runner(instance_module, room_id, thread_id, previous_agent_id) do
            :ok ->
              case assign_thread_state(
                     runtime,
                     room_server,
                     instance_module,
                     room_id,
                     thread_id,
                     agent_id,
                     agent_spec,
                     updated_thread
                   ) do
                {:ok, persisted_thread} ->
                  {:ok, persisted_thread}

                {:error, reason} ->
                  case rollback_thread_assignment(
                         instance_module,
                         runtime,
                         room_server,
                         thread,
                         previous_agent_spec
                       ) do
                    :ok -> {:error, reason}
                    {:error, rollback_reason} -> {:error, {:assignment_failed, reason, rollback_reason}}
                  end
              end

            {:error, reason} ->
              {:error, reason}
          end
      end
    end
  end

  @doc "Unassign a thread."
  def unassign_thread(instance_module, room_id, thread_id)
      when is_atom(instance_module) and is_binary(room_id) and is_binary(thread_id) do
    runtime = runtime_name(instance_module)

    with {:ok, thread} <- get_thread(runtime, thread_id),
         :ok <- ensure_thread_room(thread, room_id),
         {:ok, room_server} <- ensure_room_server(instance_module, room_id) do
      assigned_agent_id = room_server_thread_assignment(room_server, thread_id) || thread.assigned_agent_id

      assigned_agent_spec =
        registered_agent_spec(instance_module, room_id, thread_id, room_server, assigned_agent_id)

      case stop_thread_runner(instance_module, room_id, thread_id, assigned_agent_id) do
        :ok ->
          updated_thread = %{thread | assigned_agent_id: nil, updated_at: DateTime.utc_now()}

          case unassign_thread_state(runtime, room_server, updated_thread) do
            {:ok, persisted_thread} ->
              {:ok, persisted_thread}

            {:error, reason} ->
              case rollback_thread_assignment(
                     instance_module,
                     runtime,
                     room_server,
                     thread,
                     assigned_agent_spec
                   ) do
                :ok -> {:error, reason}
                {:error, rollback_reason} -> {:error, {:unassignment_failed, reason, rollback_reason}}
              end
          end

        {:error, reason} ->
          {:error, reason}
      end
    end
  end

  @doc "Fetch thread assignment for a room thread."
  def thread_assignment(instance_module, room_id, thread_id)
      when is_atom(instance_module) and is_binary(room_id) and is_binary(thread_id) do
    runtime = runtime_name(instance_module)

    with {:ok, thread} <- get_thread(runtime, thread_id),
         :ok <- ensure_thread_room(thread, room_id) do
      {:ok, resolve_thread_assignment(instance_module, room_id, thread_id, thread)}
    end
  end

  @doc "Route webhook payload through bridge-config parse/verify path into ingest."
  def route_webhook(instance_module, bridge_id, payload, opts \\ [])
      when is_atom(instance_module) and is_binary(bridge_id) and is_map(payload) and is_list(opts) do
    Jido.Messaging.InboundRouter.route_webhook(instance_module, bridge_id, payload, opts)
  end

  @doc "Route webhook request and return typed response + ingest outcome."
  def route_webhook_request(instance_module, bridge_id, request_meta, payload, opts \\ [])
      when is_atom(instance_module) and is_binary(bridge_id) and is_map(request_meta) and is_map(payload) and
             is_list(opts) do
    Jido.Messaging.InboundRouter.route_webhook_request(instance_module, bridge_id, request_meta, payload, opts)
  end

  @doc "Route direct payload through bridge-config transform path into ingest."
  def route_payload(instance_module, bridge_id, payload, opts \\ [])
      when is_atom(instance_module) and is_binary(bridge_id) and is_map(payload) and is_list(opts) do
    Jido.Messaging.InboundRouter.route_payload(instance_module, bridge_id, payload, opts)
  end

  @doc false
  def restore_agent_runners(instance_module) when is_atom(instance_module) do
    runtime = runtime_name(instance_module)
    room_supervisor = Module.concat(instance_module, :RoomSupervisor)

    if is_nil(Process.whereis(runtime)) or is_nil(Process.whereis(room_supervisor)) do
      {:error, :runtime_unavailable}
    else
      RoomSupervisor.list_rooms(instance_module)
      |> Enum.each(fn {room_id, room_server} ->
        agent_specs =
          room_server
          |> room_server_list_agents()
          |> case do
            {:ok, agents} -> Map.new(agents, &{&1.agent_id, &1})
            {:error, _reason} -> %{}
          end

        case list_threads(runtime, room_id) do
          {:ok, threads} ->
            Enum.each(threads, fn
              %Thread{id: thread_id, assigned_agent_id: agent_id}
              when is_binary(agent_id) and map_size(agent_specs) > 0 ->
                case Map.fetch(agent_specs, agent_id) do
                  {:ok, agent_spec} ->
                    _ = ensure_agent_runner(instance_module, room_id, thread_id, agent_id, agent_spec)
                    :ok

                  :error ->
                    :ok
                end

              _thread ->
                :ok
            end)

          {:error, _reason} ->
            :ok
        end
      end)

      :ok
    end
  end

  @doc false
  def restore_room_server_state(instance_module, room_id, room_server)
      when is_atom(instance_module) and is_binary(room_id) and is_pid(room_server) do
    instance_module
    |> AgentSupervisor.list_agents(room_id)
    |> Enum.each(fn {{thread_id, agent_id}, pid} ->
      case runner_agent_spec(pid, agent_id) do
        {:ok, agent_spec} ->
          _ = room_server_register_agent(room_server, agent_spec)
          _ = room_server_assign_thread(room_server, thread_id, agent_id)
          :ok

        {:error, _reason} ->
          :ok
      end
    end)

    :ok
  end

  @doc """
  Create or ensure a bridge-backed room topology in one idempotent call.

  This helper ensures:
    * optional bridge configs are upserted
    * room exists
    * bridge-scoped room bindings exist
    * optional routing policy exists
  """
  def create_bridge_room(instance_module, attrs)
      when is_atom(instance_module) and is_map(attrs) do
    spec = BridgeRoomSpec.new(attrs)
    room_id = spec.room_id || "bridge:" <> Jido.Chat.ID.generate!()
    runtime = runtime_name(instance_module)

    with :ok <- TopologyValidator.validate_bridge_room_spec(instance_module, spec),
         :ok <- ensure_bridge_configs(instance_module, spec.bridge_configs),
         {:ok, room} <- ensure_room(runtime, room_id, spec),
         {:ok, _bindings} <- ensure_bindings(runtime, instance_module, room_id, spec.bindings),
         {:ok, _policy} <- ensure_routing_policy(instance_module, room_id, spec.routing_policy) do
      {:ok, room}
    end
  end

  defp update_reaction(instance_module, runtime, message_id, participant_id, reaction, event_type, opts) do
    with {:ok, message} <- get_message(runtime, message_id) do
      existing_reactions = message.reactions || %{}
      current_participants = existing_reactions |> Map.get(reaction, []) |> List.wrap() |> Enum.map(&to_string/1)

      participants =
        case event_type do
          :reaction_added -> [participant_id | current_participants] |> Enum.uniq() |> Enum.sort()
          :reaction_removed -> Enum.reject(current_participants, &(&1 == participant_id))
        end

      reactions =
        if participants == [] do
          Map.delete(existing_reactions, reaction)
        else
          Map.put(existing_reactions, reaction, participants)
        end

      if reactions == existing_reactions do
        {:ok, Jido.Messaging.CommandResult.new(message, [])}
      else
        updated_message = %{message | reactions: reactions, updated_at: DateTime.utc_now()}

        with {:ok, updated_message} <- save_message_struct(runtime, updated_message),
             {:ok, signal} <-
               reaction_signal(event_type, instance_module, updated_message, participant_id, reaction, opts),
             {:ok, signal} <-
               Jido.Messaging.Dispatch.emit(instance_module, signal,
                 telemetry_event: Jido.Messaging.Events.telemetry_event_for(event_type),
                 telemetry_metadata:
                   telemetry_metadata(instance_module, event_type, updated_message.room_id, %{
                     message_id: updated_message.id,
                     participant_id: participant_id,
                     reaction: reaction,
                     message: updated_message
                   }),
                 room_id: updated_message.room_id,
                 legacy_event:
                   {event_type,
                    %{
                      message_id: updated_message.id,
                      participant_id: participant_id,
                      reaction: reaction,
                      message: updated_message
                    }}
               ) do
          {:ok, Jido.Messaging.CommandResult.new(updated_message, [signal])}
        end
      end
    end
  end

  defp reaction_signal(:reaction_added, instance_module, message, participant_id, reaction, opts) do
    Jido.Messaging.Events.reaction_added(instance_module, message, participant_id, reaction, signal_opts(message, opts))
  end

  defp reaction_signal(:reaction_removed, instance_module, message, participant_id, reaction, opts) do
    Jido.Messaging.Events.reaction_removed(
      instance_module,
      message,
      participant_id,
      reaction,
      signal_opts(message, opts)
    )
  end

  defp signal_opts(%Message{} = message, opts) do
    opts
    |> Keyword.put_new(:message_id, message.id)
    |> Keyword.put_new(:correlation_id, message.id)
    |> Keyword.put_new(:external_message_id, message.external_id)
    |> Keyword.put_new(:external_thread_id, message.external_thread_id)
    |> Keyword.put_new(:delivery_external_room_id, message.delivery_external_room_id)
  end

  defp normalize_event_opts(opts), do: Jido.Messaging.EventOptions.normalize(opts, :command)

  defp telemetry_metadata(instance_module, event_type, room_id, data) do
    data
    |> Map.merge(%{
      room_id: room_id,
      instance_module: instance_module,
      timestamp: DateTime.utc_now(),
      correlation_id:
        data[:message_id] || data["message_id"] || data[:participant_id] || data["participant_id"] ||
          "corr_" <> Base.encode16(:crypto.strong_rand_bytes(8), case: :lower)
    })
    |> Map.put_new(:event_type, event_type)
  end

  defp participant_event_data(participant_id, opts) do
    %{
      participant_id: participant_id,
      session_id: Keyword.get(opts, :session_id),
      presence: Keyword.get(opts, :presence),
      source: Keyword.get(opts, :source)
    }
    |> maybe_put_data(:metadata, Keyword.get(opts, :metadata))
    |> maybe_put_data(:reason, Keyword.get(opts, :reason))
    |> Map.reject(fn {_key, value} -> is_nil(value) end)
  end

  defp maybe_put_data(data, _key, nil), do: data
  defp maybe_put_data(data, key, value), do: Map.put(data, key, value)

  defp ensure_bridge_configs(_instance_module, []), do: :ok

  defp ensure_bridge_configs(instance_module, bridge_configs) when is_list(bridge_configs) do
    Enum.reduce_while(bridge_configs, :ok, fn config, :ok ->
      attrs = normalize_bridge_config_attrs(config)

      case put_bridge_config(instance_module, attrs) do
        {:ok, _bridge} -> {:cont, :ok}
        {:error, reason} -> {:halt, {:error, {:bridge_config_failed, reason, config}}}
      end
    end)
  end

  defp ensure_room(runtime, room_id, %BridgeRoomSpec{} = spec) do
    case get_room(runtime, room_id) do
      {:ok, room} ->
        {:ok, room}

      {:error, :not_found} ->
        save_room(
          runtime,
          Room.new(%{
            id: room_id,
            type: spec.room_type,
            name: spec.room_name,
            metadata: spec.room_metadata
          })
        )

      {:error, reason} ->
        {:error, {:room_lookup_failed, reason}}
    end
  end

  defp ensure_bindings(_runtime, _instance_module, _room_id, []), do: {:ok, []}

  defp ensure_bindings(runtime, instance_module, room_id, bindings) when is_list(bindings) do
    Enum.reduce_while(bindings, {:ok, []}, fn binding, {:ok, acc} ->
      with {:ok, normalized} <- normalize_binding(binding),
           {:ok, _bridge} <- ensure_binding_bridge_enabled(instance_module, normalized.bridge_id),
           {:ok, result} <- ensure_binding(runtime, room_id, normalized) do
        {:cont, {:ok, [result | acc]}}
      else
        {:error, reason} -> {:halt, {:error, reason}}
      end
    end)
    |> case do
      {:ok, results} -> {:ok, Enum.reverse(results)}
      {:error, _reason} = error -> error
    end
  end

  defp ensure_binding(runtime, room_id, normalized) do
    case get_room_by_external_binding(
           runtime,
           normalized.channel,
           normalized.bridge_id,
           normalized.external_room_id
         ) do
      {:ok, %Room{id: ^room_id}} ->
        {:ok, :existing}

      {:ok, %Room{id: existing_room_id}} ->
        {:error,
         {:binding_conflict,
          %{
            room_id: room_id,
            existing_room_id: existing_room_id,
            channel: normalized.channel,
            bridge_id: normalized.bridge_id,
            external_room_id: normalized.external_room_id
          }}}

      {:error, :not_found} ->
        create_room_binding(
          runtime,
          room_id,
          normalized.channel,
          normalized.bridge_id,
          normalized.external_room_id,
          normalized.attrs
        )

      {:error, reason} ->
        {:error, {:binding_lookup_failed, reason}}
    end
  end

  defp ensure_binding_bridge_enabled(instance_module, bridge_id) when is_binary(bridge_id) do
    case get_bridge_config(instance_module, bridge_id) do
      {:ok, %{enabled: true} = config} -> {:ok, config}
      {:ok, %{enabled: false}} -> {:error, {:bridge_disabled, bridge_id}}
      {:error, :not_found} -> {:error, {:bridge_not_found, bridge_id}}
    end
  end

  defp ensure_routing_policy(_instance_module, _room_id, policy) when policy in [%{}, nil], do: {:ok, nil}

  defp ensure_routing_policy(instance_module, room_id, policy) when is_map(policy) do
    attrs = Map.put(policy, :room_id, room_id)

    case put_routing_policy(instance_module, room_id, attrs) do
      {:ok, routing_policy} -> {:ok, routing_policy}
      {:error, reason} -> {:error, {:routing_policy_failed, reason}}
    end
  end

  defp normalize_bridge_config_attrs(config) when is_map(config) do
    normalized =
      Enum.reduce(config, %{}, fn
        {key, value}, acc when is_atom(key) -> Map.put(acc, key, value)
        {key, value}, acc when is_binary(key) -> put_bridge_config_string_key(acc, key, value)
        {_key, _value}, acc -> acc
      end)

    normalized
    |> Map.put_new(:enabled, true)
    |> Map.put_new(:opts, %{})
    |> Map.put_new(:credentials, %{})
  end

  defp put_bridge_config_string_key(acc, key, value) when is_map(acc) and is_binary(key) do
    case bridge_config_key_to_atom(key) do
      nil -> Map.put(acc, key, value)
      atom_key -> Map.put(acc, atom_key, value)
    end
  end

  defp bridge_config_key_to_atom("id"), do: :id
  defp bridge_config_key_to_atom("adapter_module"), do: :adapter_module
  defp bridge_config_key_to_atom("adapter"), do: :adapter_module
  defp bridge_config_key_to_atom("credentials"), do: :credentials
  defp bridge_config_key_to_atom("opts"), do: :opts
  defp bridge_config_key_to_atom("enabled"), do: :enabled
  defp bridge_config_key_to_atom("capabilities"), do: :capabilities
  defp bridge_config_key_to_atom("delivery_policy"), do: :delivery_policy
  defp bridge_config_key_to_atom("revision"), do: :revision
  defp bridge_config_key_to_atom("inserted_at"), do: :inserted_at
  defp bridge_config_key_to_atom("updated_at"), do: :updated_at
  defp bridge_config_key_to_atom(_), do: nil

  defp normalize_binding(binding) when is_map(binding) do
    channel = map_get(binding, :channel) |> normalize_channel()
    bridge_id = map_get(binding, :bridge_id) |> normalize_id()
    external_room_id = map_get(binding, :external_room_id) |> normalize_id()
    direction = map_get(binding, :direction) |> normalize_direction()
    enabled = map_get(binding, :enabled) |> normalize_bool()

    with true <- is_atom(channel),
         true <- is_binary(bridge_id),
         true <- is_binary(external_room_id) do
      attrs =
        binding
        |> drop_keys([
          :channel,
          "channel",
          :bridge_id,
          "bridge_id",
          :external_room_id,
          "external_room_id",
          :direction,
          "direction",
          :enabled,
          "enabled"
        ])
        |> put_attr(:direction, direction)
        |> put_attr(:enabled, enabled)

      {:ok,
       %{
         channel: channel,
         bridge_id: bridge_id,
         external_room_id: external_room_id,
         attrs: attrs
       }}
    else
      _ -> {:error, {:invalid_binding, binding}}
    end
  end

  defp map_get(map, atom_key) when is_map(map) and is_atom(atom_key),
    do: Map.get(map, atom_key, Map.get(map, Atom.to_string(atom_key)))

  defp drop_keys(map, keys), do: Enum.reduce(keys, map, &Map.delete(&2, &1))

  defp put_attr(attrs, key, value) when is_map(attrs), do: Map.put(attrs, key, value)

  defp normalize_channel(:telegram), do: :telegram
  defp normalize_channel(:discord), do: :discord
  defp normalize_channel("telegram"), do: :telegram
  defp normalize_channel("discord"), do: :discord
  defp normalize_channel(_), do: nil

  defp normalize_direction(:inbound), do: :inbound
  defp normalize_direction(:outbound), do: :outbound
  defp normalize_direction("inbound"), do: :inbound
  defp normalize_direction("outbound"), do: :outbound
  defp normalize_direction(_), do: :both

  defp normalize_bool(true), do: true
  defp normalize_bool(false), do: false
  defp normalize_bool("true"), do: true
  defp normalize_bool("false"), do: false
  defp normalize_bool("1"), do: true
  defp normalize_bool("0"), do: false
  defp normalize_bool(1), do: true
  defp normalize_bool(0), do: false
  defp normalize_bool(_), do: true

  defp normalize_id(value) when is_binary(value) and value != "", do: value
  defp normalize_id(value) when is_integer(value), do: Integer.to_string(value)
  defp normalize_id(value) when is_atom(value), do: Atom.to_string(value)
  defp normalize_id(_), do: nil

  defp normalize_agent_spec(agent_spec, opts) when is_map(agent_spec) and is_list(opts) do
    agent_id =
      map_get(agent_spec, :agent_id) ||
        map_get(agent_spec, :id) ||
        raise ArgumentError, "agent_spec requires :agent_id"

    name = map_get(agent_spec, :name) || agent_id

    mention_handles =
      agent_spec
      |> map_get(:mention_handles)
      |> case do
        handles when is_list(handles) -> handles
        _ -> [agent_id, name]
      end
      |> Enum.map(&normalize_id/1)
      |> Enum.reject(&is_nil/1)
      |> Enum.map(&String.downcase/1)
      |> Enum.uniq()

    agent_spec
    |> Map.put(:agent_id, normalize_id(agent_id))
    |> Map.put(:name, normalize_id(name) || to_string(name))
    |> Map.put(:mention_handles, mention_handles)
    |> Map.put_new(:trigger, Keyword.get(opts, :trigger, :thread))
  end

  defp restore_agent_assignments(instance_module, room_id, room_server, agent_spec) do
    runtime = runtime_name(instance_module)
    agent_id = agent_spec.agent_id

    with {:ok, threads} <- list_threads(runtime, room_id) do
      Enum.each(threads, fn
        %Thread{id: thread_id, assigned_agent_id: ^agent_id} ->
          case room_server_thread_assignment(room_server, thread_id) do
            nil ->
              _ = room_server_assign_thread(room_server, thread_id, agent_id)
              _ = ensure_agent_runner(instance_module, room_id, thread_id, agent_id, agent_spec)

            ^agent_id ->
              _ = ensure_agent_runner(instance_module, room_id, thread_id, agent_id, agent_spec)

            _other ->
              :ok
          end

        _thread ->
          :ok
      end)
    end

    :ok
  end

  defp assigned_thread_ids_for_agent(runtime, room_server, room_id, agent_id) do
    in_memory_thread_ids =
      case room_server_list_thread_assignments(room_server) do
        {:ok, assignments} ->
          Enum.flat_map(assignments, fn
            {thread_id, ^agent_id} -> [thread_id]
            {_thread_id, _assigned_agent_id} -> []
          end)

        {:error, _reason} ->
          []
      end

    persisted_thread_ids =
      case list_threads(runtime, room_id) do
        {:ok, threads} ->
          Enum.flat_map(threads, fn
            %Thread{id: thread_id, assigned_agent_id: ^agent_id} -> [thread_id]
            _thread -> []
          end)

        {:error, _reason} ->
          []
      end

    Enum.uniq(in_memory_thread_ids ++ persisted_thread_ids)
  end

  defp ensure_agent_runner(instance_module, room_id, thread_id, agent_id, agent_spec) do
    case AgentRunner.whereis(instance_module, room_id, thread_id, agent_id) do
      pid when is_pid(pid) ->
        {:ok, pid}

      nil ->
        start_thread_runner(instance_module, room_id, thread_id, agent_id, agent_spec)
    end
  end

  defp clear_persisted_thread_assignment(runtime, room_id, thread_id) do
    with {:ok, thread} <- get_thread(runtime, thread_id),
         :ok <- ensure_thread_room(thread, room_id) do
      updated_thread = %{thread | assigned_agent_id: nil, updated_at: DateTime.utc_now()}
      save_thread_struct(runtime, updated_thread)
    else
      _ -> :ok
    end
  end

  defp persist_thread_assignment(runtime, %Thread{} = thread) do
    save_thread_struct(runtime, thread)
  end

  defp resolve_thread_assignment(instance_module, room_id, thread_id, %Thread{} = thread) do
    case ensure_room_server(instance_module, room_id) do
      {:ok, room_server} ->
        case room_server_thread_assignment_result(room_server, thread_id) do
          {:ok, assignment} -> assignment || thread.assigned_agent_id
          {:error, _reason} -> thread.assigned_agent_id
        end

      {:error, _reason} ->
        thread.assigned_agent_id
    end
  end

  defp assign_thread_state(
         runtime,
         room_server,
         instance_module,
         room_id,
         thread_id,
         agent_id,
         agent_spec,
         updated_thread
       ) do
    with {:ok, persisted_thread} <- persist_thread_assignment(runtime, updated_thread),
         :ok <- room_server_assign_thread(room_server, thread_id, agent_id),
         {:ok, _pid} <- start_thread_runner(instance_module, room_id, thread_id, agent_id, agent_spec) do
      {:ok, persisted_thread}
    end
  end

  defp unassign_thread_state(runtime, room_server, updated_thread) do
    with {:ok, persisted_thread} <- save_thread_struct(runtime, updated_thread),
         :ok <- room_server_unassign_thread(room_server, updated_thread.id) do
      {:ok, persisted_thread}
    end
  end

  defp rollback_thread_assignment(
         instance_module,
         runtime,
         room_server,
         %Thread{} = original_thread,
         previous_agent_spec
       ) do
    with {:ok, _thread} <- save_thread_struct(runtime, original_thread),
         :ok <- restore_room_server_assignment(room_server, original_thread, previous_agent_spec),
         :ok <- restore_thread_runner(instance_module, original_thread, previous_agent_spec) do
      :ok
    end
  end

  defp restore_room_server_assignment(room_server, %Thread{assigned_agent_id: agent_id} = thread, agent_spec)
       when is_binary(agent_id) and is_map(agent_spec) do
    with {:ok, _registered_agent} <- room_server_register_agent(room_server, agent_spec),
         :ok <- restore_room_server_assignment(room_server, %{thread | assigned_agent_id: agent_id}, nil) do
      :ok
    end
  end

  defp restore_room_server_assignment(room_server, %Thread{id: thread_id, assigned_agent_id: agent_id}, _agent_spec)
       when is_binary(agent_id) do
    room_server_assign_thread(room_server, thread_id, agent_id)
  end

  defp restore_room_server_assignment(room_server, %Thread{id: thread_id}, _agent_spec) do
    room_server_unassign_thread(room_server, thread_id)
  end

  defp restore_thread_runner(_instance_module, %Thread{assigned_agent_id: nil}, _previous_agent_spec), do: :ok

  defp restore_thread_runner(
         instance_module,
         %Thread{room_id: room_id, id: thread_id, assigned_agent_id: agent_id},
         agent_spec
       )
       when is_binary(agent_id) and is_map(agent_spec) do
    case AgentRunner.whereis(instance_module, room_id, thread_id, agent_id) do
      pid when is_pid(pid) ->
        :ok

      nil ->
        case start_thread_runner(instance_module, room_id, thread_id, agent_id, agent_spec) do
          {:ok, _pid} -> :ok
          {:error, reason} -> {:error, reason}
        end
    end
  end

  defp restore_thread_runner(_instance_module, %Thread{assigned_agent_id: _agent_id}, _previous_agent_spec), do: :ok

  defp registered_agent_spec(_instance_module, _room_id, _thread_id, _room_server, nil), do: nil

  defp registered_agent_spec(instance_module, room_id, thread_id, room_server, agent_id) when is_binary(agent_id) do
    case room_server_get_agent(room_server, agent_id) do
      {:ok, agent_spec} ->
        agent_spec

      {:error, _reason} ->
        case AgentRunner.whereis(instance_module, room_id, thread_id, agent_id) do
          pid when is_pid(pid) ->
            case runner_agent_spec(pid, agent_id) do
              {:ok, agent_spec} -> agent_spec
              {:error, _reason} -> nil
            end

          nil ->
            nil
        end
    end
  end

  defp start_thread_runner(instance_module, room_id, thread_id, agent_id, agent_spec) do
    case AgentSupervisor.start_agent(instance_module, room_id, thread_id, agent_id, agent_spec) do
      {:ok, pid} -> {:ok, pid}
      {:error, reason} -> {:error, {:start_agent_failed, reason}}
    end
  end

  defp stop_thread_runner(_instance_module, _room_id, _thread_id, nil), do: :ok

  defp stop_thread_runner(instance_module, room_id, thread_id, agent_id) when is_binary(agent_id) do
    case AgentSupervisor.stop_agent(instance_module, room_id, thread_id, agent_id) do
      :ok -> :ok
      {:error, :not_found} -> :ok
      {:error, reason} -> {:error, {:stop_agent_failed, reason}}
    end
  end

  defp room_server_get_agent(room_server, agent_id) do
    try do
      RoomServer.get_agent(room_server, agent_id)
    catch
      :exit, reason -> {:error, {:room_server_unavailable, reason}}
    end
  end

  defp room_server_assign_thread(room_server, thread_id, agent_id) do
    try do
      RoomServer.assign_thread(room_server, thread_id, agent_id)
    catch
      :exit, reason -> {:error, {:room_server_unavailable, reason}}
    end
  end

  defp room_server_unassign_thread(room_server, thread_id) do
    try do
      RoomServer.unassign_thread(room_server, thread_id)
    catch
      :exit, reason -> {:error, {:room_server_unavailable, reason}}
    end
  end

  defp room_server_thread_assignment(room_server, thread_id) do
    case room_server_thread_assignment_result(room_server, thread_id) do
      {:ok, assignment} -> assignment
      {:error, _reason} -> nil
    end
  end

  defp room_server_thread_assignment_result(room_server, thread_id) do
    try do
      {:ok, RoomServer.thread_assignment(room_server, thread_id)}
    catch
      :exit, reason -> {:error, {:room_server_unavailable, reason}}
    end
  end

  defp room_server_list_thread_assignments(room_server) do
    try do
      {:ok, RoomServer.list_thread_assignments(room_server)}
    catch
      :exit, reason -> {:error, {:room_server_unavailable, reason}}
    end
  end

  defp room_server_list_agents(room_server) do
    try do
      {:ok, RoomServer.list_agents(room_server)}
    catch
      :exit, reason -> {:error, {:room_server_unavailable, reason}}
    end
  end

  defp room_server_register_agent(room_server, agent_spec) when is_map(agent_spec) do
    try do
      RoomServer.register_agent(room_server, agent_spec)
    catch
      :exit, reason -> {:error, {:room_server_unavailable, reason}}
    end
  end

  defp room_server_unregister_agent(room_server, agent_id) do
    try do
      RoomServer.unregister_agent(room_server, agent_id)
    catch
      :exit, reason -> {:error, {:room_server_unavailable, reason}}
    end
  end

  defp runner_agent_spec(pid, fallback_agent_id) when is_pid(pid) and is_binary(fallback_agent_id) do
    try do
      case AgentRunner.get_state(pid) do
        %AgentRunner{agent_id: agent_id, agent_config: agent_config} when is_map(agent_config) ->
          {:ok,
           agent_config
           |> Map.put(:agent_id, agent_id || fallback_agent_id)
           |> Map.put_new(:name, fallback_agent_id)}

        _other ->
          {:error, :runner_state_unavailable}
      end
    catch
      :exit, reason -> {:error, {:runner_unavailable, reason}}
    end
  end

  defp maybe_touch_thread_assignment(%Thread{assigned_agent_id: agent_id} = thread, agent_id), do: thread

  defp maybe_touch_thread_assignment(%Thread{} = thread, agent_id) do
    %{thread | assigned_agent_id: agent_id, updated_at: DateTime.utc_now()}
  end

  defp ensure_thread_room(%Thread{room_id: room_id}, room_id), do: :ok
  defp ensure_thread_room(%Thread{}, _room_id), do: {:error, :thread_room_mismatch}

  defp ensure_room_server(instance_module, room_id) do
    runtime = runtime_name(instance_module)

    with {:ok, room} <- get_room(runtime, room_id) do
      RoomSupervisor.get_or_start_room(instance_module, room)
    end
  end

  defp runtime_name(instance_module), do: Module.concat(instance_module, :Runtime)

  @doc "List running bridge workers for an instance module."
  def list_bridges(instance_module) when is_atom(instance_module) do
    Jido.Messaging.BridgeSupervisor.list_bridges(instance_module)
  end

  @doc "Resolve configured outbound adapter routes for a room."
  def resolve_outbound_routes(instance_module, room_id, opts \\ [])
      when is_atom(instance_module) and is_binary(room_id) and is_list(opts) do
    Jido.Messaging.OutboundRouter.resolve_routes(instance_module, room_id, opts)
  end

  @doc "Route outbound text through bridge bindings/policy for a room."
  def route_outbound(instance_module, room_id, text, opts \\ [])
      when is_atom(instance_module) and is_binary(room_id) and is_binary(text) and is_list(opts) do
    Jido.Messaging.OutboundRouter.route_outbound(instance_module, room_id, text, opts)
  end
end