Skip to main content

lib/jido/chat/telegram/polling_worker.ex

defmodule Jido.Chat.Telegram.PollingWorker do
  @moduledoc """
  Bridge-ingress polling worker for Telegram `getUpdates`.

  This worker is adapter-owned and runtime-agnostic. It emits raw update payloads
  via `sink_mfa` so host runtimes can route through their own ingress pipelines.
  """

  use GenServer

  alias Jido.Chat.Telegram.Transport.ExGramClient

  @type sink_mfa :: {module(), atom(), [term()]}

  @type state :: %{
          bridge_id: String.t(),
          sink_mfa: sink_mfa(),
          sink_opts: keyword(),
          token: String.t(),
          transport: module(),
          transport_opts: keyword(),
          timeout_s: pos_integer(),
          poll_interval_ms: pos_integer(),
          max_backoff_ms: pos_integer(),
          backoff_ms: pos_integer(),
          allowed_updates: [String.t()] | nil,
          offset: integer() | nil
        }

  @spec start_link(keyword()) :: GenServer.on_start()
  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts)
  end

  @impl true
  def init(opts) do
    bridge_id = Keyword.fetch!(opts, :bridge_id)
    sink_mfa = Keyword.fetch!(opts, :sink_mfa)

    token =
      opts[:token] || Application.get_env(:jido_chat_telegram, :telegram_bot_token) ||
        raise(ArgumentError, "missing Telegram bot token for polling worker")

    poll_interval_ms = normalize_pos_integer(opts[:poll_interval_ms], 250)

    state = %{
      bridge_id: bridge_id,
      sink_mfa: sink_mfa,
      sink_opts: Keyword.get(opts, :sink_opts, []),
      token: token,
      transport: Keyword.get(opts, :transport, ExGramClient),
      transport_opts: Keyword.get(opts, :transport_opts, []),
      timeout_s: normalize_pos_integer(opts[:timeout_s], 20),
      poll_interval_ms: poll_interval_ms,
      max_backoff_ms: normalize_pos_integer(opts[:max_backoff_ms], 5_000),
      backoff_ms: poll_interval_ms,
      allowed_updates: normalize_allowed_updates(opts[:allowed_updates]),
      offset: normalize_offset(opts[:offset])
    }

    send(self(), :poll)
    {:ok, state}
  end

  @impl true
  def handle_info(:poll, state) do
    payload = get_updates_payload(state)

    case state.transport.call(state.token, "getUpdates", payload, state.transport_opts) do
      {:ok, updates} when is_list(updates) ->
        case emit_updates(updates, state) do
          {:ok, next_state} ->
            schedule_poll(next_state.poll_interval_ms)
            {:noreply, %{next_state | backoff_ms: next_state.poll_interval_ms}}

          {:error, _reason, next_state} ->
            delay = min(next_state.backoff_ms, next_state.max_backoff_ms)
            schedule_poll(delay)

            {:noreply,
             %{
               next_state
               | backoff_ms: min(max(delay * 2, next_state.poll_interval_ms), next_state.max_backoff_ms)
             }}
        end

      {:ok, _invalid} ->
        delay = min(state.backoff_ms, state.max_backoff_ms)
        schedule_poll(delay)

        {:noreply, %{state | backoff_ms: min(max(delay * 2, state.poll_interval_ms), state.max_backoff_ms)}}

      {:error, _reason} ->
        delay = min(state.backoff_ms, state.max_backoff_ms)
        schedule_poll(delay)

        {:noreply, %{state | backoff_ms: min(max(delay * 2, state.poll_interval_ms), state.max_backoff_ms)}}
    end
  end

  defp emit_updates(updates, state) do
    Enum.reduce_while(updates, {:ok, state.offset}, fn update, {:ok, current_offset} ->
      case emit_update(state, update) do
        :ok ->
          {:cont, {:ok, next_offset(update, current_offset)}}

        {:error, reason} ->
          {:halt, {:error, reason}}
      end
    end)
    |> case do
      {:ok, next_offset} ->
        {:ok, %{state | offset: next_offset}}

      {:error, reason} ->
        {:error, reason, state}
    end
  end

  defp emit_update(state, update) when is_map(update) do
    case invoke_sink(state.sink_mfa, update, state.sink_opts) do
      {:ok, _result} -> :ok
      :ok -> :ok
      {:error, _reason} = error -> error
      other -> {:error, {:invalid_sink_result, other}}
    end
  end

  defp emit_update(_state, _update), do: {:error, :invalid_update_payload}

  defp invoke_sink({module, function, base_args}, payload, opts)
       when is_atom(module) and is_atom(function) and is_list(base_args) and is_list(opts) do
    apply(module, function, base_args ++ [payload, Keyword.put(opts, :mode, :payload)])
  end

  defp invoke_sink(_sink_mfa, _payload, _opts), do: {:error, :invalid_sink_mfa}

  defp get_updates_payload(state) do
    %{"timeout" => state.timeout_s}
    |> maybe_put("offset", state.offset)
    |> maybe_put("allowed_updates", state.allowed_updates)
  end

  defp next_offset(update, current_offset) do
    update_id =
      Map.get(update, "update_id") ||
        Map.get(update, :update_id)

    case update_id do
      id when is_integer(id) ->
        max(current_offset || 0, id + 1)

      _ ->
        current_offset
    end
  end

  defp schedule_poll(delay_ms) when is_integer(delay_ms) and delay_ms >= 0 do
    Process.send_after(self(), :poll, delay_ms)
  end

  defp normalize_allowed_updates(nil), do: nil

  defp normalize_allowed_updates(list) when is_list(list) do
    values =
      list
      |> Enum.map(&to_string/1)
      |> Enum.reject(&(&1 == ""))

    if values == [], do: nil, else: values
  end

  defp normalize_allowed_updates(_), do: nil

  defp normalize_offset(nil), do: nil
  defp normalize_offset(value) when is_integer(value) and value >= 0, do: value

  defp normalize_offset(value) when is_binary(value) do
    case Integer.parse(value) do
      {parsed, ""} when parsed >= 0 -> parsed
      _ -> nil
    end
  end

  defp normalize_offset(_), do: nil

  defp normalize_pos_integer(value, default)
  defp normalize_pos_integer(nil, default), do: default
  defp normalize_pos_integer(value, _default) when is_integer(value) and value > 0, do: value

  defp normalize_pos_integer(value, default) when is_binary(value) do
    case Integer.parse(value) do
      {parsed, ""} when parsed > 0 -> parsed
      _ -> default
    end
  end

  defp normalize_pos_integer(_value, default), do: default

  defp maybe_put(map, _key, nil), do: map
  defp maybe_put(map, key, value), do: Map.put(map, key, value)
end