Skip to main content

lib/bandera/notifications/redis.ex

if Code.ensure_loaded?(Redix.PubSub) do
  defmodule Bandera.Notifications.Redis do
    @moduledoc """
    Redis PubSub cache-busting notifier (via Redix). Subscribes to a channel and,
    on a flag change published by ANOTHER node, busts the local cache entry for
    that flag. Self-published changes are ignored. Connection options are read at
    runtime from `config :bandera, cache_bust_notifications: [redis: <Redix opts>]`.

    Note: incoming change payloads come from a shared channel. The flag name is
    resolved with `String.to_existing_atom/1`, so notifications for flags this node
    has never referenced are ignored (and the atom table can't be exhausted by
    foreign publishers).
    """

    use GenServer
    @behaviour Bandera.Notifications

    alias Bandera.Config
    alias Bandera.Store.Cache

    @channel "bandera:changes"

    @doc "Starts the notifier GenServer, which opens its own Redis pub and sub connections."
    @spec start_link(keyword) :: GenServer.on_start()
    def start_link(opts \\ []) do
      GenServer.start_link(__MODULE__, opts, name: __MODULE__)
    end

    @impl Bandera.Notifications
    def publish_change(flag_name) do
      GenServer.call(__MODULE__, {:publish_change, flag_name})
    end

    @impl Bandera.Notifications
    def unique_id do
      GenServer.call(__MODULE__, :unique_id)
    end

    @doc "Whether the PubSub subscription has been confirmed (useful in tests)."
    @spec subscribed?() :: boolean
    def subscribed? do
      GenServer.call(__MODULE__, :subscribed?)
    end

    @impl GenServer
    def init(opts) do
      redis_opts = Keyword.merge(redis_config(), opts)
      {:ok, pub} = Redix.start_link(redis_opts)
      {:ok, sub} = Redix.PubSub.start_link(redis_opts)
      {:ok, _ref} = Redix.PubSub.subscribe(sub, @channel, self())
      {:ok, %{unique_id: Config.build_unique_id(), pub: pub, sub: sub, subscribed: false}}
    end

    @impl GenServer
    def handle_call({:publish_change, flag_name}, _from, %{pub: pub, unique_id: id} = state) do
      result = Redix.command(pub, ["PUBLISH", @channel, "#{id}:#{flag_name}"])
      {:reply, normalize(result), state}
    end

    def handle_call(:unique_id, _from, %{unique_id: id} = state) do
      {:reply, id, state}
    end

    def handle_call(:subscribed?, _from, state) do
      {:reply, state.subscribed, state}
    end

    @impl GenServer
    def handle_info(
          {:redix_pubsub, _pid, _ref, :message, %{channel: @channel, payload: payload}},
          state
        ) do
      handle_payload(payload, state.unique_id)
      {:noreply, state}
    end

    def handle_info({:redix_pubsub, _pid, _ref, :subscribed, %{channel: @channel}}, state) do
      {:noreply, %{state | subscribed: true}}
    end

    def handle_info({:redix_pubsub, _pid, _ref, _kind, _meta}, state) do
      {:noreply, state}
    end

    defp handle_payload(payload, own_id) do
      case String.split(payload, ":", parts: 2) do
        [^own_id, _flag] -> :ok
        [_other_id, flag] -> bust(flag)
        _ -> :ok
      end
    end

    # The flag name arrives over a shared channel that other (or misbehaving)
    # publishers could write to. Use String.to_existing_atom/1 to avoid atom-table
    # exhaustion: an unknown flag name means this node has never referenced that
    # flag, so there is nothing cached to bust — drop it silently.
    defp bust(flag) do
      Cache.bust(String.to_existing_atom(flag))
    rescue
      ArgumentError -> :ok
    end

    defp normalize({:ok, _}), do: :ok
    defp normalize({:error, reason}), do: {:error, reason}

    defp redis_config, do: Keyword.get(Config.notifications(), :redis, [])
  end
end