Skip to main content

lib/air_play/v2/ptp_bmca.ex

defmodule AirPlay.V2.PtpBmca do
  @moduledoc """
  AirPlay 2 gPTP BMCA-yield worker.

  This follows the audible airplay2-rs/HomePod flow:

    * bind PTP event/general sockets on 319/320
    * send Mac-style Sync + Announce + Signaling with priority 250
    * yield to the receiver's better clock
    * act as a PTP slave and derive the receiver-clock offset

  HomePods never answer our `Delay_Req`, so the classic two-way exchange cannot
  complete. Instead the offset is taken **one-way** from each `Sync`/`Follow_Up`
  pair (`t1 - t2`), which is what lets `receiver_time_ns/2` track the receiver's
  own PTP clock — a HomePod's clock is its uptime, nowhere near the host's
  wall clock, so an un-converged (zero) offset stamps audio decades into the
  receiver's future and it plays nothing.
  """

  alias AirPlay.V2.Ptp

  require Logger

  defstruct [:pid, :state, :clock_id]

  @event_port 319
  @general_port 320

  @type t :: %__MODULE__{pid: pid(), state: pid(), clock_id: binary() | nil}

  @doc "Start the BMCA-yield worker for a receiver host."
  @spec start_link(String.t() | :inet.ip_address(), keyword()) :: {:ok, t()} | {:error, term()}
  def start_link(host, opts \\ []) do
    with {:ok, host_ip} <- parse_host(host),
         {:ok, state} <- Agent.start_link(fn -> initial_state() end) do
      parent = self()

      pid =
        spawn_link(fn ->
          run(parent, state, host_ip, opts)
        end)

      timeout = Keyword.get(opts, :ready_timeout, 5_000)

      receive do
        {:ptp_bmca_ready, ^pid, clock_id} ->
          {:ok, %__MODULE__{pid: pid, state: state, clock_id: clock_id}}

        {:ptp_bmca_failed, ^pid, reason} ->
          {:error, reason}
      after
        timeout -> {:ok, %__MODULE__{pid: pid, state: state, clock_id: nil}}
      end
    end
  end

  @doc "Stop a BMCA worker."
  @spec stop(t()) :: :ok
  def stop(%__MODULE__{pid: pid}) when is_pid(pid) do
    send(pid, :stop)
    :ok
  end

  @doc "Return the last measured clock offset."
  @spec offset(t()) :: map()
  def offset(%__MODULE__{state: state}) do
    Agent.get(state, & &1)
  end

  @doc "Convert local wall-clock time to the receiver's PTP timeline."
  @spec receiver_time_ns(t(), integer()) :: non_neg_integer()
  def receiver_time_ns(%__MODULE__{} = session, local_ns \\ now_ns()) do
    %{offset_ns: offset_ns} = offset(session)
    max(local_ns + offset_ns, 0)
  end

  @doc """
  Block until the one-way offset has converged, or `timeout_ms` elapses.

  Streaming before the offset converges stamps audio in the host's wall clock
  rather than the receiver's clock, which the receiver buffers far in its future
  and never plays. Always returns `:ok` (callers proceed either way).
  """
  @spec await_sync(t(), non_neg_integer()) :: :ok
  def await_sync(%__MODULE__{} = session, timeout_ms) do
    deadline = System.monotonic_time(:millisecond) + timeout_ms
    result = do_await_sync(session, deadline)
    %{offset_ns: offset_ns, synchronized?: synchronized?} = offset(session)

    Logger.info(
      "[airplay] PTP sync before stream: offset=#{offset_ns}ns synchronized=#{synchronized?}"
    )

    result
  end

  defp do_await_sync(session, deadline) do
    cond do
      offset(session).synchronized? ->
        :ok

      System.monotonic_time(:millisecond) >= deadline ->
        Logger.warning("[airplay] PTP offset did not converge before timeout; audio may not play")
        :ok

      true ->
        Process.sleep(20)
        do_await_sync(session, deadline)
    end
  end

  defp run(parent, state, host_ip, opts) do
    with {:ok, event_sock} <- open_ptp_socket(@event_port),
         {:ok, general_sock} <- open_ptp_socket(@general_port) do
      maybe_join_multicast(event_sock, opts)
      maybe_join_multicast(general_sock, opts)

      clock_id = Keyword.get(opts, :clock_id, Ptp.clock_identity(now_ns()))
      priority1 = Keyword.get(opts, :priority1, 250)

      send_initial_bmca(event_sock, general_sock, host_ip, clock_id, priority1)
      {remote_clock_id, remote_priority1} = wait_for_remote_announce(host_ip, 3_000)

      if remote_priority1 >= priority1 do
        :ok
      else
        :ok = :gen_udp.send(general_sock, host_ip, @general_port, Ptp.stop_signaling(clock_id, 1))
      end

      # If no receiver announced a better grandmaster (no Announce, or ours wins),
      # WE are the de-facto PTP timeline master: the host clock is authoritative and
      # the sender-stamped PT=87 sync packets carry it. A zero offset is then the
      # final, correct value — not an un-converged one — so mark synchronized now.
      # This mirrors the group player, which streams on the host clock without ever
      # waiting on offset convergence. A real grandmaster (HomePod) still announces,
      # leaving `synchronized?` false until its Sync/Follow_Up converges the offset.
      master? = is_nil(remote_clock_id) or remote_priority1 < priority1

      Agent.update(state, fn s ->
        %{s | clock_id: remote_clock_id, synchronized?: s.synchronized? or master?}
      end)

      send(parent, {:ptp_bmca_ready, self(), remote_clock_id})

      loop(%{
        event_sock: event_sock,
        general_sock: general_sock,
        host_ip: host_ip,
        local_clock_id: clock_id,
        state: state,
        t1: nil,
        t2: nil,
        t3: nil,
        delay_seq: 0
      })
    else
      error ->
        send(parent, {:ptp_bmca_failed, self(), error})
    end
  end

  defp loop(ctx) do
    receive do
      :stop ->
        :gen_udp.close(ctx.event_sock)
        :gen_udp.close(ctx.general_sock)
        :ok

      {:udp, _sock, ip, _port, packet} when ip == ctx.host_ip ->
        ctx
        |> handle_packet(packet)
        |> loop()

      {:udp, _sock, _ip, _port, _packet} ->
        loop(ctx)
    after
      1_000 ->
        loop(ctx)
    end
  end

  defp handle_packet(ctx, packet) do
    case Ptp.parse(packet) do
      {:ok, %{header: %{message_type: :sync}}} ->
        %{ctx | t2: now_ns()}

      {:ok, %{header: %{message_type: :follow_up}, timestamp_ns: timestamp_ns}} ->
        # HomePods never answer our Delay_Req, so the two-way exchange can't
        # complete. But the Follow_Up carries the receiver's own clock time (t1),
        # and we recorded our receive time of the matching Sync (t2). The one-way
        # offset `t1 - t2` makes `receiver_time_ns = local + offset` track the
        # receiver's PTP clock directly — immune to host wall-clock skew. The LAN
        # path delay it ignores is sub-millisecond, far below the audio buffer.
        ctx = apply_oneway_offset(ctx, timestamp_ns)

        # Still send a best-effort Delay_Req in case a receiver does answer (the
        # two-way result refines the offset via :delay_response below).
        delay_seq = rem(ctx.delay_seq + 1, 65_536)
        t3 = now_ns()
        request = Ptp.delay_request(ctx.local_clock_id, delay_seq, t3)
        :ok = :gen_udp.send(ctx.event_sock, ctx.host_ip, @event_port, request)
        %{ctx | t1: timestamp_ns, t3: t3, delay_seq: delay_seq}

      {:ok, %{header: %{message_type: :delay_response}, timestamp_ns: t4}} ->
        maybe_update_offset(ctx, t4)

      _ ->
        ctx
    end
  end

  defp maybe_update_offset(%{t1: t1, t2: t2, t3: t3} = ctx, t4)
       when is_integer(t1) and is_integer(t2) and is_integer(t3) do
    offset_ns = div(t2 - t1 + t3 - t4, 2)
    delay_ns = abs(div(t2 - t1 - (t3 - t4), 2))

    Agent.update(ctx.state, fn state ->
      %{
        state
        | offset_ns: offset_ns,
          error_ns: div(delay_ns, 2),
          rtt_ns: delay_ns,
          synchronized?: true
      }
    end)

    %{ctx | t1: nil, t2: nil, t3: nil}
  end

  defp maybe_update_offset(ctx, _t4), do: ctx

  # One-way offset from a Sync/Follow_Up pair (t2 = our Sync receive, t1 = the
  # receiver's send time from the Follow_Up). Only the FIRST measurement logs.
  defp apply_oneway_offset(%{t2: t2} = ctx, t1) when is_integer(t2) do
    offset_ns = t1 - t2

    Agent.update(ctx.state, fn state ->
      unless state.synchronized? do
        Logger.debug("[airplay] PTP one-way offset converged: #{offset_ns} ns")
      end

      %{state | offset_ns: offset_ns, synchronized?: true}
    end)

    ctx
  end

  defp apply_oneway_offset(ctx, _t1), do: ctx

  defp send_initial_bmca(event_sock, general_sock, host_ip, clock_id, priority1) do
    Enum.each(0..2, fn index ->
      {sync, follow_up} = Ptp.sync_follow_up(clock_id, index, now_ns())
      :ok = :gen_udp.send(event_sock, host_ip, @event_port, sync)
      :ok = :gen_udp.send(general_sock, host_ip, @general_port, follow_up)

      if index < 2 do
        announce = Ptp.announce(clock_id, index, priority1: priority1, clock_class: 248)
        :ok = :gen_udp.send(general_sock, host_ip, @general_port, announce)
      end

      Process.sleep(125)
    end)

    :ok =
      :gen_udp.send(
        general_sock,
        host_ip,
        @general_port,
        Ptp.signaling(clock_id, 0, mac_style: true)
      )
  end

  defp wait_for_remote_announce(host_ip, timeout_ms) do
    deadline = System.monotonic_time(:millisecond) + timeout_ms
    wait_for_remote_announce(host_ip, deadline, {nil, 255})
  end

  defp wait_for_remote_announce(host_ip, deadline, fallback) do
    remaining = max(deadline - System.monotonic_time(:millisecond), 0)

    if remaining == 0 do
      fallback
    else
      receive do
        {:udp, _sock, ip, _port, packet} when ip == host_ip ->
          case Ptp.parse(packet) do
            {:ok,
             %{
               header: %{message_type: :announce},
               priority1: priority1,
               grandmaster_identity: clock_id
             }} ->
              {clock_id, priority1}

            _ ->
              wait_for_remote_announce(host_ip, deadline, fallback)
          end

        {:udp, _sock, _ip, _port, _packet} ->
          wait_for_remote_announce(host_ip, deadline, fallback)
      after
        remaining -> fallback
      end
    end
  end

  defp open_ptp_socket(port) do
    opts = [:binary, active: true, reuseaddr: true, ip: {0, 0, 0, 0}]

    case :gen_udp.open(port, opts) do
      {:ok, sock} -> {:ok, sock}
      {:error, _reason} -> :gen_udp.open(0, opts)
    end
  end

  defp maybe_join_multicast(sock, opts) do
    case Keyword.get(opts, :local_ip) do
      {_, _, _, _} = ip -> :inet.setopts(sock, add_membership: {{224, 0, 1, 129}, ip})
      _ -> :ok
    end
  end

  defp parse_host({_, _, _, _} = ip), do: {:ok, ip}
  defp parse_host({_, _, _, _, _, _, _, _} = ip), do: {:ok, ip}

  defp parse_host(host) when is_binary(host) do
    host |> String.to_charlist() |> :inet.parse_address()
  end

  defp initial_state do
    %{offset_ns: 0, error_ns: 0, rtt_ns: 0, synchronized?: false, clock_id: nil}
  end

  defp now_ns, do: System.system_time(:nanosecond)
end