Skip to main content

lib/air_play/cast.ex

defmodule AirPlay.Cast do
  @moduledoc """
  Play a PCM buffer to an AirPlay/RAOP receiver, end to end — a small session
  GenServer exposing `play/3`, `stop/1`, `set_volume/2`.

  It owns the RTSP control connection (`AirPlay.Session`) and the RTP audio
  streamer (`AirPlay.Player`): establishes the handshake, streams the track,
  and tears the session down when the audio finishes or is stopped. Targets
  classic unencrypted-ALAC RAOP receivers (e.g. shairport-sync); genuine
  encrypted AirPlay 2 devices that require pairing aren't supported.

  Volume is given 0–100 and mapped to AirPlay's dB range internally.
  """

  use GenServer

  alias AirPlay.{Decoder, Player, Session, Source}

  # How often to poke the RTSP control connection so the receiver doesn't time
  # out the session and fade the audio (see `Session.keepalive/1`).
  @keepalive_ms 2_000
  @default_prebuffer_frames 16
  @default_prebuffer_timeout_ms 5_000

  @spec play(String.t(), binary(), keyword()) :: {:ok, pid()} | {:error, term()}
  def play(host, pcm, opts \\ []), do: GenServer.start(__MODULE__, {:pcm, host, pcm, opts}, [])

  @doc """
  Stream the audio file at `path` to `host`, decoding incrementally so memory
  stays bounded regardless of track length. Same options as `play/3`, plus
  `:start_seconds` (seek offset) and `:ffmpeg` (binary path).
  """
  @spec play_file(String.t(), Path.t(), keyword()) :: {:ok, pid()} | {:error, term()}
  def play_file(host, path, opts \\ []),
    do: GenServer.start(__MODULE__, {:file, host, path, opts}, [])

  @spec stop(pid()) :: :ok
  def stop(pid), do: GenServer.stop(pid, :normal)

  @doc "Set volume 0–100 (mapped to AirPlay dB). No-op if RTSP rejects it."
  @spec set_volume(pid(), non_neg_integer()) :: :ok
  def set_volume(pid, value), do: GenServer.cast(pid, {:set_volume, value})

  @impl true
  def init({:pcm, host, pcm, opts}) do
    start(host, opts, fn -> {:ok, Source.frames(pcm), nil} end)
  end

  def init({:file, host, path, opts}) do
    start(host, opts, fn ->
      args = Source.stream_args(path, opts)

      case Decoder.start_link(args: args, ffmpeg: Keyword.get(opts, :ffmpeg)) do
        {:ok, decoder} -> {:ok, {:decoder, decoder}, decoder}
        err -> err
      end
    end)
  end

  # Bring up a session, then begin streaming from whatever source `source_fun`
  # produces (an in-memory frame list or a `{:decoder, pid}`). The Player must
  # bind + answer the timing port BEFORE SETUP (else "520 Origin Error"), so it
  # starts first, then the handshake, then streaming begins.
  defp start(host, opts, source_fun) do
    Process.flag(:trap_exit, true)
    volume = Keyword.get(opts, :volume, 25)

    with {:ok, audio} <- Player.start_link(host, opts),
         local_ports <- Player.local_ports(audio),
         {:ok, source, decoder} <- source_fun.(),
         :ok <- await_source_ready(source, opts),
         {:ok, rtsp, ports} <- Session.establish(host, establish_opts(opts, volume, local_ports)),
         :ok <- begin_audio(audio, source, ports) do
      Process.send_after(self(), :keepalive, @keepalive_ms)
      {:ok, %{rtsp: rtsp, audio: audio, decoder: decoder}}
    else
      err -> {:stop, {:airplay_failed, err}}
    end
  end

  defp begin_audio(player, source, ports) do
    Player.begin(player, source,
      server_port: ports.server,
      control_port: ports.control,
      start_seq: ports.seq,
      start_ts: ports.rtptime
    )
  end

  defp await_source_ready({:decoder, decoder}, opts) do
    ready_frames = Keyword.get(opts, :prebuffer_frames, @default_prebuffer_frames)
    timeout = Keyword.get(opts, :prebuffer_timeout_ms, @default_prebuffer_timeout_ms)

    case Decoder.await_ready(decoder, ready_frames, timeout) do
      {:ok, _frames} -> :ok
      {:error, reason} -> {:error, {:decoder_not_ready, reason}}
    end
  end

  defp await_source_ready(_source, _opts), do: :ok

  defp establish_opts(opts, volume, local_ports) do
    [
      volume: to_db(volume),
      local_control_port: local_ports.control,
      local_timing_port: local_ports.timing
    ] ++ Keyword.take(opts, [:port])
  end

  # The RTP audio streamer is linked; when it finishes (or dies) we're done.
  @impl true
  def handle_info({:EXIT, audio, _reason}, %{audio: audio} = st), do: {:stop, :normal, st}

  # Keep the RTSP control channel warm so the receiver doesn't drop us at ~30s.
  # A failed poke leaves the old state; the next tick retries.
  def handle_info(:keepalive, st) do
    st =
      case Session.keepalive(st.rtsp) do
        {:ok, rtsp} -> %{st | rtsp: rtsp}
        _ -> st
      end

    Process.send_after(self(), :keepalive, @keepalive_ms)
    {:noreply, st}
  end

  def handle_info(_msg, st), do: {:noreply, st}

  @impl true
  def handle_cast({:set_volume, value}, st) do
    case Session.set_volume(st.rtsp, to_db(value)) do
      {:ok, rtsp} -> {:noreply, %{st | rtsp: rtsp}}
      _ -> {:noreply, st}
    end
  end

  @impl true
  def terminate(_reason, st) do
    if is_pid(st[:audio]) and Process.alive?(st.audio), do: Player.stop(st.audio)
    if is_pid(st[:decoder]) and Process.alive?(st.decoder), do: Decoder.stop(st.decoder)
    if st[:rtsp], do: Session.teardown(st.rtsp)
    :ok
  end

  # 0 → mute; otherwise a linear ramp across AirPlay's useful -30..0 dB range.
  defp to_db(0), do: -144.0
  defp to_db(v) when is_integer(v), do: -30.0 + v / 100 * 30.0
end