defmodule AirPlay.V2.PtpBmca do
@moduledoc """
AirPlay 2 gPTP BMCA-yield worker.
This follows the audible airplay2-rs/HomePod flow:
* bind PTP event/general sockets on 319/320
* send Mac-style Sync + Announce + Signaling with priority 250
* yield to the receiver's better clock
* act as a PTP slave and derive the receiver-clock offset
HomePods never answer our `Delay_Req`, so the classic two-way exchange cannot
complete. Instead the offset is taken **one-way** from each `Sync`/`Follow_Up`
pair (`t1 - t2`), which is what lets `receiver_time_ns/2` track the receiver's
own PTP clock — a HomePod's clock is its uptime, nowhere near the host's
wall clock, so an un-converged (zero) offset stamps audio decades into the
receiver's future and it plays nothing.
"""
alias AirPlay.V2.Ptp
require Logger
defstruct [:pid, :state, :clock_id]
@event_port 319
@general_port 320
@type t :: %__MODULE__{pid: pid(), state: pid(), clock_id: binary() | nil}
@doc "Start the BMCA-yield worker for a receiver host."
@spec start_link(String.t() | :inet.ip_address(), keyword()) :: {:ok, t()} | {:error, term()}
def start_link(host, opts \\ []) do
with {:ok, host_ip} <- parse_host(host),
{:ok, state} <- Agent.start_link(fn -> initial_state() end) do
parent = self()
pid =
spawn_link(fn ->
run(parent, state, host_ip, opts)
end)
timeout = Keyword.get(opts, :ready_timeout, 5_000)
receive do
{:ptp_bmca_ready, ^pid, clock_id} ->
{:ok, %__MODULE__{pid: pid, state: state, clock_id: clock_id}}
{:ptp_bmca_failed, ^pid, reason} ->
{:error, reason}
after
timeout -> {:ok, %__MODULE__{pid: pid, state: state, clock_id: nil}}
end
end
end
@doc "Stop a BMCA worker."
@spec stop(t()) :: :ok
def stop(%__MODULE__{pid: pid}) when is_pid(pid) do
send(pid, :stop)
:ok
end
@doc "Return the last measured clock offset."
@spec offset(t()) :: map()
def offset(%__MODULE__{state: state}) do
Agent.get(state, & &1)
end
@doc "Convert local wall-clock time to the receiver's PTP timeline."
@spec receiver_time_ns(t(), integer()) :: non_neg_integer()
def receiver_time_ns(%__MODULE__{} = session, local_ns \\ now_ns()) do
%{offset_ns: offset_ns} = offset(session)
max(local_ns + offset_ns, 0)
end
@doc """
Block until the one-way offset has converged, or `timeout_ms` elapses.
Streaming before the offset converges stamps audio in the host's wall clock
rather than the receiver's clock, which the receiver buffers far in its future
and never plays. Always returns `:ok` (callers proceed either way).
"""
@spec await_sync(t(), non_neg_integer()) :: :ok
def await_sync(%__MODULE__{} = session, timeout_ms) do
deadline = System.monotonic_time(:millisecond) + timeout_ms
result = do_await_sync(session, deadline)
%{offset_ns: offset_ns, synchronized?: synchronized?} = offset(session)
Logger.info(
"[airplay] PTP sync before stream: offset=#{offset_ns}ns synchronized=#{synchronized?}"
)
result
end
defp do_await_sync(session, deadline) do
cond do
offset(session).synchronized? ->
:ok
System.monotonic_time(:millisecond) >= deadline ->
Logger.warning("[airplay] PTP offset did not converge before timeout; audio may not play")
:ok
true ->
Process.sleep(20)
do_await_sync(session, deadline)
end
end
defp run(parent, state, host_ip, opts) do
with {:ok, event_sock} <- open_ptp_socket(@event_port),
{:ok, general_sock} <- open_ptp_socket(@general_port) do
maybe_join_multicast(event_sock, opts)
maybe_join_multicast(general_sock, opts)
clock_id = Keyword.get(opts, :clock_id, Ptp.clock_identity(now_ns()))
priority1 = Keyword.get(opts, :priority1, 250)
send_initial_bmca(event_sock, general_sock, host_ip, clock_id, priority1)
{remote_clock_id, remote_priority1} = wait_for_remote_announce(host_ip, 3_000)
if remote_priority1 >= priority1 do
:ok
else
:ok = :gen_udp.send(general_sock, host_ip, @general_port, Ptp.stop_signaling(clock_id, 1))
end
# If no receiver announced a better grandmaster (no Announce, or ours wins),
# WE are the de-facto PTP timeline master: the host clock is authoritative and
# the sender-stamped PT=87 sync packets carry it. A zero offset is then the
# final, correct value — not an un-converged one — so mark synchronized now.
# This mirrors the group player, which streams on the host clock without ever
# waiting on offset convergence. A real grandmaster (HomePod) still announces,
# leaving `synchronized?` false until its Sync/Follow_Up converges the offset.
master? = is_nil(remote_clock_id) or remote_priority1 < priority1
Agent.update(state, fn s ->
%{s | clock_id: remote_clock_id, synchronized?: s.synchronized? or master?}
end)
send(parent, {:ptp_bmca_ready, self(), remote_clock_id})
loop(%{
event_sock: event_sock,
general_sock: general_sock,
host_ip: host_ip,
local_clock_id: clock_id,
state: state,
t1: nil,
t2: nil,
t3: nil,
delay_seq: 0
})
else
error ->
send(parent, {:ptp_bmca_failed, self(), error})
end
end
defp loop(ctx) do
receive do
:stop ->
:gen_udp.close(ctx.event_sock)
:gen_udp.close(ctx.general_sock)
:ok
{:udp, _sock, ip, _port, packet} when ip == ctx.host_ip ->
ctx
|> handle_packet(packet)
|> loop()
{:udp, _sock, _ip, _port, _packet} ->
loop(ctx)
after
1_000 ->
loop(ctx)
end
end
defp handle_packet(ctx, packet) do
case Ptp.parse(packet) do
{:ok, %{header: %{message_type: :sync}}} ->
%{ctx | t2: now_ns()}
{:ok, %{header: %{message_type: :follow_up}, timestamp_ns: timestamp_ns}} ->
# HomePods never answer our Delay_Req, so the two-way exchange can't
# complete. But the Follow_Up carries the receiver's own clock time (t1),
# and we recorded our receive time of the matching Sync (t2). The one-way
# offset `t1 - t2` makes `receiver_time_ns = local + offset` track the
# receiver's PTP clock directly — immune to host wall-clock skew. The LAN
# path delay it ignores is sub-millisecond, far below the audio buffer.
ctx = apply_oneway_offset(ctx, timestamp_ns)
# Still send a best-effort Delay_Req in case a receiver does answer (the
# two-way result refines the offset via :delay_response below).
delay_seq = rem(ctx.delay_seq + 1, 65_536)
t3 = now_ns()
request = Ptp.delay_request(ctx.local_clock_id, delay_seq, t3)
:ok = :gen_udp.send(ctx.event_sock, ctx.host_ip, @event_port, request)
%{ctx | t1: timestamp_ns, t3: t3, delay_seq: delay_seq}
{:ok, %{header: %{message_type: :delay_response}, timestamp_ns: t4}} ->
maybe_update_offset(ctx, t4)
_ ->
ctx
end
end
defp maybe_update_offset(%{t1: t1, t2: t2, t3: t3} = ctx, t4)
when is_integer(t1) and is_integer(t2) and is_integer(t3) do
offset_ns = div(t2 - t1 + t3 - t4, 2)
delay_ns = abs(div(t2 - t1 - (t3 - t4), 2))
Agent.update(ctx.state, fn state ->
%{
state
| offset_ns: offset_ns,
error_ns: div(delay_ns, 2),
rtt_ns: delay_ns,
synchronized?: true
}
end)
%{ctx | t1: nil, t2: nil, t3: nil}
end
defp maybe_update_offset(ctx, _t4), do: ctx
# One-way offset from a Sync/Follow_Up pair (t2 = our Sync receive, t1 = the
# receiver's send time from the Follow_Up). Only the FIRST measurement logs.
defp apply_oneway_offset(%{t2: t2} = ctx, t1) when is_integer(t2) do
offset_ns = t1 - t2
Agent.update(ctx.state, fn state ->
unless state.synchronized? do
Logger.debug("[airplay] PTP one-way offset converged: #{offset_ns} ns")
end
%{state | offset_ns: offset_ns, synchronized?: true}
end)
ctx
end
defp apply_oneway_offset(ctx, _t1), do: ctx
defp send_initial_bmca(event_sock, general_sock, host_ip, clock_id, priority1) do
Enum.each(0..2, fn index ->
{sync, follow_up} = Ptp.sync_follow_up(clock_id, index, now_ns())
:ok = :gen_udp.send(event_sock, host_ip, @event_port, sync)
:ok = :gen_udp.send(general_sock, host_ip, @general_port, follow_up)
if index < 2 do
announce = Ptp.announce(clock_id, index, priority1: priority1, clock_class: 248)
:ok = :gen_udp.send(general_sock, host_ip, @general_port, announce)
end
Process.sleep(125)
end)
:ok =
:gen_udp.send(
general_sock,
host_ip,
@general_port,
Ptp.signaling(clock_id, 0, mac_style: true)
)
end
defp wait_for_remote_announce(host_ip, timeout_ms) do
deadline = System.monotonic_time(:millisecond) + timeout_ms
wait_for_remote_announce(host_ip, deadline, {nil, 255})
end
defp wait_for_remote_announce(host_ip, deadline, fallback) do
remaining = max(deadline - System.monotonic_time(:millisecond), 0)
if remaining == 0 do
fallback
else
receive do
{:udp, _sock, ip, _port, packet} when ip == host_ip ->
case Ptp.parse(packet) do
{:ok,
%{
header: %{message_type: :announce},
priority1: priority1,
grandmaster_identity: clock_id
}} ->
{clock_id, priority1}
_ ->
wait_for_remote_announce(host_ip, deadline, fallback)
end
{:udp, _sock, _ip, _port, _packet} ->
wait_for_remote_announce(host_ip, deadline, fallback)
after
remaining -> fallback
end
end
end
defp open_ptp_socket(port) do
opts = [:binary, active: true, reuseaddr: true, ip: {0, 0, 0, 0}]
case :gen_udp.open(port, opts) do
{:ok, sock} -> {:ok, sock}
{:error, _reason} -> :gen_udp.open(0, opts)
end
end
defp maybe_join_multicast(sock, opts) do
case Keyword.get(opts, :local_ip) do
{_, _, _, _} = ip -> :inet.setopts(sock, add_membership: {{224, 0, 1, 129}, ip})
_ -> :ok
end
end
defp parse_host({_, _, _, _} = ip), do: {:ok, ip}
defp parse_host({_, _, _, _, _, _, _, _} = ip), do: {:ok, ip}
defp parse_host(host) when is_binary(host) do
host |> String.to_charlist() |> :inet.parse_address()
end
defp initial_state do
%{offset_ns: 0, error_ns: 0, rtt_ns: 0, synchronized?: false, clock_id: nil}
end
defp now_ns, do: System.system_time(:nanosecond)
end