defmodule Jido.Chat do
@moduledoc """
Core adapter-contract facade and lightweight event-loop state container.
`jido_chat` owns canonical chat types, adapter contracts, typed handles, and
deterministic fallback behavior. It does not define the supervised runtime or
process tree for production messaging systems; that responsibility belongs in
`jido_messaging`.
"""
alias Jido.Chat.{
ActionEvent,
Adapter,
AdapterRegistry,
AssistantContextChangedEvent,
AssistantThreadStartedEvent,
Author,
CapabilityMatrix,
ChannelRef,
Concurrency,
Emoji,
EventRouter,
EventEnvelope,
Errors,
IngressResult,
Incoming,
Message,
ModalCloseEvent,
ModalSubmitEvent,
Participant,
ReactionEvent,
Room,
Serialization,
SlashCommandEvent,
StateAdapter,
Thread,
WebhookPipeline,
WebhookRequest,
WebhookResponse
}
alias Jido.Chat.Errors.Ingress, as: IngressError
alias Jido.Chat.Content.Text
@dialyzer {:nowarn_function, normalize_ingress_event: 1}
@dialyzer {:nowarn_function, normalize_ingress_response: 1}
@dialyzer {:nowarn_function, normalize_ingress_request: 3}
@typedoc "Mention handler callback."
@type mention_handler ::
(Thread.t(), Incoming.t() -> term()) | (t(), Thread.t(), Incoming.t() -> t() | term())
@typedoc "Regex-routed message handler callback."
@type message_handler :: mention_handler()
@typedoc "Subscribed-thread handler callback."
@type subscribed_handler :: mention_handler()
@typedoc "Reaction event handler callback."
@type reaction_handler ::
(ReactionEvent.t() -> term()) | (t(), ReactionEvent.t() -> t() | term())
@typedoc "Action event handler callback."
@type action_handler :: (ActionEvent.t() -> term()) | (t(), ActionEvent.t() -> t() | term())
@typedoc "Modal submit handler callback."
@type modal_submit_handler ::
(ModalSubmitEvent.t() -> term()) | (t(), ModalSubmitEvent.t() -> t() | term())
@typedoc "Modal close handler callback."
@type modal_close_handler ::
(ModalCloseEvent.t() -> term()) | (t(), ModalCloseEvent.t() -> t() | term())
@typedoc "Slash command handler callback."
@type slash_command_handler ::
(SlashCommandEvent.t() -> term()) | (t(), SlashCommandEvent.t() -> t() | term())
@typedoc "Assistant thread started handler callback."
@type assistant_thread_started_handler ::
(AssistantThreadStartedEvent.t() -> term())
| (t(), AssistantThreadStartedEvent.t() -> t() | term())
@typedoc "Assistant context changed handler callback."
@type assistant_context_changed_handler ::
(AssistantContextChangedEvent.t() -> term())
| (t(), AssistantContextChangedEvent.t() -> t() | term())
@type handlers :: %{
mention: [mention_handler()],
message: [{Regex.t(), message_handler()}],
subscribed: [subscribed_handler()],
reaction: [reaction_handler()],
action: [action_handler()],
modal_submit: [modal_submit_handler()],
modal_close: [modal_close_handler()],
slash_command: [slash_command_handler()],
assistant_thread_started: [assistant_thread_started_handler()],
assistant_context_changed: [assistant_context_changed_handler()]
}
@type webhook_handler ::
(t(), map(), keyword() -> {:ok, t(), Incoming.t()} | {:error, term()})
@type webhook_request_handler ::
(WebhookRequest.t() | map(), keyword() ->
{:ok, t(), EventEnvelope.t() | nil, WebhookResponse.t()})
@type webhook_request_handler_with_chat ::
(t(), WebhookRequest.t() | map(), keyword() ->
{:ok, t(), EventEnvelope.t() | nil, WebhookResponse.t()})
@type route_request_handler ::
(WebhookRequest.t() | map(), keyword() ->
{:ok, IngressResult.t()} | {:error, Exception.t()})
@type t :: %__MODULE__{
id: String.t(),
user_name: String.t(),
adapters: %{optional(atom()) => module()},
state_adapter: module(),
state: term(),
subscriptions: MapSet.t(String.t()),
dedupe: MapSet.t({atom(), String.t()}),
dedupe_order: [{atom(), String.t()}],
handlers: handlers(),
metadata: map(),
thread_state: %{optional(String.t()) => map()},
channel_state: %{optional(String.t()) => map()},
initialized: boolean()
}
@default_handlers %{
mention: [],
message: [],
subscribed: [],
reaction: [],
action: [],
modal_submit: [],
modal_close: [],
slash_command: [],
assistant_thread_started: [],
assistant_context_changed: []
}
@schema Zoi.struct(
__MODULE__,
%{
id: Zoi.string(),
user_name: Zoi.string() |> Zoi.default("bot"),
adapters: Zoi.map() |> Zoi.default(%{}),
state_adapter: Zoi.any() |> Zoi.default(Jido.Chat.StateAdapters.Memory),
state: Zoi.any() |> Zoi.nullish(),
subscriptions: Zoi.any() |> Zoi.default(MapSet.new()),
dedupe: Zoi.any() |> Zoi.default(MapSet.new()),
dedupe_order: Zoi.list() |> Zoi.default([]),
handlers: Zoi.map() |> Zoi.default(@default_handlers),
metadata: Zoi.map() |> Zoi.default(%{}),
thread_state: Zoi.map() |> Zoi.default(%{}),
channel_state: Zoi.map() |> Zoi.default(%{}),
initialized: Zoi.boolean() |> Zoi.default(false)
},
coerce: true
)
@enforce_keys Zoi.Struct.enforce_keys(@schema)
defstruct Zoi.Struct.struct_fields(@schema)
@doc "Returns the Zoi schema for Chat."
def schema, do: @schema
@doc """
Creates a new chat state struct.
Supported options:
* `:id`
* `:user_name`
* `:adapters` - map `%{telegram: Jido.Chat.Telegram.Adapter, ...}`
* `:metadata`
* `:state_adapter` - state backend module, defaults to `Jido.Chat.StateAdapters.Memory`
* `:state_opts` - adapter-specific initialization options
* `:state` - explicit adapter state, overrides legacy snapshot inputs
"""
@spec new(keyword() | map()) :: t()
def new(opts \\ [])
def new(opts) when is_list(opts), do: opts |> Map.new() |> new()
def new(opts) when is_map(opts) do
state_adapter =
opts[:state_adapter] || opts["state_adapter"] || Jido.Chat.StateAdapters.Memory
state_adapter = Jido.Chat.Wire.decode_module(state_adapter) || Jido.Chat.StateAdapters.Memory
state_opts = opts[:state_opts] || opts["state_opts"] || []
state_snapshot =
opts
|> initial_state_snapshot()
|> maybe_replace_with_explicit_state(state_adapter, opts[:state] || opts["state"])
state =
case opts[:state] || opts["state"] do
nil -> StateAdapter.init(state_adapter, state_snapshot, state_opts)
explicit_state -> explicit_state
end
normalized_state_snapshot = StateAdapter.snapshot(state_adapter, state)
attrs = %{
id: opts[:id] || opts["id"] || Jido.Chat.ID.generate!(),
user_name: opts[:user_name] || opts["user_name"] || "bot",
adapters: AdapterRegistry.normalize_adapters(opts[:adapters] || opts["adapters"] || %{}),
state_adapter: state_adapter,
state: state,
metadata: opts[:metadata] || opts["metadata"] || %{},
subscriptions: normalized_state_snapshot.subscriptions,
dedupe: normalized_state_snapshot.dedupe,
dedupe_order: normalized_state_snapshot.dedupe_order,
thread_state: normalized_state_snapshot.thread_state,
channel_state: normalized_state_snapshot.channel_state
}
Jido.Chat.Schema.parse!(__MODULE__, @schema, attrs)
end
@doc "Marks chat instance as initialized and initializes adapters when available."
@spec initialize(t()) :: t()
def initialize(%__MODULE__{} = chat) do
Enum.each(chat.adapters, fn {_name, adapter} ->
_ = Adapter.initialize(adapter, chat.metadata[:adapter_opts] || [])
end)
%{chat | initialized: true}
end
@doc "Marks chat instance as shut down and shuts down adapters when available."
@spec shutdown(t()) :: t()
def shutdown(%__MODULE__{} = chat) do
Enum.each(chat.adapters, fn {_name, adapter} ->
_ = Adapter.shutdown(adapter, chat.metadata[:adapter_opts] || [])
end)
%{chat | initialized: false}
end
@doc "Registers a new-mention handler."
@spec on_new_mention(t(), mention_handler()) :: t()
def on_new_mention(%__MODULE__{} = chat, handler) when is_function(handler) do
update_in(chat.handlers.mention, &(&1 ++ [handler]))
end
@doc "Registers a new-message regex handler."
@spec on_new_message(t(), Regex.t() | String.t(), message_handler()) :: t()
def on_new_message(%__MODULE__{} = chat, %Regex{} = pattern, handler)
when is_function(handler) do
update_in(chat.handlers.message, &(&1 ++ [{pattern, handler}]))
end
def on_new_message(%__MODULE__{} = chat, pattern, handler)
when is_binary(pattern) and is_function(handler) do
on_new_message(chat, Regex.compile!(pattern), handler)
end
@doc "Registers a subscribed-thread handler."
@spec on_subscribed_message(t(), subscribed_handler()) :: t()
def on_subscribed_message(%__MODULE__{} = chat, handler) when is_function(handler) do
update_in(chat.handlers.subscribed, &(&1 ++ [handler]))
end
@doc "Registers a reaction-event handler."
@spec on_reaction(t(), reaction_handler()) :: t()
def on_reaction(%__MODULE__{} = chat, handler) when is_function(handler) do
update_in(chat.handlers.reaction, &(&1 ++ [handler]))
end
@doc "Registers a filtered reaction-event handler."
@spec on_reaction(
t(),
String.t() | atom() | [String.t() | atom()] | Regex.t(),
reaction_handler()
) ::
t()
def on_reaction(%__MODULE__{} = chat, selector, handler) when is_function(handler) do
register_filtered_handler(chat, :reaction, selector, handler)
end
@doc "Registers an action-event handler."
@spec on_action(t(), action_handler()) :: t()
def on_action(%__MODULE__{} = chat, handler) when is_function(handler) do
update_in(chat.handlers.action, &(&1 ++ [handler]))
end
@doc "Registers a filtered action-event handler."
@spec on_action(t(), String.t() | atom() | [String.t() | atom()] | Regex.t(), action_handler()) ::
t()
def on_action(%__MODULE__{} = chat, selector, handler) when is_function(handler) do
register_filtered_handler(chat, :action, selector, handler)
end
@doc "Registers a modal-submit handler."
@spec on_modal_submit(t(), modal_submit_handler()) :: t()
def on_modal_submit(%__MODULE__{} = chat, handler) when is_function(handler) do
update_in(chat.handlers.modal_submit, &(&1 ++ [handler]))
end
@doc "Registers a filtered modal-submit handler."
@spec on_modal_submit(
t(),
String.t() | atom() | [String.t() | atom()] | Regex.t(),
modal_submit_handler()
) :: t()
def on_modal_submit(%__MODULE__{} = chat, selector, handler) when is_function(handler) do
register_filtered_handler(chat, :modal_submit, selector, handler)
end
@doc "Registers a modal-close handler."
@spec on_modal_close(t(), modal_close_handler()) :: t()
def on_modal_close(%__MODULE__{} = chat, handler) when is_function(handler) do
update_in(chat.handlers.modal_close, &(&1 ++ [handler]))
end
@doc "Registers a filtered modal-close handler."
@spec on_modal_close(
t(),
String.t() | atom() | [String.t() | atom()] | Regex.t(),
modal_close_handler()
) :: t()
def on_modal_close(%__MODULE__{} = chat, selector, handler) when is_function(handler) do
register_filtered_handler(chat, :modal_close, selector, handler)
end
@doc "Registers a slash-command handler."
@spec on_slash_command(t(), slash_command_handler()) :: t()
def on_slash_command(%__MODULE__{} = chat, handler) when is_function(handler) do
update_in(chat.handlers.slash_command, &(&1 ++ [handler]))
end
@doc "Registers a filtered slash-command handler."
@spec on_slash_command(
t(),
String.t() | atom() | [String.t() | atom()] | Regex.t(),
slash_command_handler()
) :: t()
def on_slash_command(%__MODULE__{} = chat, selector, handler) when is_function(handler) do
register_filtered_handler(chat, :slash_command, selector, handler)
end
@doc "Registers assistant thread started handlers."
@spec on_assistant_thread_started(t(), assistant_thread_started_handler()) :: t()
def on_assistant_thread_started(%__MODULE__{} = chat, handler) when is_function(handler) do
update_in(chat.handlers.assistant_thread_started, &(&1 ++ [handler]))
end
@doc "Registers assistant context changed handlers."
@spec on_assistant_context_changed(t(), assistant_context_changed_handler()) :: t()
def on_assistant_context_changed(%__MODULE__{} = chat, handler) when is_function(handler) do
update_in(chat.handlers.assistant_context_changed, &(&1 ++ [handler]))
end
@doc "Returns adapter module by name."
@spec get_adapter(t(), atom()) :: {:ok, module()} | {:error, term()}
def get_adapter(%__MODULE__{} = chat, adapter_name) when is_atom(adapter_name) do
AdapterRegistry.resolve(chat, adapter_name)
end
@doc "Returns adapter-keyed request-first webhook handlers."
@spec webhooks(t()) :: %{optional(atom()) => webhook_request_handler()}
def webhooks(%__MODULE__{} = chat) do
Enum.reduce(Map.keys(chat.adapters), %{}, fn adapter_name, acc ->
Map.put(acc, adapter_name, fn request_or_payload, opts ->
handle_webhook_request(chat, adapter_name, request_or_payload, opts)
end)
end)
end
@doc "Compatibility helper returning adapter-keyed webhook handlers with explicit chat argument."
@spec webhooks_with_chat(t()) :: %{optional(atom()) => webhook_request_handler_with_chat()}
def webhooks_with_chat(%__MODULE__{} = chat) do
Enum.reduce(Map.keys(chat.adapters), %{}, fn adapter_name, acc ->
Map.put(acc, adapter_name, fn current_chat, request_or_payload, opts ->
base_chat = if match?(%__MODULE__{}, current_chat), do: current_chat, else: chat
handle_webhook_request(base_chat, adapter_name, request_or_payload, opts)
end)
end)
end
@doc """
Handles a webhook payload for the given adapter.
"""
@spec handle_webhook(t(), atom(), map(), keyword()) ::
{:ok, t(), Incoming.t()} | {:error, term()}
def handle_webhook(%__MODULE__{} = chat, adapter_name, payload, opts \\ [])
when is_atom(adapter_name) and is_map(payload) do
with {:ok, adapter_module} <- AdapterRegistry.resolve(chat, adapter_name) do
if function_exported?(adapter_module, :handle_webhook, 3) do
adapter_module.handle_webhook(chat, payload, opts)
else
Adapter.handle_webhook(adapter_module, chat, payload, opts)
end
end
end
@doc """
Routes a request-style inbound input through verification/parsing/event dispatch.
This is transport-agnostic and returns a typed `IngressResult`.
"""
@spec route_request(
t(),
atom(),
WebhookRequest.t() | map(),
keyword()
) :: {:ok, IngressResult.t()} | {:error, Exception.t()}
def route_request(%__MODULE__{} = chat, adapter_name, request_or_payload, opts \\ [])
when is_atom(adapter_name) and is_list(opts) do
with {:ok, routed_chat, envelope, response} <-
WebhookPipeline.handle_request(
chat,
adapter_name,
request_or_payload,
opts,
&AdapterRegistry.resolve/2,
&process_event/4
) do
{:ok,
IngressResult.new(%{
chat: routed_chat,
adapter_name: adapter_name,
event: normalize_ingress_event(envelope),
response: normalize_ingress_response(response),
request: normalize_ingress_request(adapter_name, request_or_payload, opts),
mode: :request,
metadata: %{transport: :request}
})}
else
{:error, reason} ->
{:error, ingress_error(:request, adapter_name, reason)}
end
rescue
exception ->
{:error, ingress_error(:request, adapter_name, {:exception, exception})}
end
@doc """
Routes an event-style inbound input (polling/gateway/listener) through `process_event/4`.
This is transport-agnostic and returns a typed `IngressResult`.
"""
@spec route_event(t(), atom(), EventEnvelope.t() | map() | :noop, keyword()) ::
{:ok, IngressResult.t()} | {:error, Exception.t()}
def route_event(chat, adapter_name, event, opts \\ [])
def route_event(%__MODULE__{} = chat, adapter_name, :noop, _opts)
when is_atom(adapter_name) do
{:ok,
IngressResult.new(%{
chat: chat,
adapter_name: adapter_name,
event: :noop,
response: nil,
request: nil,
mode: :event,
metadata: %{transport: :event}
})}
end
def route_event(%__MODULE__{} = chat, adapter_name, event, opts)
when is_atom(adapter_name) and is_list(opts) do
case process_event(chat, adapter_name, event, opts) do
{:ok, routed_chat, envelope} ->
{:ok,
IngressResult.new(%{
chat: routed_chat,
adapter_name: adapter_name,
event: envelope,
response: nil,
request: nil,
mode: :event,
metadata: %{transport: :event}
})}
{:error, reason} ->
{:error, ingress_error(:event, adapter_name, reason, %{event: event})}
end
rescue
exception ->
{:error, ingress_error(:event, adapter_name, {:exception, exception}, %{event: event})}
end
@doc """
Handles a typed webhook request for the given adapter.
Returns the updated chat state, normalized event envelope, and typed webhook response.
"""
@spec handle_webhook_request(
t(),
atom(),
WebhookRequest.t() | map(),
keyword()
) ::
{:ok, t(), EventEnvelope.t() | nil, WebhookResponse.t()}
def handle_webhook_request(%__MODULE__{} = chat, adapter_name, request_or_payload, opts \\ [])
when is_atom(adapter_name) and is_list(opts) do
with {:ok, %IngressResult{} = result} <-
route_request(chat, adapter_name, request_or_payload, opts) do
{:ok, result.chat, envelope_or_nil(result.event), response_or_default(result.response)}
end
end
@doc """
Opens a DM thread with an adapter when supported.
"""
@spec open_dm(
t(),
atom() | Author.t() | map() | String.t() | integer(),
String.t() | integer() | keyword() | map()
) :: {:ok, Thread.t()} | {:error, term()}
def open_dm(%__MODULE__{} = chat, adapter_name, external_user_id) when is_atom(adapter_name) do
with {:ok, adapter_module} <- AdapterRegistry.resolve(chat, adapter_name) do
if function_exported?(adapter_module, :open_dm, 2) do
case adapter_module.open_dm(external_user_id, []) do
{:ok, external_room_id} ->
{:ok, thread(chat, adapter_name, external_room_id, is_dm: true)}
other ->
other
end
else
{:error, :unsupported}
end
end
end
def open_dm(%__MODULE__{} = chat, target, opts) when is_list(opts) or is_map(opts) do
with {:ok, adapter_name, external_user_id} <- resolve_dm_target(chat, target, opts) do
open_dm(chat, adapter_name, external_user_id)
end
end
def open_dm(%__MODULE__{} = chat, target, []) do
with {:ok, adapter_name, external_user_id} <- resolve_dm_target(chat, target, []) do
open_dm(chat, adapter_name, external_user_id)
end
end
@doc """
Opens a native platform thread from an existing room message when supported.
"""
@spec open_thread(t(), atom(), String.t() | integer(), String.t() | integer(), keyword()) ::
{:ok, Thread.t()} | {:error, term()}
def open_thread(
%__MODULE__{} = chat,
adapter_name,
external_room_id,
external_message_id,
opts \\ []
)
when is_atom(adapter_name) and is_list(opts) do
with {:ok, adapter_module} <- AdapterRegistry.resolve(chat, adapter_name) do
Adapter.open_thread(adapter_module, external_room_id, external_message_id, opts)
end
end
@doc "Builds a channel reference from adapter + external channel id."
@spec channel(t(), atom(), String.t() | integer()) :: ChannelRef.t()
def channel(%__MODULE__{} = chat, adapter_name, external_id) when is_atom(adapter_name) do
adapter_module = AdapterRegistry.resolve!(chat, adapter_name)
ChannelRef.new(%{
id: "#{adapter_name}:#{external_id}",
adapter_name: adapter_name,
adapter: adapter_module,
external_id: external_id
})
end
@doc "Builds a thread reference from adapter + external room id."
@spec thread(t(), atom(), String.t() | integer(), keyword()) :: Thread.t()
def thread(%__MODULE__{} = chat, adapter_name, external_room_id, opts \\ [])
when is_atom(adapter_name) and is_list(opts) do
adapter_module = AdapterRegistry.resolve!(chat, adapter_name)
external_thread_id = opts[:external_thread_id] || opts[:thread_id]
Thread.new(%{
id: opts[:id] || thread_id(adapter_name, external_room_id, external_thread_id),
adapter_name: adapter_name,
adapter: adapter_module,
external_room_id: external_room_id,
external_thread_id: external_thread_id,
channel_id: "#{adapter_name}:#{external_room_id}",
is_dm: opts[:is_dm] || false,
metadata: opts[:metadata] || %{}
})
end
@doc """
Adapter-internal entrypoint for processing normalized incoming message events.
"""
@spec process_message(t(), atom(), String.t(), Incoming.t() | map(), keyword()) ::
{:ok, t(), Incoming.t()} | {:error, term()}
def process_message(%__MODULE__{} = chat, adapter_name, thread_id, incoming, opts \\ [])
when is_atom(adapter_name) and is_binary(thread_id) and is_list(opts) do
EventRouter.process_message(
chat,
adapter_name,
thread_id,
incoming,
fn current_chat, normalized_incoming, resolved_thread_id ->
thread(
current_chat,
adapter_name,
normalized_incoming.external_room_id,
thread_id: normalized_incoming.external_thread_id,
id: resolved_thread_id
)
end
)
end
@doc "Processes normalized reaction events and dispatches handlers."
@spec process_reaction(t(), atom(), ReactionEvent.t() | map(), keyword()) ::
{:ok, t(), ReactionEvent.t()} | {:error, term()}
def process_reaction(%__MODULE__{} = chat, adapter_name, event, opts \\ [])
when is_atom(adapter_name) and is_list(opts) do
with {:ok, reaction} <- EventRouter.ensure_reaction_event(event, adapter_name) do
reaction = enrich_event_context(chat, adapter_name, reaction)
{:ok, EventRouter.run_event_handlers(chat, chat.handlers.reaction, reaction), reaction}
end
end
@doc "Processes normalized action events and dispatches handlers."
@spec process_action(t(), atom(), ActionEvent.t() | map(), keyword()) ::
{:ok, t(), ActionEvent.t()} | {:error, term()}
def process_action(%__MODULE__{} = chat, adapter_name, event, opts \\ [])
when is_atom(adapter_name) and is_list(opts) do
with {:ok, action} <- EventRouter.ensure_action_event(event, adapter_name) do
action = enrich_event_context(chat, adapter_name, action)
{:ok, EventRouter.run_event_handlers(chat, chat.handlers.action, action), action}
end
end
@doc "Processes normalized modal submit events and dispatches handlers."
@spec process_modal_submit(t(), atom(), ModalSubmitEvent.t() | map(), keyword()) ::
{:ok, t(), ModalSubmitEvent.t()} | {:error, term()}
def process_modal_submit(%__MODULE__{} = chat, adapter_name, event, opts \\ [])
when is_atom(adapter_name) and is_list(opts) do
with {:ok, modal_submit} <- EventRouter.ensure_modal_submit_event(event, adapter_name) do
modal_submit = enrich_event_context(chat, adapter_name, modal_submit)
{:ok, EventRouter.run_event_handlers(chat, chat.handlers.modal_submit, modal_submit), modal_submit}
end
end
@doc "Processes normalized modal close events and dispatches handlers."
@spec process_modal_close(t(), atom(), ModalCloseEvent.t() | map(), keyword()) ::
{:ok, t(), ModalCloseEvent.t()} | {:error, term()}
def process_modal_close(%__MODULE__{} = chat, adapter_name, event, opts \\ [])
when is_atom(adapter_name) and is_list(opts) do
with {:ok, modal_close} <- EventRouter.ensure_modal_close_event(event, adapter_name) do
modal_close = enrich_event_context(chat, adapter_name, modal_close)
{:ok, EventRouter.run_event_handlers(chat, chat.handlers.modal_close, modal_close), modal_close}
end
end
@doc "Processes normalized slash command events and dispatches handlers."
@spec process_slash_command(t(), atom(), SlashCommandEvent.t() | map(), keyword()) ::
{:ok, t(), SlashCommandEvent.t()} | {:error, term()}
def process_slash_command(%__MODULE__{} = chat, adapter_name, event, opts \\ [])
when is_atom(adapter_name) and is_list(opts) do
with {:ok, slash_command} <- EventRouter.ensure_slash_command_event(event, adapter_name) do
slash_command = enrich_event_context(chat, adapter_name, slash_command)
{:ok, EventRouter.run_event_handlers(chat, chat.handlers.slash_command, slash_command), slash_command}
end
end
@doc """
Canonical typed event router used by webhook and gateway ingestion.
"""
@spec process_event(t(), atom(), EventEnvelope.t() | map(), keyword()) ::
{:ok, t(), EventEnvelope.t()} | {:error, term()}
def process_event(%__MODULE__{} = chat, adapter_name, event, opts \\ [])
when is_atom(adapter_name) and is_list(opts) do
dispatchers = %{
process_message: &process_message/5,
process_reaction: &process_reaction/4,
process_action: &process_action/4,
process_modal_submit: &process_modal_submit/4,
process_modal_close: &process_modal_close/4,
process_slash_command: &process_slash_command/4,
process_assistant_thread_started: &process_assistant_thread_started/3,
process_assistant_context_changed: &process_assistant_context_changed/3
}
with {:ok, envelope} <- EventRouter.ensure_event_envelope(event, adapter_name),
{:ok, routed_chat, routed_payload} <-
EventRouter.route_event(chat, adapter_name, envelope, opts, dispatchers) do
{:ok, routed_chat, EventRouter.with_envelope_payload(envelope, routed_payload)}
end
end
@doc "Processes assistant thread started events and dispatches handlers."
@spec process_assistant_thread_started(
t(),
atom(),
AssistantThreadStartedEvent.t() | map()
) ::
{:ok, t(), AssistantThreadStartedEvent.t()} | {:error, term()}
def process_assistant_thread_started(%__MODULE__{} = chat, adapter_name, event)
when is_atom(adapter_name) do
with {:ok, assistant_event} <-
EventRouter.ensure_assistant_thread_started_event(event, adapter_name) do
assistant_event = enrich_event_context(chat, adapter_name, assistant_event)
{:ok,
EventRouter.run_event_handlers(
chat,
chat.handlers.assistant_thread_started,
assistant_event
), assistant_event}
end
end
@doc "Processes assistant context changed events and dispatches handlers."
@spec process_assistant_context_changed(
t(),
atom(),
AssistantContextChangedEvent.t() | map()
) ::
{:ok, t(), AssistantContextChangedEvent.t()} | {:error, term()}
def process_assistant_context_changed(%__MODULE__{} = chat, adapter_name, event)
when is_atom(adapter_name) do
with {:ok, assistant_event} <-
EventRouter.ensure_assistant_context_changed_event(event, adapter_name) do
assistant_event = enrich_event_context(chat, adapter_name, assistant_event)
{:ok,
EventRouter.run_event_handlers(
chat,
chat.handlers.assistant_context_changed,
assistant_event
), assistant_event}
end
end
@doc "Returns adapter capability matrix wrapped in typed struct."
@spec adapter_capabilities(t(), atom()) :: {:ok, CapabilityMatrix.t()} | {:error, term()}
def adapter_capabilities(%__MODULE__{} = chat, adapter_name) when is_atom(adapter_name) do
with {:ok, adapter_module} <- AdapterRegistry.resolve(chat, adapter_name) do
{:ok,
CapabilityMatrix.new(%{
adapter_name: adapter_name,
capabilities: Adapter.capabilities(adapter_module)
})}
end
end
@doc "Returns true when a thread id is currently subscribed."
@spec subscribed?(t(), String.t()) :: boolean()
def subscribed?(%__MODULE__{} = chat, thread_id) when is_binary(thread_id) do
StateAdapter.subscribed?(chat.state_adapter, chat.state, thread_id)
end
@doc "Subscribes a thread id."
@spec subscribe(t(), String.t()) :: t()
def subscribe(%__MODULE__{} = chat, thread_id) when is_binary(thread_id) do
chat.state_adapter
|> StateAdapter.subscribe(chat.state, thread_id)
|> sync_state(chat)
end
@doc "Returns normalized overlapping-message concurrency config."
@spec concurrency(t()) :: Concurrency.t()
def concurrency(%__MODULE__{} = chat) do
Concurrency.new(chat.metadata[:concurrency] || chat.metadata["concurrency"] || %{})
end
@doc "Updates chat-level concurrency configuration."
@spec configure_concurrency(t(), keyword() | map()) :: t()
def configure_concurrency(%__MODULE__{} = chat, opts) when is_list(opts) or is_map(opts) do
config = Concurrency.new(opts)
metadata = Map.put(chat.metadata || %{}, :concurrency, Map.from_struct(config))
%{chat | metadata: metadata}
end
@doc "Returns the current concurrency lock snapshot."
@spec lock_snapshot(t()) :: %{locks: map(), pending_locks: map()}
def lock_snapshot(%__MODULE__{} = chat) do
snapshot = StateAdapter.snapshot(chat.state_adapter, chat.state)
%{locks: snapshot.locks, pending_locks: snapshot.pending_locks}
end
@doc "Attempts to acquire a concurrency lock for a message-processing key."
@spec acquire_lock(t(), String.t(), String.t(), keyword() | map()) ::
{:acquired | :queued | :debounced | :busy, t()}
def acquire_lock(%__MODULE__{} = chat, key, owner, opts \\ [])
when is_binary(key) and is_binary(owner) do
opts = if is_list(opts), do: Map.new(opts), else: opts
config = opts[:concurrency] || opts["concurrency"] || concurrency(chat)
config = Concurrency.new(config)
metadata = opts[:metadata] || opts["metadata"] || %{}
{result, state} =
StateAdapter.lock(
chat.state_adapter,
chat.state,
key,
owner,
config.strategy,
metadata
)
{result, sync_state(state, chat)}
end
@doc "Releases a held concurrency lock and returns queued/debounced entries."
@spec release_lock(t(), String.t(), String.t()) ::
{{:released, [map()]} | {:error, :not_owner}, t()}
def release_lock(%__MODULE__{} = chat, key, owner) when is_binary(key) and is_binary(owner) do
{result, state} = StateAdapter.release_lock(chat.state_adapter, chat.state, key, owner)
{result, sync_state(state, chat)}
end
@doc "Force-releases a concurrency lock regardless of owner."
@spec force_release_lock(t(), String.t()) :: {{:released, [map()]}, t()}
def force_release_lock(%__MODULE__{} = chat, key) when is_binary(key) do
{result, state} = StateAdapter.force_release_lock(chat.state_adapter, chat.state, key)
{result, sync_state(state, chat)}
end
@doc "Unsubscribes a thread id."
@spec unsubscribe(t(), String.t()) :: t()
def unsubscribe(%__MODULE__{} = chat, thread_id) when is_binary(thread_id) do
chat.state_adapter
|> StateAdapter.unsubscribe(chat.state, thread_id)
|> sync_state(chat)
end
@doc "Gets thread state map by id."
@spec thread_state(t(), String.t()) :: map()
def thread_state(%__MODULE__{} = chat, thread_id) when is_binary(thread_id) do
StateAdapter.thread_state(chat.state_adapter, chat.state, thread_id)
end
@doc "Sets thread state map by id."
@spec put_thread_state(t(), String.t(), map()) :: t()
def put_thread_state(%__MODULE__{} = chat, thread_id, state) when is_map(state) do
chat.state_adapter
|> StateAdapter.put_thread_state(chat.state, thread_id, state)
|> sync_state(chat)
end
@doc "Gets channel state map by id."
@spec channel_state(t(), String.t()) :: map()
def channel_state(%__MODULE__{} = chat, channel_id) when is_binary(channel_id) do
StateAdapter.channel_state(chat.state_adapter, chat.state, channel_id)
end
@doc "Sets channel state map by id."
@spec put_channel_state(t(), String.t(), map()) :: t()
def put_channel_state(%__MODULE__{} = chat, channel_id, state) when is_map(state) do
chat.state_adapter
|> StateAdapter.put_channel_state(chat.state, channel_id, state)
|> sync_state(chat)
end
@doc false
@spec duplicate?(t(), {atom(), String.t()} | nil) :: boolean()
def duplicate?(%__MODULE__{}, nil), do: false
def duplicate?(%__MODULE__{} = chat, {adapter_name, message_id})
when is_atom(adapter_name) and is_binary(message_id) do
StateAdapter.duplicate?(chat.state_adapter, chat.state, {adapter_name, message_id})
end
@doc false
@spec mark_dedupe(t(), {atom(), String.t()} | nil) :: t()
def mark_dedupe(%__MODULE__{} = chat, nil), do: chat
def mark_dedupe(%__MODULE__{} = chat, {adapter_name, message_id})
when is_atom(adapter_name) and is_binary(message_id) do
dedupe_limit = dedupe_limit(chat)
chat.state_adapter
|> StateAdapter.mark_dedupe(chat.state, {adapter_name, message_id}, dedupe_limit)
|> sync_state(chat)
end
@doc "Creates a normalized Chat SDK-style message."
@spec message(map()) :: Message.t()
def message(attrs), do: Message.new(attrs)
@spec new_room(map()) :: Room.t()
def new_room(attrs), do: Room.new(attrs)
@spec new_participant(map()) :: Participant.t()
def new_participant(attrs), do: Participant.new(attrs)
@spec text(String.t()) :: Text.t()
def text(value), do: Text.new(value)
@doc "Resolves a cross-platform emoji token into a rendered value."
@spec emoji(String.t() | atom(), keyword()) :: String.t()
def emoji(value, opts \\ []), do: Emoji.render(value, opts)
@doc "Serializes chat state to a revivable map."
@spec to_map(t()) :: map()
def to_map(%__MODULE__{} = chat), do: Serialization.to_map(chat)
@doc "Builds chat state from serialized map."
@spec from_map(map()) :: t()
def from_map(map) when is_map(map), do: Serialization.from_map(map)
@doc "Returns a reviver function for serialized core structs."
@spec reviver() :: (map() -> term())
def reviver, do: Serialization.reviver()
@doc false
@spec revive(map()) :: term()
def revive(map), do: Serialization.revive(map)
defp register_filtered_handler(%__MODULE__{} = chat, key, selector, handler) do
normalized = normalize_handler_selector(selector)
update_in(chat.handlers[key], &(&1 ++ [{normalized, handler}]))
end
defp normalize_handler_selector(selectors) when is_list(selectors),
do: Enum.map(selectors, &normalize_handler_selector/1)
defp normalize_handler_selector(selector) when is_atom(selector) and selector != :all,
do: Atom.to_string(selector)
defp normalize_handler_selector(selector), do: selector
defp resolve_dm_target(%__MODULE__{} = chat, %Author{} = author, opts) do
adapter_name =
opts[:adapter_name] ||
author.metadata[:adapter_name] ||
author.metadata["adapter_name"] ||
infer_single_adapter(chat)
with {:ok, adapter_name} <- normalize_adapter_name(chat, adapter_name) do
{:ok, adapter_name, author.user_id}
end
end
defp resolve_dm_target(%__MODULE__{} = chat, target, opts) when is_map(target) do
resolve_dm_target(chat, Author.new(target), opts)
rescue
_ -> {:error, :invalid_dm_target}
end
defp resolve_dm_target(%__MODULE__{} = chat, target, opts)
when is_binary(target) or is_integer(target) do
case parse_adapter_prefixed_target(chat, target) do
{:ok, adapter_name, external_user_id} ->
{:ok, adapter_name, external_user_id}
:error ->
with {:ok, adapter_name} <-
normalize_adapter_name(chat, opts[:adapter_name] || infer_single_adapter(chat)) do
{:ok, adapter_name, to_string(target)}
end
end
end
defp resolve_dm_target(_chat, _target, _opts), do: {:error, :invalid_dm_target}
defp normalize_adapter_name(_chat, adapter_name) when is_atom(adapter_name),
do: {:ok, adapter_name}
defp normalize_adapter_name(_chat, nil), do: {:error, :ambiguous_adapter}
defp normalize_adapter_name(chat, adapter_name) when is_binary(adapter_name) do
adapter_atom = String.to_existing_atom(adapter_name)
normalize_adapter_name(chat, adapter_atom)
rescue
ArgumentError -> {:error, :ambiguous_adapter}
end
defp infer_single_adapter(%__MODULE__{adapters: adapters}) when map_size(adapters) == 1 do
adapters |> Map.keys() |> List.first()
end
defp infer_single_adapter(_chat), do: nil
defp parse_adapter_prefixed_target(%__MODULE__{adapters: adapters}, target)
when is_binary(target) do
case String.split(target, ":", parts: 2) do
[adapter_name, external_user_id] when external_user_id != "" ->
adapter_atom = String.to_atom(adapter_name)
if Map.has_key?(adapters, adapter_atom) do
{:ok, adapter_atom, external_user_id}
else
:error
end
_ ->
:error
end
end
defp parse_adapter_prefixed_target(_chat, _target), do: :error
defp enrich_event_context(%__MODULE__{} = chat, adapter_name, event) do
adapter = Map.get(chat.adapters, adapter_name)
channel = Map.get(event, :channel) || build_channel_handle(chat, adapter_name, event)
thread = Map.get(event, :thread) || build_thread_handle(chat, adapter_name, event, channel)
message = Map.get(event, :message) || build_message_handle(event, thread, channel)
related_channel =
Map.get(event, :related_channel) || build_related_channel_handle(chat, adapter_name, event)
related_thread =
Map.get(event, :related_thread) ||
build_related_thread_handle(chat, adapter_name, event, related_channel)
related_message =
Map.get(event, :related_message) ||
build_related_message_handle(event, related_thread, related_channel)
struct(event,
adapter: adapter,
thread: thread,
channel: channel,
message: message,
related_thread: related_thread,
related_channel: related_channel,
related_message: related_message,
thread_id: Map.get(event, :thread_id) || (thread && thread.id),
channel_id: Map.get(event, :channel_id) || (channel && channel.id),
message_id: Map.get(event, :message_id) || (message && message.id)
)
end
defp build_channel_handle(%__MODULE__{} = chat, adapter_name, event) do
case channel_external_id(
adapter_name,
Map.get(event, :channel_id) || Map.get(event, :thread_id)
) do
nil -> nil
external_id -> channel(chat, adapter_name, external_id)
end
end
defp build_thread_handle(%__MODULE__{} = chat, adapter_name, event, channel) do
cond do
is_binary(Map.get(event, :thread_id)) ->
case parse_thread_identifier(adapter_name, Map.get(event, :thread_id)) do
{external_room_id, external_thread_id} ->
thread(chat, adapter_name, external_room_id,
id: Map.get(event, :thread_id),
external_thread_id: external_thread_id,
is_dm: is_dm_channel?(channel)
)
_ ->
nil
end
match?(%ChannelRef{}, channel) ->
thread(chat, adapter_name, channel.external_id,
id: "#{adapter_name}:#{channel.external_id}",
is_dm: is_dm_channel?(channel)
)
true ->
nil
end
end
defp build_message_handle(event, thread, channel) do
case Map.get(event, :message_id) do
nil ->
nil
message_id ->
external_room_id =
cond do
match?(%Thread{}, thread) -> thread.external_room_id
match?(%ChannelRef{}, channel) -> channel.external_id
true -> nil
end
Message.new(%{
id: to_string(message_id),
thread_id: thread && thread.id,
channel_id: channel && channel.id,
external_room_id: external_room_id,
external_message_id: to_string(message_id)
})
end
end
defp build_related_channel_handle(%__MODULE__{} = chat, adapter_name, event) do
related_channel_id =
Map.get(event.metadata, :related_channel_id) ||
Map.get(event.metadata, "related_channel_id")
case channel_external_id(adapter_name, related_channel_id) do
nil -> nil
external_id -> channel(chat, adapter_name, external_id)
end
end
defp build_related_thread_handle(%__MODULE__{} = chat, adapter_name, event, related_channel) do
related_thread_id =
Map.get(event.metadata, :related_thread_id) || Map.get(event.metadata, "related_thread_id")
cond do
is_binary(related_thread_id) ->
case parse_thread_identifier(adapter_name, related_thread_id) do
{external_room_id, external_thread_id} ->
thread(chat, adapter_name, external_room_id,
id: related_thread_id,
external_thread_id: external_thread_id,
is_dm: is_dm_channel?(related_channel)
)
_ ->
nil
end
match?(%ChannelRef{}, related_channel) ->
thread(chat, adapter_name, related_channel.external_id,
id: "#{adapter_name}:#{related_channel.external_id}",
is_dm: is_dm_channel?(related_channel)
)
true ->
nil
end
end
defp build_related_message_handle(event, thread, channel) do
related_message_id =
Map.get(event.metadata, :related_message_id) ||
Map.get(event.metadata, "related_message_id")
if is_binary(related_message_id) do
Message.new(%{
id: related_message_id,
thread_id: thread && thread.id,
channel_id: channel && channel.id,
external_room_id:
cond do
match?(%Thread{}, thread) -> thread.external_room_id
match?(%ChannelRef{}, channel) -> channel.external_id
true -> nil
end,
external_message_id: related_message_id
})
end
end
defp parse_thread_identifier(adapter_name, thread_id) when is_binary(thread_id) do
prefix = "#{adapter_name}:"
if String.starts_with?(thread_id, prefix) do
rest = String.trim_leading(thread_id, prefix)
case String.split(rest, ":", parts: 2) do
[external_room_id, external_thread_id] -> {external_room_id, external_thread_id}
[external_room_id] -> {external_room_id, nil}
_ -> nil
end
end
end
defp parse_thread_identifier(_adapter_name, _thread_id), do: nil
defp channel_external_id(adapter_name, channel_id) when is_binary(channel_id) do
prefix = "#{adapter_name}:"
if String.starts_with?(channel_id, prefix) do
String.trim_leading(channel_id, prefix)
|> String.split(":", parts: 2)
|> List.first()
end
end
defp channel_external_id(_adapter_name, _channel_id), do: nil
defp is_dm_channel?(%ChannelRef{metadata: metadata}) do
metadata[:is_dm] || metadata["is_dm"] || false
end
defp is_dm_channel?(_channel), do: false
defp sync_state(state, %__MODULE__{} = chat) do
snapshot = StateAdapter.snapshot(chat.state_adapter, state)
%{
chat
| state: state,
subscriptions: snapshot.subscriptions,
dedupe: snapshot.dedupe,
dedupe_order: snapshot.dedupe_order,
thread_state: snapshot.thread_state,
channel_state: snapshot.channel_state
}
end
defp initial_state_snapshot(opts) do
%{
subscriptions: opts[:subscriptions] || opts["subscriptions"] || MapSet.new(),
dedupe: opts[:dedupe] || opts["dedupe"] || MapSet.new(),
dedupe_order: opts[:dedupe_order] || opts["dedupe_order"] || [],
thread_state: opts[:thread_state] || opts["thread_state"] || %{},
channel_state: opts[:channel_state] || opts["channel_state"] || %{},
locks: opts[:locks] || opts["locks"] || %{},
pending_locks: opts[:pending_locks] || opts["pending_locks"] || %{}
}
|> StateAdapter.normalize_snapshot()
end
defp maybe_replace_with_explicit_state(snapshot, _state_adapter, nil), do: snapshot
defp maybe_replace_with_explicit_state(_snapshot, state_adapter, explicit_state) do
StateAdapter.snapshot(state_adapter, explicit_state)
end
defp dedupe_limit(chat) do
metadata = Map.get(chat, :metadata, %{})
value =
case metadata do
%{} -> metadata[:dedupe_limit] || metadata["dedupe_limit"]
_ -> nil
end
if is_integer(value) and value > 0, do: value, else: 1_000
end
defp normalize_ingress_event(%EventEnvelope{} = envelope), do: envelope
defp normalize_ingress_event(nil), do: nil
defp normalize_ingress_event(:noop), do: :noop
defp normalize_ingress_event(other), do: other
defp normalize_ingress_response(%WebhookResponse{} = response), do: response
defp normalize_ingress_response(nil), do: nil
defp normalize_ingress_response(other) when is_map(other), do: WebhookResponse.new(other)
defp normalize_ingress_response(_other), do: WebhookResponse.accepted()
defp normalize_ingress_request(_adapter_name, %WebhookRequest{} = request, _opts), do: request
defp normalize_ingress_request(adapter_name, payload, opts) when is_map(payload) do
payload_map = payload[:payload] || payload["payload"] || payload
headers = opts[:headers] || payload[:headers] || payload["headers"] || %{}
WebhookRequest.new(%{
adapter_name: adapter_name,
method: payload[:method] || payload["method"] || opts[:method] || "POST",
path: payload[:path] || payload["path"] || opts[:path],
headers: headers,
payload: payload_map,
query: payload[:query] || payload["query"] || opts[:query] || %{},
raw: payload,
metadata: payload[:metadata] || payload["metadata"] || %{}
})
end
defp normalize_ingress_request(adapter_name, _other, _opts),
do: WebhookRequest.new(%{adapter_name: adapter_name, payload: %{}})
defp envelope_or_nil(%EventEnvelope{} = envelope), do: envelope
defp envelope_or_nil(_), do: nil
defp response_or_default(%WebhookResponse{} = response), do: response
defp response_or_default(other) when is_map(other), do: WebhookResponse.new(other)
defp response_or_default(_), do: WebhookResponse.accepted()
defp ingress_error(transport, adapter_name, reason, context \\ %{}) do
Errors.to_error(%IngressError{
transport: transport,
adapter_name: adapter_name,
reason: reason,
context: context
})
end
defp thread_id(adapter_name, external_room_id, nil), do: "#{adapter_name}:#{external_room_id}"
defp thread_id(adapter_name, external_room_id, external_thread_id),
do: "#{adapter_name}:#{external_room_id}:#{external_thread_id}"
end