defmodule AirPlay.Player do
@moduledoc """
Streams ALAC-over-RTP audio to an AirPlay/RAOP receiver after the RTSP
handshake. Owns three UDP sockets:
* audio — sends RTP audio (PT 0x60) to `device:server_port`
* control — bound on our `control_port`, sends periodic `sync` (0xd4)
* timing — bound on our `timing_port`, answers the device's timing
requests (0xd2 → 0xd3)
Packet formats follow the RAOP wire format (see `AirPlay.Rtp`). Audio is paced
against wall-clock: each tick sends however
many 352-sample frames are now due, after an initial ~2 s pre-buffer so the
receiver (which delays by the 88200-sample latency) can start cleanly.
"""
use GenServer
require Logger
import Bitwise
alias AirPlay.{Alac, Decoder, Ntp, Rtp}
@sample_rate 44_100
@frame_samples 352
@latency 88_200
@frame_ns div(@frame_samples * 1_000_000_000, @sample_rate)
@default_control_port 0
@default_timing_port 0
@tick_ms 100
@sync_ms 1_000
# Frames to stay ahead of real time (~0.7s) — enough to absorb tick jitter
# without overflowing the receiver's frame buffer.
@prebuffer_frames 88
@doc """
Open the UDP sockets and start answering the receiver's timing requests, WITHOUT
streaming yet. `opts`: `:local_control_port` (6001), `:local_timing_port` (6002).
This must run *before* the RTSP `SETUP`: AirPlay receivers (HomePods especially)
probe our timing port with an NTP `0xd2` request during SETUP and return
"520 Origin Error" if it's unreachable or unanswered. Binding + answering here
is what lets SETUP succeed. Call `begin/3` once the handshake yields the ports.
"""
def start_link(host, opts \\ []) do
GenServer.start_link(__MODULE__, {host, opts})
end
@doc """
Start streaming over the already-open sockets. `opts`: `:server_port`
(required), `:control_port` (device's control port), `:start_seq`,
`:start_ts`.
`source` is where frames come from, either:
* a list of 352-sample s16le stereo chunks (whole track held in memory), or
* `{:decoder, pid}` — pull lazily from an `AirPlay.Decoder` (bounded memory).
"""
def begin(pid, source, opts), do: GenServer.call(pid, {:begin, source, opts})
@doc "Return the local UDP ports that must be advertised in RTSP SETUP."
def local_ports(pid), do: GenServer.call(pid, :local_ports)
def stop(pid), do: GenServer.stop(pid)
@impl true
def init({host, opts}) do
host_ip = host |> to_string() |> String.to_charlist()
lcontrol = Keyword.get(opts, :local_control_port, @default_control_port)
ltiming = Keyword.get(opts, :local_timing_port, @default_timing_port)
{:ok, audio} = :gen_udp.open(0, [:binary, active: false])
{:ok, control} = :gen_udp.open(lcontrol, [:binary, active: false])
{:ok, timing} = :gen_udp.open(ltiming, [:binary, active: true])
{:ok, control_port} = :inet.port(control)
{:ok, timing_port} = :inet.port(timing)
{:ok,
%{
host: host_ip,
audio: audio,
control: control,
timing: timing,
local_control_port: control_port,
local_timing_port: timing_port,
streaming?: false
}}
end
@impl true
def handle_call(:local_ports, _from, st) do
{:reply,
%{
control: st.local_control_port,
timing: st.local_timing_port
}, st}
end
@impl true
def handle_call({:begin, source, opts}, _from, st) do
st =
Map.merge(st, %{
source: normalize_source(source),
server_port: Keyword.fetch!(opts, :server_port),
control_port: Keyword.get(opts, :control_port),
ssrc: :rand.uniform(0xFFFFFFFF),
seq: Keyword.get(opts, :start_seq, :rand.uniform(0xFFFF)),
base_ts: Keyword.get(opts, :start_ts, :rand.uniform(0xFFFFFFFF)),
sent: 0,
first_sync?: true,
start_ns: System.monotonic_time(:nanosecond),
streaming?: true
})
send(self(), :tick_audio)
Process.send_after(self(), :tick_sync, 0)
{:reply, :ok, st}
end
@impl true
def handle_info(:tick_audio, st) do
# Send at ~real time with a small lead (~0.7s). The receiver delays playback
# by the announced latency (~2s) and fills its buffer from this steady stream,
# so we must NOT dump the whole 2s at once (that overflows its frame buffer).
elapsed = System.monotonic_time(:nanosecond) - st.start_ns
due = div(elapsed, @frame_ns) + @prebuffer_frames
to_send = max(due - st.sent, 0)
{frames, source, eos?} = pull(st.source, to_send)
st = Enum.reduce(frames, %{st | source: source}, &send_frame(&2, &1))
if eos? do
Logger.info("[airplay] stream complete (#{st.sent} frames)")
{:stop, :normal, st}
else
Process.send_after(self(), :tick_audio, @tick_ms)
{:noreply, st}
end
end
def handle_info(:tick_sync, st) do
# rtp_now tracks WALL-CLOCK elapsed (the live network position), so the
# receiver can anchor playback: sample `rtp_now - latency` plays at `ntp`.
elapsed = System.monotonic_time(:nanosecond) - st.start_ns
rtp_now = st.base_ts + div(elapsed * @sample_rate, 1_000_000_000) &&& 0xFFFFFFFF
pkt = Rtp.sync(rtp_now, @latency, Ntp.now(), st.first_sync?)
if st.control_port, do: :gen_udp.send(st.control, st.host, st.control_port, pkt)
Process.send_after(self(), :tick_sync, @sync_ms)
{:noreply, %{st | first_sync?: false}}
end
# Device timing request → reply (from our timing socket) to its source port
# with our receive/transmit NTP times.
def handle_info({:udp, _sock, _ip, port, data}, st) do
recv = Ntp.now()
case Rtp.timing_request_origin(data) do
{:ok, origin} ->
resp = Rtp.timing_response(origin, recv, Ntp.now())
:gen_udp.send(st.timing, st.host, port, resp)
:error ->
:ok
end
{:noreply, st}
end
@impl true
def terminate(_reason, st) do
Enum.each([st.audio, st.control, st.timing], &:gen_udp.close/1)
:ok
end
# A list source is the whole track in memory; wrap it so `pull/2` is uniform.
defp normalize_source(frames) when is_list(frames), do: {:buffer, frames}
defp normalize_source({:decoder, _pid} = source), do: source
# Pull up to `n` frames from the source, returning `{frames, source, eos?}`.
# `eos?` is true only once the source is permanently exhausted.
defp pull(source, 0), do: {[], source, false}
defp pull({:buffer, frames}, n) do
{take, rest} = Enum.split(frames, n)
{take, {:buffer, rest}, rest == []}
end
defp pull({:decoder, pid} = source, n) do
{frames, eos?} = Decoder.take(pid, n)
{frames, source, eos?}
end
# Send one audio frame, advancing the sequence/sample counters.
defp send_frame(st, frame) do
payload = Alac.encode_stereo16(frame)
ts = st.base_ts + st.sent * @frame_samples &&& 0xFFFFFFFF
pkt = Rtp.audio(st.seq &&& 0xFFFF, ts, st.ssrc, payload, st.sent == 0)
:gen_udp.send(st.audio, st.host, st.server_port, pkt)
%{st | sent: st.sent + 1, seq: st.seq + 1}
end
end