Skip to main content

lib/linx/netlink/rtnl/monitor.ex

defmodule Linx.Netlink.Rtnl.Monitor do
  @moduledoc """
  A GenServer that owns a multicast rtnetlink socket, decodes each broadcast
  into a `Linx.Netlink.Rtnl.Monitor.Event`, and forwards it to an owner pid —
  the `ip monitor` equivalent.

  ## Lifecycle

      {:ok, mon} = Linx.Netlink.Rtnl.Monitor.subscribe()
      # → the owner receives:
      #   {:linx_rtnl, :event, %Linx.Netlink.Rtnl.Monitor.Event{...}}
      #   {:linx_rtnl, :resync_needed}    (on ENOBUFS)
      :ok = Linx.Netlink.Rtnl.Monitor.unsubscribe(mon)

  ## Events are wake-ups, not deltas

  Netlink multicast is lossy by design: on a busy system the kernel's send
  buffer fills, frames are dropped, and the next recv returns `ENOBUFS`. So the
  Monitor is a *latency* layer over reconcile, never a source of truth — a
  level-triggered consumer treats every event (and every `:resync_needed`) as
  "look now", then re-reads and re-diffs full state. It must not act on an
  event's `:resource` directly. `RTM_NEW*`/`RTM_DEL*` decode through the *same*
  codecs as `list/1`, so the structs are identical to what a re-read returns.

  Unlike `Linx.Netfilter.Monitor`, there is no generation counter or
  snapshot-then-tail handshake — rtnetlink has no transaction id. The
  level-triggered resync (re-`list`, diff, apply) is what makes a missed event
  harmless.

  ## Groups

  By default it joins links, neighbours, and IPv4/IPv6 addresses, routes, and
  rules. Override with `:groups` (a list of `RTNLGRP_*` numbers).

  ## ENOBUFS recovery

  On `ENOBUFS` the Monitor emits `{:linx_rtnl, :resync_needed}` and keeps
  reading; the owner re-syncs by reconciling. `SO_RCVBUF` defaults to 4 MiB to
  reduce overflow.
  """

  use GenServer

  alias Linx.Netlink.{Message, Rtnl, Socket}
  alias Linx.Netlink.Rtnl.{Address, Link, Neighbour, Route, Rule}
  alias Linx.Netlink.Rtnl.Monitor.Event

  # RTNLGRP_* multicast group numbers — enum rtnetlink_groups,
  # include/uapi/linux/rtnetlink.h.
  @rtnlgrp_link 1
  @rtnlgrp_neigh 3
  @rtnlgrp_ipv4_ifaddr 5
  @rtnlgrp_ipv4_route 7
  @rtnlgrp_ipv4_rule 8
  @rtnlgrp_ipv6_ifaddr 9
  @rtnlgrp_ipv6_route 11
  @rtnlgrp_ipv6_rule 19

  @default_groups [
    @rtnlgrp_link,
    @rtnlgrp_neigh,
    @rtnlgrp_ipv4_ifaddr,
    @rtnlgrp_ipv4_route,
    @rtnlgrp_ipv4_rule,
    @rtnlgrp_ipv6_ifaddr,
    @rtnlgrp_ipv6_route,
    @rtnlgrp_ipv6_rule
  ]

  # RTM_* message types — include/uapi/linux/rtnetlink.h.
  @rtm %{
    16 => {:new_link, Link},
    17 => {:del_link, Link},
    20 => {:new_addr, Address},
    21 => {:del_addr, Address},
    24 => {:new_route, Route},
    25 => {:del_route, Route},
    28 => {:new_neigh, Neighbour},
    29 => {:del_neigh, Neighbour},
    32 => {:new_rule, Rule},
    33 => {:del_rule, Rule}
  }

  # 4 MiB — absorbs most monitoring bursts without bumping net.core.rmem_max.
  @default_rcvbuf 4 * 1024 * 1024

  # Short-timeout polling loop (same rationale as Linx.Netfilter.Monitor:
  # :nowait select notifications proved flaky for netlink multicast).
  @recv_timeout_ms 50
  @recv_size 65_536

  @type opt ::
          {:owner, pid()}
          | {:netns, Socket.netns()}
          | {:groups, [pos_integer()]}
          | {:rcvbuf, pos_integer()}

  @doc """
  Starts a Monitor linked to the caller and subscribed to the rtnetlink
  multicast groups.

  Options:

    * `:owner` (required) — pid that receives `{:linx_rtnl, _}` messages.
    * `:netns` — namespace to monitor; defaults to `:host`.
    * `:groups` — `RTNLGRP_*` group numbers; defaults to links, neighbours,
      and IPv4/IPv6 addresses, routes, and rules.
    * `:rcvbuf` — `SO_RCVBUF` size in bytes; default 4 MiB.
  """
  @spec start_link([opt()]) :: GenServer.on_start()
  def start_link(opts) when is_list(opts), do: GenServer.start_link(__MODULE__, opts)

  @doc """
  Convenience: start a Monitor with `owner` (default the calling process) and
  the rest of `opts`. Returns `{:ok, monitor}`.
  """
  @spec subscribe(pid(), [opt()]) :: GenServer.on_start()
  def subscribe(owner \\ self(), opts \\ []) when is_pid(owner) and is_list(opts) do
    start_link(Keyword.put(opts, :owner, owner))
  end

  @doc "Stops the Monitor (closes its socket)."
  @spec stop(pid()) :: :ok
  def stop(monitor) when is_pid(monitor), do: GenServer.stop(monitor)

  @doc "Alias for `stop/1`."
  @spec unsubscribe(pid()) :: :ok
  def unsubscribe(monitor), do: stop(monitor)

  @doc false
  # Decodes a framed multicast message into an Event. Public for testing the
  # RTM-type dispatch without a live socket.
  @spec decode_event(Message.t()) :: Event.t()
  def decode_event(%Message{type: type, payload: payload}) do
    case Map.fetch(@rtm, type) do
      {:ok, {op, module}} -> %Event{op: op, resource: module.decode(payload)}
      :error -> %Event{op: {:unknown, type}, resource: nil}
    end
  end

  # === GenServer ============================================================

  @impl true
  def init(opts) do
    owner = Keyword.fetch!(opts, :owner)
    netns = Keyword.get(opts, :netns, :host)
    groups = Keyword.get(opts, :groups, @default_groups)
    rcvbuf = Keyword.get(opts, :rcvbuf, @default_rcvbuf)

    with {:ok, sock} <- Rtnl.open(netns),
         :ok <- join_all(sock, groups) do
      _ = Socket.set_rcvbuf(sock, rcvbuf)
      send(self(), :recv)
      {:ok, %{sock: sock, owner: owner}}
    end
  end

  @impl true
  def handle_info(:recv, state), do: do_recv(state)

  # Defensive: ignore any stray :socket select notifications.
  def handle_info({:"$socket", _sock, :select, _ref}, state), do: {:noreply, state}

  @impl true
  def terminate(_reason, state) do
    if state[:sock], do: Socket.close(state.sock)
    :ok
  end

  # === Internals ============================================================

  defp join_all(sock, groups) do
    Enum.reduce_while(groups, :ok, fn group, :ok ->
      case Socket.add_membership(sock, group) do
        :ok -> {:cont, :ok}
        {:error, _} = error -> {:halt, error}
      end
    end)
  end

  defp do_recv(%{sock: sock} = state) do
    case :socket.recv(sock.socket, @recv_size, @recv_timeout_ms) do
      {:ok, data} ->
        for %Message{} = msg <- Message.decode(data) do
          send(state.owner, {:linx_rtnl, :event, decode_event(msg)})
        end

        send(self(), :recv)
        {:noreply, state}

      {:error, :timeout} ->
        send(self(), :recv)
        {:noreply, state}

      {:error, :enobufs} ->
        send(state.owner, {:linx_rtnl, :resync_needed})
        send(self(), :recv)
        {:noreply, state}

      {:error, :closed} ->
        {:stop, :normal, state}

      {:error, reason} ->
        {:stop, {:recv, reason}, state}
    end
  end
end