Skip to main content

lib/air_play/player.ex

defmodule AirPlay.Player do
  @moduledoc """
  Streams ALAC-over-RTP audio to an AirPlay/RAOP receiver after the RTSP
  handshake. Owns three UDP sockets:

    * audio   — sends RTP audio (PT 0x60) to `device:server_port`
    * control — bound on our `control_port`, sends periodic `sync` (0xd4)
    * timing  — bound on our `timing_port`, answers the device's timing
      requests (0xd2 → 0xd3)

  Packet formats follow the RAOP wire format (see `AirPlay.Rtp`). Audio is paced
  against wall-clock: each tick sends however
  many 352-sample frames are now due, after an initial ~2 s pre-buffer so the
  receiver (which delays by the 88200-sample latency) can start cleanly.
  """

  use GenServer
  require Logger
  import Bitwise

  alias AirPlay.{Alac, Decoder, Ntp, Rtp}

  @sample_rate 44_100
  @frame_samples 352
  @latency 88_200
  @frame_ns div(@frame_samples * 1_000_000_000, @sample_rate)
  @default_control_port 0
  @default_timing_port 0
  @tick_ms 100
  @sync_ms 1_000
  # Frames to stay ahead of real time (~0.7s) — enough to absorb tick jitter
  # without overflowing the receiver's frame buffer.
  @prebuffer_frames 88

  @doc """
  Open the UDP sockets and start answering the receiver's timing requests, WITHOUT
  streaming yet. `opts`: `:local_control_port` (6001), `:local_timing_port` (6002).

  This must run *before* the RTSP `SETUP`: AirPlay receivers (HomePods especially)
  probe our timing port with an NTP `0xd2` request during SETUP and return
  "520 Origin Error" if it's unreachable or unanswered. Binding + answering here
  is what lets SETUP succeed. Call `begin/3` once the handshake yields the ports.
  """
  def start_link(host, opts \\ []) do
    GenServer.start_link(__MODULE__, {host, opts})
  end

  @doc """
  Start streaming over the already-open sockets. `opts`: `:server_port`
  (required), `:control_port` (device's control port), `:start_seq`,
  `:start_ts`.

  `source` is where frames come from, either:
    * a list of 352-sample s16le stereo chunks (whole track held in memory), or
    * `{:decoder, pid}` — pull lazily from an `AirPlay.Decoder` (bounded memory).
  """
  def begin(pid, source, opts), do: GenServer.call(pid, {:begin, source, opts})

  @doc "Return the local UDP ports that must be advertised in RTSP SETUP."
  def local_ports(pid), do: GenServer.call(pid, :local_ports)

  def stop(pid), do: GenServer.stop(pid)

  @impl true
  def init({host, opts}) do
    host_ip = host |> to_string() |> String.to_charlist()
    lcontrol = Keyword.get(opts, :local_control_port, @default_control_port)
    ltiming = Keyword.get(opts, :local_timing_port, @default_timing_port)
    {:ok, audio} = :gen_udp.open(0, [:binary, active: false])
    {:ok, control} = :gen_udp.open(lcontrol, [:binary, active: false])
    {:ok, timing} = :gen_udp.open(ltiming, [:binary, active: true])
    {:ok, control_port} = :inet.port(control)
    {:ok, timing_port} = :inet.port(timing)

    {:ok,
     %{
       host: host_ip,
       audio: audio,
       control: control,
       timing: timing,
       local_control_port: control_port,
       local_timing_port: timing_port,
       streaming?: false
     }}
  end

  @impl true
  def handle_call(:local_ports, _from, st) do
    {:reply,
     %{
       control: st.local_control_port,
       timing: st.local_timing_port
     }, st}
  end

  @impl true
  def handle_call({:begin, source, opts}, _from, st) do
    st =
      Map.merge(st, %{
        source: normalize_source(source),
        server_port: Keyword.fetch!(opts, :server_port),
        control_port: Keyword.get(opts, :control_port),
        ssrc: :rand.uniform(0xFFFFFFFF),
        seq: Keyword.get(opts, :start_seq, :rand.uniform(0xFFFF)),
        base_ts: Keyword.get(opts, :start_ts, :rand.uniform(0xFFFFFFFF)),
        sent: 0,
        first_sync?: true,
        start_ns: System.monotonic_time(:nanosecond),
        streaming?: true
      })

    send(self(), :tick_audio)
    Process.send_after(self(), :tick_sync, 0)
    {:reply, :ok, st}
  end

  @impl true
  def handle_info(:tick_audio, st) do
    # Send at ~real time with a small lead (~0.7s). The receiver delays playback
    # by the announced latency (~2s) and fills its buffer from this steady stream,
    # so we must NOT dump the whole 2s at once (that overflows its frame buffer).
    elapsed = System.monotonic_time(:nanosecond) - st.start_ns
    due = div(elapsed, @frame_ns) + @prebuffer_frames
    to_send = max(due - st.sent, 0)

    {frames, source, eos?} = pull(st.source, to_send)
    st = Enum.reduce(frames, %{st | source: source}, &send_frame(&2, &1))

    if eos? do
      Logger.info("[airplay] stream complete (#{st.sent} frames)")
      {:stop, :normal, st}
    else
      Process.send_after(self(), :tick_audio, @tick_ms)
      {:noreply, st}
    end
  end

  def handle_info(:tick_sync, st) do
    # rtp_now tracks WALL-CLOCK elapsed (the live network position), so the
    # receiver can anchor playback: sample `rtp_now - latency` plays at `ntp`.
    elapsed = System.monotonic_time(:nanosecond) - st.start_ns
    rtp_now = st.base_ts + div(elapsed * @sample_rate, 1_000_000_000) &&& 0xFFFFFFFF
    pkt = Rtp.sync(rtp_now, @latency, Ntp.now(), st.first_sync?)
    if st.control_port, do: :gen_udp.send(st.control, st.host, st.control_port, pkt)
    Process.send_after(self(), :tick_sync, @sync_ms)
    {:noreply, %{st | first_sync?: false}}
  end

  # Device timing request → reply (from our timing socket) to its source port
  # with our receive/transmit NTP times.
  def handle_info({:udp, _sock, _ip, port, data}, st) do
    recv = Ntp.now()

    case Rtp.timing_request_origin(data) do
      {:ok, origin} ->
        resp = Rtp.timing_response(origin, recv, Ntp.now())
        :gen_udp.send(st.timing, st.host, port, resp)

      :error ->
        :ok
    end

    {:noreply, st}
  end

  @impl true
  def terminate(_reason, st) do
    Enum.each([st.audio, st.control, st.timing], &:gen_udp.close/1)
    :ok
  end

  # A list source is the whole track in memory; wrap it so `pull/2` is uniform.
  defp normalize_source(frames) when is_list(frames), do: {:buffer, frames}
  defp normalize_source({:decoder, _pid} = source), do: source

  # Pull up to `n` frames from the source, returning `{frames, source, eos?}`.
  # `eos?` is true only once the source is permanently exhausted.
  defp pull(source, 0), do: {[], source, false}

  defp pull({:buffer, frames}, n) do
    {take, rest} = Enum.split(frames, n)
    {take, {:buffer, rest}, rest == []}
  end

  defp pull({:decoder, pid} = source, n) do
    {frames, eos?} = Decoder.take(pid, n)
    {frames, source, eos?}
  end

  # Send one audio frame, advancing the sequence/sample counters.
  defp send_frame(st, frame) do
    payload = Alac.encode_stereo16(frame)
    ts = st.base_ts + st.sent * @frame_samples &&& 0xFFFFFFFF
    pkt = Rtp.audio(st.seq &&& 0xFFFF, ts, st.ssrc, payload, st.sent == 0)
    :gen_udp.send(st.audio, st.host, st.server_port, pkt)
    %{st | sent: st.sent + 1, seq: st.seq + 1}
  end
end