defmodule Jido.Chat.Telegram.Adapter do
@moduledoc """
Telegram `Jido.Chat.Adapter` implementation using ExGram.
"""
use Jido.Chat.Adapter
alias Jido.Chat.{
ChannelInfo,
EphemeralMessage,
EventEnvelope,
FileUpload,
Incoming,
Response,
WebhookRequest,
WebhookResponse
}
alias Jido.Chat.Telegram.{
DeleteOptions,
EditOptions,
Extensions,
Ingress,
MetadataOptions,
PollingWorker,
ReactionOptions,
SendOptions,
StreamOptions,
TypingOptions
}
alias Jido.Chat.Telegram.Transport.ExGramClient
@impl true
def channel_type, do: :telegram
@impl true
@spec capabilities() :: map()
def capabilities,
do: %{
initialize: :fallback,
shutdown: :fallback,
send_message: :native,
send_file: :native,
edit_message: :native,
delete_message: :native,
start_typing: :native,
fetch_metadata: :native,
fetch_thread: :fallback,
fetch_message: :unsupported,
add_reaction: :native,
remove_reaction: :native,
post_ephemeral: :fallback,
open_dm: :native,
fetch_messages: :unsupported,
fetch_channel_messages: :unsupported,
list_threads: :unsupported,
open_thread: :native,
post_channel_message: :fallback,
stream: :native,
open_modal: :unsupported,
webhook: :native,
verify_webhook: :native,
parse_event: :native,
format_webhook_response: :native
}
@doc """
Returns Telegram extension capability statuses for features outside core adapter contract.
"""
@spec extension_capabilities() :: map()
def extension_capabilities, do: Jido.Chat.Telegram.Extensions.capabilities()
@impl true
def listener_child_specs(bridge_id, opts \\ []) when is_binary(bridge_id) and is_list(opts) do
ingress = Ingress.normalize_opts(opts)
case Ingress.mode(ingress) do
:webhook ->
{:ok, []}
:polling ->
with {:ok, sink_mfa} <- validate_sink_mfa(Keyword.get(opts, :sink_mfa)) do
worker_opts = polling_worker_opts(bridge_id, ingress, opts, sink_mfa)
{:ok,
[
Supervisor.child_spec(
{PollingWorker, worker_opts},
id: {:telegram_polling_worker, bridge_id}
)
]}
end
:invalid ->
{:error, :invalid_ingress_mode}
end
end
@doc """
Ensures Telegram webhook ingress for a messaging bridge.
Telegram supports one webhook per bot token, so the subscription id is
deterministic for the bridge and maps to the bot's current webhook.
"""
@spec ensure_ingress_subscription(String.t(), keyword()) :: {:ok, map()} | {:error, term()}
def ensure_ingress_subscription(bridge_id, opts \\ [])
when is_binary(bridge_id) and is_list(opts) do
Ingress.ensure_subscription(bridge_id, opts)
end
@doc """
Lists the Telegram webhook ingress subscription for a messaging bridge.
"""
@spec list_ingress_subscriptions(String.t(), keyword()) :: {:ok, [map()]} | {:error, term()}
def list_ingress_subscriptions(bridge_id, opts \\ []) when is_binary(bridge_id) and is_list(opts) do
Ingress.list_subscriptions(bridge_id, opts)
end
@doc """
Deletes the Telegram webhook ingress subscription for a messaging bridge.
"""
@spec delete_ingress_subscription(String.t(), String.t(), keyword()) ::
{:ok, map()} | {:error, term()}
def delete_ingress_subscription(bridge_id, subscription_id, opts \\ [])
when is_binary(bridge_id) and is_binary(subscription_id) and is_list(opts) do
Ingress.delete_subscription(bridge_id, subscription_id, opts)
end
@impl true
def transform_incoming(%{"message" => nil}), do: {:error, :no_message}
def transform_incoming(%{message: nil}), do: {:error, :no_message}
def transform_incoming(%{"message" => message}) when is_map(message),
do: transform_message(message)
def transform_incoming(%{message: message}) when is_map(message), do: transform_message(message)
def transform_incoming(%{"channel_post" => message}) when is_map(message),
do: transform_message(message)
def transform_incoming(%{channel_post: message}) when is_map(message),
do: transform_message(message)
def transform_incoming(%{"edited_channel_post" => message}) when is_map(message),
do: transform_message(message)
def transform_incoming(%{edited_channel_post: message}) when is_map(message),
do: transform_message(message)
def transform_incoming(_), do: {:error, :unsupported_update_type}
@impl true
def send_message(chat_id, text, opts \\ []) do
opts = SendOptions.new(opts)
token = fetch_token(opts.token)
payload =
Map.merge(%{"chat_id" => chat_id, "text" => text}, SendOptions.payload_opts(opts))
with {:ok, result} <-
transport(opts).call(token, "sendMessage", payload, SendOptions.transport_opts(opts)) do
{:ok,
Response.new(%{
message_id: map_get(result, [:message_id, "message_id"]),
chat_id: map_get(result, [:chat, "chat"]) |> map_get([:id, "id"]),
date: map_get(result, [:date, "date"]),
channel_type: :telegram,
status: :sent,
raw: result
})}
end
end
@impl true
def stream(chat_id, chunks, opts \\ []) do
opts = StreamOptions.new(opts)
token = fetch_token(opts.token)
draft_id = opts.draft_id || System.unique_integer([:positive])
interval_ms = normalize_stream_update_interval(opts.stream_update_interval_ms)
with {:ok, draft_chat_id} <- draft_chat_id(chat_id),
{:ok, state} <-
consume_stream_chunks(chunks, draft_chat_id, token, opts, draft_id, interval_ms) do
case state.text do
"" ->
{:error, :empty_stream}
text ->
send_message(chat_id, text, StreamOptions.send_opts(opts))
end
else
:fallback ->
fallback_stream_send(chat_id, chunks, opts)
{:error, :empty_stream} = error ->
error
{:error, _reason} = error ->
error
end
end
@impl true
def send_file(chat_id, file, opts \\ []) do
upload = FileUpload.normalize(file)
with {:ok, input} <- upload_input(upload),
{:ok, media} <- deliver_upload(chat_id, upload, input, opts) do
{:ok, upload_response(upload, media, chat_id)}
end
end
@impl true
def edit_message(chat_id, message_id, text, opts \\ []) do
opts = EditOptions.new(opts)
token = fetch_token(opts.token)
payload =
EditOptions.payload_opts(opts)
|> Map.merge(%{"chat_id" => chat_id, "message_id" => message_id, "text" => text})
with {:ok, result} <-
transport(opts).call(
token,
"editMessageText",
payload,
EditOptions.transport_opts(opts)
) do
{:ok,
Response.new(%{
message_id: map_get(result, [:message_id, "message_id"]) || message_id,
chat_id: map_get(result, [:chat, "chat"]) |> map_get([:id, "id"]) || chat_id,
date: map_get(result, [:date, "date"]),
channel_type: :telegram,
status: :edited,
raw: result
})}
end
end
@impl true
def delete_message(chat_id, message_id, opts \\ []) do
opts =
opts
|> pick_opts([:token, :transport, :debug, :check_params, :ex_gram_module, :ex_gram_adapter])
|> DeleteOptions.new()
token = fetch_token(opts.token)
with {:ok, _result} <-
transport(opts).call(
token,
"deleteMessage",
%{"chat_id" => chat_id, "message_id" => message_id},
DeleteOptions.transport_opts(opts)
) do
:ok
end
end
@impl true
def start_typing(chat_id, opts \\ []) do
status = Keyword.get(opts, :status) || Keyword.get(opts, :action)
opts =
opts
|> pick_opts([
:token,
:transport,
:thread_id,
:debug,
:check_params,
:ex_gram_module,
:ex_gram_adapter
])
|> maybe_put_action(status)
|> TypingOptions.new()
token = fetch_token(opts.token)
payload =
TypingOptions.payload_opts(opts)
|> Map.put("chat_id", chat_id)
with {:ok, _result} <-
transport(opts).call(
token,
"sendChatAction",
payload,
TypingOptions.transport_opts(opts)
) do
:ok
end
end
@impl true
def fetch_metadata(chat_id, opts \\ []) do
opts =
opts
|> pick_opts([:token, :transport, :debug, :check_params, :ex_gram_module, :ex_gram_adapter])
|> MetadataOptions.new()
token = fetch_token(opts.token)
with {:ok, result} <-
transport(opts).call(
token,
"getChat",
%{"chat_id" => chat_id},
MetadataOptions.transport_opts(opts)
) do
metadata = normalize_metadata(result)
chat_type = parse_chat_type(map_get(metadata, [:type, "type"]))
{:ok,
ChannelInfo.new(%{
id: to_string(map_get(metadata, [:id, "id"]) || chat_id),
name:
map_get(metadata, [:title, "title"]) ||
map_get(metadata, [:username, "username"]) ||
map_get(metadata, [:first_name, "first_name"]),
is_dm: chat_type == :private,
member_count: nil,
metadata: metadata
})}
end
end
@impl true
def add_reaction(chat_id, message_id, emoji, opts \\ []) when is_binary(emoji) do
opts =
opts
|> pick_opts([
:token,
:transport,
:is_big,
:debug,
:check_params,
:ex_gram_module,
:ex_gram_adapter
])
|> ReactionOptions.new()
token = fetch_token(opts.token)
payload =
ReactionOptions.payload_opts(opts)
|> Map.merge(%{
"chat_id" => chat_id,
"message_id" => message_id,
"reaction" => [%{"type" => "emoji", "emoji" => emoji}]
})
case transport(opts).call(
token,
"setMessageReaction",
payload,
ReactionOptions.transport_opts(opts)
) do
{:ok, _result} -> :ok
{:error, :unsupported_method} -> {:error, :unsupported}
{:error, {:unsupported_method, _method}} -> {:error, :unsupported}
{:error, _reason} = error -> error
end
end
@impl true
def remove_reaction(chat_id, message_id, _emoji, opts \\ []) do
opts =
opts
|> pick_opts([
:token,
:transport,
:is_big,
:debug,
:check_params,
:ex_gram_module,
:ex_gram_adapter
])
|> ReactionOptions.new()
token = fetch_token(opts.token)
payload =
ReactionOptions.payload_opts(opts)
|> Map.merge(%{"chat_id" => chat_id, "message_id" => message_id, "reaction" => []})
case transport(opts).call(
token,
"setMessageReaction",
payload,
ReactionOptions.transport_opts(opts)
) do
{:ok, _result} -> :ok
{:error, :unsupported_method} -> {:error, :unsupported}
{:error, {:unsupported_method, _method}} -> {:error, :unsupported}
{:error, _reason} = error -> error
end
end
@impl true
def open_dm(external_user_id, _opts \\ []), do: {:ok, external_user_id}
@impl true
def open_thread(chat_id, _message_id, opts \\ []) do
if Keyword.get(opts, :supports_forum_topics, true) do
token = fetch_token(opts[:token])
topic_name = Keyword.get(opts, :topic_name, "New thread")
transport_opts =
pick_opts(opts, [:transport, :debug, :check_params, :ex_gram_module, :ex_gram_adapter])
with {:ok, result} <-
transport(opts).call(
token,
"createForumTopic",
%{"chat_id" => chat_id, "name" => topic_name},
transport_opts
) do
{:ok,
%{
external_thread_id: stringify(map_get(result, [:message_thread_id, "message_thread_id"])),
delivery_external_room_id: stringify(chat_id)
}}
end
else
{:error, :unsupported}
end
end
@impl true
def post_ephemeral(_chat_id, user_id, text, opts \\ []) do
if Keyword.get(opts, :fallback_to_dm, false) do
send_opts = Keyword.drop(opts, [:fallback_to_dm])
with {:ok, dm_room_id} <- open_dm(user_id, send_opts),
{:ok, %Response{} = response} <- send_message(dm_room_id, text, send_opts) do
{:ok,
EphemeralMessage.new(%{
id: response.external_message_id || Jido.Chat.ID.generate!(),
thread_id: "telegram:#{dm_room_id}",
used_fallback: true,
raw: response.raw,
metadata: %{chat_id: dm_room_id}
})}
end
else
{:error, :unsupported}
end
end
@impl true
def fetch_messages(_chat_id, _opts), do: {:error, :unsupported}
@impl true
def fetch_channel_messages(_chat_id, _opts), do: {:error, :unsupported}
@impl true
def list_threads(_chat_id, _opts), do: {:error, :unsupported}
@impl true
def verify_webhook(%WebhookRequest{} = request, opts \\ []) do
verify_webhook_secret(opts, request.headers)
end
@impl true
def parse_event(%WebhookRequest{} = request, _opts \\ []) do
parse_payload_event(request.payload)
end
@impl true
def format_webhook_response(result, opts \\ [])
def format_webhook_response({:ok, _chat, _event}, _opts) do
WebhookResponse.accepted(%{ok: true})
end
def format_webhook_response({:error, :invalid_webhook_secret}, _opts) do
WebhookResponse.error(401, %{error: "invalid_webhook_secret"})
end
def format_webhook_response({:error, reason}, _opts) do
WebhookResponse.error(400, %{error: to_string(reason)})
end
@impl true
def handle_webhook(%Jido.Chat{} = chat, payload, opts \\ []) when is_map(payload) do
request =
WebhookRequest.new(%{
adapter_name: :telegram,
headers: opts[:headers] || %{},
payload: payload,
raw: opts[:raw_body] || payload,
metadata: %{raw_body: opts[:raw_body]}
})
with :ok <- verify_webhook(request, opts),
{:ok, parsed_event} <- parse_event(request, opts),
{:ok, updated_chat, incoming} <- route_parsed_event(chat, parsed_event, opts) do
{:ok, updated_chat, incoming}
end
end
defp route_parsed_event(_chat, :noop, _opts), do: {:error, :unsupported_update_type}
defp route_parsed_event(chat, %EventEnvelope{event_type: :slash_command} = envelope, opts) do
with {:ok, slash_chat, routed_envelope} <-
Jido.Chat.process_event(chat, :telegram, envelope, opts),
{:ok, incoming} <- incoming_from_event(routed_envelope),
thread_id <- thread_id(incoming),
{:ok, final_chat, _incoming} <-
Jido.Chat.process_message(slash_chat, :telegram, thread_id, incoming, opts) do
{:ok, final_chat, incoming}
end
end
defp route_parsed_event(chat, %EventEnvelope{} = envelope, opts) do
with {:ok, updated_chat, routed_envelope} <-
Jido.Chat.process_event(chat, :telegram, envelope, opts),
{:ok, incoming} <- incoming_from_event(routed_envelope) do
{:ok, updated_chat, incoming}
end
end
defp incoming_from_event(%EventEnvelope{event_type: :message, payload: %Incoming{} = incoming}),
do: {:ok, incoming}
defp incoming_from_event(%EventEnvelope{event_type: :slash_command, raw: raw}) do
case update_message(raw) do
message when is_map(message) -> transform_message(message)
_ -> {:error, :unsupported_update_type}
end
end
defp incoming_from_event(%EventEnvelope{event_type: :action, raw: raw}) do
callback_query = map_get(raw, [:callback_query, "callback_query"]) || %{}
message = map_get(callback_query, [:message, "message"]) || %{}
chat_map = map_get(message, [:chat, "chat"]) || %{}
from = map_get(callback_query, [:from, "from"]) || %{}
{:ok,
synthetic_incoming(
map_get(chat_map, [:id, "id"]),
map_get(from, [:id, "id"]),
map_get(callback_query, [:id, "id"]),
callback_query,
:action
)}
end
defp incoming_from_event(%EventEnvelope{event_type: :reaction, raw: raw}) do
reaction = map_get(raw, [:message_reaction, "message_reaction"]) || %{}
chat_map = map_get(reaction, [:chat, "chat"]) || %{}
user = map_get(reaction, [:user, "user"]) || %{}
{:ok,
synthetic_incoming(
map_get(chat_map, [:id, "id"]),
map_get(user, [:id, "id"]),
map_get(reaction, [:message_id, "message_id"]),
reaction,
:reaction
)}
end
defp incoming_from_event(_), do: {:error, :unsupported_update_type}
defp parse_payload_event(payload) when is_map(payload) do
cond do
is_map(map_get(payload, [:message_reaction, "message_reaction"])) ->
{:ok, reaction_envelope(payload)}
is_map(map_get(payload, [:callback_query, "callback_query"])) ->
{:ok, action_envelope(payload)}
is_map(update_message(payload)) ->
build_message_or_slash_envelope(update_message(payload), payload)
true ->
{:ok, :noop}
end
end
defp parse_payload_event(_payload), do: {:ok, :noop}
defp deliver_upload(chat_id, %FileUpload{kind: :image} = upload, input, opts) do
Extensions.send_photo(chat_id, input, upload_opts(upload, opts))
end
defp deliver_upload(chat_id, %FileUpload{} = upload, input, opts) do
Extensions.send_document(chat_id, input, upload_opts(upload, opts))
end
defp upload_opts(%FileUpload{} = upload, opts) do
opts
|> pick_opts([
:token,
:transport,
:caption,
:text,
:format,
:parse_mode,
:reply_to_id,
:reply_to_message_id,
:thread_id,
:external_thread_id,
:reply_markup,
:disable_notification,
:debug,
:check_params,
:ex_gram_module,
:ex_gram_adapter
])
|> maybe_put_option(:caption, upload_caption(upload))
end
defp upload_response(%FileUpload{} = upload, media, chat_id) do
delivered_kind =
case media.kind do
:photo -> :image
_ -> upload.kind
end
Response.new(%{
message_id: media.message_id,
chat_id: media.chat_id || chat_id,
date: media.date,
channel_type: :telegram,
status: :sent,
raw: media.raw,
metadata:
media.metadata
|> Map.put(:file_id, media.file_id)
|> Map.put(:upload_kind, upload.kind)
|> Map.put(:delivered_kind, delivered_kind)
})
end
defp upload_input(%FileUpload{url: url}) when is_binary(url) and url != "", do: {:ok, url}
defp upload_input(%FileUpload{path: path}) when is_binary(path) and path != "" do
{:ok, {:file, path}}
end
defp upload_input(%FileUpload{data: data, filename: filename})
when is_binary(data) and data != "" and is_binary(filename) and filename != "" do
{:ok, {:file_content, data, filename}}
end
defp upload_input(%FileUpload{data: data}) when is_binary(data) and data != "" do
{:error, :missing_filename}
end
defp upload_input(_upload), do: {:error, :missing_file_source}
defp upload_caption(%FileUpload{} = upload) do
metadata = upload.metadata
metadata[:caption] || metadata["caption"] || metadata[:alt_text] || metadata["alt_text"] ||
metadata[:transcript] || metadata["transcript"]
end
defp maybe_put_option(opts, _key, value) when value in [nil, ""], do: opts
defp maybe_put_option(opts, key, value), do: Keyword.put_new(opts, key, value)
defp update_message(payload) when is_map(payload) do
map_get(payload, [:message, "message"]) ||
map_get(payload, [:edited_message, "edited_message"]) ||
map_get(payload, [:channel_post, "channel_post"]) ||
map_get(payload, [:edited_channel_post, "edited_channel_post"])
end
defp reaction_envelope(payload) do
reaction = map_get(payload, [:message_reaction, "message_reaction"]) || %{}
chat_map = map_get(reaction, [:chat, "chat"]) || %{}
user = map_get(reaction, [:user, "user"]) || %{}
message_id = map_get(reaction, [:message_id, "message_id"])
new_reaction = map_get(reaction, [:new_reaction, "new_reaction"]) || []
old_reaction = map_get(reaction, [:old_reaction, "old_reaction"]) || []
added = new_reaction != []
emoji = extract_reaction_emoji(if(added, do: new_reaction, else: old_reaction))
room_id = map_get(chat_map, [:id, "id"])
thread_id = "telegram:#{room_id}"
EventEnvelope.new(%{
adapter_name: :telegram,
event_type: :reaction,
thread_id: thread_id,
channel_id: stringify(room_id),
message_id: stringify(message_id),
payload: %{
adapter_name: :telegram,
thread_id: thread_id,
message_id: stringify(message_id),
emoji: emoji,
added: added,
user: %{
user_id: stringify(map_get(user, [:id, "id"]) || "unknown"),
user_name: map_get(user, [:username, "username"]) || "unknown",
full_name: map_get(user, [:first_name, "first_name"])
},
raw: reaction,
metadata: %{chat_id: room_id}
},
raw: payload,
metadata: %{}
})
end
defp action_envelope(payload) do
callback_query = map_get(payload, [:callback_query, "callback_query"]) || %{}
message = map_get(callback_query, [:message, "message"]) || %{}
chat_map = map_get(message, [:chat, "chat"]) || %{}
from = map_get(callback_query, [:from, "from"]) || %{}
room_id = map_get(chat_map, [:id, "id"])
thread_id = "telegram:#{room_id}"
message_id = stringify(map_get(message, [:message_id, "message_id"]))
EventEnvelope.new(%{
adapter_name: :telegram,
event_type: :action,
thread_id: thread_id,
channel_id: stringify(room_id),
message_id: message_id,
payload: %{
adapter_name: :telegram,
thread_id: thread_id,
message_id: message_id,
action_id: stringify(map_get(callback_query, [:data, "data"]) || map_get(callback_query, [:id, "id"])),
value: map_get(callback_query, [:data, "data"]),
user: %{
user_id: stringify(map_get(from, [:id, "id"]) || "unknown"),
user_name:
map_get(from, [:username, "username"]) ||
stringify(map_get(from, [:id, "id"]) || "unknown"),
full_name: map_get(from, [:first_name, "first_name"])
},
raw: callback_query,
metadata: %{chat_id: room_id}
},
raw: payload,
metadata: %{}
})
end
defp build_message_or_slash_envelope(message, payload) do
with {:ok, incoming} <- transform_message(message) do
case parse_slash_command(map_get(message, [:text, "text"])) do
nil ->
{:ok,
EventEnvelope.new(%{
adapter_name: :telegram,
event_type: :message,
thread_id: thread_id(incoming),
channel_id: stringify(incoming.external_room_id),
message_id: stringify(incoming.external_message_id),
payload: incoming,
raw: payload,
metadata: %{}
})}
{command, arguments} ->
slash_payload = %{
adapter_name: :telegram,
channel_id: stringify(incoming.external_room_id),
command: command,
text: arguments,
user: incoming.author,
raw: message,
metadata: %{thread_id: thread_id(incoming)}
}
{:ok,
EventEnvelope.new(%{
adapter_name: :telegram,
event_type: :slash_command,
thread_id: thread_id(incoming),
channel_id: stringify(incoming.external_room_id),
message_id: stringify(incoming.external_message_id),
payload: slash_payload,
raw: payload,
metadata: %{}
})}
end
end
end
defp verify_webhook_secret(opts, headers) do
expected =
opts[:secret_token] || Application.get_env(:jido_chat_telegram, :telegram_webhook_secret)
if is_nil(expected) do
:ok
else
actual = header_value(headers, "x-telegram-bot-api-secret-token")
if actual == expected, do: :ok, else: {:error, :invalid_webhook_secret}
end
end
defp transform_message(message) do
chat = map_get(message, [:chat, "chat"]) || %{}
from = map_get(message, [:from, "from"]) || %{}
chat_type = parse_chat_type(map_get(chat, [:type, "type"]))
thread_id = map_get(message, [:message_thread_id, "message_thread_id"])
{:ok,
Incoming.new(%{
external_room_id: map_get(chat, [:id, "id"]),
external_user_id: map_get(from, [:id, "id"]),
text: map_get(message, [:text, "text"]),
media: extract_media(message),
username: map_get(from, [:username, "username"]),
display_name: map_get(from, [:first_name, "first_name"]),
external_message_id: map_get(message, [:message_id, "message_id"]),
timestamp: map_get(message, [:date, "date"]),
external_thread_id: stringify(thread_id),
chat_type: chat_type,
chat_title: map_get(chat, [:title, "title"]),
channel_meta: %{
adapter_name: :telegram,
external_room_id: map_get(chat, [:id, "id"]),
external_thread_id: stringify(thread_id),
chat_type: chat_type,
chat_title: map_get(chat, [:title, "title"]),
is_dm: chat_type == :private,
metadata: %{}
},
raw: message
})}
end
defp parse_slash_command(text) when is_binary(text) do
case Regex.run(~r/^\/(\w+)(?:@\w+)?(?:\s+(.*))?$/, text) do
[_, command] -> {"/" <> command, ""}
[_, command, arguments] -> {"/" <> command, String.trim(arguments)}
_ -> nil
end
end
defp parse_slash_command(_), do: nil
defp extract_reaction_emoji([first | _]) when is_map(first),
do: map_get(first, [:emoji, "emoji"]) || ""
defp extract_reaction_emoji(_), do: ""
defp synthetic_incoming(chat_id, user_id, message_id, raw, event_type) do
Incoming.new(%{
external_room_id: chat_id || "unknown",
external_user_id: user_id,
external_message_id: message_id,
text: nil,
raw: raw,
metadata: %{event_type: event_type}
})
end
defp thread_id(%Incoming{external_room_id: room_id, external_thread_id: nil}),
do: "telegram:#{room_id}"
defp thread_id(%Incoming{external_room_id: room_id, external_thread_id: thread_id}),
do: "telegram:#{room_id}:#{thread_id}"
defp fetch_token(token) do
token || Application.get_env(:jido_chat_telegram, :telegram_bot_token) ||
raise ArgumentError,
"missing Telegram bot token; pass :token option or configure :jido_chat_telegram, :telegram_bot_token"
end
defp transport(opts) when is_list(opts), do: transport(Map.new(opts))
defp transport(%{transport: transport}) when not is_nil(transport), do: transport
defp transport(_opts), do: ExGramClient
defp maybe_put_action(opts, nil), do: opts
defp maybe_put_action(opts, ""), do: opts
defp maybe_put_action(opts, status), do: Keyword.put(opts, :action, status)
defp consume_stream_chunks(chunks, draft_chat_id, token, opts, draft_id, interval_ms) do
initial = %{text: "", last_draft_text: nil, last_update_ms: nil}
Enum.reduce_while(chunks, {:ok, initial}, fn chunk, {:ok, state} ->
next_state = %{state | text: state.text <> to_string(chunk)}
case maybe_send_draft_update(next_state, draft_chat_id, token, opts, draft_id, interval_ms) do
{:ok, updated_state} -> {:cont, {:ok, updated_state}}
{:error, reason} -> {:halt, {:error, reason}}
:skip -> {:cont, {:ok, next_state}}
end
end)
rescue
Protocol.UndefinedError -> {:error, :invalid_stream_chunk}
end
defp maybe_send_draft_update(
%{text: ""},
_draft_chat_id,
_token,
_opts,
_draft_id,
_interval_ms
),
do: :skip
defp maybe_send_draft_update(
%{text: text, last_draft_text: text},
_draft_chat_id,
_token,
_opts,
_draft_id,
_interval_ms
),
do: :skip
defp maybe_send_draft_update(state, draft_chat_id, token, opts, draft_id, interval_ms) do
now_ms = System.monotonic_time(:millisecond)
if send_draft_now?(state.last_update_ms, now_ms, interval_ms) do
payload =
StreamOptions.draft_payload_opts(opts, draft_id)
|> Map.merge(%{"chat_id" => draft_chat_id, "text" => state.text})
case transport(opts).call(
token,
"sendMessageDraft",
payload,
StreamOptions.transport_opts(opts)
) do
{:ok, _result} ->
{:ok, %{state | last_draft_text: state.text, last_update_ms: now_ms}}
{:error, reason} ->
{:error, reason}
end
else
:skip
end
end
defp send_draft_now?(nil, _now_ms, _interval_ms), do: true
defp send_draft_now?(_last_update_ms, _now_ms, 0), do: true
defp send_draft_now?(last_update_ms, now_ms, interval_ms),
do: now_ms - last_update_ms >= interval_ms
defp fallback_stream_send(chat_id, chunks, opts) do
text = chunks |> Enum.map(&to_string/1) |> Enum.join("")
if text == "" do
{:error, :empty_stream}
else
send_message(chat_id, text, StreamOptions.send_opts(StreamOptions.new(opts)))
end
rescue
Protocol.UndefinedError -> {:error, :invalid_stream_chunk}
end
defp draft_chat_id(chat_id) when is_integer(chat_id) and chat_id > 0, do: {:ok, chat_id}
defp draft_chat_id(chat_id) when is_binary(chat_id) do
case Integer.parse(chat_id) do
{parsed, ""} when parsed > 0 -> {:ok, parsed}
_ -> :fallback
end
end
defp draft_chat_id(_chat_id), do: :fallback
defp normalize_stream_update_interval(nil), do: 250
defp normalize_stream_update_interval(value) when value < 0, do: 0
defp normalize_stream_update_interval(value), do: value
defp parse_chat_type("private"), do: :private
defp parse_chat_type("group"), do: :group
defp parse_chat_type("supergroup"), do: :supergroup
defp parse_chat_type("channel"), do: :channel
defp parse_chat_type(:private), do: :private
defp parse_chat_type(:group), do: :group
defp parse_chat_type(:supergroup), do: :supergroup
defp parse_chat_type(:channel), do: :channel
defp parse_chat_type(_), do: :unknown
defp extract_media(message) when is_map(message) do
photos = map_get(message, [:photo, "photo"])
audio = map_get(message, [:audio, "audio"])
voice = map_get(message, [:voice, "voice"])
video = map_get(message, [:video, "video"])
document = map_get(message, [:document, "document"])
[]
|> maybe_append_photo(photos)
|> maybe_append_media(:audio, audio)
|> maybe_append_media(:audio, voice)
|> maybe_append_media(:video, video)
|> maybe_append_media(:file, document)
end
defp maybe_append_photo(media, photos) when is_list(photos) and photos != [] do
photo = List.last(photos)
case normalize_telegram_media(:image, photo) do
nil -> media
entry -> media ++ [entry]
end
end
defp maybe_append_photo(media, _), do: media
defp maybe_append_media(media, kind, value) do
case normalize_telegram_media(kind, value) do
nil -> media
entry -> media ++ [entry]
end
end
defp normalize_telegram_media(_kind, nil), do: nil
defp normalize_telegram_media(kind, media) when is_map(media) do
media_type = map_get(media, [:mime_type, "mime_type"])
resolved_kind = resolve_kind(kind, media_type)
media_ref =
map_get(media, [:file_id, "file_id", :id, "id"])
|> normalize_file_ref()
if is_nil(media_ref) do
nil
else
%{
kind: resolved_kind,
url: media_ref,
media_type: media_type,
filename: map_get(media, [:file_name, "file_name"]),
size_bytes: map_get(media, [:file_size, "file_size"]),
width: map_get(media, [:width, "width"]),
height: map_get(media, [:height, "height"]),
duration: map_get(media, [:duration, "duration"])
}
|> Enum.reject(fn {_k, v} -> is_nil(v) end)
|> Map.new()
end
end
defp normalize_telegram_media(_kind, _), do: nil
defp normalize_file_ref(nil), do: nil
defp normalize_file_ref(value) when is_binary(value), do: "telegram://file/#{value}"
defp normalize_file_ref(value), do: "telegram://file/#{to_string(value)}"
defp resolve_kind(:file, media_type) when is_binary(media_type) do
cond do
String.starts_with?(media_type, "image/") -> :image
String.starts_with?(media_type, "audio/") -> :audio
String.starts_with?(media_type, "video/") -> :video
true -> :file
end
end
defp resolve_kind(kind, _), do: kind
defp header_value(headers, name) when is_map(headers) do
Enum.find_value(headers, fn
{key, value} when is_binary(key) -> if String.downcase(key) == name, do: value
{key, value} when is_atom(key) -> if String.downcase(to_string(key)) == name, do: value
_ -> nil
end)
end
defp header_value(headers, name) when is_list(headers) do
headers
|> Enum.into(%{})
|> header_value(name)
end
defp header_value(_, _), do: nil
defp map_get(nil, _keys), do: nil
defp map_get(map, keys) when is_map(map) and is_list(keys) do
Enum.find_value(keys, fn key -> Map.get(map, key) end)
end
defp map_get(_other, _keys), do: nil
defp stringify(nil), do: nil
defp stringify(value) when is_binary(value), do: value
defp stringify(value), do: to_string(value)
defp pick_opts(opts, allowed_keys) when is_list(opts), do: Keyword.take(opts, allowed_keys)
defp normalize_metadata(%_{} = struct) do
struct
|> Map.from_struct()
|> normalize_metadata()
end
defp normalize_metadata(map) when is_map(map) do
map
|> Enum.map(fn {key, value} -> {key, normalize_metadata(value)} end)
|> Map.new()
end
defp normalize_metadata(list) when is_list(list), do: Enum.map(list, &normalize_metadata/1)
defp normalize_metadata(other), do: other
defp validate_sink_mfa({module, function, args})
when is_atom(module) and is_atom(function) and is_list(args),
do: {:ok, {module, function, args}}
defp validate_sink_mfa(_), do: {:error, :invalid_sink_mfa}
defp polling_worker_opts(bridge_id, ingress, opts, sink_mfa) do
bridge_config = Keyword.get(opts, :bridge_config)
credentials = bridge_credentials(bridge_config)
transport_opts =
ingress
|> map_get([:transport_opts, "transport_opts"])
|> case do
value when is_list(value) -> value
value when is_map(value) -> Enum.into(value, [])
_ -> []
end
[
bridge_id: bridge_id,
sink_mfa: sink_mfa,
sink_opts: [bridge_id: bridge_id],
token: map_get(ingress, [:token, "token"]) || map_get(credentials, [:token, "token"]),
transport: map_get(ingress, [:transport, "transport"]) || ExGramClient,
transport_opts: transport_opts,
timeout_s: map_get(ingress, [:timeout_s, "timeout_s"]) || 20,
poll_interval_ms: map_get(ingress, [:poll_interval_ms, "poll_interval_ms"]) || 250,
allowed_updates: map_get(ingress, [:allowed_updates, "allowed_updates"]),
max_backoff_ms: map_get(ingress, [:max_backoff_ms, "max_backoff_ms"]) || 5_000
]
end
defp bridge_credentials(%{credentials: credentials}) when is_map(credentials), do: credentials
defp bridge_credentials(_), do: %{}
end