Skip to main content

lib/air_play/v2/session.ex

defmodule AirPlay.V2.Session do
  @moduledoc """
  Persistent AirPlay 2 session for a single receiver.

  Unlike `AirPlay.V2.Player.play_file/3` (which pairs, sets up, runs PTP, streams
  one file, and tears everything down), a `Session` keeps the connection and clock
  warm across tracks:

    * `connect/2` performs the one-time handshake (transient pair, SETUP session +
      stream, RECORD, BMCA-yield PTP) and returns the session process pid.
    * `play/3` streams a file: `FLUSH` (continuing the RTP timeline, not resetting
      to zero) then ALAC RTP send. When the file ends the session notifies its owner
      with `{#{inspect(__MODULE__)}, :ended, play_gen}` and goes idle — the
      connection stays up.
    * while idle, the session keeps sending RTSP `FEEDBACK` so the receiver does not
      tear the stream down between tracks.
    * `close/1` issues the teardown and closes all sockets.

  This is what lets an album or audiobook change tracks without paying the ~5s
  pair + SETUP + PTP cold-start on every track.

  Owner notifications (sent to the `:owner` pid, defaulting to the caller of
  `connect/2`):

    * `{#{inspect(__MODULE__)}, :ended, play_gen}` — current track finished cleanly
    * `{#{inspect(__MODULE__)}, :error, play_gen, reason}` — a track failed to start
      (e.g. the decoder could not be prepared); the session stays connected and idle
  """

  alias AirPlay.{Alac, Decoder, Rtp, Source}
  alias AirPlay.V2.{Crypto, Pairing, Plist, PtpBmca, Rtsp2, Setup}

  @sample_rate 44_100
  @samples_per_packet 352
  @feedback_interval_us 2_000_000
  @ssrc 0

  @default_prebuffer_frames 125
  # Once the session has streamed a track the connection + PTP clock are warm, so a
  # small prebuffer keeps album/audiobook track changes ~gapless. The large cold
  # prebuffer above is only needed to prime the receiver on the very first stream.
  @default_warm_prebuffer_frames 16
  @default_prebuffer_timeout_ms 5_000
  @take_batch 32
  @idle_poll_ms 5

  # While idle (between tracks) send FEEDBACK at least this often so the receiver
  # keeps the session alive. Comfortably inside the active 2s feedback cadence.
  @keepalive_ms 1_000

  @doc """
  Open a persistent session to an AirPlay 2 receiver.

  Options:

    * `:port` - RTSP port, defaults to `7000`
    * `:owner` - pid to receive `:ended` / `:error` notifications, defaults to the
      calling process
    * `:render_delay_ms` - added to PT=87 sync timestamps so the receiver buffers
      before rendering, defaults to `200`
    * `:prebuffer_frames` - decoded frames (~8ms each) to buffer before the first
      packet, defaults to `125` (~1s); a near-empty buffer starves the receiver at
      the start of a stream
    * `:ptp_settle_ms` - PTP settle delay after BMCA, defaults to `500`
    * `:ptp_sync_timeout_ms` - max wait for the PTP offset to converge before
      streaming, defaults to `3_000`
    * `:connect_timeout_ms` - give up on the handshake after this long, default `20_000`
  """
  @spec connect(String.t(), keyword()) :: {:ok, pid()} | {:error, term()}
  def connect(host, opts \\ []) do
    caller = self()
    owner = Keyword.get(opts, :owner, caller)
    pid = spawn(fn -> init(host, opts, caller, owner) end)
    ref = Process.monitor(pid)

    receive do
      {__MODULE__, :ready, ^pid} ->
        Process.demonitor(ref, [:flush])
        {:ok, pid}

      {__MODULE__, :connect_error, ^pid, reason} ->
        Process.demonitor(ref, [:flush])
        {:error, reason}

      {:DOWN, ^ref, :process, ^pid, reason} ->
        {:error, {:session_down, reason}}
    after
      Keyword.get(opts, :connect_timeout_ms, 20_000) ->
        Process.demonitor(ref, [:flush])
        Process.exit(pid, :kill)
        {:error, :connect_timeout}
    end
  end

  @doc "Play (or skip to) a file on an open session. Continues the RTP timeline."
  @spec play(pid(), String.t(), keyword()) :: :ok
  def play(pid, path, opts \\ []) when is_pid(pid) do
    send(pid, {__MODULE__, :play, path, opts})
    :ok
  end

  @doc "Stop the current track but keep the connection warm."
  @spec stop(pid()) :: :ok
  def stop(pid) when is_pid(pid) do
    send(pid, {__MODULE__, :stop})
    :ok
  end

  @doc "Set the receiver volume (`0.0..1.0`)."
  @spec set_volume(pid(), number()) :: :ok
  def set_volume(pid, volume) when is_pid(pid) and is_number(volume) do
    send(pid, {__MODULE__, :set_volume, volume})
    :ok
  end

  @doc "Tear down the session and close all sockets."
  @spec close(pid()) :: :ok
  def close(pid) when is_pid(pid) do
    send(pid, {__MODULE__, :close})
    :ok
  end

  # --- process bootstrap -----------------------------------------------------

  defp init(host, opts, caller, owner) do
    case establish(host, opts) do
      {:ok, state} ->
        owner_ref = if is_pid(owner), do: Process.monitor(owner), else: nil
        send(caller, {__MODULE__, :ready, self()})
        loop_idle(%{state | owner: owner, owner_ref: owner_ref})

      {:error, reason} ->
        send(caller, {__MODULE__, :connect_error, self(), reason})
    end
  end

  defp establish(host, opts) do
    with {:ok, host_ip} <- parse_host(host),
         {:ok, pairing0} <- Pairing.transient(host, port: Keyword.get(opts, :port, 7000)),
         pairing0 = stamp_dacp(pairing0, opts),
         {:ok, local_ip} <- local_ip(pairing0),
         {:ok, session_body, pairing1} <- setup_session(pairing0, local_ip),
         {:ok, event_sock} <- connect_event(host, session_body),
         {:ok, control_sock, control_port} <- open_control_socket(),
         {:ok, stream_body, pairing2} <- setup_stream(pairing1, control_port, opts),
         {:ok, pairing3} <- record(pairing2),
         {:ok, ptp} <- PtpBmca.start_link(host_ip, local_ip: local_ip_tuple(local_ip)),
         :ok <- wait_for_ptp(Keyword.get(opts, :ptp_settle_ms, 500)),
         :ok <- PtpBmca.await_sync(ptp, Keyword.get(opts, :ptp_sync_timeout_ms, 3_000)),
         {:ok, data_port, remote_control_port} <- stream_ports(stream_body),
         {:ok, audio_sock} <- :gen_udp.open(0, [:binary, active: false]) do
      now = System.monotonic_time(:microsecond)

      {:ok,
       %{
         owner: nil,
         owner_ref: nil,
         host_ip: host_ip,
         local_ip: local_ip,
         session_body: session_body,
         stream_body: stream_body,
         pairing: pairing3,
         event_sock: event_sock,
         control_sock: control_sock,
         audio_sock: audio_sock,
         ptp: ptp,
         clock_id: ptp.clock_id || session_clock_id(session_body) || <<0::64>>,
         data_port: data_port,
         remote_control_port: remote_control_port,
         frame_duration_us: div(@samples_per_packet * 1_000_000, @sample_rate),
         render_delay_ns: Keyword.get(opts, :render_delay_ms, 200) * 1_000_000,
         # RTP timeline counters — persist across tracks
         seq: 0,
         rtp: 0,
         sync_seq: 0,
         last_sync: nil,
         last_feedback_us: now,
         # current-track state
         streamed?: false,
         decoder: nil,
         play_gen: nil,
         track_started_us: now,
         track_index: 0
       }}
    end
  end

  # --- idle loop (connection warm, no audio) ---------------------------------

  defp loop_idle(state) do
    receive do
      {__MODULE__, :play, path, opts} ->
        state |> start_track(path, opts) |> resume()

      {__MODULE__, :stop} ->
        loop_idle(state)

      {__MODULE__, :set_volume, volume} ->
        loop_idle(apply_volume(state, volume))

      {__MODULE__, :close} ->
        do_close(state)

      {:DOWN, _ref, :process, _pid, _reason} ->
        # The owner went away — don't leave the receiver held.
        do_close(state)

      _other ->
        loop_idle(state)
    after
      @keepalive_ms -> loop_idle(keepalive(state))
    end
  end

  # --- play loop (streaming the current track) -------------------------------

  defp loop_play(state) do
    case drain_commands(state) do
      {:stop, state} ->
        state |> stop_track() |> loop_idle()

      {:close, state} ->
        state |> stop_track() |> do_close()

      {:skip, path, opts, state} ->
        state |> stop_track() |> start_track(path, opts) |> resume()

      {:cont, state} ->
        case stream_batch(state) do
          {:cont, state} ->
            loop_play(state)

          {:eos, state} ->
            state = stop_track(state)
            notify_owner(state, {:ended, state.play_gen})
            loop_idle(%{state | play_gen: nil})
        end
    end
  end

  defp resume(state) do
    if state.decoder, do: loop_play(state), else: loop_idle(state)
  end

  # Non-blocking: apply any volume changes and report the first control command.
  defp drain_commands(state) do
    receive do
      {__MODULE__, :set_volume, volume} -> drain_commands(apply_volume(state, volume))
      {__MODULE__, :stop} -> {:stop, state}
      {__MODULE__, :close} -> {:close, state}
      {__MODULE__, :play, path, opts} -> {:skip, path, opts, state}
    after
      0 -> {:cont, state}
    end
  end

  defp stream_batch(state) do
    case Decoder.take(state.decoder, @take_batch) do
      {[], true} ->
        {:eos, state}

      {[], false} ->
        Process.sleep(@idle_poll_ms)
        {:cont, state}

      {frames, _eos?} ->
        {:cont, Enum.reduce(frames, state, &send_one_frame/2)}
    end
  end

  defp send_one_frame(frame, state) do
    first? = state.track_index == 0
    state = maybe_send_sync(first?, state)
    send_audio_packet(frame, first?, state)
    state = maybe_send_feedback(state)
    pace(state)

    %{
      state
      | seq: rem(state.seq + 1, 65_536),
        rtp: rem(state.rtp + @samples_per_packet, 4_294_967_296),
        track_index: state.track_index + 1
    }
  end

  defp maybe_send_sync(first?, state) do
    if first? or is_nil(state.last_sync) or state.rtp - state.last_sync >= @sample_rate do
      ptp_time = PtpBmca.receiver_time_ns(state.ptp) + state.render_delay_ns
      next_rtp = state.rtp + @samples_per_packet

      sync =
        Rtp.ptp_sync(
          state.sync_seq,
          state.rtp,
          ptp_time,
          next_rtp,
          state.clock_id,
          state.sync_seq == 0
        )

      :ok = :gen_udp.send(state.control_sock, state.host_ip, state.remote_control_port, sync)
      %{state | sync_seq: rem(state.sync_seq + 1, 65_536), last_sync: state.rtp}
    else
      state
    end
  end

  defp send_audio_packet(frame, first?, state) do
    payload = Alac.encode_stereo16(frame)
    payload_type = if first?, do: 0xE0, else: 0x60
    header = <<0x80, payload_type, state.seq::16, state.rtp::32, @ssrc::32>>

    packet =
      header <>
        Crypto.audio_encrypt(state.pairing.audio_key, state.rtp, @ssrc, state.seq, payload)

    :ok = :gen_udp.send(state.audio_sock, state.host_ip, state.data_port, packet)
  end

  defp maybe_send_feedback(state) do
    now_us = System.monotonic_time(:microsecond)

    if now_us - state.last_feedback_us >= @feedback_interval_us do
      %{state | pairing: feedback(state.pairing), last_feedback_us: now_us}
    else
      state
    end
  end

  defp pace(state) do
    target_us = state.track_started_us + (state.track_index + 1) * state.frame_duration_us
    sleep_us = target_us - System.monotonic_time(:microsecond)
    if sleep_us > 1_000, do: Process.sleep(div(sleep_us, 1_000))
  end

  # --- track lifecycle -------------------------------------------------------

  defp start_track(state, path, opts) do
    # FLUSH continuing the RTP timeline (seq/rtp), not resetting to zero — keeps
    # the receiver's RTP sequence monotonic across the warm connection.
    state = %{state | pairing: flush(state.pairing, state.seq, state.rtp)}
    state = apply_volume(state, Keyword.get(opts, :volume))

    case start_decoder(path, prebuffer_opts(state, opts)) do
      {:ok, decoder} ->
        %{
          state
          | decoder: decoder,
            play_gen: Keyword.get(opts, :playback_gen),
            track_started_us: System.monotonic_time(:microsecond),
            track_index: 0,
            last_sync: nil,
            streamed?: true
        }

      {:error, reason} ->
        notify_owner(state, {:error, Keyword.get(opts, :playback_gen), reason})
        %{state | decoder: nil, play_gen: nil}
    end
  end

  # Large prebuffer to prime the receiver on the first (cold) stream; a small one
  # on warm track changes so albums stay ~gapless.
  defp prebuffer_opts(%{streamed?: true}, opts),
    do: Keyword.put(opts, :prebuffer_frames, warm_prebuffer_frames(opts))

  defp prebuffer_opts(%{streamed?: false}, opts), do: opts

  defp warm_prebuffer_frames(opts),
    do: Keyword.get(opts, :warm_prebuffer_frames, @default_warm_prebuffer_frames)

  defp stop_track(%{decoder: nil} = state), do: state

  defp stop_track(%{decoder: decoder} = state) do
    Decoder.stop(decoder)
    %{state | decoder: nil}
  end

  defp do_close(state) do
    if state.decoder, do: Decoder.stop(state.decoder)
    close_event(state.event_sock)
    if state.ptp, do: PtpBmca.stop(state.ptp)
    if state.pairing, do: Rtsp2.close(state.pairing.rtsp)
    close_udp(state.control_sock)
    close_udp(state.audio_sock)
    :ok
  end

  defp keepalive(state) do
    %{
      state
      | pairing: feedback(state.pairing),
        last_feedback_us: System.monotonic_time(:microsecond)
    }
  end

  # --- RTSP control ----------------------------------------------------------

  defp flush(pairing, seq, rtp) do
    case Setup.flush(pairing, seq, rtp) do
      {:ok, status, _headers, _body, pairing} when status in 200..299 -> pairing
      _other -> pairing
    end
  end

  defp feedback(pairing) do
    case Setup.feedback(pairing) do
      {:ok, status, _headers, _body, pairing} when status in 200..299 -> pairing
      _other -> pairing
    end
  end

  defp apply_volume(state, nil), do: state

  defp apply_volume(state, volume) do
    pairing =
      case Setup.set_volume(state.pairing, volume) do
        {:ok, status, _headers, _body, pairing} when status in 200..299 -> pairing
        _other -> state.pairing
      end

    %{state | pairing: pairing}
  end

  defp notify_owner(%{owner: owner}, {:ended, gen}) when is_pid(owner),
    do: send(owner, {__MODULE__, :ended, gen})

  defp notify_owner(%{owner: owner}, {:error, gen, reason}) when is_pid(owner),
    do: send(owner, {__MODULE__, :error, gen, reason})

  defp notify_owner(_state, _msg), do: :ok

  # Stamp the caller-supplied DACP identity onto the RTSP state so SETUP (and
  # every later request) tells the receiver where to send on-device transport
  # commands. No-op when the caller doesn't supply tokens.
  defp stamp_dacp(pairing, opts) do
    %{
      pairing
      | rtsp: %{
          pairing.rtsp
          | dacp_id: Keyword.get(opts, :dacp_id),
            active_remote: Keyword.get(opts, :active_remote)
        }
    }
  end

  # --- handshake helpers (mirrors AirPlay.V2.Player) -------------------------

  defp setup_session(pairing, local_ip) do
    case Setup.session(pairing, local_addresses: [local_ip], timing_port: 319) do
      {:ok, 200, _headers, body, pairing} -> {:ok, body, pairing}
      {:ok, status, _headers, body, _pairing} -> {:error, {:setup_session, status, body}}
      error -> error
    end
  end

  defp setup_stream(pairing, control_port, opts) do
    case Setup.stream(pairing, control_port,
           latency_min: Keyword.get(opts, :latency_min, 22_050),
           latency_max: Keyword.get(opts, :latency_max, 88_200)
         ) do
      {:ok, 200, _headers, body, pairing} -> {:ok, body, pairing}
      {:ok, status, _headers, body, _pairing} -> {:error, {:setup_stream, status, body}}
      error -> error
    end
  end

  defp record(pairing) do
    case Setup.record(pairing, 0, 0) do
      {:ok, status, _headers, _body, pairing} when status in 200..299 -> {:ok, pairing}
      {:ok, status, _headers, body, _pairing} -> {:error, {:record, status, body}}
      error -> error
    end
  end

  defp connect_event(host, session_body) do
    event_port = session_body |> Plist.decode!() |> Map.get("eventPort")

    if is_integer(event_port) and event_port > 0 do
      case :gen_tcp.connect(String.to_charlist(host), event_port, [:binary, active: false], 2_000) do
        {:ok, sock} -> {:ok, sock}
        {:error, _reason} -> {:ok, nil}
      end
    else
      {:ok, nil}
    end
  end

  defp close_event(nil), do: :ok
  defp close_event(sock), do: :gen_tcp.close(sock)

  defp close_udp(nil), do: :ok
  defp close_udp(sock), do: :gen_udp.close(sock)

  defp open_control_socket do
    with {:ok, sock} <- :gen_udp.open(0, [:binary, active: false]),
         {:ok, port} <- :inet.port(sock) do
      {:ok, sock, port}
    end
  end

  defp wait_for_ptp(milliseconds) when is_integer(milliseconds) and milliseconds > 0 do
    Process.sleep(milliseconds)
    :ok
  end

  defp wait_for_ptp(_milliseconds), do: :ok

  defp stream_ports(stream_body) do
    case Plist.decode!(stream_body) do
      %{"streams" => [%{"dataPort" => data_port, "controlPort" => control_port} | _]}
      when is_integer(data_port) and is_integer(control_port) ->
        {:ok, data_port, control_port}

      decoded ->
        {:error, {:missing_stream_ports, decoded}}
    end
  end

  defp session_clock_id(session_body) do
    case Plist.decode!(session_body) do
      %{"timingPeerInfo" => %{"ClockID" => clock_id}} when is_integer(clock_id) ->
        <<clock_id::64>>

      _ ->
        nil
    end
  end

  defp start_decoder(path, opts) do
    args = Source.stream_args(path, opts)

    case Decoder.start_link(args: args, ffmpeg: Keyword.get(opts, :ffmpeg)) do
      {:ok, decoder} ->
        ready = Keyword.get(opts, :prebuffer_frames, @default_prebuffer_frames)
        timeout = Keyword.get(opts, :prebuffer_timeout_ms, @default_prebuffer_timeout_ms)

        case Decoder.await_ready(decoder, ready, timeout) do
          {:ok, _frames} ->
            {:ok, decoder}

          {:error, reason} ->
            Decoder.stop(decoder)
            {:error, {:decoder_not_ready, reason}}
        end

      error ->
        error
    end
  end

  defp local_ip(pairing) do
    case :inet.sockname(pairing.rtsp.sock) do
      {:ok, {{_, _, _, _} = ip, _port}} -> {:ok, ip_to_string(ip)}
      {:ok, {ip, _port}} -> {:ok, ip_to_string(ip)}
      error -> error
    end
  end

  defp local_ip_tuple(ip) when is_binary(ip) do
    {:ok, tuple} = ip |> String.to_charlist() |> :inet.parse_address()
    tuple
  end

  defp ip_to_string(ip), do: ip |> Tuple.to_list() |> Enum.join(".")

  defp parse_host({_, _, _, _} = ip), do: {:ok, ip}

  defp parse_host(host) when is_binary(host),
    do: host |> String.to_charlist() |> :inet.parse_address()
end