Skip to main content

lib/bb/jido/pub_sub_bridge.ex

# SPDX-FileCopyrightText: 2026 James Harton
#
# SPDX-License-Identifier: Apache-2.0

defmodule BB.Jido.PubSubBridge do
  @moduledoc """
  GenServer that bridges `BB.PubSub` messages into a Jido agent as signals.

  Subscribes to configured BB topics on behalf of a Jido agent process and
  forwards each `{:bb, source_path, %BB.Message{}}` delivery as a
  `Jido.Signal` via `Jido.AgentServer.cast/2`.

  ## Options

  - `:robot` — robot module to subscribe against (required).
  - `:agent` — pid or registered name of the Jido agent server that should
    receive the forwarded signals (required).
  - `:topics` — list of paths to subscribe to (default `[[:state_machine]]`).
    Each path is a list of atoms as accepted by `BB.PubSub.subscribe/3`.
  - `:message_types` — list of payload modules to filter on at subscription
    time (default `[]`, meaning no filter). Applied to every topic.
  - `:throttle_ms` — optional minimum interval between signals of the same
    type. Repeated signals arriving within the window are dropped. Defaults
    to no throttling.

  ## Filtering

  Topic and message-type filtering happen at the PubSub layer (cheap). The
  bridge additionally enforces an optional per-type throttle to limit signal
  volume for high-frequency topics (e.g. joint states at 100Hz).
  """

  use GenServer

  alias BB.Jido.Signal, as: SignalMap
  alias BB.Jido.Telemetry
  alias BB.Message

  require Logger

  @type option ::
          {:robot, module()}
          | {:agent, GenServer.server()}
          | {:topics, [[atom()]]}
          | {:message_types, [module()]}
          | {:throttle_ms, pos_integer()}
          | GenServer.option()

  @spec start_link([option()]) :: GenServer.on_start()
  def start_link(opts) do
    {server_opts, init_opts} =
      Keyword.split(opts, [:name, :timeout, :debug, :spawn_opt, :hibernate_after])

    GenServer.start_link(__MODULE__, init_opts, server_opts)
  end

  @impl GenServer
  def init(opts) do
    robot = Keyword.fetch!(opts, :robot)
    agent = Keyword.fetch!(opts, :agent)
    topics = Keyword.get(opts, :topics, default_topics())
    message_types = Keyword.get(opts, :message_types, [])
    throttle_ms = Keyword.get(opts, :throttle_ms)

    case subscribe_all(robot, topics, message_types) do
      :ok ->
        {:ok,
         %{
           robot: robot,
           agent: agent,
           topics: topics,
           throttle_ms: throttle_ms,
           last_emitted: %{}
         }}

      {:error, reason} ->
        {:stop, {:subscribe_failed, reason}}
    end
  end

  @impl GenServer
  def handle_info({:bb, path, %Message{} = message}, state) do
    signal = SignalMap.from_pubsub(state.robot, path, message)

    case maybe_emit(signal, state) do
      {:emit, new_state} ->
        Telemetry.emit_signal(state.robot, signal.type)
        dispatch(state.agent, signal)
        {:noreply, new_state}

      {:drop, new_state} ->
        {:noreply, new_state}
    end
  end

  def handle_info(_msg, state), do: {:noreply, state}

  defp dispatch(agent, signal) do
    case Jido.AgentServer.cast(agent, signal) do
      :ok ->
        :ok

      {:error, reason} ->
        Logger.warning("BB.Jido.PubSubBridge could not cast signal to agent: #{inspect(reason)}")
    end
  end

  defp maybe_emit(_signal, %{throttle_ms: nil} = state), do: {:emit, state}

  defp maybe_emit(signal, %{throttle_ms: throttle_ms} = state) do
    now = System.monotonic_time(:millisecond)
    last = Map.get(state.last_emitted, signal.type)

    if is_nil(last) or now - last >= throttle_ms do
      {:emit, %{state | last_emitted: Map.put(state.last_emitted, signal.type, now)}}
    else
      {:drop, state}
    end
  end

  defp subscribe_all(robot, topics, message_types) do
    opts = if message_types == [], do: [], else: [message_types: message_types]

    Enum.reduce_while(topics, :ok, fn topic, :ok ->
      case BB.PubSub.subscribe(robot, topic, opts) do
        {:ok, _pid} -> {:cont, :ok}
        {:error, reason} -> {:halt, {:error, {topic, reason}}}
      end
    end)
  end

  defp default_topics, do: [[:state_machine]]
end