Skip to main content

lib/air_play/v2/ptp.ex

defmodule AirPlay.V2.Ptp do
  @moduledoc """
  AirPlay 2 gPTP packet and clock helpers.

  AirPlay 2 receivers use IEEE 1588 / 802.1AS timing instead of the classic
  RAOP NTP clock. This module contains the packet-level pieces needed by an AP2
  sender:

    * PTP header and timestamp parsing/encoding
    * Announce / Signaling / Sync + Follow_Up packet builders
    * HomePod clock sample extraction
    * SETRATEANCHORTIME anchor construction

  The transport loop that binds UDP ports 319/320 and maintains a live clock
  sample can be built on top of these primitives.
  """

  import Bitwise, only: [&&&: 2, |||: 2, <<<: 2, >>>: 2]

  alias AirPlay.V2.Plist

  @event_port 319
  @general_port 320
  @header_size 34
  @timestamp_size 10
  @port_number 1
  @two_step_flag 0x0200
  @organization_extension_tlv 0x0003
  @gptp_org <<0x00, 0x80, 0xC2>>
  @apple_org <<0x00, 0x0D, 0x93>>
  @gptp_follow_up_info <<0x00, 0x00, 0x01>>
  @gptp_message_interval_request <<0x00, 0x00, 0x02>>

  @message_types %{
    sync: 0x00,
    delay_request: 0x01,
    follow_up: 0x08,
    delay_response: 0x09,
    announce: 0x0B,
    signaling: 0x0C
  }

  @controls %{
    sync: 0,
    delay_request: 1,
    follow_up: 2,
    delay_response: 3,
    announce: 5,
    signaling: 5
  }

  @type message_type ::
          :sync | :delay_request | :follow_up | :delay_response | :announce | :signaling
  @type timestamp :: %{seconds: non_neg_integer(), nanoseconds: non_neg_integer()}
  @type header :: %{
          message_type: message_type(),
          transport_specific: non_neg_integer(),
          version: non_neg_integer(),
          message_length: non_neg_integer(),
          domain_number: non_neg_integer(),
          flags: non_neg_integer(),
          correction_field: integer(),
          source_clock_identity: binary(),
          source_port: non_neg_integer(),
          sequence_id: non_neg_integer(),
          control_field: non_neg_integer(),
          log_message_interval: integer()
        }
  @type clock_sample :: %{
          clock_identity: binary(),
          device_time_ns: non_neg_integer(),
          local_time_ns: integer(),
          sequence_id: non_neg_integer()
        }

  @doc "Return the current implementation status."
  @spec status() :: {:ok, :packet_timing_primitives}
  def status, do: {:ok, :packet_timing_primitives}

  @doc "IEEE 1588 event port. Sync and Delay_Req are sent here."
  @spec event_port() :: 319
  def event_port, do: @event_port

  @doc "IEEE 1588 general port. Announce, Signaling, Follow_Up and Delay_Resp use this port."
  @spec general_port() :: 320
  def general_port, do: @general_port

  @doc "Create an 8-byte gPTP clock identity from a seed integer."
  @spec clock_identity(non_neg_integer() | binary() | nil) :: binary()
  def clock_identity(seed \\ nil)
  def clock_identity(<<_::64>> = identity), do: identity
  def clock_identity(nil), do: clock_identity(System.system_time(:nanosecond))

  def clock_identity(seed) when is_integer(seed) and seed >= 0,
    do: <<seed &&& 0xFFFFFFFFFFFFFFFF::64>>

  @doc "Convert nanoseconds into a PTP timestamp map."
  @spec from_nanoseconds(non_neg_integer()) :: timestamp()
  def from_nanoseconds(ns) when is_integer(ns) and ns >= 0 do
    %{seconds: div(ns, 1_000_000_000), nanoseconds: rem(ns, 1_000_000_000)}
  end

  @doc "Convert a PTP timestamp map into nanoseconds."
  @spec to_nanoseconds(timestamp()) :: non_neg_integer()
  def to_nanoseconds(%{seconds: seconds, nanoseconds: nanoseconds}) do
    seconds * 1_000_000_000 + nanoseconds
  end

  @doc "Encode a PTP timestamp as 48-bit seconds plus 32-bit nanoseconds."
  @spec encode_timestamp(timestamp() | non_neg_integer()) :: binary()
  def encode_timestamp(ns) when is_integer(ns), do: ns |> from_nanoseconds() |> encode_timestamp()

  def encode_timestamp(%{seconds: seconds, nanoseconds: nanoseconds}) do
    <<seconds &&& 0xFFFFFFFFFFFF::48, nanoseconds::32>>
  end

  @doc "Parse a 10-byte PTP timestamp."
  @spec parse_timestamp(binary()) :: {:ok, timestamp()} | :error
  def parse_timestamp(<<seconds::48, nanoseconds::32, _rest::binary>>)
      when nanoseconds < 1_000_000_000 do
    {:ok, %{seconds: seconds, nanoseconds: nanoseconds}}
  end

  def parse_timestamp(_), do: :error

  @doc "Encode the common 34-byte PTP header."
  @spec encode_header(message_type(), non_neg_integer(), keyword()) :: binary()
  def encode_header(type, sequence_id, opts \\ []) do
    type_code = Map.fetch!(@message_types, type)
    transport_specific = Keyword.get(opts, :transport_specific, 0)
    version = Keyword.get(opts, :version, 2)
    message_length = Keyword.get(opts, :message_length, @header_size + @timestamp_size)
    domain_number = Keyword.get(opts, :domain_number, 0)
    flags = Keyword.get(opts, :flags, 0)
    correction_field = Keyword.get(opts, :correction_field, 0)
    clock_id = opts |> Keyword.get(:clock_identity, <<0::64>>) |> clock_identity()
    port_number = Keyword.get(opts, :port_number, @port_number)
    control_field = Keyword.get(opts, :control_field, Map.fetch!(@controls, type))
    log_message_interval = Keyword.get(opts, :log_message_interval, 0)

    <<(transport_specific &&& 0x0F) <<< 4 ||| type_code::8, version &&& 0x0F::8,
      message_length::16, domain_number::8, 0::8, flags::16,
      correction_field::signed-big-integer-size(64), 0::32, clock_id::binary-size(8),
      port_number::16, sequence_id &&& 0xFFFF::16, control_field::8,
      log_message_interval::signed-big-integer-size(8)>>
  end

  @doc "Parse the common 34-byte PTP header."
  @spec parse_header(binary()) :: {:ok, header()} | :error
  def parse_header(
        <<first, second, message_length::16, domain_number, _reserved, flags::16,
          correction_field::signed-big-integer-size(64), _reserved2::32, clock_id::binary-size(8),
          source_port::16, sequence_id::16, control_field,
          log_message_interval::signed-big-integer-size(8), _rest::binary>>
      ) do
    type_code = first &&& 0x0F

    case message_type(type_code) do
      nil ->
        :error

      type ->
        {:ok,
         %{
           message_type: type,
           transport_specific: first >>> 4,
           version: second &&& 0x0F,
           message_length: message_length,
           domain_number: domain_number,
           flags: flags,
           correction_field: correction_field,
           source_clock_identity: clock_id,
           source_port: source_port,
           sequence_id: sequence_id,
           control_field: control_field,
           log_message_interval: log_message_interval
         }}
    end
  end

  def parse_header(_), do: :error

  @doc "Parse a PTP packet into header, body and convenience fields."
  @spec parse(binary()) :: {:ok, map()} | :error
  def parse(<<header_bytes::binary-size(@header_size), body::binary>> = packet) do
    with {:ok, header} <- parse_header(header_bytes),
         true <- byte_size(packet) >= header.message_length do
      packet_body = binary_part(body, 0, header.message_length - @header_size)

      parsed =
        case header.message_type do
          type when type in [:sync, :follow_up, :delay_request, :delay_response] ->
            case parse_timestamp(packet_body) do
              {:ok, timestamp} -> %{timestamp: timestamp, timestamp_ns: to_nanoseconds(timestamp)}
              :error -> %{}
            end

          :announce ->
            parse_announce_body(packet_body)

          :signaling ->
            %{
              target_port_identity: target_identity(packet_body),
              tlvs: parse_tlvs(drop_target(packet_body))
            }
        end

      {:ok, %{header: header, body: packet_body} |> Map.merge(parsed)}
    else
      _ -> :error
    end
  end

  def parse(_), do: :error

  @doc """
  Build an Announce packet for BMCA.

  Lower `:priority1` wins. A Mac-style sender that should yield to HomePod-class
  receivers uses priority 250; HomePod commonly advertises 248.
  """
  @spec announce(binary() | non_neg_integer(), non_neg_integer(), keyword()) :: binary()
  def announce(clock_id, sequence_id, opts \\ []) do
    clock_id = clock_identity(clock_id)
    clock_class = Keyword.get(opts, :clock_class, 248)
    priority1 = Keyword.get(opts, :priority1, 250)
    priority2 = Keyword.get(opts, :priority2, 128)
    time_source = Keyword.get(opts, :time_source, 0xA0)

    header =
      encode_header(:announce, sequence_id,
        clock_identity: clock_id,
        message_length: 64,
        log_message_interval: Keyword.get(opts, :log_message_interval, 1)
      )

    header <>
      <<0::80, 37::signed-big-integer-size(16), 0::8, priority1::8, clock_class::8, 0xFE::8,
        0xFFFF::16, priority2::8, clock_id::binary-size(8), 0::16, time_source::8>>
  end

  @doc "Build a gPTP Signaling packet with interval-request and Apple extension TLVs."
  @spec signaling(binary() | non_neg_integer(), non_neg_integer(), keyword()) :: binary()
  def signaling(clock_id, sequence_id, opts \\ []) do
    clock_id = clock_identity(clock_id)
    sync_interval = Keyword.get(opts, :sync_interval_log, -3)
    announce_interval = Keyword.get(opts, :announce_interval_log, -2)

    tlvs =
      if Keyword.get(opts, :mac_style, true) do
        [
          message_interval_tlv(sync_interval, announce_interval),
          apple_tlv(
            <<0x00, 0x00, 0x01>>,
            <<0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x27, 0x10, 0x00, 0x00, 0x27, 0x10, 0x00, 0x00,
              0x00, 0x00>>
          ),
          apple_tlv(
            <<0x00, 0x00, 0x05>>,
            <<0x00, 0x0F, 0x00, 0x00, 0x00, 0x00, 0x27, 0x10, 0x00, 0x00, 0x27, 0x10, 0x00, 0x00,
              0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00>>
          )
        ]
      else
        [
          message_interval_tlv(sync_interval, announce_interval),
          apple_tlv(
            <<0x00, 0x00, 0x01>>,
            <<0x00, 0x04, signed8(sync_interval)::8, signed8(announce_interval)::8, 0::96>>
          ),
          apple_tlv(
            <<0x00, 0x00, 0x05>>,
            <<0x00, 0x10, signed8(sync_interval)::8, signed8(announce_interval)::8, 0::176>>
          )
        ]
      end

    body = <<0xFF::80>> <> IO.iodata_to_binary(tlvs)

    encode_header(:signaling, sequence_id,
      clock_identity: clock_id,
      message_length: @header_size + byte_size(body)
    ) <> body
  end

  @doc "Build a stop Signaling packet asking the receiver to stop sending timing messages."
  @spec stop_signaling(binary() | non_neg_integer(), non_neg_integer()) :: binary()
  def stop_signaling(clock_id, sequence_id) do
    clock_id = clock_identity(clock_id)
    body = <<0xFF::80>> <> message_interval_tlv(0x7E, 0x7E, 0x00)

    encode_header(:signaling, sequence_id,
      clock_identity: clock_id,
      message_length: @header_size + byte_size(body)
    ) <> body
  end

  @doc "Build a two-step Sync packet and matching Follow_Up packet."
  @spec sync_follow_up(
          binary() | non_neg_integer(),
          non_neg_integer(),
          non_neg_integer(),
          keyword()
        ) ::
          {binary(), binary()}
  def sync_follow_up(
        clock_id,
        sequence_id,
        timestamp_ns \\ System.system_time(:nanosecond),
        opts \\ []
      ) do
    clock_id = clock_identity(clock_id)

    sync =
      encode_header(:sync, sequence_id,
        clock_identity: clock_id,
        flags: @two_step_flag,
        message_length: @header_size + @timestamp_size,
        log_message_interval: Keyword.get(opts, :log_message_interval, 0)
      ) <> <<0::80>>

    tlv = follow_up_info_tlv()

    follow_up =
      encode_header(:follow_up, sequence_id,
        clock_identity: clock_id,
        message_length: @header_size + @timestamp_size + byte_size(tlv),
        log_message_interval: Keyword.get(opts, :log_message_interval, 0)
      ) <> encode_timestamp(timestamp_ns) <> tlv

    {sync, follow_up}
  end

  @doc """
  Build a Delay_Req packet for offset calculation.

  The `sourcePortIdentity` (clock id + port) is sent as all zeros to match the
  reference sender: HomePods reply with a Delay_Resp to a zero-identity Delay_Req
  but silently ignore one carrying our real clock identity, which leaves the PTP
  offset stuck at 0. The `clock_id` argument is accepted for API symmetry but not
  placed in the packet.
  """
  @spec delay_request(binary() | non_neg_integer(), non_neg_integer(), non_neg_integer()) ::
          binary()
  def delay_request(_clock_id, sequence_id, timestamp_ns \\ System.system_time(:nanosecond)) do
    encode_header(:delay_request, sequence_id,
      clock_identity: <<0::64>>,
      port_number: 0,
      message_length: @header_size + @timestamp_size
    ) <> encode_timestamp(timestamp_ns)
  end

  @doc "Extract a remote clock sample from a Follow_Up packet."
  @spec sample_from_follow_up(binary(), integer()) :: {:ok, clock_sample()} | :error
  def sample_from_follow_up(packet, local_time_ns \\ System.monotonic_time(:nanosecond)) do
    case parse(packet) do
      {:ok, %{header: %{message_type: :follow_up} = header, timestamp_ns: timestamp_ns}} ->
        {:ok,
         %{
           clock_identity: header.source_clock_identity,
           device_time_ns: timestamp_ns,
           local_time_ns: local_time_ns,
           sequence_id: header.sequence_id
         }}

      _other ->
        :error
    end
  end

  @doc "Estimate the receiver's current PTP time from a previously observed sample."
  @spec estimate_device_time(clock_sample(), integer()) :: non_neg_integer()
  def estimate_device_time(sample, local_time_ns \\ System.monotonic_time(:nanosecond)) do
    elapsed = max(local_time_ns - sample.local_time_ns, 0)
    sample.device_time_ns + elapsed
  end

  @doc "Calculate PTP offset and one-way delay from a Delay_Req exchange."
  @spec offset(non_neg_integer(), non_neg_integer(), non_neg_integer(), non_neg_integer()) ::
          map()
  def offset(t1_ns, t2_ns, t3_ns, t4_ns) do
    left = t2_ns - t1_ns
    right = t3_ns - t4_ns
    offset_ns = div(left + right, 2)
    delay_ns = abs(div(left - right, 2))

    %{offset_ns: offset_ns, delay_ns: delay_ns, rtt_ns: delay_ns * 2}
  end

  @doc """
  Build the AP2 SETRATEANCHORTIME body map from a remote clock sample.

  `:delay_ms` places the render anchor in the future. The RTP timestamp should
  match the first audio packet intended to render at that network time.
  """
  @spec anchor(clock_sample(), non_neg_integer(), keyword()) :: map()
  def anchor(sample, rtp_time, opts \\ []) do
    delay_ns = Keyword.get(opts, :delay_ms, 2_000) * 1_000_000
    rate = Keyword.get(opts, :rate, 1.0)
    timestamp = sample |> estimate_device_time() |> Kernel.+(delay_ns) |> from_nanoseconds()

    %{
      "networkTimeTimelineID" => clock_identity_to_integer(sample.clock_identity),
      "networkTimeSecs" => timestamp.seconds,
      "networkTimeFrac" => fraction64(timestamp.nanoseconds),
      "networkTimeFlags" => Keyword.get(opts, :network_time_flags, 0),
      "rtpTime" => rtp_time,
      "rate" => rate
    }
  end

  @doc "Encode an AP2 SETRATEANCHORTIME body from a remote clock sample."
  @spec anchor_plist(clock_sample(), non_neg_integer(), keyword()) :: binary()
  def anchor_plist(sample, rtp_time, opts \\ []) do
    sample |> anchor(rtp_time, opts) |> Plist.encode!()
  end

  @doc "Convert nanoseconds within a second into AP2's 32-bit fixed-point fraction."
  @spec fraction32(non_neg_integer()) :: non_neg_integer()
  def fraction32(nanoseconds), do: div((nanoseconds &&& 0xFFFFFFFF) <<< 32, 1_000_000_000)

  @doc "Convert nanoseconds within a second into AP2's 64-bit fixed-point fraction."
  @spec fraction64(non_neg_integer()) :: non_neg_integer()
  def fraction64(nanoseconds), do: div(nanoseconds <<< 64, 1_000_000_000)

  @doc "Convert an 8-byte clock identity to the integer used by AP2 plist bodies."
  @spec clock_identity_to_integer(binary()) :: non_neg_integer()
  def clock_identity_to_integer(<<identity::64>>), do: identity

  @doc "Parse TLVs from a PTP body section."
  @spec parse_tlvs(binary()) :: [
          %{type: non_neg_integer(), length: non_neg_integer(), payload: binary()}
        ]
  def parse_tlvs(data), do: parse_tlvs(data, [])

  defp parse_tlvs(<<type::16, length::16, payload::binary-size(length), rest::binary>>, acc) do
    parse_tlvs(rest, [%{type: type, length: length, payload: payload} | acc])
  end

  defp parse_tlvs(_data, acc), do: Enum.reverse(acc)

  defp parse_announce_body(
         <<_origin_timestamp::binary-size(@timestamp_size),
           _utc_offset::signed-big-integer-size(16), _reserved::8, priority1::8, clock_class::8,
           clock_accuracy::8, offset_scaled_log_variance::16, priority2::8,
           grandmaster_identity::binary-size(8), steps_removed::16, time_source::8>>
       ) do
    %{
      priority1: priority1,
      clock_class: clock_class,
      clock_accuracy: clock_accuracy,
      offset_scaled_log_variance: offset_scaled_log_variance,
      priority2: priority2,
      grandmaster_identity: grandmaster_identity,
      steps_removed: steps_removed,
      time_source: time_source
    }
  end

  defp parse_announce_body(_), do: %{}

  defp follow_up_info_tlv do
    tlv(
      @gptp_org,
      @gptp_follow_up_info,
      <<0::signed-big-integer-size(32), 0::16, 0::96, 0::signed-big-integer-size(32)>>
    )
  end

  defp message_interval_tlv(sync_interval, announce_interval, flags \\ 0x02) do
    tlv(
      @gptp_org,
      @gptp_message_interval_request,
      <<signed8(sync_interval)::8, signed8(sync_interval)::8, signed8(announce_interval)::8,
        flags::8>>
    )
  end

  defp apple_tlv(subtype, payload), do: tlv(@apple_org, subtype, payload)

  defp tlv(org, subtype, payload) do
    data = org <> subtype <> payload
    <<@organization_extension_tlv::16, byte_size(data)::16, data::binary>>
  end

  defp signed8(value) when value < 0, do: value + 256
  defp signed8(value), do: value &&& 0xFF

  defp target_identity(<<identity::binary-size(10), _rest::binary>>), do: identity
  defp target_identity(_), do: nil

  defp drop_target(<<_identity::binary-size(10), rest::binary>>), do: rest
  defp drop_target(_), do: <<>>

  defp message_type(type_code) do
    Enum.find_value(@message_types, fn {type, code} -> if code == type_code, do: type end)
  end
end