Skip to main content

lib/air_play/decoder.ex

defmodule AirPlay.Decoder do
  @moduledoc """
  Stream-decode an audio file to AirPlay frames with bounded memory.

  Runs `ffmpeg` as a port, reads its interleaved s16le stdout, slices it into
  352-sample stereo frames, and hands them to `AirPlay.Player` on demand via
  `take/2`. The alternative — decoding the whole file to one PCM buffer (see
  `AirPlay.Source.pcm/1`) — costs ~2.4 GB for a 4-hour book; this keeps only a
  few seconds of frames around at a time.

  Flow control comes from ffmpeg's `-re` flag (read input at its native rate),
  set by `AirPlay.Source.stream_args/2`: ffmpeg only produces PCM at ~1× real
  time, which is the same rate `Player` drains it, so the buffer neither floods
  the mailbox nor grows without bound.
  """

  use GenServer
  require Logger

  @frame_samples 352
  @channels 2
  @bytes_per_sample 2
  @frame_bytes @frame_samples * @channels * @bytes_per_sample
  @default_ready_frames 16
  @default_ready_timeout 5_000

  @doc """
  Start a decoder. Options:
    * `:args` — full ffmpeg argument list (see `AirPlay.Source.stream_args/2`)
    * `:ffmpeg` — ffmpeg binary (defaults to one found on `PATH`)
  """
  def start_link(opts), do: GenServer.start_link(__MODULE__, opts)

  @doc """
  Take up to `n` buffered frames. Returns `{frames, eos?}`. `frames` may be
  shorter than `n` (or empty) if decoding is briefly behind; `eos?` is true once
  ffmpeg has finished and the buffer is drained.
  """
  @spec take(pid(), non_neg_integer()) :: {[binary()], boolean()}
  def take(pid, n), do: GenServer.call(pid, {:take, n})

  @doc """
  Wait until the decoder has buffered at least `min_frames` frames, or until
  ffmpeg exits before enough audio is available.
  """
  @spec await_ready(pid(), pos_integer(), timeout()) ::
          {:ok, non_neg_integer()} | {:error, term()}
  def await_ready(pid, min_frames \\ @default_ready_frames, timeout \\ @default_ready_timeout) do
    GenServer.call(pid, {:await_ready, min_frames}, timeout)
  end

  def stop(pid), do: GenServer.stop(pid)

  @impl true
  def init(opts) do
    ffmpeg = opts[:ffmpeg] || System.find_executable("ffmpeg") || "ffmpeg"
    args = Keyword.fetch!(opts, :args)

    port =
      Port.open(
        {:spawn_executable, ffmpeg_executable(ffmpeg)},
        [:binary, :exit_status, :use_stdio, args: args]
      )

    {:ok,
     %{
       port: port,
       buf: :queue.new(),
       leftover: <<>>,
       done?: false,
       exit_status: nil,
       waiters: []
     }}
  end

  # Port.open needs an absolute path; resolve a bare name on PATH.
  defp ffmpeg_executable(ffmpeg) do
    cond do
      String.contains?(ffmpeg, "/") -> ffmpeg
      found = System.find_executable(ffmpeg) -> found
      true -> ffmpeg
    end
  end

  @impl true
  def handle_call({:take, n}, _from, st) do
    {frames, buf} = pop(st.buf, n, [])
    eos? = st.done? and :queue.is_empty(buf)
    {:reply, {frames, eos?}, %{st | buf: buf}}
  end

  def handle_call({:await_ready, min_frames}, from, st) do
    case readiness(st, min_frames) do
      {:wait, st} -> {:noreply, %{st | waiters: [{from, min_frames} | st.waiters]}}
      reply -> {:reply, reply, st}
    end
  end

  @impl true
  def handle_info({port, {:data, bin}}, %{port: port} = st) do
    {frames, leftover} = chunk(st.leftover <> bin)
    buf = Enum.reduce(frames, st.buf, &:queue.in/2)
    {:noreply, %{st | buf: buf, leftover: leftover} |> reply_ready_waiters()}
  end

  def handle_info({port, {:exit_status, status}}, %{port: port} = st) do
    if status != 0, do: Logger.warning("[airplay] ffmpeg exited #{status}")

    # Pad and emit any trailing partial frame so the last fraction of a second
    # isn't dropped.
    buf =
      if st.leftover == <<>> do
        st.buf
      else
        pad = @frame_bytes - byte_size(st.leftover)
        :queue.in(st.leftover <> :binary.copy(<<0>>, pad), st.buf)
      end

    {:noreply,
     %{st | buf: buf, leftover: <<>>, done?: true, exit_status: status} |> reply_ready_waiters()}
  end

  def handle_info(_msg, st), do: {:noreply, st}

  defp pop(q, 0, acc), do: {Enum.reverse(acc), q}

  defp pop(q, n, acc) do
    case :queue.out(q) do
      {{:value, frame}, q2} -> pop(q2, n - 1, [frame | acc])
      {:empty, q2} -> {Enum.reverse(acc), q2}
    end
  end

  # Split a byte buffer into whole frames, returning the trailing remainder.
  defp chunk(bin) do
    whole = div(byte_size(bin), @frame_bytes) * @frame_bytes
    <<frames_bin::binary-size(^whole), rest::binary>> = bin
    frames = for <<frame::binary-size(@frame_bytes) <- frames_bin>>, do: frame
    {frames, rest}
  end

  defp readiness(st, min_frames) do
    frame_count = :queue.len(st.buf)

    cond do
      frame_count >= min_frames ->
        {:ok, frame_count}

      st.done? and frame_count > 0 ->
        {:ok, frame_count}

      st.done? and st.exit_status not in [nil, 0] ->
        {:error, {:ffmpeg_exit, st.exit_status}}

      st.done? ->
        {:error, :decoder_ended}

      true ->
        {:wait, st}
    end
  end

  defp reply_ready_waiters(st) do
    {ready, waiting} =
      Enum.split_with(st.waiters, fn {_from, min_frames} ->
        not match?({:wait, _}, readiness(st, min_frames))
      end)

    Enum.each(ready, fn {from, min_frames} ->
      GenServer.reply(from, readiness(st, min_frames))
    end)

    %{st | waiters: waiting}
  end
end