defmodule AirPlay.V2.Ptp do
@moduledoc """
AirPlay 2 gPTP packet and clock helpers.
AirPlay 2 receivers use IEEE 1588 / 802.1AS timing instead of the classic
RAOP NTP clock. This module contains the packet-level pieces needed by an AP2
sender:
* PTP header and timestamp parsing/encoding
* Announce / Signaling / Sync + Follow_Up packet builders
* HomePod clock sample extraction
* SETRATEANCHORTIME anchor construction
The transport loop that binds UDP ports 319/320 and maintains a live clock
sample can be built on top of these primitives.
"""
import Bitwise, only: [&&&: 2, |||: 2, <<<: 2, >>>: 2]
alias AirPlay.V2.Plist
@event_port 319
@general_port 320
@header_size 34
@timestamp_size 10
@port_number 1
@two_step_flag 0x0200
@organization_extension_tlv 0x0003
@gptp_org <<0x00, 0x80, 0xC2>>
@apple_org <<0x00, 0x0D, 0x93>>
@gptp_follow_up_info <<0x00, 0x00, 0x01>>
@gptp_message_interval_request <<0x00, 0x00, 0x02>>
@message_types %{
sync: 0x00,
delay_request: 0x01,
follow_up: 0x08,
delay_response: 0x09,
announce: 0x0B,
signaling: 0x0C
}
@controls %{
sync: 0,
delay_request: 1,
follow_up: 2,
delay_response: 3,
announce: 5,
signaling: 5
}
@type message_type ::
:sync | :delay_request | :follow_up | :delay_response | :announce | :signaling
@type timestamp :: %{seconds: non_neg_integer(), nanoseconds: non_neg_integer()}
@type header :: %{
message_type: message_type(),
transport_specific: non_neg_integer(),
version: non_neg_integer(),
message_length: non_neg_integer(),
domain_number: non_neg_integer(),
flags: non_neg_integer(),
correction_field: integer(),
source_clock_identity: binary(),
source_port: non_neg_integer(),
sequence_id: non_neg_integer(),
control_field: non_neg_integer(),
log_message_interval: integer()
}
@type clock_sample :: %{
clock_identity: binary(),
device_time_ns: non_neg_integer(),
local_time_ns: integer(),
sequence_id: non_neg_integer()
}
@doc "Return the current implementation status."
@spec status() :: {:ok, :packet_timing_primitives}
def status, do: {:ok, :packet_timing_primitives}
@doc "IEEE 1588 event port. Sync and Delay_Req are sent here."
@spec event_port() :: 319
def event_port, do: @event_port
@doc "IEEE 1588 general port. Announce, Signaling, Follow_Up and Delay_Resp use this port."
@spec general_port() :: 320
def general_port, do: @general_port
@doc "Create an 8-byte gPTP clock identity from a seed integer."
@spec clock_identity(non_neg_integer() | binary() | nil) :: binary()
def clock_identity(seed \\ nil)
def clock_identity(<<_::64>> = identity), do: identity
def clock_identity(nil), do: clock_identity(System.system_time(:nanosecond))
def clock_identity(seed) when is_integer(seed) and seed >= 0,
do: <<seed &&& 0xFFFFFFFFFFFFFFFF::64>>
@doc "Convert nanoseconds into a PTP timestamp map."
@spec from_nanoseconds(non_neg_integer()) :: timestamp()
def from_nanoseconds(ns) when is_integer(ns) and ns >= 0 do
%{seconds: div(ns, 1_000_000_000), nanoseconds: rem(ns, 1_000_000_000)}
end
@doc "Convert a PTP timestamp map into nanoseconds."
@spec to_nanoseconds(timestamp()) :: non_neg_integer()
def to_nanoseconds(%{seconds: seconds, nanoseconds: nanoseconds}) do
seconds * 1_000_000_000 + nanoseconds
end
@doc "Encode a PTP timestamp as 48-bit seconds plus 32-bit nanoseconds."
@spec encode_timestamp(timestamp() | non_neg_integer()) :: binary()
def encode_timestamp(ns) when is_integer(ns), do: ns |> from_nanoseconds() |> encode_timestamp()
def encode_timestamp(%{seconds: seconds, nanoseconds: nanoseconds}) do
<<seconds &&& 0xFFFFFFFFFFFF::48, nanoseconds::32>>
end
@doc "Parse a 10-byte PTP timestamp."
@spec parse_timestamp(binary()) :: {:ok, timestamp()} | :error
def parse_timestamp(<<seconds::48, nanoseconds::32, _rest::binary>>)
when nanoseconds < 1_000_000_000 do
{:ok, %{seconds: seconds, nanoseconds: nanoseconds}}
end
def parse_timestamp(_), do: :error
@doc "Encode the common 34-byte PTP header."
@spec encode_header(message_type(), non_neg_integer(), keyword()) :: binary()
def encode_header(type, sequence_id, opts \\ []) do
type_code = Map.fetch!(@message_types, type)
transport_specific = Keyword.get(opts, :transport_specific, 0)
version = Keyword.get(opts, :version, 2)
message_length = Keyword.get(opts, :message_length, @header_size + @timestamp_size)
domain_number = Keyword.get(opts, :domain_number, 0)
flags = Keyword.get(opts, :flags, 0)
correction_field = Keyword.get(opts, :correction_field, 0)
clock_id = opts |> Keyword.get(:clock_identity, <<0::64>>) |> clock_identity()
port_number = Keyword.get(opts, :port_number, @port_number)
control_field = Keyword.get(opts, :control_field, Map.fetch!(@controls, type))
log_message_interval = Keyword.get(opts, :log_message_interval, 0)
<<(transport_specific &&& 0x0F) <<< 4 ||| type_code::8, version &&& 0x0F::8,
message_length::16, domain_number::8, 0::8, flags::16,
correction_field::signed-big-integer-size(64), 0::32, clock_id::binary-size(8),
port_number::16, sequence_id &&& 0xFFFF::16, control_field::8,
log_message_interval::signed-big-integer-size(8)>>
end
@doc "Parse the common 34-byte PTP header."
@spec parse_header(binary()) :: {:ok, header()} | :error
def parse_header(
<<first, second, message_length::16, domain_number, _reserved, flags::16,
correction_field::signed-big-integer-size(64), _reserved2::32, clock_id::binary-size(8),
source_port::16, sequence_id::16, control_field,
log_message_interval::signed-big-integer-size(8), _rest::binary>>
) do
type_code = first &&& 0x0F
case message_type(type_code) do
nil ->
:error
type ->
{:ok,
%{
message_type: type,
transport_specific: first >>> 4,
version: second &&& 0x0F,
message_length: message_length,
domain_number: domain_number,
flags: flags,
correction_field: correction_field,
source_clock_identity: clock_id,
source_port: source_port,
sequence_id: sequence_id,
control_field: control_field,
log_message_interval: log_message_interval
}}
end
end
def parse_header(_), do: :error
@doc "Parse a PTP packet into header, body and convenience fields."
@spec parse(binary()) :: {:ok, map()} | :error
def parse(<<header_bytes::binary-size(@header_size), body::binary>> = packet) do
with {:ok, header} <- parse_header(header_bytes),
true <- byte_size(packet) >= header.message_length do
packet_body = binary_part(body, 0, header.message_length - @header_size)
parsed =
case header.message_type do
type when type in [:sync, :follow_up, :delay_request, :delay_response] ->
case parse_timestamp(packet_body) do
{:ok, timestamp} -> %{timestamp: timestamp, timestamp_ns: to_nanoseconds(timestamp)}
:error -> %{}
end
:announce ->
parse_announce_body(packet_body)
:signaling ->
%{
target_port_identity: target_identity(packet_body),
tlvs: parse_tlvs(drop_target(packet_body))
}
end
{:ok, %{header: header, body: packet_body} |> Map.merge(parsed)}
else
_ -> :error
end
end
def parse(_), do: :error
@doc """
Build an Announce packet for BMCA.
Lower `:priority1` wins. A Mac-style sender that should yield to HomePod-class
receivers uses priority 250; HomePod commonly advertises 248.
"""
@spec announce(binary() | non_neg_integer(), non_neg_integer(), keyword()) :: binary()
def announce(clock_id, sequence_id, opts \\ []) do
clock_id = clock_identity(clock_id)
clock_class = Keyword.get(opts, :clock_class, 248)
priority1 = Keyword.get(opts, :priority1, 250)
priority2 = Keyword.get(opts, :priority2, 128)
time_source = Keyword.get(opts, :time_source, 0xA0)
header =
encode_header(:announce, sequence_id,
clock_identity: clock_id,
message_length: 64,
log_message_interval: Keyword.get(opts, :log_message_interval, 1)
)
header <>
<<0::80, 37::signed-big-integer-size(16), 0::8, priority1::8, clock_class::8, 0xFE::8,
0xFFFF::16, priority2::8, clock_id::binary-size(8), 0::16, time_source::8>>
end
@doc "Build a gPTP Signaling packet with interval-request and Apple extension TLVs."
@spec signaling(binary() | non_neg_integer(), non_neg_integer(), keyword()) :: binary()
def signaling(clock_id, sequence_id, opts \\ []) do
clock_id = clock_identity(clock_id)
sync_interval = Keyword.get(opts, :sync_interval_log, -3)
announce_interval = Keyword.get(opts, :announce_interval_log, -2)
tlvs =
if Keyword.get(opts, :mac_style, true) do
[
message_interval_tlv(sync_interval, announce_interval),
apple_tlv(
<<0x00, 0x00, 0x01>>,
<<0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x27, 0x10, 0x00, 0x00, 0x27, 0x10, 0x00, 0x00,
0x00, 0x00>>
),
apple_tlv(
<<0x00, 0x00, 0x05>>,
<<0x00, 0x0F, 0x00, 0x00, 0x00, 0x00, 0x27, 0x10, 0x00, 0x00, 0x27, 0x10, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00>>
)
]
else
[
message_interval_tlv(sync_interval, announce_interval),
apple_tlv(
<<0x00, 0x00, 0x01>>,
<<0x00, 0x04, signed8(sync_interval)::8, signed8(announce_interval)::8, 0::96>>
),
apple_tlv(
<<0x00, 0x00, 0x05>>,
<<0x00, 0x10, signed8(sync_interval)::8, signed8(announce_interval)::8, 0::176>>
)
]
end
body = <<0xFF::80>> <> IO.iodata_to_binary(tlvs)
encode_header(:signaling, sequence_id,
clock_identity: clock_id,
message_length: @header_size + byte_size(body)
) <> body
end
@doc "Build a stop Signaling packet asking the receiver to stop sending timing messages."
@spec stop_signaling(binary() | non_neg_integer(), non_neg_integer()) :: binary()
def stop_signaling(clock_id, sequence_id) do
clock_id = clock_identity(clock_id)
body = <<0xFF::80>> <> message_interval_tlv(0x7E, 0x7E, 0x00)
encode_header(:signaling, sequence_id,
clock_identity: clock_id,
message_length: @header_size + byte_size(body)
) <> body
end
@doc "Build a two-step Sync packet and matching Follow_Up packet."
@spec sync_follow_up(
binary() | non_neg_integer(),
non_neg_integer(),
non_neg_integer(),
keyword()
) ::
{binary(), binary()}
def sync_follow_up(
clock_id,
sequence_id,
timestamp_ns \\ System.system_time(:nanosecond),
opts \\ []
) do
clock_id = clock_identity(clock_id)
sync =
encode_header(:sync, sequence_id,
clock_identity: clock_id,
flags: @two_step_flag,
message_length: @header_size + @timestamp_size,
log_message_interval: Keyword.get(opts, :log_message_interval, 0)
) <> <<0::80>>
tlv = follow_up_info_tlv()
follow_up =
encode_header(:follow_up, sequence_id,
clock_identity: clock_id,
message_length: @header_size + @timestamp_size + byte_size(tlv),
log_message_interval: Keyword.get(opts, :log_message_interval, 0)
) <> encode_timestamp(timestamp_ns) <> tlv
{sync, follow_up}
end
@doc """
Build a Delay_Req packet for offset calculation.
The `sourcePortIdentity` (clock id + port) is sent as all zeros to match the
reference sender: HomePods reply with a Delay_Resp to a zero-identity Delay_Req
but silently ignore one carrying our real clock identity, which leaves the PTP
offset stuck at 0. The `clock_id` argument is accepted for API symmetry but not
placed in the packet.
"""
@spec delay_request(binary() | non_neg_integer(), non_neg_integer(), non_neg_integer()) ::
binary()
def delay_request(_clock_id, sequence_id, timestamp_ns \\ System.system_time(:nanosecond)) do
encode_header(:delay_request, sequence_id,
clock_identity: <<0::64>>,
port_number: 0,
message_length: @header_size + @timestamp_size
) <> encode_timestamp(timestamp_ns)
end
@doc "Extract a remote clock sample from a Follow_Up packet."
@spec sample_from_follow_up(binary(), integer()) :: {:ok, clock_sample()} | :error
def sample_from_follow_up(packet, local_time_ns \\ System.monotonic_time(:nanosecond)) do
case parse(packet) do
{:ok, %{header: %{message_type: :follow_up} = header, timestamp_ns: timestamp_ns}} ->
{:ok,
%{
clock_identity: header.source_clock_identity,
device_time_ns: timestamp_ns,
local_time_ns: local_time_ns,
sequence_id: header.sequence_id
}}
_other ->
:error
end
end
@doc "Estimate the receiver's current PTP time from a previously observed sample."
@spec estimate_device_time(clock_sample(), integer()) :: non_neg_integer()
def estimate_device_time(sample, local_time_ns \\ System.monotonic_time(:nanosecond)) do
elapsed = max(local_time_ns - sample.local_time_ns, 0)
sample.device_time_ns + elapsed
end
@doc "Calculate PTP offset and one-way delay from a Delay_Req exchange."
@spec offset(non_neg_integer(), non_neg_integer(), non_neg_integer(), non_neg_integer()) ::
map()
def offset(t1_ns, t2_ns, t3_ns, t4_ns) do
left = t2_ns - t1_ns
right = t3_ns - t4_ns
offset_ns = div(left + right, 2)
delay_ns = abs(div(left - right, 2))
%{offset_ns: offset_ns, delay_ns: delay_ns, rtt_ns: delay_ns * 2}
end
@doc """
Build the AP2 SETRATEANCHORTIME body map from a remote clock sample.
`:delay_ms` places the render anchor in the future. The RTP timestamp should
match the first audio packet intended to render at that network time.
"""
@spec anchor(clock_sample(), non_neg_integer(), keyword()) :: map()
def anchor(sample, rtp_time, opts \\ []) do
delay_ns = Keyword.get(opts, :delay_ms, 2_000) * 1_000_000
rate = Keyword.get(opts, :rate, 1.0)
timestamp = sample |> estimate_device_time() |> Kernel.+(delay_ns) |> from_nanoseconds()
%{
"networkTimeTimelineID" => clock_identity_to_integer(sample.clock_identity),
"networkTimeSecs" => timestamp.seconds,
"networkTimeFrac" => fraction64(timestamp.nanoseconds),
"networkTimeFlags" => Keyword.get(opts, :network_time_flags, 0),
"rtpTime" => rtp_time,
"rate" => rate
}
end
@doc "Encode an AP2 SETRATEANCHORTIME body from a remote clock sample."
@spec anchor_plist(clock_sample(), non_neg_integer(), keyword()) :: binary()
def anchor_plist(sample, rtp_time, opts \\ []) do
sample |> anchor(rtp_time, opts) |> Plist.encode!()
end
@doc "Convert nanoseconds within a second into AP2's 32-bit fixed-point fraction."
@spec fraction32(non_neg_integer()) :: non_neg_integer()
def fraction32(nanoseconds), do: div((nanoseconds &&& 0xFFFFFFFF) <<< 32, 1_000_000_000)
@doc "Convert nanoseconds within a second into AP2's 64-bit fixed-point fraction."
@spec fraction64(non_neg_integer()) :: non_neg_integer()
def fraction64(nanoseconds), do: div(nanoseconds <<< 64, 1_000_000_000)
@doc "Convert an 8-byte clock identity to the integer used by AP2 plist bodies."
@spec clock_identity_to_integer(binary()) :: non_neg_integer()
def clock_identity_to_integer(<<identity::64>>), do: identity
@doc "Parse TLVs from a PTP body section."
@spec parse_tlvs(binary()) :: [
%{type: non_neg_integer(), length: non_neg_integer(), payload: binary()}
]
def parse_tlvs(data), do: parse_tlvs(data, [])
defp parse_tlvs(<<type::16, length::16, payload::binary-size(length), rest::binary>>, acc) do
parse_tlvs(rest, [%{type: type, length: length, payload: payload} | acc])
end
defp parse_tlvs(_data, acc), do: Enum.reverse(acc)
defp parse_announce_body(
<<_origin_timestamp::binary-size(@timestamp_size),
_utc_offset::signed-big-integer-size(16), _reserved::8, priority1::8, clock_class::8,
clock_accuracy::8, offset_scaled_log_variance::16, priority2::8,
grandmaster_identity::binary-size(8), steps_removed::16, time_source::8>>
) do
%{
priority1: priority1,
clock_class: clock_class,
clock_accuracy: clock_accuracy,
offset_scaled_log_variance: offset_scaled_log_variance,
priority2: priority2,
grandmaster_identity: grandmaster_identity,
steps_removed: steps_removed,
time_source: time_source
}
end
defp parse_announce_body(_), do: %{}
defp follow_up_info_tlv do
tlv(
@gptp_org,
@gptp_follow_up_info,
<<0::signed-big-integer-size(32), 0::16, 0::96, 0::signed-big-integer-size(32)>>
)
end
defp message_interval_tlv(sync_interval, announce_interval, flags \\ 0x02) do
tlv(
@gptp_org,
@gptp_message_interval_request,
<<signed8(sync_interval)::8, signed8(sync_interval)::8, signed8(announce_interval)::8,
flags::8>>
)
end
defp apple_tlv(subtype, payload), do: tlv(@apple_org, subtype, payload)
defp tlv(org, subtype, payload) do
data = org <> subtype <> payload
<<@organization_extension_tlv::16, byte_size(data)::16, data::binary>>
end
defp signed8(value) when value < 0, do: value + 256
defp signed8(value), do: value &&& 0xFF
defp target_identity(<<identity::binary-size(10), _rest::binary>>), do: identity
defp target_identity(_), do: nil
defp drop_target(<<_identity::binary-size(10), rest::binary>>), do: rest
defp drop_target(_), do: <<>>
defp message_type(type_code) do
Enum.find_value(@message_types, fn {type, code} -> if code == type_code, do: type end)
end
end