Skip to main content

lib/jido_chat.ex

defmodule Jido.Chat do
  @moduledoc """
  Core adapter-contract facade and lightweight event-loop state container.

  `jido_chat` owns canonical chat types, adapter contracts, typed handles, and
  deterministic fallback behavior. It does not define the supervised runtime or
  process tree for production messaging systems; that responsibility belongs in
  `jido_messaging`.
  """

  alias Jido.Chat.{
    ActionEvent,
    Adapter,
    AdapterRegistry,
    AssistantContextChangedEvent,
    AssistantThreadStartedEvent,
    Author,
    CapabilityMatrix,
    ChannelRef,
    Concurrency,
    Emoji,
    EventRouter,
    EventEnvelope,
    Errors,
    IngressResult,
    Incoming,
    Message,
    ModalCloseEvent,
    ModalSubmitEvent,
    Participant,
    ReactionEvent,
    Room,
    Serialization,
    SlashCommandEvent,
    StateAdapter,
    Thread,
    WebhookPipeline,
    WebhookRequest,
    WebhookResponse
  }

  alias Jido.Chat.Errors.Ingress, as: IngressError

  alias Jido.Chat.Content.Text

  @dialyzer {:nowarn_function, normalize_ingress_event: 1}
  @dialyzer {:nowarn_function, normalize_ingress_response: 1}
  @dialyzer {:nowarn_function, normalize_ingress_request: 3}

  @typedoc "Mention handler callback."
  @type mention_handler ::
          (Thread.t(), Incoming.t() -> term()) | (t(), Thread.t(), Incoming.t() -> t() | term())
  @typedoc "Regex-routed message handler callback."
  @type message_handler :: mention_handler()
  @typedoc "Subscribed-thread handler callback."
  @type subscribed_handler :: mention_handler()

  @typedoc "Reaction event handler callback."
  @type reaction_handler ::
          (ReactionEvent.t() -> term()) | (t(), ReactionEvent.t() -> t() | term())
  @typedoc "Action event handler callback."
  @type action_handler :: (ActionEvent.t() -> term()) | (t(), ActionEvent.t() -> t() | term())

  @typedoc "Modal submit handler callback."
  @type modal_submit_handler ::
          (ModalSubmitEvent.t() -> term()) | (t(), ModalSubmitEvent.t() -> t() | term())

  @typedoc "Modal close handler callback."
  @type modal_close_handler ::
          (ModalCloseEvent.t() -> term()) | (t(), ModalCloseEvent.t() -> t() | term())

  @typedoc "Slash command handler callback."
  @type slash_command_handler ::
          (SlashCommandEvent.t() -> term()) | (t(), SlashCommandEvent.t() -> t() | term())

  @typedoc "Assistant thread started handler callback."
  @type assistant_thread_started_handler ::
          (AssistantThreadStartedEvent.t() -> term())
          | (t(), AssistantThreadStartedEvent.t() -> t() | term())

  @typedoc "Assistant context changed handler callback."
  @type assistant_context_changed_handler ::
          (AssistantContextChangedEvent.t() -> term())
          | (t(), AssistantContextChangedEvent.t() -> t() | term())

  @type handlers :: %{
          mention: [mention_handler()],
          message: [{Regex.t(), message_handler()}],
          subscribed: [subscribed_handler()],
          reaction: [reaction_handler()],
          action: [action_handler()],
          modal_submit: [modal_submit_handler()],
          modal_close: [modal_close_handler()],
          slash_command: [slash_command_handler()],
          assistant_thread_started: [assistant_thread_started_handler()],
          assistant_context_changed: [assistant_context_changed_handler()]
        }

  @type webhook_handler ::
          (t(), map(), keyword() -> {:ok, t(), Incoming.t()} | {:error, term()})

  @type webhook_request_handler ::
          (WebhookRequest.t() | map(), keyword() ->
             {:ok, t(), EventEnvelope.t() | nil, WebhookResponse.t()})

  @type webhook_request_handler_with_chat ::
          (t(), WebhookRequest.t() | map(), keyword() ->
             {:ok, t(), EventEnvelope.t() | nil, WebhookResponse.t()})

  @type route_request_handler ::
          (WebhookRequest.t() | map(), keyword() ->
             {:ok, IngressResult.t()} | {:error, Exception.t()})

  @type t :: %__MODULE__{
          id: String.t(),
          user_name: String.t(),
          adapters: %{optional(atom()) => module()},
          state_adapter: module(),
          state: term(),
          subscriptions: MapSet.t(String.t()),
          dedupe: MapSet.t({atom(), String.t()}),
          dedupe_order: [{atom(), String.t()}],
          handlers: handlers(),
          metadata: map(),
          thread_state: %{optional(String.t()) => map()},
          channel_state: %{optional(String.t()) => map()},
          initialized: boolean()
        }

  @default_handlers %{
    mention: [],
    message: [],
    subscribed: [],
    reaction: [],
    action: [],
    modal_submit: [],
    modal_close: [],
    slash_command: [],
    assistant_thread_started: [],
    assistant_context_changed: []
  }

  @schema Zoi.struct(
            __MODULE__,
            %{
              id: Zoi.string(),
              user_name: Zoi.string() |> Zoi.default("bot"),
              adapters: Zoi.map() |> Zoi.default(%{}),
              state_adapter: Zoi.any() |> Zoi.default(Jido.Chat.StateAdapters.Memory),
              state: Zoi.any() |> Zoi.nullish(),
              subscriptions: Zoi.any() |> Zoi.default(MapSet.new()),
              dedupe: Zoi.any() |> Zoi.default(MapSet.new()),
              dedupe_order: Zoi.list() |> Zoi.default([]),
              handlers: Zoi.map() |> Zoi.default(@default_handlers),
              metadata: Zoi.map() |> Zoi.default(%{}),
              thread_state: Zoi.map() |> Zoi.default(%{}),
              channel_state: Zoi.map() |> Zoi.default(%{}),
              initialized: Zoi.boolean() |> Zoi.default(false)
            },
            coerce: true
          )

  @enforce_keys Zoi.Struct.enforce_keys(@schema)
  defstruct Zoi.Struct.struct_fields(@schema)

  @doc "Returns the Zoi schema for Chat."
  def schema, do: @schema

  @doc """
  Creates a new chat state struct.

  Supported options:
    * `:id`
    * `:user_name`
    * `:adapters` - map `%{telegram: Jido.Chat.Telegram.Adapter, ...}`
    * `:metadata`
    * `:state_adapter` - state backend module, defaults to `Jido.Chat.StateAdapters.Memory`
    * `:state_opts` - adapter-specific initialization options
    * `:state` - explicit adapter state, overrides legacy snapshot inputs
  """
  @spec new(keyword() | map()) :: t()
  def new(opts \\ [])

  def new(opts) when is_list(opts), do: opts |> Map.new() |> new()

  def new(opts) when is_map(opts) do
    state_adapter =
      opts[:state_adapter] || opts["state_adapter"] || Jido.Chat.StateAdapters.Memory

    state_adapter = Jido.Chat.Wire.decode_module(state_adapter) || Jido.Chat.StateAdapters.Memory
    state_opts = opts[:state_opts] || opts["state_opts"] || []

    state_snapshot =
      opts
      |> initial_state_snapshot()
      |> maybe_replace_with_explicit_state(state_adapter, opts[:state] || opts["state"])

    state =
      case opts[:state] || opts["state"] do
        nil -> StateAdapter.init(state_adapter, state_snapshot, state_opts)
        explicit_state -> explicit_state
      end

    normalized_state_snapshot = StateAdapter.snapshot(state_adapter, state)

    attrs = %{
      id: opts[:id] || opts["id"] || Jido.Chat.ID.generate!(),
      user_name: opts[:user_name] || opts["user_name"] || "bot",
      adapters: AdapterRegistry.normalize_adapters(opts[:adapters] || opts["adapters"] || %{}),
      state_adapter: state_adapter,
      state: state,
      metadata: opts[:metadata] || opts["metadata"] || %{},
      subscriptions: normalized_state_snapshot.subscriptions,
      dedupe: normalized_state_snapshot.dedupe,
      dedupe_order: normalized_state_snapshot.dedupe_order,
      thread_state: normalized_state_snapshot.thread_state,
      channel_state: normalized_state_snapshot.channel_state
    }

    Jido.Chat.Schema.parse!(__MODULE__, @schema, attrs)
  end

  @doc "Marks chat instance as initialized and initializes adapters when available."
  @spec initialize(t()) :: t()
  def initialize(%__MODULE__{} = chat) do
    Enum.each(chat.adapters, fn {_name, adapter} ->
      _ = Adapter.initialize(adapter, chat.metadata[:adapter_opts] || [])
    end)

    %{chat | initialized: true}
  end

  @doc "Marks chat instance as shut down and shuts down adapters when available."
  @spec shutdown(t()) :: t()
  def shutdown(%__MODULE__{} = chat) do
    Enum.each(chat.adapters, fn {_name, adapter} ->
      _ = Adapter.shutdown(adapter, chat.metadata[:adapter_opts] || [])
    end)

    %{chat | initialized: false}
  end

  @doc "Registers a new-mention handler."
  @spec on_new_mention(t(), mention_handler()) :: t()
  def on_new_mention(%__MODULE__{} = chat, handler) when is_function(handler) do
    update_in(chat.handlers.mention, &(&1 ++ [handler]))
  end

  @doc "Registers a new-message regex handler."
  @spec on_new_message(t(), Regex.t() | String.t(), message_handler()) :: t()
  def on_new_message(%__MODULE__{} = chat, %Regex{} = pattern, handler)
      when is_function(handler) do
    update_in(chat.handlers.message, &(&1 ++ [{pattern, handler}]))
  end

  def on_new_message(%__MODULE__{} = chat, pattern, handler)
      when is_binary(pattern) and is_function(handler) do
    on_new_message(chat, Regex.compile!(pattern), handler)
  end

  @doc "Registers a subscribed-thread handler."
  @spec on_subscribed_message(t(), subscribed_handler()) :: t()
  def on_subscribed_message(%__MODULE__{} = chat, handler) when is_function(handler) do
    update_in(chat.handlers.subscribed, &(&1 ++ [handler]))
  end

  @doc "Registers a reaction-event handler."
  @spec on_reaction(t(), reaction_handler()) :: t()
  def on_reaction(%__MODULE__{} = chat, handler) when is_function(handler) do
    update_in(chat.handlers.reaction, &(&1 ++ [handler]))
  end

  @doc "Registers a filtered reaction-event handler."
  @spec on_reaction(
          t(),
          String.t() | atom() | [String.t() | atom()] | Regex.t(),
          reaction_handler()
        ) ::
          t()
  def on_reaction(%__MODULE__{} = chat, selector, handler) when is_function(handler) do
    register_filtered_handler(chat, :reaction, selector, handler)
  end

  @doc "Registers an action-event handler."
  @spec on_action(t(), action_handler()) :: t()
  def on_action(%__MODULE__{} = chat, handler) when is_function(handler) do
    update_in(chat.handlers.action, &(&1 ++ [handler]))
  end

  @doc "Registers a filtered action-event handler."
  @spec on_action(t(), String.t() | atom() | [String.t() | atom()] | Regex.t(), action_handler()) ::
          t()
  def on_action(%__MODULE__{} = chat, selector, handler) when is_function(handler) do
    register_filtered_handler(chat, :action, selector, handler)
  end

  @doc "Registers a modal-submit handler."
  @spec on_modal_submit(t(), modal_submit_handler()) :: t()
  def on_modal_submit(%__MODULE__{} = chat, handler) when is_function(handler) do
    update_in(chat.handlers.modal_submit, &(&1 ++ [handler]))
  end

  @doc "Registers a filtered modal-submit handler."
  @spec on_modal_submit(
          t(),
          String.t() | atom() | [String.t() | atom()] | Regex.t(),
          modal_submit_handler()
        ) :: t()
  def on_modal_submit(%__MODULE__{} = chat, selector, handler) when is_function(handler) do
    register_filtered_handler(chat, :modal_submit, selector, handler)
  end

  @doc "Registers a modal-close handler."
  @spec on_modal_close(t(), modal_close_handler()) :: t()
  def on_modal_close(%__MODULE__{} = chat, handler) when is_function(handler) do
    update_in(chat.handlers.modal_close, &(&1 ++ [handler]))
  end

  @doc "Registers a filtered modal-close handler."
  @spec on_modal_close(
          t(),
          String.t() | atom() | [String.t() | atom()] | Regex.t(),
          modal_close_handler()
        ) :: t()
  def on_modal_close(%__MODULE__{} = chat, selector, handler) when is_function(handler) do
    register_filtered_handler(chat, :modal_close, selector, handler)
  end

  @doc "Registers a slash-command handler."
  @spec on_slash_command(t(), slash_command_handler()) :: t()
  def on_slash_command(%__MODULE__{} = chat, handler) when is_function(handler) do
    update_in(chat.handlers.slash_command, &(&1 ++ [handler]))
  end

  @doc "Registers a filtered slash-command handler."
  @spec on_slash_command(
          t(),
          String.t() | atom() | [String.t() | atom()] | Regex.t(),
          slash_command_handler()
        ) :: t()
  def on_slash_command(%__MODULE__{} = chat, selector, handler) when is_function(handler) do
    register_filtered_handler(chat, :slash_command, selector, handler)
  end

  @doc "Registers assistant thread started handlers."
  @spec on_assistant_thread_started(t(), assistant_thread_started_handler()) :: t()
  def on_assistant_thread_started(%__MODULE__{} = chat, handler) when is_function(handler) do
    update_in(chat.handlers.assistant_thread_started, &(&1 ++ [handler]))
  end

  @doc "Registers assistant context changed handlers."
  @spec on_assistant_context_changed(t(), assistant_context_changed_handler()) :: t()
  def on_assistant_context_changed(%__MODULE__{} = chat, handler) when is_function(handler) do
    update_in(chat.handlers.assistant_context_changed, &(&1 ++ [handler]))
  end

  @doc "Returns adapter module by name."
  @spec get_adapter(t(), atom()) :: {:ok, module()} | {:error, term()}
  def get_adapter(%__MODULE__{} = chat, adapter_name) when is_atom(adapter_name) do
    AdapterRegistry.resolve(chat, adapter_name)
  end

  @doc "Returns adapter-keyed request-first webhook handlers."
  @spec webhooks(t()) :: %{optional(atom()) => webhook_request_handler()}
  def webhooks(%__MODULE__{} = chat) do
    Enum.reduce(Map.keys(chat.adapters), %{}, fn adapter_name, acc ->
      Map.put(acc, adapter_name, fn request_or_payload, opts ->
        handle_webhook_request(chat, adapter_name, request_or_payload, opts)
      end)
    end)
  end

  @doc "Compatibility helper returning adapter-keyed webhook handlers with explicit chat argument."
  @spec webhooks_with_chat(t()) :: %{optional(atom()) => webhook_request_handler_with_chat()}
  def webhooks_with_chat(%__MODULE__{} = chat) do
    Enum.reduce(Map.keys(chat.adapters), %{}, fn adapter_name, acc ->
      Map.put(acc, adapter_name, fn current_chat, request_or_payload, opts ->
        base_chat = if match?(%__MODULE__{}, current_chat), do: current_chat, else: chat
        handle_webhook_request(base_chat, adapter_name, request_or_payload, opts)
      end)
    end)
  end

  @doc """
  Handles a webhook payload for the given adapter.
  """
  @spec handle_webhook(t(), atom(), map(), keyword()) ::
          {:ok, t(), Incoming.t()} | {:error, term()}
  def handle_webhook(%__MODULE__{} = chat, adapter_name, payload, opts \\ [])
      when is_atom(adapter_name) and is_map(payload) do
    with {:ok, adapter_module} <- AdapterRegistry.resolve(chat, adapter_name) do
      if function_exported?(adapter_module, :handle_webhook, 3) do
        adapter_module.handle_webhook(chat, payload, opts)
      else
        Adapter.handle_webhook(adapter_module, chat, payload, opts)
      end
    end
  end

  @doc """
  Routes a request-style inbound input through verification/parsing/event dispatch.

  This is transport-agnostic and returns a typed `IngressResult`.
  """
  @spec route_request(
          t(),
          atom(),
          WebhookRequest.t() | map(),
          keyword()
        ) :: {:ok, IngressResult.t()} | {:error, Exception.t()}
  def route_request(%__MODULE__{} = chat, adapter_name, request_or_payload, opts \\ [])
      when is_atom(adapter_name) and is_list(opts) do
    with {:ok, routed_chat, envelope, response} <-
           WebhookPipeline.handle_request(
             chat,
             adapter_name,
             request_or_payload,
             opts,
             &AdapterRegistry.resolve/2,
             &process_event/4
           ) do
      {:ok,
       IngressResult.new(%{
         chat: routed_chat,
         adapter_name: adapter_name,
         event: normalize_ingress_event(envelope),
         response: normalize_ingress_response(response),
         request: normalize_ingress_request(adapter_name, request_or_payload, opts),
         mode: :request,
         metadata: %{transport: :request}
       })}
    else
      {:error, reason} ->
        {:error, ingress_error(:request, adapter_name, reason)}
    end
  rescue
    exception ->
      {:error, ingress_error(:request, adapter_name, {:exception, exception})}
  end

  @doc """
  Routes an event-style inbound input (polling/gateway/listener) through `process_event/4`.

  This is transport-agnostic and returns a typed `IngressResult`.
  """
  @spec route_event(t(), atom(), EventEnvelope.t() | map() | :noop, keyword()) ::
          {:ok, IngressResult.t()} | {:error, Exception.t()}
  def route_event(chat, adapter_name, event, opts \\ [])

  def route_event(%__MODULE__{} = chat, adapter_name, :noop, _opts)
      when is_atom(adapter_name) do
    {:ok,
     IngressResult.new(%{
       chat: chat,
       adapter_name: adapter_name,
       event: :noop,
       response: nil,
       request: nil,
       mode: :event,
       metadata: %{transport: :event}
     })}
  end

  def route_event(%__MODULE__{} = chat, adapter_name, event, opts)
      when is_atom(adapter_name) and is_list(opts) do
    case process_event(chat, adapter_name, event, opts) do
      {:ok, routed_chat, envelope} ->
        {:ok,
         IngressResult.new(%{
           chat: routed_chat,
           adapter_name: adapter_name,
           event: envelope,
           response: nil,
           request: nil,
           mode: :event,
           metadata: %{transport: :event}
         })}

      {:error, reason} ->
        {:error, ingress_error(:event, adapter_name, reason, %{event: event})}
    end
  rescue
    exception ->
      {:error, ingress_error(:event, adapter_name, {:exception, exception}, %{event: event})}
  end

  @doc """
  Handles a typed webhook request for the given adapter.

  Returns the updated chat state, normalized event envelope, and typed webhook response.
  """
  @spec handle_webhook_request(
          t(),
          atom(),
          WebhookRequest.t() | map(),
          keyword()
        ) ::
          {:ok, t(), EventEnvelope.t() | nil, WebhookResponse.t()}
  def handle_webhook_request(%__MODULE__{} = chat, adapter_name, request_or_payload, opts \\ [])
      when is_atom(adapter_name) and is_list(opts) do
    with {:ok, %IngressResult{} = result} <-
           route_request(chat, adapter_name, request_or_payload, opts) do
      {:ok, result.chat, envelope_or_nil(result.event), response_or_default(result.response)}
    end
  end

  @doc """
  Opens a DM thread with an adapter when supported.
  """
  @spec open_dm(
          t(),
          atom() | Author.t() | map() | String.t() | integer(),
          String.t() | integer() | keyword() | map()
        ) :: {:ok, Thread.t()} | {:error, term()}
  def open_dm(%__MODULE__{} = chat, adapter_name, external_user_id) when is_atom(adapter_name) do
    with {:ok, adapter_module} <- AdapterRegistry.resolve(chat, adapter_name) do
      if function_exported?(adapter_module, :open_dm, 2) do
        case adapter_module.open_dm(external_user_id, []) do
          {:ok, external_room_id} ->
            {:ok, thread(chat, adapter_name, external_room_id, is_dm: true)}

          other ->
            other
        end
      else
        {:error, :unsupported}
      end
    end
  end

  def open_dm(%__MODULE__{} = chat, target, opts) when is_list(opts) or is_map(opts) do
    with {:ok, adapter_name, external_user_id} <- resolve_dm_target(chat, target, opts) do
      open_dm(chat, adapter_name, external_user_id)
    end
  end

  def open_dm(%__MODULE__{} = chat, target, []) do
    with {:ok, adapter_name, external_user_id} <- resolve_dm_target(chat, target, []) do
      open_dm(chat, adapter_name, external_user_id)
    end
  end

  @doc """
  Opens a native platform thread from an existing room message when supported.
  """
  @spec open_thread(t(), atom(), String.t() | integer(), String.t() | integer(), keyword()) ::
          {:ok, Thread.t()} | {:error, term()}
  def open_thread(
        %__MODULE__{} = chat,
        adapter_name,
        external_room_id,
        external_message_id,
        opts \\ []
      )
      when is_atom(adapter_name) and is_list(opts) do
    with {:ok, adapter_module} <- AdapterRegistry.resolve(chat, adapter_name) do
      Adapter.open_thread(adapter_module, external_room_id, external_message_id, opts)
    end
  end

  @doc "Builds a channel reference from adapter + external channel id."
  @spec channel(t(), atom(), String.t() | integer()) :: ChannelRef.t()
  def channel(%__MODULE__{} = chat, adapter_name, external_id) when is_atom(adapter_name) do
    adapter_module = AdapterRegistry.resolve!(chat, adapter_name)

    ChannelRef.new(%{
      id: "#{adapter_name}:#{external_id}",
      adapter_name: adapter_name,
      adapter: adapter_module,
      external_id: external_id
    })
  end

  @doc "Builds a thread reference from adapter + external room id."
  @spec thread(t(), atom(), String.t() | integer(), keyword()) :: Thread.t()
  def thread(%__MODULE__{} = chat, adapter_name, external_room_id, opts \\ [])
      when is_atom(adapter_name) and is_list(opts) do
    adapter_module = AdapterRegistry.resolve!(chat, adapter_name)
    external_thread_id = opts[:external_thread_id] || opts[:thread_id]

    Thread.new(%{
      id: opts[:id] || thread_id(adapter_name, external_room_id, external_thread_id),
      adapter_name: adapter_name,
      adapter: adapter_module,
      external_room_id: external_room_id,
      external_thread_id: external_thread_id,
      channel_id: "#{adapter_name}:#{external_room_id}",
      is_dm: opts[:is_dm] || false,
      metadata: opts[:metadata] || %{}
    })
  end

  @doc """
  Adapter-internal entrypoint for processing normalized incoming message events.
  """
  @spec process_message(t(), atom(), String.t(), Incoming.t() | map(), keyword()) ::
          {:ok, t(), Incoming.t()} | {:error, term()}
  def process_message(%__MODULE__{} = chat, adapter_name, thread_id, incoming, opts \\ [])
      when is_atom(adapter_name) and is_binary(thread_id) and is_list(opts) do
    EventRouter.process_message(
      chat,
      adapter_name,
      thread_id,
      incoming,
      fn current_chat, normalized_incoming, resolved_thread_id ->
        thread(
          current_chat,
          adapter_name,
          normalized_incoming.external_room_id,
          thread_id: normalized_incoming.external_thread_id,
          id: resolved_thread_id
        )
      end
    )
  end

  @doc "Processes normalized reaction events and dispatches handlers."
  @spec process_reaction(t(), atom(), ReactionEvent.t() | map(), keyword()) ::
          {:ok, t(), ReactionEvent.t()} | {:error, term()}
  def process_reaction(%__MODULE__{} = chat, adapter_name, event, opts \\ [])
      when is_atom(adapter_name) and is_list(opts) do
    with {:ok, reaction} <- EventRouter.ensure_reaction_event(event, adapter_name) do
      reaction = enrich_event_context(chat, adapter_name, reaction)
      {:ok, EventRouter.run_event_handlers(chat, chat.handlers.reaction, reaction), reaction}
    end
  end

  @doc "Processes normalized action events and dispatches handlers."
  @spec process_action(t(), atom(), ActionEvent.t() | map(), keyword()) ::
          {:ok, t(), ActionEvent.t()} | {:error, term()}
  def process_action(%__MODULE__{} = chat, adapter_name, event, opts \\ [])
      when is_atom(adapter_name) and is_list(opts) do
    with {:ok, action} <- EventRouter.ensure_action_event(event, adapter_name) do
      action = enrich_event_context(chat, adapter_name, action)
      {:ok, EventRouter.run_event_handlers(chat, chat.handlers.action, action), action}
    end
  end

  @doc "Processes normalized modal submit events and dispatches handlers."
  @spec process_modal_submit(t(), atom(), ModalSubmitEvent.t() | map(), keyword()) ::
          {:ok, t(), ModalSubmitEvent.t()} | {:error, term()}
  def process_modal_submit(%__MODULE__{} = chat, adapter_name, event, opts \\ [])
      when is_atom(adapter_name) and is_list(opts) do
    with {:ok, modal_submit} <- EventRouter.ensure_modal_submit_event(event, adapter_name) do
      modal_submit = enrich_event_context(chat, adapter_name, modal_submit)

      {:ok, EventRouter.run_event_handlers(chat, chat.handlers.modal_submit, modal_submit), modal_submit}
    end
  end

  @doc "Processes normalized modal close events and dispatches handlers."
  @spec process_modal_close(t(), atom(), ModalCloseEvent.t() | map(), keyword()) ::
          {:ok, t(), ModalCloseEvent.t()} | {:error, term()}
  def process_modal_close(%__MODULE__{} = chat, adapter_name, event, opts \\ [])
      when is_atom(adapter_name) and is_list(opts) do
    with {:ok, modal_close} <- EventRouter.ensure_modal_close_event(event, adapter_name) do
      modal_close = enrich_event_context(chat, adapter_name, modal_close)

      {:ok, EventRouter.run_event_handlers(chat, chat.handlers.modal_close, modal_close), modal_close}
    end
  end

  @doc "Processes normalized slash command events and dispatches handlers."
  @spec process_slash_command(t(), atom(), SlashCommandEvent.t() | map(), keyword()) ::
          {:ok, t(), SlashCommandEvent.t()} | {:error, term()}
  def process_slash_command(%__MODULE__{} = chat, adapter_name, event, opts \\ [])
      when is_atom(adapter_name) and is_list(opts) do
    with {:ok, slash_command} <- EventRouter.ensure_slash_command_event(event, adapter_name) do
      slash_command = enrich_event_context(chat, adapter_name, slash_command)

      {:ok, EventRouter.run_event_handlers(chat, chat.handlers.slash_command, slash_command), slash_command}
    end
  end

  @doc """
  Canonical typed event router used by webhook and gateway ingestion.
  """
  @spec process_event(t(), atom(), EventEnvelope.t() | map(), keyword()) ::
          {:ok, t(), EventEnvelope.t()} | {:error, term()}
  def process_event(%__MODULE__{} = chat, adapter_name, event, opts \\ [])
      when is_atom(adapter_name) and is_list(opts) do
    dispatchers = %{
      process_message: &process_message/5,
      process_reaction: &process_reaction/4,
      process_action: &process_action/4,
      process_modal_submit: &process_modal_submit/4,
      process_modal_close: &process_modal_close/4,
      process_slash_command: &process_slash_command/4,
      process_assistant_thread_started: &process_assistant_thread_started/3,
      process_assistant_context_changed: &process_assistant_context_changed/3
    }

    with {:ok, envelope} <- EventRouter.ensure_event_envelope(event, adapter_name),
         {:ok, routed_chat, routed_payload} <-
           EventRouter.route_event(chat, adapter_name, envelope, opts, dispatchers) do
      {:ok, routed_chat, EventRouter.with_envelope_payload(envelope, routed_payload)}
    end
  end

  @doc "Processes assistant thread started events and dispatches handlers."
  @spec process_assistant_thread_started(
          t(),
          atom(),
          AssistantThreadStartedEvent.t() | map()
        ) ::
          {:ok, t(), AssistantThreadStartedEvent.t()} | {:error, term()}
  def process_assistant_thread_started(%__MODULE__{} = chat, adapter_name, event)
      when is_atom(adapter_name) do
    with {:ok, assistant_event} <-
           EventRouter.ensure_assistant_thread_started_event(event, adapter_name) do
      assistant_event = enrich_event_context(chat, adapter_name, assistant_event)

      {:ok,
       EventRouter.run_event_handlers(
         chat,
         chat.handlers.assistant_thread_started,
         assistant_event
       ), assistant_event}
    end
  end

  @doc "Processes assistant context changed events and dispatches handlers."
  @spec process_assistant_context_changed(
          t(),
          atom(),
          AssistantContextChangedEvent.t() | map()
        ) ::
          {:ok, t(), AssistantContextChangedEvent.t()} | {:error, term()}
  def process_assistant_context_changed(%__MODULE__{} = chat, adapter_name, event)
      when is_atom(adapter_name) do
    with {:ok, assistant_event} <-
           EventRouter.ensure_assistant_context_changed_event(event, adapter_name) do
      assistant_event = enrich_event_context(chat, adapter_name, assistant_event)

      {:ok,
       EventRouter.run_event_handlers(
         chat,
         chat.handlers.assistant_context_changed,
         assistant_event
       ), assistant_event}
    end
  end

  @doc "Returns adapter capability matrix wrapped in typed struct."
  @spec adapter_capabilities(t(), atom()) :: {:ok, CapabilityMatrix.t()} | {:error, term()}
  def adapter_capabilities(%__MODULE__{} = chat, adapter_name) when is_atom(adapter_name) do
    with {:ok, adapter_module} <- AdapterRegistry.resolve(chat, adapter_name) do
      {:ok,
       CapabilityMatrix.new(%{
         adapter_name: adapter_name,
         capabilities: Adapter.capabilities(adapter_module)
       })}
    end
  end

  @doc "Returns true when a thread id is currently subscribed."
  @spec subscribed?(t(), String.t()) :: boolean()
  def subscribed?(%__MODULE__{} = chat, thread_id) when is_binary(thread_id) do
    StateAdapter.subscribed?(chat.state_adapter, chat.state, thread_id)
  end

  @doc "Subscribes a thread id."
  @spec subscribe(t(), String.t()) :: t()
  def subscribe(%__MODULE__{} = chat, thread_id) when is_binary(thread_id) do
    chat.state_adapter
    |> StateAdapter.subscribe(chat.state, thread_id)
    |> sync_state(chat)
  end

  @doc "Returns normalized overlapping-message concurrency config."
  @spec concurrency(t()) :: Concurrency.t()
  def concurrency(%__MODULE__{} = chat) do
    Concurrency.new(chat.metadata[:concurrency] || chat.metadata["concurrency"] || %{})
  end

  @doc "Updates chat-level concurrency configuration."
  @spec configure_concurrency(t(), keyword() | map()) :: t()
  def configure_concurrency(%__MODULE__{} = chat, opts) when is_list(opts) or is_map(opts) do
    config = Concurrency.new(opts)
    metadata = Map.put(chat.metadata || %{}, :concurrency, Map.from_struct(config))
    %{chat | metadata: metadata}
  end

  @doc "Returns the current concurrency lock snapshot."
  @spec lock_snapshot(t()) :: %{locks: map(), pending_locks: map()}
  def lock_snapshot(%__MODULE__{} = chat) do
    snapshot = StateAdapter.snapshot(chat.state_adapter, chat.state)
    %{locks: snapshot.locks, pending_locks: snapshot.pending_locks}
  end

  @doc "Attempts to acquire a concurrency lock for a message-processing key."
  @spec acquire_lock(t(), String.t(), String.t(), keyword() | map()) ::
          {:acquired | :queued | :debounced | :busy, t()}
  def acquire_lock(%__MODULE__{} = chat, key, owner, opts \\ [])
      when is_binary(key) and is_binary(owner) do
    opts = if is_list(opts), do: Map.new(opts), else: opts
    config = opts[:concurrency] || opts["concurrency"] || concurrency(chat)
    config = Concurrency.new(config)
    metadata = opts[:metadata] || opts["metadata"] || %{}

    {result, state} =
      StateAdapter.lock(
        chat.state_adapter,
        chat.state,
        key,
        owner,
        config.strategy,
        metadata
      )

    {result, sync_state(state, chat)}
  end

  @doc "Releases a held concurrency lock and returns queued/debounced entries."
  @spec release_lock(t(), String.t(), String.t()) ::
          {{:released, [map()]} | {:error, :not_owner}, t()}
  def release_lock(%__MODULE__{} = chat, key, owner) when is_binary(key) and is_binary(owner) do
    {result, state} = StateAdapter.release_lock(chat.state_adapter, chat.state, key, owner)
    {result, sync_state(state, chat)}
  end

  @doc "Force-releases a concurrency lock regardless of owner."
  @spec force_release_lock(t(), String.t()) :: {{:released, [map()]}, t()}
  def force_release_lock(%__MODULE__{} = chat, key) when is_binary(key) do
    {result, state} = StateAdapter.force_release_lock(chat.state_adapter, chat.state, key)
    {result, sync_state(state, chat)}
  end

  @doc "Unsubscribes a thread id."
  @spec unsubscribe(t(), String.t()) :: t()
  def unsubscribe(%__MODULE__{} = chat, thread_id) when is_binary(thread_id) do
    chat.state_adapter
    |> StateAdapter.unsubscribe(chat.state, thread_id)
    |> sync_state(chat)
  end

  @doc "Gets thread state map by id."
  @spec thread_state(t(), String.t()) :: map()
  def thread_state(%__MODULE__{} = chat, thread_id) when is_binary(thread_id) do
    StateAdapter.thread_state(chat.state_adapter, chat.state, thread_id)
  end

  @doc "Sets thread state map by id."
  @spec put_thread_state(t(), String.t(), map()) :: t()
  def put_thread_state(%__MODULE__{} = chat, thread_id, state) when is_map(state) do
    chat.state_adapter
    |> StateAdapter.put_thread_state(chat.state, thread_id, state)
    |> sync_state(chat)
  end

  @doc "Gets channel state map by id."
  @spec channel_state(t(), String.t()) :: map()
  def channel_state(%__MODULE__{} = chat, channel_id) when is_binary(channel_id) do
    StateAdapter.channel_state(chat.state_adapter, chat.state, channel_id)
  end

  @doc "Sets channel state map by id."
  @spec put_channel_state(t(), String.t(), map()) :: t()
  def put_channel_state(%__MODULE__{} = chat, channel_id, state) when is_map(state) do
    chat.state_adapter
    |> StateAdapter.put_channel_state(chat.state, channel_id, state)
    |> sync_state(chat)
  end

  @doc false
  @spec duplicate?(t(), {atom(), String.t()} | nil) :: boolean()
  def duplicate?(%__MODULE__{}, nil), do: false

  def duplicate?(%__MODULE__{} = chat, {adapter_name, message_id})
      when is_atom(adapter_name) and is_binary(message_id) do
    StateAdapter.duplicate?(chat.state_adapter, chat.state, {adapter_name, message_id})
  end

  @doc false
  @spec mark_dedupe(t(), {atom(), String.t()} | nil) :: t()
  def mark_dedupe(%__MODULE__{} = chat, nil), do: chat

  def mark_dedupe(%__MODULE__{} = chat, {adapter_name, message_id})
      when is_atom(adapter_name) and is_binary(message_id) do
    dedupe_limit = dedupe_limit(chat)

    chat.state_adapter
    |> StateAdapter.mark_dedupe(chat.state, {adapter_name, message_id}, dedupe_limit)
    |> sync_state(chat)
  end

  @doc "Creates a normalized Chat SDK-style message."
  @spec message(map()) :: Message.t()
  def message(attrs), do: Message.new(attrs)

  @spec new_room(map()) :: Room.t()
  def new_room(attrs), do: Room.new(attrs)

  @spec new_participant(map()) :: Participant.t()
  def new_participant(attrs), do: Participant.new(attrs)

  @spec text(String.t()) :: Text.t()
  def text(value), do: Text.new(value)

  @doc "Resolves a cross-platform emoji token into a rendered value."
  @spec emoji(String.t() | atom(), keyword()) :: String.t()
  def emoji(value, opts \\ []), do: Emoji.render(value, opts)

  @doc "Serializes chat state to a revivable map."
  @spec to_map(t()) :: map()
  def to_map(%__MODULE__{} = chat), do: Serialization.to_map(chat)

  @doc "Builds chat state from serialized map."
  @spec from_map(map()) :: t()
  def from_map(map) when is_map(map), do: Serialization.from_map(map)

  @doc "Returns a reviver function for serialized core structs."
  @spec reviver() :: (map() -> term())
  def reviver, do: Serialization.reviver()

  @doc false
  @spec revive(map()) :: term()
  def revive(map), do: Serialization.revive(map)

  defp register_filtered_handler(%__MODULE__{} = chat, key, selector, handler) do
    normalized = normalize_handler_selector(selector)
    update_in(chat.handlers[key], &(&1 ++ [{normalized, handler}]))
  end

  defp normalize_handler_selector(selectors) when is_list(selectors),
    do: Enum.map(selectors, &normalize_handler_selector/1)

  defp normalize_handler_selector(selector) when is_atom(selector) and selector != :all,
    do: Atom.to_string(selector)

  defp normalize_handler_selector(selector), do: selector

  defp resolve_dm_target(%__MODULE__{} = chat, %Author{} = author, opts) do
    adapter_name =
      opts[:adapter_name] ||
        author.metadata[:adapter_name] ||
        author.metadata["adapter_name"] ||
        infer_single_adapter(chat)

    with {:ok, adapter_name} <- normalize_adapter_name(chat, adapter_name) do
      {:ok, adapter_name, author.user_id}
    end
  end

  defp resolve_dm_target(%__MODULE__{} = chat, target, opts) when is_map(target) do
    resolve_dm_target(chat, Author.new(target), opts)
  rescue
    _ -> {:error, :invalid_dm_target}
  end

  defp resolve_dm_target(%__MODULE__{} = chat, target, opts)
       when is_binary(target) or is_integer(target) do
    case parse_adapter_prefixed_target(chat, target) do
      {:ok, adapter_name, external_user_id} ->
        {:ok, adapter_name, external_user_id}

      :error ->
        with {:ok, adapter_name} <-
               normalize_adapter_name(chat, opts[:adapter_name] || infer_single_adapter(chat)) do
          {:ok, adapter_name, to_string(target)}
        end
    end
  end

  defp resolve_dm_target(_chat, _target, _opts), do: {:error, :invalid_dm_target}

  defp normalize_adapter_name(_chat, adapter_name) when is_atom(adapter_name),
    do: {:ok, adapter_name}

  defp normalize_adapter_name(_chat, nil), do: {:error, :ambiguous_adapter}

  defp normalize_adapter_name(chat, adapter_name) when is_binary(adapter_name) do
    adapter_atom = String.to_existing_atom(adapter_name)
    normalize_adapter_name(chat, adapter_atom)
  rescue
    ArgumentError -> {:error, :ambiguous_adapter}
  end

  defp infer_single_adapter(%__MODULE__{adapters: adapters}) when map_size(adapters) == 1 do
    adapters |> Map.keys() |> List.first()
  end

  defp infer_single_adapter(_chat), do: nil

  defp parse_adapter_prefixed_target(%__MODULE__{adapters: adapters}, target)
       when is_binary(target) do
    case String.split(target, ":", parts: 2) do
      [adapter_name, external_user_id] when external_user_id != "" ->
        adapter_atom = String.to_atom(adapter_name)

        if Map.has_key?(adapters, adapter_atom) do
          {:ok, adapter_atom, external_user_id}
        else
          :error
        end

      _ ->
        :error
    end
  end

  defp parse_adapter_prefixed_target(_chat, _target), do: :error

  defp enrich_event_context(%__MODULE__{} = chat, adapter_name, event) do
    adapter = Map.get(chat.adapters, adapter_name)
    channel = Map.get(event, :channel) || build_channel_handle(chat, adapter_name, event)
    thread = Map.get(event, :thread) || build_thread_handle(chat, adapter_name, event, channel)
    message = Map.get(event, :message) || build_message_handle(event, thread, channel)

    related_channel =
      Map.get(event, :related_channel) || build_related_channel_handle(chat, adapter_name, event)

    related_thread =
      Map.get(event, :related_thread) ||
        build_related_thread_handle(chat, adapter_name, event, related_channel)

    related_message =
      Map.get(event, :related_message) ||
        build_related_message_handle(event, related_thread, related_channel)

    struct(event,
      adapter: adapter,
      thread: thread,
      channel: channel,
      message: message,
      related_thread: related_thread,
      related_channel: related_channel,
      related_message: related_message,
      thread_id: Map.get(event, :thread_id) || (thread && thread.id),
      channel_id: Map.get(event, :channel_id) || (channel && channel.id),
      message_id: Map.get(event, :message_id) || (message && message.id)
    )
  end

  defp build_channel_handle(%__MODULE__{} = chat, adapter_name, event) do
    case channel_external_id(
           adapter_name,
           Map.get(event, :channel_id) || Map.get(event, :thread_id)
         ) do
      nil -> nil
      external_id -> channel(chat, adapter_name, external_id)
    end
  end

  defp build_thread_handle(%__MODULE__{} = chat, adapter_name, event, channel) do
    cond do
      is_binary(Map.get(event, :thread_id)) ->
        case parse_thread_identifier(adapter_name, Map.get(event, :thread_id)) do
          {external_room_id, external_thread_id} ->
            thread(chat, adapter_name, external_room_id,
              id: Map.get(event, :thread_id),
              external_thread_id: external_thread_id,
              is_dm: is_dm_channel?(channel)
            )

          _ ->
            nil
        end

      match?(%ChannelRef{}, channel) ->
        thread(chat, adapter_name, channel.external_id,
          id: "#{adapter_name}:#{channel.external_id}",
          is_dm: is_dm_channel?(channel)
        )

      true ->
        nil
    end
  end

  defp build_message_handle(event, thread, channel) do
    case Map.get(event, :message_id) do
      nil ->
        nil

      message_id ->
        external_room_id =
          cond do
            match?(%Thread{}, thread) -> thread.external_room_id
            match?(%ChannelRef{}, channel) -> channel.external_id
            true -> nil
          end

        Message.new(%{
          id: to_string(message_id),
          thread_id: thread && thread.id,
          channel_id: channel && channel.id,
          external_room_id: external_room_id,
          external_message_id: to_string(message_id)
        })
    end
  end

  defp build_related_channel_handle(%__MODULE__{} = chat, adapter_name, event) do
    related_channel_id =
      Map.get(event.metadata, :related_channel_id) ||
        Map.get(event.metadata, "related_channel_id")

    case channel_external_id(adapter_name, related_channel_id) do
      nil -> nil
      external_id -> channel(chat, adapter_name, external_id)
    end
  end

  defp build_related_thread_handle(%__MODULE__{} = chat, adapter_name, event, related_channel) do
    related_thread_id =
      Map.get(event.metadata, :related_thread_id) || Map.get(event.metadata, "related_thread_id")

    cond do
      is_binary(related_thread_id) ->
        case parse_thread_identifier(adapter_name, related_thread_id) do
          {external_room_id, external_thread_id} ->
            thread(chat, adapter_name, external_room_id,
              id: related_thread_id,
              external_thread_id: external_thread_id,
              is_dm: is_dm_channel?(related_channel)
            )

          _ ->
            nil
        end

      match?(%ChannelRef{}, related_channel) ->
        thread(chat, adapter_name, related_channel.external_id,
          id: "#{adapter_name}:#{related_channel.external_id}",
          is_dm: is_dm_channel?(related_channel)
        )

      true ->
        nil
    end
  end

  defp build_related_message_handle(event, thread, channel) do
    related_message_id =
      Map.get(event.metadata, :related_message_id) ||
        Map.get(event.metadata, "related_message_id")

    if is_binary(related_message_id) do
      Message.new(%{
        id: related_message_id,
        thread_id: thread && thread.id,
        channel_id: channel && channel.id,
        external_room_id:
          cond do
            match?(%Thread{}, thread) -> thread.external_room_id
            match?(%ChannelRef{}, channel) -> channel.external_id
            true -> nil
          end,
        external_message_id: related_message_id
      })
    end
  end

  defp parse_thread_identifier(adapter_name, thread_id) when is_binary(thread_id) do
    prefix = "#{adapter_name}:"

    if String.starts_with?(thread_id, prefix) do
      rest = String.trim_leading(thread_id, prefix)

      case String.split(rest, ":", parts: 2) do
        [external_room_id, external_thread_id] -> {external_room_id, external_thread_id}
        [external_room_id] -> {external_room_id, nil}
        _ -> nil
      end
    end
  end

  defp parse_thread_identifier(_adapter_name, _thread_id), do: nil

  defp channel_external_id(adapter_name, channel_id) when is_binary(channel_id) do
    prefix = "#{adapter_name}:"

    if String.starts_with?(channel_id, prefix) do
      String.trim_leading(channel_id, prefix)
      |> String.split(":", parts: 2)
      |> List.first()
    end
  end

  defp channel_external_id(_adapter_name, _channel_id), do: nil

  defp is_dm_channel?(%ChannelRef{metadata: metadata}) do
    metadata[:is_dm] || metadata["is_dm"] || false
  end

  defp is_dm_channel?(_channel), do: false

  defp sync_state(state, %__MODULE__{} = chat) do
    snapshot = StateAdapter.snapshot(chat.state_adapter, state)

    %{
      chat
      | state: state,
        subscriptions: snapshot.subscriptions,
        dedupe: snapshot.dedupe,
        dedupe_order: snapshot.dedupe_order,
        thread_state: snapshot.thread_state,
        channel_state: snapshot.channel_state
    }
  end

  defp initial_state_snapshot(opts) do
    %{
      subscriptions: opts[:subscriptions] || opts["subscriptions"] || MapSet.new(),
      dedupe: opts[:dedupe] || opts["dedupe"] || MapSet.new(),
      dedupe_order: opts[:dedupe_order] || opts["dedupe_order"] || [],
      thread_state: opts[:thread_state] || opts["thread_state"] || %{},
      channel_state: opts[:channel_state] || opts["channel_state"] || %{},
      locks: opts[:locks] || opts["locks"] || %{},
      pending_locks: opts[:pending_locks] || opts["pending_locks"] || %{}
    }
    |> StateAdapter.normalize_snapshot()
  end

  defp maybe_replace_with_explicit_state(snapshot, _state_adapter, nil), do: snapshot

  defp maybe_replace_with_explicit_state(_snapshot, state_adapter, explicit_state) do
    StateAdapter.snapshot(state_adapter, explicit_state)
  end

  defp dedupe_limit(chat) do
    metadata = Map.get(chat, :metadata, %{})

    value =
      case metadata do
        %{} -> metadata[:dedupe_limit] || metadata["dedupe_limit"]
        _ -> nil
      end

    if is_integer(value) and value > 0, do: value, else: 1_000
  end

  defp normalize_ingress_event(%EventEnvelope{} = envelope), do: envelope
  defp normalize_ingress_event(nil), do: nil
  defp normalize_ingress_event(:noop), do: :noop
  defp normalize_ingress_event(other), do: other

  defp normalize_ingress_response(%WebhookResponse{} = response), do: response
  defp normalize_ingress_response(nil), do: nil
  defp normalize_ingress_response(other) when is_map(other), do: WebhookResponse.new(other)
  defp normalize_ingress_response(_other), do: WebhookResponse.accepted()

  defp normalize_ingress_request(_adapter_name, %WebhookRequest{} = request, _opts), do: request

  defp normalize_ingress_request(adapter_name, payload, opts) when is_map(payload) do
    payload_map = payload[:payload] || payload["payload"] || payload
    headers = opts[:headers] || payload[:headers] || payload["headers"] || %{}

    WebhookRequest.new(%{
      adapter_name: adapter_name,
      method: payload[:method] || payload["method"] || opts[:method] || "POST",
      path: payload[:path] || payload["path"] || opts[:path],
      headers: headers,
      payload: payload_map,
      query: payload[:query] || payload["query"] || opts[:query] || %{},
      raw: payload,
      metadata: payload[:metadata] || payload["metadata"] || %{}
    })
  end

  defp normalize_ingress_request(adapter_name, _other, _opts),
    do: WebhookRequest.new(%{adapter_name: adapter_name, payload: %{}})

  defp envelope_or_nil(%EventEnvelope{} = envelope), do: envelope
  defp envelope_or_nil(_), do: nil

  defp response_or_default(%WebhookResponse{} = response), do: response
  defp response_or_default(other) when is_map(other), do: WebhookResponse.new(other)
  defp response_or_default(_), do: WebhookResponse.accepted()

  defp ingress_error(transport, adapter_name, reason, context \\ %{}) do
    Errors.to_error(%IngressError{
      transport: transport,
      adapter_name: adapter_name,
      reason: reason,
      context: context
    })
  end

  defp thread_id(adapter_name, external_room_id, nil), do: "#{adapter_name}:#{external_room_id}"

  defp thread_id(adapter_name, external_room_id, external_thread_id),
    do: "#{adapter_name}:#{external_room_id}:#{external_thread_id}"
end