defmodule Jido.Chat.Adapter do
@moduledoc """
Canonical adapter behavior for Chat SDK style integrations.
Thread-aware channel contract for Chat SDK integrations.
"""
alias Jido.Chat.{
Card,
CapabilityMatrix,
ChannelInfo,
EventEnvelope,
EphemeralMessage,
FileUpload,
FetchOptions,
Incoming,
Markdown,
Modal,
ModalResult,
Message,
MessagePage,
PostPayload,
Postable,
Response,
WebhookRequest,
WebhookResponse,
StreamChunk,
Thread,
ThreadPage
}
@type raw_payload :: map()
@type external_room_id :: String.t() | integer()
@type external_user_id :: String.t() | integer()
@type external_message_id :: String.t() | integer()
@type sink_mfa :: {module(), atom(), [term()]}
@type listener_opts :: keyword()
@type capability_status :: :native | :fallback | :unsupported
@type capability_matrix :: %{optional(atom()) => capability_status()}
@type send_result :: {:ok, Response.t()} | {:error, term()}
@type incoming_result :: {:ok, Incoming.t()} | {:error, term()}
@type delete_result :: :ok | {:error, term()}
@type typing_result :: :ok | {:error, term()}
@type reaction_result :: :ok | {:error, term()}
@type metadata_result :: {:ok, ChannelInfo.t()} | {:error, term()}
@type message_result :: {:ok, Message.t()} | {:error, term()}
@type message_page_result :: {:ok, MessagePage.t()} | {:error, term()}
@type thread_result :: {:ok, Thread.t()} | {:error, term()}
@type thread_page_result :: {:ok, ThreadPage.t()} | {:error, term()}
@type ephemeral_result :: {:ok, EphemeralMessage.t()} | {:error, term()}
@type modal_result :: {:ok, ModalResult.t()} | {:error, term()}
@type file_input :: FileUpload.input()
@callback channel_type() :: atom()
@callback transform_incoming(raw_payload()) :: incoming_result() | {:ok, map()}
@callback send_message(external_room_id(), text :: String.t(), opts :: keyword()) ::
send_result() | {:ok, map()} | {:error, term()}
@callback send_file(external_room_id(), file :: file_input(), opts :: keyword()) ::
send_result() | {:ok, map()} | {:error, term()}
@callback post_message(external_room_id(), payload :: PostPayload.t(), opts :: keyword()) ::
send_result() | {:ok, map()} | {:error, term()}
@callback edit_message(
external_room_id(),
external_message_id(),
text :: String.t(),
opts :: keyword()
) ::
send_result() | {:ok, map()} | {:error, term()}
@callback initialize(opts :: keyword()) :: :ok | {:ok, term()} | {:error, term()}
@callback shutdown(opts :: keyword()) :: :ok | {:ok, term()} | {:error, term()}
@callback delete_message(external_room_id(), external_message_id(), opts :: keyword()) ::
:ok | {:ok, term()} | {:error, term()}
@callback start_typing(external_room_id(), opts :: keyword()) ::
:ok | {:ok, term()} | {:error, term()}
@callback fetch_metadata(external_room_id(), opts :: keyword()) ::
{:ok, ChannelInfo.t() | map()} | {:error, term()}
@callback fetch_thread(external_room_id(), opts :: keyword()) ::
{:ok, Thread.t() | map()} | {:error, term()}
@callback fetch_message(external_room_id(), external_message_id(), opts :: keyword()) ::
{:ok, Message.t() | Incoming.t() | map()} | {:error, term()}
@callback add_reaction(
external_room_id(),
external_message_id(),
emoji :: String.t(),
opts :: keyword()
) ::
:ok | {:ok, term()} | {:error, term()}
@callback remove_reaction(
external_room_id(),
external_message_id(),
emoji :: String.t(),
opts :: keyword()
) ::
:ok | {:ok, term()} | {:error, term()}
@callback post_ephemeral(
external_room_id(),
external_user_id(),
text :: String.t(),
opts :: keyword()
) ::
{:ok, EphemeralMessage.t() | map()} | {:error, term()}
@callback post_channel_message(external_room_id(), text :: String.t(), opts :: keyword()) ::
send_result() | {:ok, map()} | {:error, term()}
@callback stream(external_room_id(), stream :: Enumerable.t(), opts :: keyword()) ::
send_result() | {:ok, map()} | {:error, term()}
@callback open_modal(external_room_id(), payload :: map(), opts :: keyword()) ::
{:ok, ModalResult.t() | map()} | {:error, term()}
@callback fetch_messages(external_room_id(), opts :: keyword()) ::
{:ok, MessagePage.t() | map()} | {:error, term()}
@callback fetch_channel_messages(external_room_id(), opts :: keyword()) ::
{:ok, MessagePage.t() | map()} | {:error, term()}
@callback list_threads(external_room_id(), opts :: keyword()) ::
{:ok, ThreadPage.t() | map()} | {:error, term()}
@callback open_thread(external_room_id(), external_message_id(), opts :: keyword()) ::
{:ok, Thread.t() | map()} | {:error, term()}
@callback open_dm(external_user_id(), opts :: keyword()) ::
{:ok, external_room_id()} | {:error, term()}
@callback handle_webhook(chat :: Jido.Chat.t(), raw_payload(), opts :: keyword()) ::
{:ok, Jido.Chat.t(), Incoming.t()} | {:error, term()}
@callback verify_webhook(WebhookRequest.t() | map(), opts :: keyword()) ::
:ok | {:error, term()}
@callback parse_event(WebhookRequest.t() | map(), opts :: keyword()) ::
{:ok, EventEnvelope.t() | map() | :noop | nil} | {:error, term()}
@callback format_webhook_response(term(), opts :: keyword()) ::
WebhookResponse.t() | map() | {:ok, WebhookResponse.t() | map()} | {:error, term()}
@doc """
Optional listener child-spec callback for adapter-owned ingress workers.
Listener workers should emit inbound payloads/events through a sink MFA provided
in `opts` to avoid coupling adapter packages to runtime implementations.
Expected listener opts keys:
* `:sink_mfa` - sink callback MFA, typically `{Module, :function, [base_args...]}`
* `:bridge_id` - configured bridge identifier
* `:bridge_config` - resolved bridge config struct/map
* `:instance_module` - runtime instance module (opaque to adapters)
* `:settings` - adapter-specific ingress settings map
* `:ingress` - normalized ingress mode/settings map
"""
@callback listener_child_specs(bridge_id :: String.t(), opts :: listener_opts()) ::
{:ok, [Supervisor.child_spec()]} | {:error, term()}
@callback capabilities() :: capability_matrix()
@optional_callbacks initialize: 1,
shutdown: 1,
send_file: 3,
post_message: 3,
edit_message: 4,
delete_message: 3,
start_typing: 2,
fetch_metadata: 2,
fetch_thread: 2,
fetch_message: 3,
add_reaction: 4,
remove_reaction: 4,
post_ephemeral: 4,
post_channel_message: 3,
stream: 3,
open_modal: 3,
fetch_messages: 2,
fetch_channel_messages: 2,
list_threads: 2,
open_thread: 3,
open_dm: 2,
handle_webhook: 3,
verify_webhook: 2,
parse_event: 2,
format_webhook_response: 2,
listener_child_specs: 2,
capabilities: 0
defmacro __using__(_opts) do
quote do
@behaviour Jido.Chat.Adapter
@impl true
def channel_type do
__MODULE__
|> Module.split()
|> List.last()
|> Macro.underscore()
|> String.to_atom()
end
defoverridable channel_type: 0
end
end
@doc "Initializes adapter resources when supported."
@spec initialize(module(), keyword()) :: :ok | {:error, term()}
def initialize(adapter_module, opts \\ []) do
if callback_exported?(adapter_module, :initialize, 1) do
case adapter_module.initialize(opts) do
:ok -> :ok
{:ok, _} -> :ok
{:error, _} = error -> error
_ -> {:error, :invalid_initialize_result}
end
else
:ok
end
end
@doc "Shuts down adapter resources when supported."
@spec shutdown(module(), keyword()) :: :ok | {:error, term()}
def shutdown(adapter_module, opts \\ []) do
if callback_exported?(adapter_module, :shutdown, 1) do
case adapter_module.shutdown(opts) do
:ok -> :ok
{:ok, _} -> :ok
{:error, _} = error -> error
_ -> {:error, :invalid_shutdown_result}
end
else
:ok
end
end
@doc "Returns capability matrix for adapter-native vs fallback support."
@spec capabilities(module()) :: capability_matrix()
def capabilities(adapter_module) do
if callback_exported?(adapter_module, :capabilities, 0) do
adapter_module.capabilities()
|> normalize_capability_matrix()
|> ensure_capability_defaults(adapter_module)
else
%{
initialize: support_status(adapter_module, :initialize, 1, :fallback),
shutdown: support_status(adapter_module, :shutdown, 1, :fallback),
send_message: :native,
send_file: support_status(adapter_module, :send_file, 3),
post_message: support_status(adapter_module, :post_message, 3),
edit_message: support_status(adapter_module, :edit_message, 4),
delete_message: support_status(adapter_module, :delete_message, 3),
start_typing: support_status(adapter_module, :start_typing, 2),
fetch_metadata: support_status(adapter_module, :fetch_metadata, 2, :fallback),
fetch_thread: support_status(adapter_module, :fetch_thread, 2, :fallback),
fetch_message: support_status(adapter_module, :fetch_message, 3, :fallback),
add_reaction: support_status(adapter_module, :add_reaction, 4),
remove_reaction: support_status(adapter_module, :remove_reaction, 4),
post_ephemeral: support_status(adapter_module, :post_ephemeral, 4),
open_dm: support_status(adapter_module, :open_dm, 2),
fetch_messages: support_status(adapter_module, :fetch_messages, 2),
fetch_channel_messages: support_status(adapter_module, :fetch_channel_messages, 2),
list_threads: support_status(adapter_module, :list_threads, 2),
open_thread: support_status(adapter_module, :open_thread, 3),
post_channel_message: support_status(adapter_module, :post_channel_message, 3, :fallback),
stream: support_status(adapter_module, :stream, 3, :fallback),
open_modal: support_status(adapter_module, :open_modal, 3),
webhook: support_status(adapter_module, :handle_webhook, 3, :fallback),
verify_webhook: support_status(adapter_module, :verify_webhook, 2, :fallback),
parse_event: support_status(adapter_module, :parse_event, 2, :fallback),
format_webhook_response: support_status(adapter_module, :format_webhook_response, 2, :fallback)
}
|> ensure_capability_defaults(adapter_module)
end
end
@doc "Normalizes adapter inbound transformation to `Jido.Chat.Incoming`."
@spec transform_incoming(module(), raw_payload()) :: incoming_result()
def transform_incoming(adapter_module, payload)
when is_atom(adapter_module) and is_map(payload) do
with {:ok, incoming} <- adapter_module.transform_incoming(payload) do
{:ok, normalize_incoming(incoming)}
end
end
@doc "Normalizes adapter send results to `Jido.Chat.Response`."
@spec send_message(module(), external_room_id(), String.t(), keyword()) :: send_result()
def send_message(adapter_module, external_room_id, text, opts \\ []) do
with {:ok, response} <- adapter_module.send_message(external_room_id, text, opts) do
{:ok, normalize_response(adapter_module, response)}
end
end
@doc "Uploads and sends a file when supported by the adapter."
@spec send_file(module(), external_room_id(), file_input(), keyword()) :: send_result()
def send_file(adapter_module, external_room_id, file, opts \\ []) do
if callback_exported?(adapter_module, :send_file, 3) do
with {:ok, response} <- adapter_module.send_file(external_room_id, file, opts) do
{:ok, normalize_response(adapter_module, response)}
end
else
{:error, :unsupported}
end
end
@doc "Posts a normalized outbound payload using adapter-native or core fallback behavior."
@spec post_message(module(), external_room_id(), PostPayload.t() | map(), keyword()) ::
send_result()
def post_message(adapter_module, external_room_id, payload, opts \\ [])
def post_message(adapter_module, external_room_id, %PostPayload{} = payload, opts) do
scope = Keyword.get(opts, :scope, :thread)
adapter_opts = Keyword.delete(opts, :scope)
upload_candidates = PostPayload.upload_candidates(payload)
cond do
callback_exported?(adapter_module, :post_message, 3) ->
with {:ok, response} <-
adapter_module.post_message(external_room_id, payload, adapter_opts) do
{:ok, normalize_response(adapter_module, response)}
end
upload_candidates in [nil, []] and scope == :channel ->
post_channel_message(
adapter_module,
external_room_id,
PostPayload.display_text(payload) || "",
adapter_opts
)
upload_candidates in [nil, []] ->
send_message(
adapter_module,
external_room_id,
PostPayload.display_text(payload) || "",
adapter_opts
)
match?([_single], upload_candidates) ->
[upload] = upload_candidates
file_opts =
adapter_opts
|> maybe_put_caption(payload)
|> maybe_put_metadata(payload.metadata)
send_file(adapter_module, external_room_id, upload, file_opts)
true ->
{:error, :multiple_attachments_unsupported}
end
end
def post_message(adapter_module, external_room_id, payload, opts)
when is_map(payload),
do: post_message(adapter_module, external_room_id, PostPayload.new(payload), opts)
@doc "Posts a channel-level message using adapter callback or send fallback."
@spec post_channel_message(module(), external_room_id(), String.t(), keyword()) :: send_result()
def post_channel_message(adapter_module, external_room_id, text, opts \\ []) do
if callback_exported?(adapter_module, :post_channel_message, 3) do
with {:ok, response} <- adapter_module.post_channel_message(external_room_id, text, opts) do
{:ok, normalize_response(adapter_module, response)}
end
else
send_message(adapter_module, external_room_id, text, opts)
end
end
@doc "Streams chunked text using adapter stream callback or send fallback."
@spec stream(module(), external_room_id(), Enumerable.t(), keyword()) :: send_result()
def stream(adapter_module, external_room_id, chunks, opts \\ []) do
if callback_exported?(adapter_module, :stream, 3) do
with {:ok, response} <- adapter_module.stream(external_room_id, chunks, opts) do
{:ok, normalize_response(adapter_module, response)}
end
else
fallback_chunks = Enum.to_list(chunks)
fallback_text = stream_fallback_text(fallback_chunks)
fallback_mode = Keyword.get(opts, :fallback_mode, default_stream_fallback(adapter_module))
placeholder_text = Keyword.get(opts, :placeholder_text, "Working...")
update_every = Keyword.get(opts, :update_every, 1)
stream_opts = Keyword.drop(opts, [:fallback_mode, :placeholder_text, :update_every])
cond do
fallback_mode == :post_edit and callback_exported?(adapter_module, :edit_message, 4) ->
with {:ok, initial_response} <-
send_message(adapter_module, external_room_id, placeholder_text, stream_opts),
{:ok, final_response} <-
stream_post_edit_fallback(
adapter_module,
external_room_id,
initial_response,
fallback_chunks,
stream_opts,
update_every
) do
{:ok, final_response}
end
true ->
send_message(adapter_module, external_room_id, fallback_text, stream_opts)
end
end
end
@doc "Normalizes adapter edit results to `Jido.Chat.Response`."
@spec edit_message(module(), external_room_id(), external_message_id(), String.t(), keyword()) ::
send_result()
def edit_message(adapter_module, external_room_id, external_message_id, text, opts \\ []) do
if callback_exported?(adapter_module, :edit_message, 4) do
with {:ok, response} <-
adapter_module.edit_message(external_room_id, external_message_id, text, opts) do
{:ok, normalize_response(adapter_module, Map.put(response, :status, :edited))}
end
else
{:error, :unsupported}
end
end
@doc "Deletes a previously-sent message when supported by adapter."
@spec delete_message(module(), external_room_id(), external_message_id(), keyword()) ::
delete_result()
def delete_message(adapter_module, external_room_id, external_message_id, opts \\ []) do
if callback_exported?(adapter_module, :delete_message, 3) do
case adapter_module.delete_message(external_room_id, external_message_id, opts) do
:ok -> :ok
{:ok, _} -> :ok
{:error, _} = error -> error
_other -> {:error, :invalid_delete_result}
end
else
{:error, :unsupported}
end
end
@doc "Starts typing indicator when supported by adapter."
@spec start_typing(module(), external_room_id(), keyword()) :: typing_result()
def start_typing(adapter_module, external_room_id, opts \\ []) do
if callback_exported?(adapter_module, :start_typing, 2) do
case adapter_module.start_typing(external_room_id, opts) do
:ok -> :ok
{:ok, _} -> :ok
{:error, _} = error -> error
_other -> {:error, :invalid_typing_result}
end
else
{:error, :unsupported}
end
end
@doc "Fetches channel metadata as `Jido.Chat.ChannelInfo`."
@spec fetch_metadata(module(), external_room_id(), keyword()) :: metadata_result()
def fetch_metadata(adapter_module, external_room_id, opts \\ []) do
if callback_exported?(adapter_module, :fetch_metadata, 2) do
with {:ok, info} <- adapter_module.fetch_metadata(external_room_id, opts) do
{:ok, normalize_channel_info(adapter_module, info, external_room_id)}
end
else
{:ok, default_channel_info(adapter_module, external_room_id)}
end
end
@doc "Fetches thread metadata as a normalized `Jido.Chat.Thread`."
@spec fetch_thread(module(), external_room_id(), keyword()) :: thread_result()
def fetch_thread(adapter_module, external_room_id, opts \\ []) do
if callback_exported?(adapter_module, :fetch_thread, 2) do
with {:ok, thread} <- adapter_module.fetch_thread(external_room_id, opts) do
{:ok, normalize_thread(adapter_module, thread, external_room_id, opts)}
end
else
{:ok,
Thread.new(%{
id: opts[:thread_id] || "#{adapter_type(adapter_module)}:#{external_room_id}",
adapter_name: adapter_type(adapter_module),
adapter: adapter_module,
external_room_id: external_room_id,
external_thread_id: opts[:external_thread_id],
metadata: %{}
})}
end
end
@doc "Fetches a normalized message by id when supported."
@spec fetch_message(module(), external_room_id(), external_message_id(), keyword()) ::
message_result()
def fetch_message(adapter_module, external_room_id, external_message_id, opts \\ []) do
if callback_exported?(adapter_module, :fetch_message, 3) do
with {:ok, message} <-
adapter_module.fetch_message(external_room_id, external_message_id, opts) do
{:ok, normalize_message(adapter_module, message, opts)}
end
else
{:error, :unsupported}
end
end
@doc "Adds a reaction when supported by adapter."
@spec add_reaction(module(), external_room_id(), external_message_id(), String.t(), keyword()) ::
reaction_result()
def add_reaction(adapter_module, external_room_id, external_message_id, emoji, opts \\ []) do
if callback_exported?(adapter_module, :add_reaction, 4) do
case adapter_module.add_reaction(external_room_id, external_message_id, emoji, opts) do
:ok -> :ok
{:ok, _} -> :ok
{:error, _} = error -> error
_other -> {:error, :invalid_reaction_result}
end
else
{:error, :unsupported}
end
end
@doc "Removes a reaction when supported by adapter."
@spec remove_reaction(
module(),
external_room_id(),
external_message_id(),
String.t(),
keyword()
) ::
reaction_result()
def remove_reaction(adapter_module, external_room_id, external_message_id, emoji, opts \\ []) do
if callback_exported?(adapter_module, :remove_reaction, 4) do
case adapter_module.remove_reaction(external_room_id, external_message_id, emoji, opts) do
:ok -> :ok
{:ok, _} -> :ok
{:error, _} = error -> error
_other -> {:error, :invalid_reaction_result}
end
else
{:error, :unsupported}
end
end
@doc "Posts an ephemeral message when supported, with optional DM fallback."
@spec post_ephemeral(module(), external_room_id(), external_user_id(), String.t(), keyword()) ::
ephemeral_result()
def post_ephemeral(adapter_module, external_room_id, external_user_id, text, opts \\ []) do
post_ephemeral_message(
adapter_module,
external_room_id,
external_user_id,
PostPayload.text(text),
opts
)
end
@doc "Posts an ephemeral payload using the canonical outbound payload contract."
@spec post_ephemeral_message(
module(),
external_room_id(),
external_user_id(),
String.t() | Postable.t() | PostPayload.t() | map(),
keyword()
) :: ephemeral_result()
def post_ephemeral_message(
adapter_module,
external_room_id,
external_user_id,
input,
opts \\ []
) do
with {:ok, payload} <- normalize_post_payload_input(input) do
upload_candidates = PostPayload.upload_candidates(payload)
text = PostPayload.display_text(payload) || ""
base_opts = maybe_put_metadata(opts, payload.metadata)
fallback_to_dm = Keyword.get(base_opts, :fallback_to_dm, false)
cond do
upload_candidates == [] and callback_exported?(adapter_module, :post_ephemeral, 4) ->
with {:ok, message} <-
adapter_module.post_ephemeral(
external_room_id,
external_user_id,
text,
base_opts
) do
{:ok,
normalize_ephemeral(
adapter_module,
message,
external_room_id,
false,
payload,
base_opts
)}
end
fallback_to_dm and callback_exported?(adapter_module, :open_dm, 2) ->
dm_opts = Keyword.delete(base_opts, :fallback_to_dm)
with {:ok, dm_room_id} <- adapter_module.open_dm(external_user_id, base_opts),
{:ok, response} <- post_message(adapter_module, dm_room_id, payload, dm_opts) do
{:ok,
EphemeralMessage.new(%{
id: response.external_message_id || Jido.Chat.ID.generate!(),
thread_id: fallback_thread_id(adapter_module, dm_room_id),
text: text,
formatted: payload.formatted || text,
used_fallback: true,
raw: response.raw,
attachments: PostPayload.outbound_attachments(payload),
metadata:
%{source_room_id: external_room_id, delivery: :dm_fallback}
|> Map.merge(payload.metadata)
})}
end
upload_candidates != [] ->
{:error, :ephemeral_attachments_unsupported}
true ->
{:error, :unsupported}
end
end
end
@doc "Opens adapter-native modal when supported."
@spec open_modal(module(), external_room_id(), Modal.t() | map(), keyword()) ::
modal_result()
def open_modal(adapter_module, external_room_id, payload, opts \\ [])
when is_map(payload) or is_struct(payload, Modal) do
payload = normalize_modal_payload(payload)
if callback_exported?(adapter_module, :open_modal, 3) do
with {:ok, result} <- adapter_module.open_modal(external_room_id, payload, opts) do
{:ok, normalize_modal_result(result, external_room_id)}
end
else
{:error, :unsupported}
end
end
@doc "Fetches thread-level history when supported by adapter."
@spec fetch_messages(module(), external_room_id(), keyword()) :: message_page_result()
def fetch_messages(adapter_module, external_room_id, opts \\ []) do
if callback_exported?(adapter_module, :fetch_messages, 2) do
fetch_opts = normalize_fetch_opts(opts)
with {:ok, page} <-
adapter_module.fetch_messages(external_room_id, FetchOptions.to_keyword(fetch_opts)) do
{:ok, normalize_message_page(adapter_module, page, fetch_opts, external_room_id, opts)}
end
else
{:error, :unsupported}
end
end
@doc "Fetches channel-level history when supported by adapter."
@spec fetch_channel_messages(module(), external_room_id(), keyword()) :: message_page_result()
def fetch_channel_messages(adapter_module, external_room_id, opts \\ []) do
if callback_exported?(adapter_module, :fetch_channel_messages, 2) do
fetch_opts = normalize_fetch_opts(opts)
with {:ok, page} <-
adapter_module.fetch_channel_messages(
external_room_id,
FetchOptions.to_keyword(fetch_opts)
) do
{:ok, normalize_message_page(adapter_module, page, fetch_opts, external_room_id, opts)}
end
else
{:error, :unsupported}
end
end
@doc "Lists channel thread summaries when supported by adapter."
@spec list_threads(module(), external_room_id(), keyword()) :: thread_page_result()
def list_threads(adapter_module, external_room_id, opts \\ []) do
if callback_exported?(adapter_module, :list_threads, 2) do
with {:ok, page} <- adapter_module.list_threads(external_room_id, opts) do
{:ok, normalize_thread_page(page)}
end
else
{:error, :unsupported}
end
end
@doc "Opens a native platform thread from an existing room message when supported."
@spec open_thread(module(), external_room_id(), external_message_id(), keyword()) ::
thread_result()
def open_thread(adapter_module, external_room_id, external_message_id, opts \\ []) do
if callback_exported?(adapter_module, :open_thread, 3) do
with {:ok, thread} <-
adapter_module.open_thread(external_room_id, external_message_id, opts) do
{:ok, normalize_thread(adapter_module, thread, external_room_id, opts)}
end
else
{:error, :unsupported}
end
end
@doc "Default helper to normalize webhook payload through `transform_incoming/1`."
@spec handle_webhook(module(), Jido.Chat.t(), raw_payload(), keyword()) ::
{:ok, Jido.Chat.t(), Incoming.t()} | {:error, term()}
def handle_webhook(adapter_module, %Jido.Chat{} = chat, payload, opts \\ []) do
with {:ok, incoming} <- transform_incoming(adapter_module, payload) do
thread_id = thread_id(adapter_module, incoming, opts)
Jido.Chat.process_message(chat, adapter_type(adapter_module), thread_id, incoming, opts)
end
end
@doc "Verifies webhook request integrity when adapter exposes validation callback."
@spec verify_webhook(module(), WebhookRequest.t() | map(), keyword()) ::
:ok | {:error, term()}
def verify_webhook(adapter_module, request, opts \\ []) do
request = normalize_webhook_request(request, opts)
if callback_exported?(adapter_module, :verify_webhook, 2) do
adapter_module.verify_webhook(request, opts)
else
:ok
end
end
@doc "Parses request into a normalized event envelope."
@spec parse_event(module(), WebhookRequest.t() | map(), keyword()) ::
{:ok, EventEnvelope.t() | :noop} | {:error, term()}
def parse_event(adapter_module, request, opts \\ []) do
request = normalize_webhook_request(request, opts)
cond do
callback_exported?(adapter_module, :parse_event, 2) ->
case adapter_module.parse_event(request, opts) do
{:ok, :noop} ->
{:ok, :noop}
{:ok, nil} ->
{:ok, :noop}
{:ok, parsed} ->
{:ok, normalize_event_envelope(adapter_module, parsed)}
{:error, _reason} = error ->
error
end
true ->
with {:ok, incoming} <- transform_incoming(adapter_module, request.payload) do
{:ok,
EventEnvelope.new(%{
adapter_name: adapter_type(adapter_module),
event_type: :message,
thread_id: thread_id(adapter_module, incoming, opts),
channel_id: stringify(incoming.external_room_id),
message_id: stringify(incoming.external_message_id),
payload: incoming,
raw: request.payload,
metadata: %{path: request.path, method: request.method}
})}
end
end
end
@doc "Formats a typed webhook response using adapter callback when available."
@spec format_webhook_response(module(), term(), keyword()) ::
{:ok, WebhookResponse.t()} | {:error, term()}
def format_webhook_response(adapter_module, result, opts \\ []) do
if callback_exported?(adapter_module, :format_webhook_response, 2) do
case adapter_module.format_webhook_response(result, opts) do
{:ok, response} ->
{:ok, normalize_webhook_response(response)}
%WebhookResponse{} = response ->
{:ok, response}
response when is_map(response) ->
{:ok, WebhookResponse.new(response)}
{:error, _} = error ->
error
_other ->
{:error, :invalid_webhook_response}
end
else
{:ok, default_webhook_response(result)}
end
end
@doc "Returns a normalized typed capability matrix."
@spec capability_matrix(module()) :: CapabilityMatrix.t()
def capability_matrix(adapter_module) do
CapabilityMatrix.new(%{
adapter_name: adapter_type(adapter_module),
capabilities: capabilities(adapter_module)
})
end
@doc "Validates capability declaration coherence with implemented callbacks."
@spec validate_capabilities(module()) :: :ok | {:error, term()}
def validate_capabilities(adapter_module) do
declared = capabilities(adapter_module)
invalid =
Enum.reduce(declared, [], fn {capability, status}, acc ->
callback = capability_callback(capability)
case callback do
nil ->
acc
{name, arity} ->
exported? = callback_exported?(adapter_module, name, arity)
case {status, exported?} do
{:native, false} -> [{capability, :missing_callback} | acc]
_ -> acc
end
end
end)
case invalid do
[] -> :ok
_ -> {:error, {:invalid_capability_matrix, Enum.reverse(invalid)}}
end
end
@doc "Returns adapter channel type with fallback to module name."
@spec adapter_type(module()) :: atom()
def adapter_type(adapter_module) do
if callback_exported?(adapter_module, :channel_type, 0) do
adapter_module.channel_type()
else
adapter_module
|> Module.split()
|> List.last()
|> Macro.underscore()
|> String.to_atom()
end
end
defp support_status(adapter_module, callback, arity, fallback \\ :unsupported) do
if callback_exported?(adapter_module, callback, arity), do: :native, else: fallback
end
defp supported_status?(status), do: status in [:native, :fallback]
defp normalize_capability_matrix(matrix) when is_map(matrix),
do: matrix |> then(&CapabilityMatrix.new(%{capabilities: &1})) |> CapabilityMatrix.as_map()
defp normalize_capability_matrix(_), do: %{}
defp normalize_incoming(%Incoming{} = incoming), do: incoming
defp normalize_incoming(map) when is_map(map), do: Incoming.new(map)
defp normalize_response(adapter_module, %Response{} = response) do
response
|> Map.put_new(:channel_type, adapter_type(adapter_module))
|> Response.new()
end
defp normalize_response(adapter_module, map) when is_map(map) do
map
|> Map.put_new(:channel_type, adapter_type(adapter_module))
|> Response.new()
end
defp normalize_channel_info(_adapter_module, %ChannelInfo{} = info, _external_room_id), do: info
defp normalize_channel_info(_adapter_module, info, external_room_id) when is_map(info) do
info
|> Map.put_new(:id, to_string(external_room_id))
|> Map.put_new(:metadata, %{})
|> Map.put_new(:name, nil)
|> Map.put_new(:is_dm, nil)
|> Map.put_new(:member_count, nil)
|> Map.drop([:adapter_name])
|> ChannelInfo.new()
end
defp normalize_channel_info(adapter_module, _info, external_room_id) do
default_channel_info(adapter_module, external_room_id)
end
defp normalize_thread(_adapter_module, %Thread{} = thread, _external_room_id, _opts), do: thread
defp normalize_thread(adapter_module, thread, external_room_id, opts) when is_map(thread) do
external_thread_id =
thread[:external_thread_id] || thread["external_thread_id"] || opts[:external_thread_id]
metadata =
(thread[:metadata] || thread["metadata"] || %{})
|> maybe_put_thread_metadata(
:delivery_external_room_id,
thread[:delivery_external_room_id] || thread["delivery_external_room_id"]
)
Thread.new(%{
id:
thread[:id] || thread["id"] ||
default_thread_id(adapter_module, external_room_id, external_thread_id),
adapter_name: thread[:adapter_name] || thread["adapter_name"] || adapter_type(adapter_module),
adapter: thread[:adapter] || thread["adapter"] || adapter_module,
external_room_id: thread[:external_room_id] || thread["external_room_id"] || external_room_id,
external_thread_id: external_thread_id,
channel_id: thread[:channel_id] || thread["channel_id"],
is_dm: thread[:is_dm] || thread["is_dm"] || false,
metadata: metadata
})
end
defp normalize_message(_adapter_module, %Message{} = message, _opts), do: message
defp normalize_message(adapter_module, %Incoming{} = incoming, opts),
do:
Message.from_incoming(incoming,
adapter_name: adapter_type(adapter_module),
thread_id: opts[:thread_id]
)
defp normalize_message(adapter_module, map, opts) when is_map(map) do
if Map.has_key?(map, :external_room_id) || Map.has_key?(map, "external_room_id") do
map
|> Incoming.new()
|> Message.from_incoming(
adapter_name: adapter_type(adapter_module),
thread_id: opts[:thread_id]
)
else
map
|> Map.put_new(:thread_id, opts[:thread_id])
|> Message.new()
end
end
defp normalize_message_page(
_adapter_module,
%MessagePage{} = page,
_fetch_opts,
_external_room_id,
_opts
),
do: page
defp normalize_message_page(
adapter_module,
page,
%FetchOptions{} = fetch_opts,
external_room_id,
opts
)
when is_map(page) do
thread_opt =
if is_list(opts) do
Keyword.get(opts, :thread_id)
else
opts[:thread_id] || opts["thread_id"]
end
thread_id =
thread_opt ||
"#{adapter_type(adapter_module)}:#{external_room_id}"
page
|> Map.put_new(:direction, fetch_opts.direction)
|> Map.put_new(:adapter_name, adapter_type(adapter_module))
|> Map.put_new(:thread_id, thread_id)
|> MessagePage.new()
end
defp normalize_thread_page(%ThreadPage{} = page), do: page
defp normalize_thread_page(page) when is_map(page), do: ThreadPage.new(page)
defp maybe_put_thread_metadata(metadata, _key, nil), do: metadata
defp maybe_put_thread_metadata(metadata, key, value), do: Map.put(metadata, key, value)
defp normalize_ephemeral(
_adapter_module,
%EphemeralMessage{} = message,
_external_room_id,
_used_fallback,
_payload,
_opts
),
do: message
defp normalize_ephemeral(
adapter_module,
message,
external_room_id,
used_fallback,
payload,
opts
)
when is_map(message) do
thread_id =
message[:thread_id] || message["thread_id"] ||
fallback_thread_id(adapter_module, external_room_id)
id =
message[:id] || message["id"] ||
message[:external_message_id] || message["external_message_id"] ||
Jido.Chat.ID.generate!()
EphemeralMessage.new(%{
id: to_string(id),
thread_id: to_string(thread_id),
text: message[:text] || message["text"] || PostPayload.display_text(payload),
formatted:
message[:formatted] || message["formatted"] || payload.formatted ||
PostPayload.display_text(payload),
used_fallback: message[:used_fallback] || message["used_fallback"] || used_fallback,
raw: message[:raw] || message["raw"],
attachments:
message[:attachments] || message["attachments"] ||
PostPayload.outbound_attachments(payload),
metadata:
(message[:metadata] || message["metadata"] || %{})
|> Map.merge(payload.metadata)
|> Map.merge(metadata_from_opts(opts))
})
end
defp normalize_modal_result(%ModalResult{} = result, _external_room_id), do: result
defp normalize_modal_result(result, external_room_id) when is_map(result) do
ModalResult.new(%{
id: result[:id] || result["id"] || Jido.Chat.ID.generate!(),
status: result[:status] || result["status"] || :opened,
external_room_id: result[:external_room_id] || result["external_room_id"] || external_room_id,
external_message_id: stringify(result[:external_message_id] || result["external_message_id"]),
raw: result[:raw] || result["raw"],
metadata: result[:metadata] || result["metadata"] || %{}
})
end
defp normalize_modal_result(result, external_room_id) do
ModalResult.new(%{
external_room_id: external_room_id,
raw: result,
metadata: %{coerced: true}
})
end
@doc "Returns a stable adapter-facing Markdown representation."
@spec render_markdown(Markdown.t() | map() | String.t(), keyword()) :: String.t()
def render_markdown(markdown, _opts \\ []) do
markdown
|> normalize_markdown_payload()
|> Markdown.stringify()
end
@doc "Returns a stable adapter-facing card payload."
@spec render_card(Card.t() | map(), keyword()) :: map()
def render_card(card, _opts \\ []) do
card
|> normalize_card_payload()
|> Card.to_adapter_payload()
end
@doc "Returns a stable adapter-facing modal payload."
@spec render_modal(Modal.t() | map(), keyword()) :: map()
def render_modal(modal, _opts \\ []) do
modal
|> normalize_modal_payload()
end
defp default_channel_info(adapter_module, external_room_id) do
ChannelInfo.new(%{
id: to_string(external_room_id),
metadata: %{adapter_name: adapter_type(adapter_module)}
})
end
defp default_thread_id(adapter_module, external_room_id, nil),
do: "#{adapter_type(adapter_module)}:#{external_room_id}"
defp default_thread_id(adapter_module, external_room_id, external_thread_id),
do: "#{adapter_type(adapter_module)}:#{external_room_id}:#{external_thread_id}"
defp normalize_fetch_opts(%FetchOptions{} = opts), do: opts
defp normalize_fetch_opts(opts) when is_list(opts), do: FetchOptions.new(opts)
defp normalize_fetch_opts(opts) when is_map(opts), do: FetchOptions.new(opts)
defp normalize_fetch_opts(_other), do: FetchOptions.new(%{})
defp thread_id(adapter_module, %Incoming{} = incoming, opts) do
opts[:thread_id] || incoming.external_thread_id ||
"#{adapter_type(adapter_module)}:#{incoming.external_room_id}"
end
defp fallback_thread_id(adapter_module, external_room_id),
do: "#{adapter_type(adapter_module)}:#{external_room_id}"
defp default_stream_fallback(adapter_module) do
if callback_exported?(adapter_module, :edit_message, 4), do: :post_edit, else: :final
end
defp stream_post_edit_fallback(
adapter_module,
external_room_id,
initial_response,
chunks,
stream_opts,
update_every
) do
total = length(chunks)
update_every = if is_integer(update_every) and update_every > 0, do: update_every, else: 1
chunks
|> Enum.with_index(1)
|> Enum.reduce_while({:ok, "", initial_response}, fn {chunk, index}, {:ok, acc_text, response} ->
next_text = acc_text <> render_stream_chunk(chunk)
should_update = rem(index, update_every) == 0 or index == total
if should_update do
case edit_message(
adapter_module,
external_room_id,
initial_response.external_message_id,
next_text,
stream_opts
) do
{:ok, next_response} ->
{:cont, {:ok, next_text, with_stream_metadata(next_response, :post_edit, total, next_text)}}
{:error, _reason} = error ->
{:halt, error}
end
else
{:cont, {:ok, next_text, response}}
end
end)
|> case do
{:ok, _text, response} ->
{:ok, response}
{:error, _reason} = error ->
error
end
end
defp with_stream_metadata(%Response{} = response, mode, chunk_count, final_text) do
metadata =
response.metadata
|> Map.put(:stream_fallback, mode)
|> Map.put(:chunk_count, chunk_count)
|> Map.put(:final_text, final_text)
%{response | metadata: metadata}
end
defp stream_fallback_text(chunks) do
chunks
|> Enum.map(&render_stream_chunk/1)
|> Enum.join("")
end
defp render_stream_chunk(%StreamChunk{} = chunk) do
case chunk.kind do
:text -> chunk.text || ""
:markdown -> chunk.text || ""
:status -> bracketed_chunk(chunk)
:plan -> plan_chunk(chunk)
:step_start -> step_chunk(chunk)
:step_finish -> "\n"
:data -> ""
end
end
defp render_stream_chunk(chunk) when is_map(chunk),
do: chunk |> StreamChunk.new() |> render_stream_chunk()
defp render_stream_chunk(chunk), do: to_string(chunk)
defp bracketed_chunk(%StreamChunk{} = chunk) do
case StreamChunk.fallback_text(chunk) do
"" -> ""
text -> "\n[#{text}]\n"
end
end
defp plan_chunk(%StreamChunk{} = chunk) do
case chunk.payload do
items when is_list(items) ->
"\n" <>
(items
|> Enum.map_join("\n", fn item -> "- " <> to_string(item) end)) <> "\n"
_other ->
bracketed_chunk(chunk)
end
end
defp step_chunk(%StreamChunk{} = chunk) do
case StreamChunk.fallback_text(chunk) do
"" -> ""
text -> "\n\n#{text}\n"
end
end
defp normalize_markdown_payload(%Markdown{} = markdown), do: markdown
defp normalize_markdown_payload(%{} = markdown), do: Markdown.new(markdown)
defp normalize_markdown_payload(value) when is_binary(value), do: Markdown.parse(value)
defp normalize_card_payload(%Card{} = card), do: card
defp normalize_card_payload(%{} = card), do: Card.new(card)
defp normalize_modal_payload(%Modal{} = modal), do: Modal.to_adapter_payload(modal)
defp normalize_modal_payload(%{} = modal), do: modal
defp ensure_capability_defaults(matrix, adapter_module) do
single_upload_supported? =
supported_status?(matrix[:send_file]) or supported_status?(matrix[:post_message])
multi_upload_supported? =
supported_status?(matrix[:multi_file]) or supported_status?(matrix[:post_message])
defaults = %{
initialize: support_status(adapter_module, :initialize, 1, :fallback),
shutdown: support_status(adapter_module, :shutdown, 1, :fallback),
send_message: :native,
send_file: support_status(adapter_module, :send_file, 3),
post_message: support_status(adapter_module, :post_message, 3),
edit_message: support_status(adapter_module, :edit_message, 4),
delete_message: support_status(adapter_module, :delete_message, 3),
start_typing: support_status(adapter_module, :start_typing, 2),
fetch_metadata: support_status(adapter_module, :fetch_metadata, 2, :fallback),
fetch_thread: support_status(adapter_module, :fetch_thread, 2, :fallback),
fetch_message: support_status(adapter_module, :fetch_message, 3, :fallback),
add_reaction: support_status(adapter_module, :add_reaction, 4),
remove_reaction: support_status(adapter_module, :remove_reaction, 4),
post_ephemeral: support_status(adapter_module, :post_ephemeral, 4),
open_dm: support_status(adapter_module, :open_dm, 2),
fetch_messages: support_status(adapter_module, :fetch_messages, 2),
fetch_channel_messages: support_status(adapter_module, :fetch_channel_messages, 2),
list_threads: support_status(adapter_module, :list_threads, 2),
open_thread: support_status(adapter_module, :open_thread, 3),
post_channel_message: support_status(adapter_module, :post_channel_message, 3, :fallback),
stream: support_status(adapter_module, :stream, 3, :fallback),
open_modal: support_status(adapter_module, :open_modal, 3),
webhook: support_status(adapter_module, :handle_webhook, 3, :fallback),
verify_webhook: support_status(adapter_module, :verify_webhook, 2, :fallback),
parse_event: support_status(adapter_module, :parse_event, 2, :fallback),
format_webhook_response: support_status(adapter_module, :format_webhook_response, 2, :fallback),
text: :native,
image: if(single_upload_supported?, do: :fallback, else: :unsupported),
audio: if(single_upload_supported?, do: :fallback, else: :unsupported),
video: if(single_upload_supported?, do: :fallback, else: :unsupported),
file: if(single_upload_supported?, do: :fallback, else: :unsupported),
multi_file: if(multi_upload_supported?, do: :fallback, else: :unsupported),
markdown: :unsupported,
cards: :unsupported,
modals: support_status(adapter_module, :open_modal, 3),
ephemeral:
cond do
callback_exported?(adapter_module, :post_ephemeral, 4) -> :native
callback_exported?(adapter_module, :open_dm, 2) -> :fallback
true -> :unsupported
end,
assistant_events: :unsupported
}
Map.merge(defaults, matrix)
end
defp normalize_webhook_request(%WebhookRequest{} = request, _opts), do: request
defp normalize_webhook_request(request, opts) when is_map(request) do
adapter_name = opts[:adapter_name]
request
|> Map.put_new(:adapter_name, adapter_name)
|> WebhookRequest.new()
end
defp normalize_webhook_request(other, _opts), do: WebhookRequest.new(%{payload: %{raw: other}})
defp normalize_post_payload_input(%PostPayload{} = payload), do: {:ok, payload}
defp normalize_post_payload_input(%Postable{} = postable),
do: {:ok, Postable.to_payload(postable)}
defp normalize_post_payload_input(input) when is_binary(input),
do: {:ok, PostPayload.text(input)}
defp normalize_post_payload_input(input) when is_map(input) do
try do
{:ok, input |> Postable.new() |> Postable.to_payload()}
rescue
_ -> {:error, :invalid_postable}
end
end
defp normalize_post_payload_input(_input), do: {:error, :invalid_postable}
defp metadata_from_opts(opts) when is_list(opts) do
case Keyword.get(opts, :metadata) do
metadata when is_map(metadata) -> metadata
_other -> %{}
end
end
defp normalize_event_envelope(_adapter_module, %EventEnvelope{} = envelope), do: envelope
defp normalize_event_envelope(adapter_module, map) when is_map(map) do
map
|> Map.put_new(:adapter_name, adapter_type(adapter_module))
|> EventEnvelope.new()
end
defp normalize_webhook_response(%WebhookResponse{} = response), do: response
defp normalize_webhook_response(map) when is_map(map), do: WebhookResponse.new(map)
defp default_webhook_response({:ok, _chat, _event}),
do: WebhookResponse.accepted(%{ok: true})
defp default_webhook_response({:error, {:invalid_webhook_secret, _}}),
do: WebhookResponse.error(401, %{error: "invalid_webhook_secret"})
defp default_webhook_response({:error, :invalid_webhook_secret}),
do: WebhookResponse.error(401, %{error: "invalid_webhook_secret"})
defp default_webhook_response({:error, _reason}),
do: WebhookResponse.error(400, %{error: "invalid_webhook_request"})
defp default_webhook_response(_), do: WebhookResponse.accepted(%{ok: true})
defp capability_callback(:initialize), do: {:initialize, 1}
defp capability_callback(:shutdown), do: {:shutdown, 1}
defp capability_callback(:send_message), do: {:send_message, 3}
defp capability_callback(:send_file), do: {:send_file, 3}
defp capability_callback(:post_message), do: {:post_message, 3}
defp capability_callback(:edit_message), do: {:edit_message, 4}
defp capability_callback(:delete_message), do: {:delete_message, 3}
defp capability_callback(:start_typing), do: {:start_typing, 2}
defp capability_callback(:fetch_metadata), do: {:fetch_metadata, 2}
defp capability_callback(:fetch_thread), do: {:fetch_thread, 2}
defp capability_callback(:fetch_message), do: {:fetch_message, 3}
defp capability_callback(:add_reaction), do: {:add_reaction, 4}
defp capability_callback(:remove_reaction), do: {:remove_reaction, 4}
defp capability_callback(:post_ephemeral), do: {:post_ephemeral, 4}
defp capability_callback(:open_dm), do: {:open_dm, 2}
defp capability_callback(:fetch_messages), do: {:fetch_messages, 2}
defp capability_callback(:fetch_channel_messages), do: {:fetch_channel_messages, 2}
defp capability_callback(:list_threads), do: {:list_threads, 2}
defp capability_callback(:open_thread), do: {:open_thread, 3}
defp capability_callback(:post_channel_message), do: {:post_channel_message, 3}
defp capability_callback(:stream), do: {:stream, 3}
defp capability_callback(:open_modal), do: {:open_modal, 3}
defp capability_callback(:webhook), do: {:handle_webhook, 3}
defp capability_callback(:verify_webhook), do: {:verify_webhook, 2}
defp capability_callback(:parse_event), do: {:parse_event, 2}
defp capability_callback(:format_webhook_response), do: {:format_webhook_response, 2}
defp capability_callback(_), do: nil
defp callback_exported?(adapter_module, callback, arity) do
Code.ensure_loaded?(adapter_module) and function_exported?(adapter_module, callback, arity)
end
defp maybe_put_caption(opts, %PostPayload{} = payload) do
case PostPayload.display_text(payload) do
nil ->
opts
"" ->
opts
text ->
opts
|> Keyword.put_new(:caption, text)
|> Keyword.put_new(:text, text)
end
end
defp maybe_put_metadata(opts, metadata) when metadata in [%{}, nil], do: opts
defp maybe_put_metadata(opts, metadata) when is_map(metadata) do
Keyword.update(opts, :metadata, metadata, &Map.merge(metadata, &1))
end
defp stringify(nil), do: nil
defp stringify(value) when is_binary(value), do: value
defp stringify(value), do: to_string(value)
end