defmodule Jido.Messaging.Demo.Bridge do
@moduledoc """
Bridges messages between Telegram and Discord via Signal Bus subscription.
Subscribes to `jido.messaging.room.message_added` signals and forwards
to external platforms, skipping the origin platform to prevent loops.
## Architecture
Instead of being called directly by channel handlers, the Bridge now:
1. Subscribes to the Signal Bus for `message_added` events
2. Receives signals when any message is added to any room
3. Forwards to all bound platforms except the origin
This decouples the Bridge from channel handlers and makes it event-driven.
"""
use GenServer
require Logger
alias Jido.Messaging.AdapterBridge
alias Jido.Messaging.Supervisor, as: MessagingSupervisor
@schema Zoi.struct(
__MODULE__,
%{
instance_module: Zoi.module(),
bindings: Zoi.array(Zoi.any()) |> Zoi.default([]),
room_id: Zoi.string() |> Zoi.nullish(),
subscribed: Zoi.boolean() |> Zoi.default(false)
},
coerce: false
)
@type t :: unquote(Zoi.type_spec(@schema))
@enforce_keys Zoi.Struct.enforce_keys(@schema)
defstruct Zoi.Struct.struct_fields(@schema)
@type binding :: {:telegram | :discord, module(), String.t(), String.t()}
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
@shared_room_id "demo:lobby"
@shared_room_name "Jido.Messaging Bridge Room"
@impl true
def init(opts) do
instance_module = Keyword.get(opts, :instance_module, Jido.Messaging.Demo.Messaging)
telegram_chat_id = Keyword.fetch!(opts, :telegram_chat_id)
discord_channel_id = Keyword.fetch!(opts, :discord_channel_id)
telegram_adapter =
resolve_adapter_module(
opts,
:telegram_adapter,
"JIDO_MESSAGING_DEMO_TELEGRAM_ADAPTER"
)
discord_adapter =
resolve_adapter_module(
opts,
:discord_adapter,
"JIDO_MESSAGING_DEMO_DISCORD_ADAPTER"
)
telegram_bridge_id = Keyword.get(opts, :telegram_bridge_id, to_string(telegram_adapter))
discord_bridge_id = Keyword.get(opts, :discord_bridge_id, to_string(discord_adapter))
# Bindings: {channel, adapter_module, bridge_id, external_id}
bindings = [
{:telegram, telegram_adapter, to_string(telegram_bridge_id), to_string(telegram_chat_id)},
{:discord, discord_adapter, to_string(discord_bridge_id), to_string(discord_channel_id)}
]
state = %__MODULE__{
instance_module: instance_module,
bindings: bindings,
room_id: @shared_room_id,
subscribed: false
}
# Schedule room setup and subscription
send(self(), :setup_shared_room)
Logger.info("[Bridge] Started - TG:#{telegram_chat_id} <-> DC:#{discord_channel_id}")
{:ok, state}
end
@impl true
def handle_info(:setup_shared_room, state) do
case setup_shared_room(state) do
:ok ->
send(self(), :subscribe)
{:noreply, state}
{:error, reason} ->
Logger.debug("[Bridge] Failed to setup shared room: #{inspect(reason)}, retrying in 100ms")
Process.send_after(self(), :setup_shared_room, 100)
{:noreply, state}
end
end
@impl true
def handle_info(:subscribe, state) do
bus_name = MessagingSupervisor.signal_bus_name(state.instance_module)
case Jido.Signal.Bus.subscribe(bus_name, "jido.messaging.room.message_added") do
{:ok, _subscription_id} ->
Logger.info("[Bridge] Subscribed to Signal Bus #{inspect(bus_name)}")
{:noreply, %{state | subscribed: true}}
{:error, reason} ->
Logger.debug("[Bridge] Failed to subscribe to #{inspect(bus_name)}: #{inspect(reason)}, retrying in 100ms")
Process.send_after(self(), :subscribe, 100)
{:noreply, state}
end
end
@impl true
def handle_info({:signal, signal}, state) do
handle_signal(signal, state)
{:noreply, state}
end
def handle_info(_msg, state) do
{:noreply, state}
end
# Signal handling
defp handle_signal(%{type: "jido.messaging.room.message_added"} = signal, state) do
message = signal.data.message
room_id = signal.data.room_id
# If room_id filter is set, only handle messages for that room
if state.room_id == nil or room_id == state.room_id do
origin = get_origin(message)
# Forward to all bindings except origin
for binding <- state.bindings, not_origin?(binding, origin) do
send_to_binding(binding, message)
end
end
end
defp handle_signal(_signal, _state), do: :ok
# Shared room setup
defp setup_shared_room(state) do
messaging = state.instance_module
# Create or get the shared room
case get_or_create_shared_room(messaging) do
{:ok, room} ->
# Create bindings for each platform
Enum.each(state.bindings, fn {channel, _adapter, bridge_id, external_id} ->
create_binding_if_missing(messaging, room.id, channel, bridge_id, external_id)
end)
Logger.info("[Bridge] Shared room #{room.id} ready with #{length(state.bindings)} bindings")
:ok
{:error, reason} ->
{:error, reason}
end
end
defp get_or_create_shared_room(messaging) do
case messaging.get_room(@shared_room_id) do
{:ok, room} ->
{:ok, room}
{:error, :not_found} ->
# Create room struct with our fixed ID
room = %Jido.Chat.Room{
id: @shared_room_id,
type: :group,
name: @shared_room_name,
external_bindings: %{},
metadata: %{},
inserted_at: DateTime.utc_now()
}
messaging.save_room(room)
end
end
defp create_binding_if_missing(messaging, room_id, channel, bridge_id, external_id) do
case messaging.get_room_by_external_binding(channel, bridge_id, external_id) do
{:ok, _existing} ->
Logger.debug("[Bridge] Binding already exists: #{channel}/#{bridge_id}/#{external_id}")
:ok
{:error, :not_found} ->
case messaging.create_room_binding(room_id, channel, bridge_id, external_id, %{
direction: :both,
enabled: true
}) do
{:ok, binding} ->
Logger.info("[Bridge] Created binding #{binding.id}: #{channel}/#{bridge_id}/#{external_id} -> #{room_id}")
:ok
{:error, reason} ->
Logger.warning("[Bridge] Failed to create binding: #{inspect(reason)}")
{:error, reason}
end
end
end
# Origin detection from message metadata
defp get_origin(%{metadata: metadata}) when is_map(metadata) do
channel = metadata[:channel] || metadata["channel"]
{channel, nil}
end
defp get_origin(_), do: {nil, nil}
defp not_origin?({channel, _adapter, _bridge_id, _external_id}, {origin_channel, _}) do
# Convert both to atoms for comparison
normalize_channel(channel) != normalize_channel(origin_channel)
end
defp normalize_channel(channel) when is_atom(channel), do: channel
defp normalize_channel("telegram"), do: :telegram
defp normalize_channel("discord"), do: :discord
defp normalize_channel(_), do: nil
# Delivery
defp send_to_binding({:telegram, adapter_module, _bridge_id, chat_id}, message) do
text = format_message(message, :telegram)
Logger.info("[Bridge] -> TG: #{text}")
try do
case AdapterBridge.send_message(adapter_module, chat_id, text, []) do
{:ok, _} -> :ok
{:error, reason} -> Logger.warning("[Bridge] Telegram send failed: #{inspect(reason)}")
end
rescue
e -> Logger.warning("[Bridge] Telegram send crashed: #{inspect(e)}")
end
end
defp send_to_binding({:discord, adapter_module, _bridge_id, channel_id}, message) do
text = format_message(message, :discord)
Logger.info("[Bridge] -> DC: #{text}")
try do
case AdapterBridge.send_message(adapter_module, channel_id, text, []) do
{:ok, _} -> :ok
{:error, reason} -> Logger.warning("[Bridge] Discord send failed: #{inspect(reason)}")
end
rescue
e -> Logger.warning("[Bridge] Discord send crashed: #{inspect(e)}")
end
end
defp format_message(message, _target) do
text = extract_text(message)
username = get_username(message)
source = get_source_tag(message)
"[#{source} #{username}] #{text}"
end
defp get_source_tag(%{metadata: %{channel: :telegram}}), do: "TG"
defp get_source_tag(%{metadata: %{channel: :discord}}), do: "DC"
defp get_source_tag(%{metadata: %{channel: :agent}}), do: "AG"
defp get_source_tag(%{metadata: %{"channel" => "telegram"}}), do: "TG"
defp get_source_tag(%{metadata: %{"channel" => "discord"}}), do: "DC"
defp get_source_tag(%{metadata: %{"channel" => "agent"}}), do: "AG"
defp get_source_tag(_), do: "??"
defp get_username(%{metadata: metadata}) when is_map(metadata) do
metadata[:username] || metadata["username"] ||
metadata[:display_name] || metadata["display_name"] ||
"unknown"
end
defp get_username(_), do: "unknown"
defp extract_text(%{content: [%{text: text} | _]}) when is_binary(text), do: text
defp extract_text(%{content: [%{"text" => text} | _]}) when is_binary(text), do: text
defp extract_text(%{content: content}) when is_list(content) do
Enum.find_value(content, fn
%{type: :text, text: text} -> text
%{"type" => "text", "text" => text} -> text
%{text: text} when is_binary(text) -> text
%Jido.Chat.Content.Text{text: text} -> text
_ -> nil
end)
end
defp extract_text(_), do: nil
defp resolve_adapter_module(opts, opt_key, env_key) do
module =
Keyword.get(opts, opt_key) ||
Application.get_env(:jido_messaging, opt_key) ||
System.get_env(env_key)
normalize_module(module)
end
defp normalize_module(module) when is_atom(module), do: module
defp normalize_module(module_name) when is_binary(module_name) and module_name != "" do
module_name
|> String.split(".")
|> Module.concat()
end
defp normalize_module(nil),
do: raise(ArgumentError, "missing adapter module configuration (set opts or env)")
defp normalize_module(other),
do: raise(ArgumentError, "invalid adapter module configuration: #{inspect(other)}")
end