defmodule AirPlay.V2.GroupSession do
@moduledoc """
Persistent AirPlay 2 multi-room group session.
The group analogue of `AirPlay.V2.Session`: it pairs + sets up every receiver,
starts one BMCA-yield PTP clock on the primary, sends SETPEERS, and then keeps
that whole arrangement warm across tracks. Each `play/3` flushes (continuing the
shared RTP timeline) and streams one decode to every receiver, encrypting each
packet with that receiver's stream key.
Same owner-notification contract as `AirPlay.V2.Session`:
* `{#{inspect(__MODULE__)}, :ended, play_gen}` — current track finished
* `{#{inspect(__MODULE__)}, :error, play_gen, reason}` — a track failed to start
"""
alias AirPlay.{Alac, Decoder, Rtp, Source}
alias AirPlay.V2.{Crypto, Pairing, Plist, PtpBmca, Rtsp2, Setup}
@sample_rate 44_100
@samples_per_packet 352
@feedback_interval_us 2_000_000
@ssrc 0
@default_prebuffer_frames 125
# Small prebuffer once warm so album track changes stay ~gapless (see Session).
@default_warm_prebuffer_frames 16
@default_prebuffer_timeout_ms 5_000
@take_batch 32
@idle_poll_ms 5
@keepalive_ms 1_000
@doc """
Open a persistent session to an AirPlay 2 receiver group.
Receivers are maps with at least `:host`/`"host"` and optional `:port`/`"port"`.
Options mirror `AirPlay.V2.Session.connect/2` (`:owner`, `:render_delay_ms`,
`:ptp_settle_ms`, `:connect_timeout_ms`).
"""
@spec connect([map()], keyword()) :: {:ok, pid()} | {:error, term()}
def connect(receivers, opts \\ []) when is_list(receivers) do
caller = self()
owner = Keyword.get(opts, :owner, caller)
pid = spawn(fn -> init(receivers, opts, caller, owner) end)
ref = Process.monitor(pid)
receive do
{__MODULE__, :ready, ^pid} ->
Process.demonitor(ref, [:flush])
{:ok, pid}
{__MODULE__, :connect_error, ^pid, reason} ->
Process.demonitor(ref, [:flush])
{:error, reason}
{:DOWN, ^ref, :process, ^pid, reason} ->
{:error, {:session_down, reason}}
after
Keyword.get(opts, :connect_timeout_ms, 20_000) ->
Process.demonitor(ref, [:flush])
Process.exit(pid, :kill)
{:error, :connect_timeout}
end
end
@doc "Play (or skip to) a file on an open group session. Continues the RTP timeline."
@spec play(pid(), String.t(), keyword()) :: :ok
def play(pid, path, opts \\ []) when is_pid(pid) do
send(pid, {__MODULE__, :play, path, opts})
:ok
end
@doc "Stop the current track but keep the group connection warm."
@spec stop(pid()) :: :ok
def stop(pid) when is_pid(pid) do
send(pid, {__MODULE__, :stop})
:ok
end
@doc "Set the group volume (`0.0..1.0`) on every receiver."
@spec set_volume(pid(), number()) :: :ok
def set_volume(pid, volume) when is_pid(pid) and is_number(volume) do
send(pid, {__MODULE__, :set_volume, volume})
:ok
end
@doc "Tear down the group session and close every receiver."
@spec close(pid()) :: :ok
def close(pid) when is_pid(pid) do
send(pid, {__MODULE__, :close})
:ok
end
# --- process bootstrap -----------------------------------------------------
defp init(receivers, opts, caller, owner) do
case establish(receivers, opts) do
{:ok, state} ->
owner_ref = if is_pid(owner), do: Process.monitor(owner), else: nil
send(caller, {__MODULE__, :ready, self()})
loop_idle(%{state | owner: owner, owner_ref: owner_ref})
{:error, reason} ->
send(caller, {__MODULE__, :connect_error, self(), reason})
end
end
defp establish(receivers, opts) do
receivers = receivers |> Enum.map(&normalize_receiver/1) |> Enum.reject(&is_nil/1)
if length(receivers) < 2 do
{:error, :need_at_least_two_airplay2_receivers}
else
with {:ok, targets} <- prepare_targets(receivers, opts),
[primary | _] <- targets,
{:ok, ptp} <-
PtpBmca.start_link(primary.host_ip, local_ip: local_ip_tuple(primary.local_ip)),
:ok <- wait_for_ptp(Keyword.get(opts, :ptp_settle_ms, 500)),
:ok <- PtpBmca.await_sync(ptp, Keyword.get(opts, :ptp_sync_timeout_ms, 3_000)),
{:ok, targets} <- send_setpeers(targets, peer_addresses(targets)) do
now = System.monotonic_time(:microsecond)
{:ok,
%{
owner: nil,
owner_ref: nil,
targets: targets,
ptp: ptp,
clock_id: ptp.clock_id || session_clock_id(primary.session_body) || <<0::64>>,
frame_duration_us: div(@samples_per_packet * 1_000_000, @sample_rate),
render_delay_ns: Keyword.get(opts, :render_delay_ms, 200) * 1_000_000,
seq: 0,
rtp: 0,
sync_seq: 0,
last_sync: nil,
last_feedback_us: now,
streamed?: false,
decoder: nil,
play_gen: nil,
track_started_us: now,
track_index: 0
}}
else
{:error, _reason} = error ->
error
other ->
{:error, {:group_setup_failed, other}}
end
end
end
# --- idle / play loops -----------------------------------------------------
defp loop_idle(state) do
receive do
{__MODULE__, :play, path, opts} ->
state |> start_track(path, opts) |> resume()
{__MODULE__, :stop} ->
loop_idle(state)
{__MODULE__, :set_volume, volume} ->
loop_idle(apply_volume(state, volume))
{__MODULE__, :close} ->
do_close(state)
{:DOWN, _ref, :process, _pid, _reason} ->
do_close(state)
_other ->
loop_idle(state)
after
@keepalive_ms -> loop_idle(keepalive(state))
end
end
defp loop_play(state) do
case drain_commands(state) do
{:stop, state} ->
state |> stop_track() |> loop_idle()
{:close, state} ->
state |> stop_track() |> do_close()
{:skip, path, opts, state} ->
state |> stop_track() |> start_track(path, opts) |> resume()
{:cont, state} ->
case stream_batch(state) do
{:cont, state} ->
loop_play(state)
{:eos, state} ->
state = stop_track(state)
notify_owner(state, {:ended, state.play_gen})
loop_idle(%{state | play_gen: nil})
end
end
end
defp resume(state) do
if state.decoder, do: loop_play(state), else: loop_idle(state)
end
defp drain_commands(state) do
receive do
{__MODULE__, :set_volume, volume} -> drain_commands(apply_volume(state, volume))
{__MODULE__, :stop} -> {:stop, state}
{__MODULE__, :close} -> {:close, state}
{__MODULE__, :play, path, opts} -> {:skip, path, opts, state}
after
0 -> {:cont, state}
end
end
defp stream_batch(state) do
case Decoder.take(state.decoder, @take_batch) do
{[], true} ->
{:eos, state}
{[], false} ->
Process.sleep(@idle_poll_ms)
{:cont, state}
{frames, _eos?} ->
{:cont, Enum.reduce(frames, state, &send_one_frame/2)}
end
end
defp send_one_frame(frame, state) do
first? = state.track_index == 0
state = maybe_send_sync(first?, state)
send_group_packets(frame, first?, state)
state = maybe_send_feedback(state)
pace(state)
%{
state
| seq: rem(state.seq + 1, 65_536),
rtp: rem(state.rtp + @samples_per_packet, 4_294_967_296),
track_index: state.track_index + 1
}
end
defp maybe_send_sync(first?, state) do
if first? or is_nil(state.last_sync) or state.rtp - state.last_sync >= @sample_rate do
ptp_time = PtpBmca.receiver_time_ns(state.ptp) + state.render_delay_ns
next_rtp = state.rtp + @samples_per_packet
sync =
Rtp.ptp_sync(
state.sync_seq,
state.rtp,
ptp_time,
next_rtp,
state.clock_id,
state.sync_seq == 0
)
Enum.each(state.targets, fn target ->
:ok = :gen_udp.send(target.control_sock, target.host_ip, target.remote_control_port, sync)
end)
%{state | sync_seq: rem(state.sync_seq + 1, 65_536), last_sync: state.rtp}
else
state
end
end
defp send_group_packets(frame, first?, state) do
payload = Alac.encode_stereo16(frame)
payload_type = if first?, do: 0xE0, else: 0x60
header = <<0x80, payload_type, state.seq::16, state.rtp::32, @ssrc::32>>
Enum.each(state.targets, fn target ->
packet =
header <>
Crypto.audio_encrypt(target.pairing.audio_key, state.rtp, @ssrc, state.seq, payload)
:ok = :gen_udp.send(target.audio_sock, target.host_ip, target.data_port, packet)
end)
end
defp maybe_send_feedback(state) do
now_us = System.monotonic_time(:microsecond)
if now_us - state.last_feedback_us >= @feedback_interval_us do
%{state | targets: feedback_all(state.targets), last_feedback_us: now_us}
else
state
end
end
defp pace(state) do
target_us = state.track_started_us + (state.track_index + 1) * state.frame_duration_us
sleep_us = target_us - System.monotonic_time(:microsecond)
if sleep_us > 1_000, do: Process.sleep(div(sleep_us, 1_000))
end
# --- track lifecycle -------------------------------------------------------
defp start_track(state, path, opts) do
state = %{state | targets: flush_all(state.targets, state.seq, state.rtp)}
state = apply_volume(state, Keyword.get(opts, :volume))
case start_decoder(path, prebuffer_opts(state, opts)) do
{:ok, decoder} ->
%{
state
| decoder: decoder,
play_gen: Keyword.get(opts, :playback_gen),
track_started_us: System.monotonic_time(:microsecond),
track_index: 0,
last_sync: nil,
streamed?: true
}
{:error, reason} ->
notify_owner(state, {:error, Keyword.get(opts, :playback_gen), reason})
%{state | decoder: nil, play_gen: nil}
end
end
# Large prebuffer to prime the receivers on the first (cold) stream; a small one
# on warm track changes so albums stay ~gapless.
defp prebuffer_opts(%{streamed?: true}, opts),
do: Keyword.put(opts, :prebuffer_frames, warm_prebuffer_frames(opts))
defp prebuffer_opts(%{streamed?: false}, opts), do: opts
defp warm_prebuffer_frames(opts),
do: Keyword.get(opts, :warm_prebuffer_frames, @default_warm_prebuffer_frames)
defp stop_track(%{decoder: nil} = state), do: state
defp stop_track(%{decoder: decoder} = state) do
Decoder.stop(decoder)
%{state | decoder: nil}
end
defp do_close(state) do
if state.decoder, do: Decoder.stop(state.decoder)
cleanup(state.targets, state.ptp)
:ok
end
defp keepalive(state) do
%{
state
| targets: feedback_all(state.targets),
last_feedback_us: System.monotonic_time(:microsecond)
}
end
# --- RTSP control across all targets ---------------------------------------
defp flush_all(targets, seq, rtp) do
Enum.map(targets, fn target ->
case Setup.flush(target.pairing, seq, rtp) do
{:ok, status, _headers, _body, pairing} when status in 200..299 ->
%{target | pairing: pairing}
_other ->
target
end
end)
end
defp feedback_all(targets) do
Enum.map(targets, fn target ->
case Setup.feedback(target.pairing) do
{:ok, status, _headers, _body, pairing} when status in 200..299 ->
%{target | pairing: pairing}
_other ->
target
end
end)
end
defp apply_volume(state, nil), do: state
defp apply_volume(state, volume) do
targets =
Enum.map(state.targets, fn target ->
case Setup.set_volume(target.pairing, volume) do
{:ok, status, _headers, _body, pairing} when status in 200..299 ->
%{target | pairing: pairing}
_other ->
target
end
end)
%{state | targets: targets}
end
defp notify_owner(%{owner: owner}, {:ended, gen}) when is_pid(owner),
do: send(owner, {__MODULE__, :ended, gen})
defp notify_owner(%{owner: owner}, {:error, gen, reason}) when is_pid(owner),
do: send(owner, {__MODULE__, :error, gen, reason})
defp notify_owner(_state, _msg), do: :ok
# --- target setup (mirrors AirPlay.V2.GroupPlayer) -------------------------
defp prepare_targets(receivers, opts) do
Enum.reduce_while(receivers, {:ok, []}, fn receiver, {:ok, targets} ->
case prepare_target(receiver, opts) do
{:ok, target} ->
{:cont, {:ok, [target | targets]}}
{:error, reason} ->
cleanup(Enum.reverse(targets), nil)
{:halt, {:error, {:prepare_group_receiver, receiver.host, reason}}}
end
end)
|> case do
{:ok, targets} -> {:ok, Enum.reverse(targets)}
error -> error
end
end
defp prepare_target(%{host: host, port: port} = receiver, opts) do
with {:ok, host_ip} <- parse_host(host),
{:ok, pairing0} <- Pairing.transient(host, port: port),
pairing0 = stamp_dacp(pairing0, opts),
{:ok, local_ip} <- local_ip(pairing0),
{:ok, session_body, pairing1} <- setup_session(pairing0, local_ip),
{:ok, event_sock} <- connect_event(host, session_body),
{:ok, control_sock, control_port} <- open_control_socket(),
{:ok, stream_body, pairing2} <- setup_stream(pairing1, control_port, opts),
{:ok, pairing3} <- record(pairing2),
{:ok, data_port, remote_control_port} <- stream_ports(stream_body),
{:ok, audio_sock} <- :gen_udp.open(0, [:binary, active: false]) do
{:ok,
%{
host: host,
port: port,
receiver: receiver,
host_ip: host_ip,
local_ip: local_ip,
session_body: session_body,
stream_body: stream_body,
pairing: pairing3,
event_sock: event_sock,
control_sock: control_sock,
audio_sock: audio_sock,
data_port: data_port,
remote_control_port: remote_control_port
}}
end
end
defp normalize_receiver(%{host: host} = receiver) when is_binary(host),
do: %{host: host, port: normalize_port(Map.get(receiver, :port))}
defp normalize_receiver(%{"host" => host} = receiver) when is_binary(host),
do: %{host: host, port: normalize_port(Map.get(receiver, "port"))}
defp normalize_receiver(_receiver), do: nil
defp normalize_port(port) when is_integer(port) and port > 0, do: port
defp normalize_port(port) when is_binary(port) do
String.to_integer(port)
rescue
_error -> 7000
end
defp normalize_port(_port), do: 7000
# See AirPlay.V2.Session.stamp_dacp/2 — advertise the DACP remote on each member.
defp stamp_dacp(pairing, opts) do
%{
pairing
| rtsp: %{
pairing.rtsp
| dacp_id: Keyword.get(opts, :dacp_id),
active_remote: Keyword.get(opts, :active_remote)
}
}
end
defp setup_session(pairing, local_ip) do
case Setup.session(pairing, local_addresses: [local_ip], timing_port: 319) do
{:ok, 200, _headers, body, pairing} -> {:ok, body, pairing}
{:ok, status, _headers, body, _pairing} -> {:error, {:setup_session, status, body}}
error -> error
end
end
defp setup_stream(pairing, control_port, opts) do
case Setup.stream(pairing, control_port,
latency_min: Keyword.get(opts, :latency_min, 22_050),
latency_max: Keyword.get(opts, :latency_max, 88_200)
) do
{:ok, 200, _headers, body, pairing} -> {:ok, body, pairing}
{:ok, status, _headers, body, _pairing} -> {:error, {:setup_stream, status, body}}
error -> error
end
end
defp record(pairing) do
case Setup.record(pairing, 0, 0) do
{:ok, status, _headers, _body, pairing} when status in 200..299 -> {:ok, pairing}
{:ok, status, _headers, body, _pairing} -> {:error, {:record, status, body}}
error -> error
end
end
defp send_setpeers(targets, addresses) do
map_targets(targets, fn target ->
case Setup.set_peers(target.pairing, addresses) do
{:ok, status, _headers, _body, pairing} when status in 200..299 ->
{:ok, %{target | pairing: pairing}}
{:ok, status, _headers, body, _pairing} ->
{:error, {:set_peers, target.host, status, body}}
error ->
error
end
end)
end
defp map_targets(targets, fun) do
Enum.reduce_while(targets, {:ok, []}, fn target, {:ok, acc} ->
case fun.(target) do
{:ok, target} -> {:cont, {:ok, [target | acc]}}
{:error, reason} -> {:halt, {:error, reason}}
end
end)
|> case do
{:ok, targets} -> {:ok, Enum.reverse(targets)}
error -> error
end
end
defp peer_addresses(targets) do
targets
|> Enum.flat_map(fn target -> [target.host, target.local_ip] end)
|> Enum.reject(&is_nil/1)
|> Enum.uniq()
end
defp connect_event(host, session_body) do
event_port = session_body |> Plist.decode!() |> Map.get("eventPort")
if is_integer(event_port) and event_port > 0 do
case :gen_tcp.connect(String.to_charlist(host), event_port, [:binary, active: false], 2_000) do
{:ok, sock} -> {:ok, sock}
{:error, _reason} -> {:ok, nil}
end
else
{:ok, nil}
end
end
defp open_control_socket do
with {:ok, sock} <- :gen_udp.open(0, [:binary, active: false]),
{:ok, port} <- :inet.port(sock) do
{:ok, sock, port}
end
end
defp wait_for_ptp(milliseconds) when is_integer(milliseconds) and milliseconds > 0 do
Process.sleep(milliseconds)
:ok
end
defp wait_for_ptp(_milliseconds), do: :ok
defp stream_ports(stream_body) do
case Plist.decode!(stream_body) do
%{"streams" => [%{"dataPort" => data_port, "controlPort" => control_port} | _]}
when is_integer(data_port) and is_integer(control_port) ->
{:ok, data_port, control_port}
decoded ->
{:error, {:missing_stream_ports, decoded}}
end
end
defp session_clock_id(session_body) do
case Plist.decode!(session_body) do
%{"timingPeerInfo" => %{"ClockID" => clock_id}} when is_integer(clock_id) ->
<<clock_id::64>>
_ ->
nil
end
end
defp start_decoder(path, opts) do
args = Source.stream_args(path, opts)
case Decoder.start_link(args: args, ffmpeg: Keyword.get(opts, :ffmpeg)) do
{:ok, decoder} ->
ready = Keyword.get(opts, :prebuffer_frames, @default_prebuffer_frames)
timeout = Keyword.get(opts, :prebuffer_timeout_ms, @default_prebuffer_timeout_ms)
case Decoder.await_ready(decoder, ready, timeout) do
{:ok, _frames} ->
{:ok, decoder}
{:error, reason} ->
Decoder.stop(decoder)
{:error, {:decoder_not_ready, reason}}
end
error ->
error
end
end
defp cleanup(targets, ptp) do
if ptp, do: PtpBmca.stop(ptp)
Enum.each(targets, &cleanup_target/1)
:ok
end
defp cleanup_target(target) do
close_event(Map.get(target, :event_sock))
close_udp(Map.get(target, :audio_sock))
close_udp(Map.get(target, :control_sock))
case target do
%{pairing: %{rtsp: rtsp}} -> Rtsp2.close(rtsp)
_target -> :ok
end
catch
_kind, _reason -> :ok
end
defp close_event(nil), do: :ok
defp close_event(sock), do: :gen_tcp.close(sock)
defp close_udp(nil), do: :ok
defp close_udp(sock), do: :gen_udp.close(sock)
defp local_ip(pairing) do
case :inet.sockname(pairing.rtsp.sock) do
{:ok, {{_, _, _, _} = ip, _port}} -> {:ok, ip_to_string(ip)}
{:ok, {ip, _port}} -> {:ok, ip_to_string(ip)}
error -> error
end
end
defp local_ip_tuple(ip) when is_binary(ip) do
{:ok, tuple} = ip |> String.to_charlist() |> :inet.parse_address()
tuple
end
defp ip_to_string(ip), do: ip |> Tuple.to_list() |> Enum.join(".")
defp parse_host({_, _, _, _} = ip), do: {:ok, ip}
defp parse_host(host) when is_binary(host),
do: host |> String.to_charlist() |> :inet.parse_address()
end