defmodule AirPlay.Cast do
@moduledoc """
Play a PCM buffer to an AirPlay/RAOP receiver, end to end — a small session
GenServer exposing `play/3`, `stop/1`, `set_volume/2`.
It owns the RTSP control connection (`AirPlay.Session`) and the RTP audio
streamer (`AirPlay.Player`): establishes the handshake, streams the track,
and tears the session down when the audio finishes or is stopped. Targets
classic unencrypted-ALAC RAOP receivers (e.g. shairport-sync); genuine
encrypted AirPlay 2 devices that require pairing aren't supported.
Volume is given 0–100 and mapped to AirPlay's dB range internally.
"""
use GenServer
alias AirPlay.{Decoder, Player, Session, Source}
# How often to poke the RTSP control connection so the receiver doesn't time
# out the session and fade the audio (see `Session.keepalive/1`).
@keepalive_ms 2_000
@default_prebuffer_frames 16
@default_prebuffer_timeout_ms 5_000
@spec play(String.t(), binary(), keyword()) :: {:ok, pid()} | {:error, term()}
def play(host, pcm, opts \\ []), do: GenServer.start(__MODULE__, {:pcm, host, pcm, opts}, [])
@doc """
Stream the audio file at `path` to `host`, decoding incrementally so memory
stays bounded regardless of track length. Same options as `play/3`, plus
`:start_seconds` (seek offset) and `:ffmpeg` (binary path).
"""
@spec play_file(String.t(), Path.t(), keyword()) :: {:ok, pid()} | {:error, term()}
def play_file(host, path, opts \\ []),
do: GenServer.start(__MODULE__, {:file, host, path, opts}, [])
@spec stop(pid()) :: :ok
def stop(pid), do: GenServer.stop(pid, :normal)
@doc "Set volume 0–100 (mapped to AirPlay dB). No-op if RTSP rejects it."
@spec set_volume(pid(), non_neg_integer()) :: :ok
def set_volume(pid, value), do: GenServer.cast(pid, {:set_volume, value})
@impl true
def init({:pcm, host, pcm, opts}) do
start(host, opts, fn -> {:ok, Source.frames(pcm), nil} end)
end
def init({:file, host, path, opts}) do
start(host, opts, fn ->
args = Source.stream_args(path, opts)
case Decoder.start_link(args: args, ffmpeg: Keyword.get(opts, :ffmpeg)) do
{:ok, decoder} -> {:ok, {:decoder, decoder}, decoder}
err -> err
end
end)
end
# Bring up a session, then begin streaming from whatever source `source_fun`
# produces (an in-memory frame list or a `{:decoder, pid}`). The Player must
# bind + answer the timing port BEFORE SETUP (else "520 Origin Error"), so it
# starts first, then the handshake, then streaming begins.
defp start(host, opts, source_fun) do
Process.flag(:trap_exit, true)
volume = Keyword.get(opts, :volume, 25)
with {:ok, audio} <- Player.start_link(host, opts),
local_ports <- Player.local_ports(audio),
{:ok, source, decoder} <- source_fun.(),
:ok <- await_source_ready(source, opts),
{:ok, rtsp, ports} <- Session.establish(host, establish_opts(opts, volume, local_ports)),
:ok <- begin_audio(audio, source, ports) do
Process.send_after(self(), :keepalive, @keepalive_ms)
{:ok, %{rtsp: rtsp, audio: audio, decoder: decoder}}
else
err -> {:stop, {:airplay_failed, err}}
end
end
defp begin_audio(player, source, ports) do
Player.begin(player, source,
server_port: ports.server,
control_port: ports.control,
start_seq: ports.seq,
start_ts: ports.rtptime
)
end
defp await_source_ready({:decoder, decoder}, opts) do
ready_frames = Keyword.get(opts, :prebuffer_frames, @default_prebuffer_frames)
timeout = Keyword.get(opts, :prebuffer_timeout_ms, @default_prebuffer_timeout_ms)
case Decoder.await_ready(decoder, ready_frames, timeout) do
{:ok, _frames} -> :ok
{:error, reason} -> {:error, {:decoder_not_ready, reason}}
end
end
defp await_source_ready(_source, _opts), do: :ok
defp establish_opts(opts, volume, local_ports) do
[
volume: to_db(volume),
local_control_port: local_ports.control,
local_timing_port: local_ports.timing
] ++ Keyword.take(opts, [:port])
end
# The RTP audio streamer is linked; when it finishes (or dies) we're done.
@impl true
def handle_info({:EXIT, audio, _reason}, %{audio: audio} = st), do: {:stop, :normal, st}
# Keep the RTSP control channel warm so the receiver doesn't drop us at ~30s.
# A failed poke leaves the old state; the next tick retries.
def handle_info(:keepalive, st) do
st =
case Session.keepalive(st.rtsp) do
{:ok, rtsp} -> %{st | rtsp: rtsp}
_ -> st
end
Process.send_after(self(), :keepalive, @keepalive_ms)
{:noreply, st}
end
def handle_info(_msg, st), do: {:noreply, st}
@impl true
def handle_cast({:set_volume, value}, st) do
case Session.set_volume(st.rtsp, to_db(value)) do
{:ok, rtsp} -> {:noreply, %{st | rtsp: rtsp}}
_ -> {:noreply, st}
end
end
@impl true
def terminate(_reason, st) do
if is_pid(st[:audio]) and Process.alive?(st.audio), do: Player.stop(st.audio)
if is_pid(st[:decoder]) and Process.alive?(st.decoder), do: Decoder.stop(st.decoder)
if st[:rtsp], do: Session.teardown(st.rtsp)
:ok
end
# 0 → mute; otherwise a linear ramp across AirPlay's useful -30..0 dB range.
defp to_db(0), do: -144.0
defp to_db(v) when is_integer(v), do: -30.0 + v / 100 * 30.0
end