Skip to main content

lib/air_play/v2/player.ex

defmodule AirPlay.V2.Player do
  @moduledoc """
  Minimal AirPlay 2 file player.

  This follows the audible airplay2-rs `test_gptp` sequence for HomePod-class
  receivers:

    * transient pair
    * SETUP phase 1
    * connect event channel
    * bind local control socket before SETUP phase 2
    * SETUP phase 2 with random `shk`
    * RECORD `seq=0;rtptime=0`
    * run BMCA-yield PTP and use the receiver clock for PT=87 sync
    * FLUSH `seq=0;rtptime=0`
    * send ALAC RTP with SSRC=0, sequence=0, timestamp=0
  """

  alias AirPlay.{Alac, Decoder, Rtp, Source}
  alias AirPlay.V2.{Crypto, Pairing, Plist, PtpBmca, Rtsp2, Setup}

  @sample_rate 44_100
  @samples_per_packet 352
  @feedback_interval_us 2_000_000
  @ssrc 0

  # Streaming decode (see `AirPlay.Decoder`): buffer a little audio before the
  # first packet so the receiver starts cleanly, then pull frames on demand at
  # ~1× real time instead of decoding the whole file up front.
  @default_prebuffer_frames 125
  @default_prebuffer_timeout_ms 5_000
  @take_batch 32
  @idle_poll_ms 5

  @doc "Set volume on a running AP2 player process."
  @spec set_volume(pid(), number()) :: :ok
  def set_volume(pid, volume) when is_pid(pid) and is_number(volume) do
    send(pid, {__MODULE__, :set_volume, volume})
    :ok
  end

  @doc "Ask a running AP2 player process to stop."
  @spec stop(pid()) :: :ok
  def stop(pid) when is_pid(pid) do
    send(pid, {__MODULE__, :stop})
    :ok
  end

  @doc """
  Play a local audio file to an AirPlay 2 receiver.

  Options:

    * `:port` - RTSP port, defaults to `7000`
    * `:seconds` - limit playback duration, useful for smoke tests
    * `:volume` - optional AirPlay volume, `0.0..1.0`
    * `:render_delay_ms` - added to PT=87 sync timestamps, defaults to `0`
  """
  @spec play_file(String.t(), String.t(), keyword()) :: {:ok, map()} | {:error, term()}
  def play_file(host, path, opts \\ []) do
    with {:ok, host_ip} <- parse_host(host),
         {:ok, pairing0} <- Pairing.transient(host, port: Keyword.get(opts, :port, 7000)),
         {:ok, local_ip} <- local_ip(pairing0),
         {:ok, session_body, pairing1} <- setup_session(pairing0, local_ip),
         {:ok, event_sock} <- connect_event(host, session_body),
         {:ok, control_sock, control_port} <- open_control_socket(),
         {:ok, stream_body, pairing2} <- setup_stream(pairing1, control_port),
         {:ok, pairing3} <- record(pairing2),
         {:ok, ptp} <- PtpBmca.start_link(host_ip, local_ip: local_ip_tuple(local_ip)),
         :ok <- wait_for_ptp(Keyword.get(opts, :ptp_settle_ms, 500)),
         {:ok, pairing4} <- flush(pairing3),
         {:ok, pairing5} <- maybe_set_volume(pairing4, Keyword.get(opts, :volume)),
         {:ok, decoder} <- start_decoder(path, opts) do
      # Stream-decode and send concurrently: ffmpeg (with `-re`) produces PCM at
      # ~1× real time while `send_audio` paces packets at the same rate, so
      # playback starts after a short prebuffer instead of waiting for the whole
      # file to decode (which, for a multi-hour source, stalls for many seconds
      # and holds the entire decoded PCM in memory).
      result =
        send_audio(host_ip, session_body, stream_body, pairing5, control_sock, ptp, decoder, opts)

      Decoder.stop(decoder)
      close_event(event_sock)
      PtpBmca.stop(ptp)
      Rtsp2.close(pairing5.rtsp)
      :gen_udp.close(control_sock)
      result
    end
  end

  defp setup_session(pairing, local_ip) do
    case Setup.session(pairing, local_addresses: [local_ip], timing_port: 319) do
      {:ok, 200, _headers, body, pairing} -> {:ok, body, pairing}
      {:ok, status, _headers, body, _pairing} -> {:error, {:setup_session, status, body}}
      error -> error
    end
  end

  defp setup_stream(pairing, control_port) do
    case Setup.stream(pairing, control_port, latency_min: 22_050, latency_max: 88_200) do
      {:ok, 200, _headers, body, pairing} -> {:ok, body, pairing}
      {:ok, status, _headers, body, _pairing} -> {:error, {:setup_stream, status, body}}
      error -> error
    end
  end

  defp record(pairing) do
    case Setup.record(pairing, 0, 0) do
      {:ok, status, _headers, _body, pairing} when status in 200..299 -> {:ok, pairing}
      {:ok, status, _headers, body, _pairing} -> {:error, {:record, status, body}}
      error -> error
    end
  end

  defp flush(pairing) do
    case Setup.flush(pairing, 0, 0) do
      {:ok, status, _headers, _body, pairing} when status in 200..299 -> {:ok, pairing}
      {:ok, status, _headers, body, _pairing} -> {:error, {:flush, status, body}}
      error -> error
    end
  end

  defp maybe_set_volume(pairing, nil), do: {:ok, pairing}

  defp maybe_set_volume(pairing, volume) do
    case Setup.set_volume(pairing, volume) do
      {:ok, status, _headers, _body, pairing} when status in 200..299 -> {:ok, pairing}
      {:ok, status, _headers, body, _pairing} -> {:error, {:set_volume, status, body}}
      error -> error
    end
  end

  defp wait_for_ptp(milliseconds) when is_integer(milliseconds) and milliseconds > 0 do
    Process.sleep(milliseconds)
    :ok
  end

  defp wait_for_ptp(_milliseconds), do: :ok

  defp connect_event(host, session_body) do
    event_port = session_body |> Plist.decode!() |> Map.get("eventPort")

    if is_integer(event_port) and event_port > 0 do
      case :gen_tcp.connect(String.to_charlist(host), event_port, [:binary, active: false], 2_000) do
        {:ok, sock} -> {:ok, sock}
        {:error, _reason} -> {:ok, nil}
      end
    else
      {:ok, nil}
    end
  end

  defp close_event(nil), do: :ok
  defp close_event(sock), do: :gen_tcp.close(sock)

  defp open_control_socket do
    with {:ok, sock} <- :gen_udp.open(0, [:binary, active: false]),
         {:ok, port} <- :inet.port(sock) do
      {:ok, sock, port}
    end
  end

  defp send_audio(host_ip, session_body, stream_body, pairing, control_sock, ptp, decoder, opts) do
    with {:ok, data_port, remote_control_port} <- stream_ports(stream_body),
         {:ok, audio_sock} <- :gen_udp.open(0, [:binary, active: false]) do
      render_delay_ns = Keyword.get(opts, :render_delay_ms, 200) * 1_000_000
      clock_id = ptp.clock_id || session_clock_id(session_body) || <<0::64>>
      frame_duration_us = div(@samples_per_packet * 1_000_000, @sample_rate)
      started_us = System.monotonic_time(:microsecond)

      context = %{
        audio_sock: audio_sock,
        clock_id: clock_id,
        control_sock: control_sock,
        data_port: data_port,
        frame_duration_us: frame_duration_us,
        host_ip: host_ip,
        ptp: ptp,
        remote_control_port: remote_control_port,
        render_delay_ns: render_delay_ns,
        started_us: started_us
      }

      {_seq, _rtp, sync_seq, _last_sync, _pairing, _last_feedback_us, stopped?, packets} =
        Enum.reduce_while(
          Stream.with_index(decoder_frame_stream(decoder)),
          {0, 0, 0, nil, pairing, started_us, false, 0},
          fn frame, state ->
            send_audio_frame(frame, state, context)
          end
        )

      :gen_udp.close(audio_sock)

      {:ok,
       %{
         packets: packets,
         sync_packets: sync_seq,
         stopped?: stopped?,
         data_port: data_port,
         control_port: remote_control_port,
         ptp: PtpBmca.offset(ptp)
       }}
    end
  end

  defp send_audio_frame(
         {frame, index},
         {seq, rtp, sync_seq, last_sync, pairing, last_feedback_us, _stopped?, count},
         context
       ) do
    case receive_controls(pairing) do
      {:stop, pairing} ->
        {:halt, {seq, rtp, sync_seq, last_sync, pairing, last_feedback_us, true, count}}

      {:cont, pairing} ->
        send_active_audio_frame(
          frame,
          index,
          {seq, rtp, sync_seq, last_sync, pairing, last_feedback_us, count},
          context
        )
    end
  end

  defp send_active_audio_frame(
         frame,
         index,
         {seq, rtp, sync_seq, last_sync, pairing, last_feedback_us, count},
         context
       ) do
    first? = index == 0
    {sync_seq, last_sync} = maybe_send_sync(first?, rtp, sync_seq, last_sync, context)
    send_audio_packet(frame, first?, seq, rtp, pairing.audio_key, context)
    {pairing, last_feedback_us} = maybe_send_feedback(pairing, last_feedback_us)
    pace_audio_frame(index, context)

    {:cont,
     {rem(seq + 1, 65_536), rem(rtp + @samples_per_packet, 4_294_967_296), sync_seq, last_sync,
      pairing, last_feedback_us, false, count + 1}}
  end

  defp maybe_send_feedback(pairing, last_feedback_us) do
    now_us = System.monotonic_time(:microsecond)

    if now_us - last_feedback_us >= @feedback_interval_us do
      pairing =
        case Setup.feedback(pairing) do
          {:ok, status, _headers, _body, pairing} when status in 200..299 -> pairing
          _error -> pairing
        end

      {pairing, now_us}
    else
      {pairing, last_feedback_us}
    end
  end

  defp maybe_send_sync(first?, rtp, sync_seq, last_sync, context) do
    if first? or is_nil(last_sync) or rtp - last_sync >= @sample_rate do
      ptp_time = PtpBmca.receiver_time_ns(context.ptp) + context.render_delay_ns
      next_rtp = rtp + @samples_per_packet
      sync = Rtp.ptp_sync(sync_seq, rtp, ptp_time, next_rtp, context.clock_id, sync_seq == 0)

      :ok =
        :gen_udp.send(context.control_sock, context.host_ip, context.remote_control_port, sync)

      {rem(sync_seq + 1, 65_536), rtp}
    else
      {sync_seq, last_sync}
    end
  end

  defp send_audio_packet(frame, first?, seq, rtp, audio_key, context) do
    payload = Alac.encode_stereo16(frame)
    payload_type = if first?, do: 0xE0, else: 0x60
    header = <<0x80, payload_type, seq::16, rtp::32, @ssrc::32>>
    packet = header <> Crypto.audio_encrypt(audio_key, rtp, @ssrc, seq, payload)
    :ok = :gen_udp.send(context.audio_sock, context.host_ip, context.data_port, packet)
  end

  defp pace_audio_frame(index, context) do
    target_us = context.started_us + (index + 1) * context.frame_duration_us
    sleep_us = target_us - System.monotonic_time(:microsecond)
    if sleep_us > 1_000, do: Process.sleep(div(sleep_us, 1_000))
  end

  defp receive_controls(pairing) do
    receive do
      {__MODULE__, :set_volume, volume} ->
        pairing =
          case Setup.set_volume(pairing, volume) do
            {:ok, status, _headers, _body, pairing} when status in 200..299 -> pairing
            _error -> pairing
          end

        receive_controls(pairing)

      {__MODULE__, :stop} ->
        {:stop, pairing}
    after
      0 -> {:cont, pairing}
    end
  end

  defp stream_ports(stream_body) do
    case Plist.decode!(stream_body) do
      %{"streams" => [%{"dataPort" => data_port, "controlPort" => control_port} | _]}
      when is_integer(data_port) and is_integer(control_port) ->
        {:ok, data_port, control_port}

      decoded ->
        {:error, {:missing_stream_ports, decoded}}
    end
  end

  defp session_clock_id(session_body) do
    case Plist.decode!(session_body) do
      %{"timingPeerInfo" => %{"ClockID" => clock_id}} when is_integer(clock_id) ->
        <<clock_id::64>>

      _ ->
        nil
    end
  end

  # Start a streaming ffmpeg decoder and wait for a small prebuffer, so the first
  # RTP packet can go out almost immediately. `Source.stream_args/2` adds `-re`
  # (read input at native rate), which is the backpressure that keeps the decoded
  # PCM buffer (and this process's mailbox) bounded for long files.
  defp start_decoder(path, opts) do
    args = Source.stream_args(path, opts)

    case Decoder.start_link(args: args, ffmpeg: Keyword.get(opts, :ffmpeg)) do
      {:ok, decoder} ->
        ready = Keyword.get(opts, :prebuffer_frames, @default_prebuffer_frames)
        timeout = Keyword.get(opts, :prebuffer_timeout_ms, @default_prebuffer_timeout_ms)

        case Decoder.await_ready(decoder, ready, timeout) do
          {:ok, _frames} ->
            {:ok, decoder}

          {:error, reason} ->
            Decoder.stop(decoder)
            {:error, {:decoder_not_ready, reason}}
        end

      error ->
        error
    end
  end

  # Lazily pull decoded frames so `send_audio` can pace them out one at a time.
  # `{[], false}` means the decoder is momentarily behind (not yet at end of
  # stream), so idle briefly and retry rather than spin.
  defp decoder_frame_stream(decoder) do
    Stream.resource(
      fn -> :ok end,
      fn :ok ->
        case Decoder.take(decoder, @take_batch) do
          {[], true} ->
            {:halt, :ok}

          {[], false} ->
            Process.sleep(@idle_poll_ms)
            {[], :ok}

          {frames, _eos?} ->
            {frames, :ok}
        end
      end,
      fn :ok -> :ok end
    )
  end

  defp local_ip(pairing) do
    case :inet.sockname(pairing.rtsp.sock) do
      {:ok, {{_, _, _, _} = ip, _port}} -> {:ok, ip_to_string(ip)}
      {:ok, {ip, _port}} -> {:ok, ip_to_string(ip)}
      error -> error
    end
  end

  defp local_ip_tuple(ip) when is_binary(ip) do
    {:ok, tuple} = ip |> String.to_charlist() |> :inet.parse_address()
    tuple
  end

  defp ip_to_string(ip), do: ip |> Tuple.to_list() |> Enum.join(".")

  defp parse_host({_, _, _, _} = ip), do: {:ok, ip}

  defp parse_host(host) when is_binary(host),
    do: host |> String.to_charlist() |> :inet.parse_address()
end