defmodule AirPlay.Decoder do
@moduledoc """
Stream-decode an audio file to AirPlay frames with bounded memory.
Runs `ffmpeg` as a port, reads its interleaved s16le stdout, slices it into
352-sample stereo frames, and hands them to `AirPlay.Player` on demand via
`take/2`. The alternative — decoding the whole file to one PCM buffer (see
`AirPlay.Source.pcm/1`) — costs ~2.4 GB for a 4-hour book; this keeps only a
few seconds of frames around at a time.
Flow control comes from ffmpeg's `-re` flag (read input at its native rate),
set by `AirPlay.Source.stream_args/2`: ffmpeg only produces PCM at ~1× real
time, which is the same rate `Player` drains it, so the buffer neither floods
the mailbox nor grows without bound.
"""
use GenServer
require Logger
@frame_samples 352
@channels 2
@bytes_per_sample 2
@frame_bytes @frame_samples * @channels * @bytes_per_sample
@default_ready_frames 16
@default_ready_timeout 5_000
@doc """
Start a decoder. Options:
* `:args` — full ffmpeg argument list (see `AirPlay.Source.stream_args/2`)
* `:ffmpeg` — ffmpeg binary (defaults to one found on `PATH`)
"""
def start_link(opts), do: GenServer.start_link(__MODULE__, opts)
@doc """
Take up to `n` buffered frames. Returns `{frames, eos?}`. `frames` may be
shorter than `n` (or empty) if decoding is briefly behind; `eos?` is true once
ffmpeg has finished and the buffer is drained.
"""
@spec take(pid(), non_neg_integer()) :: {[binary()], boolean()}
def take(pid, n), do: GenServer.call(pid, {:take, n})
@doc """
Wait until the decoder has buffered at least `min_frames` frames, or until
ffmpeg exits before enough audio is available.
"""
@spec await_ready(pid(), pos_integer(), timeout()) ::
{:ok, non_neg_integer()} | {:error, term()}
def await_ready(pid, min_frames \\ @default_ready_frames, timeout \\ @default_ready_timeout) do
GenServer.call(pid, {:await_ready, min_frames}, timeout)
end
def stop(pid), do: GenServer.stop(pid)
@impl true
def init(opts) do
ffmpeg = opts[:ffmpeg] || System.find_executable("ffmpeg") || "ffmpeg"
args = Keyword.fetch!(opts, :args)
port =
Port.open(
{:spawn_executable, ffmpeg_executable(ffmpeg)},
[:binary, :exit_status, :use_stdio, args: args]
)
{:ok,
%{
port: port,
buf: :queue.new(),
leftover: <<>>,
done?: false,
exit_status: nil,
waiters: []
}}
end
# Port.open needs an absolute path; resolve a bare name on PATH.
defp ffmpeg_executable(ffmpeg) do
cond do
String.contains?(ffmpeg, "/") -> ffmpeg
found = System.find_executable(ffmpeg) -> found
true -> ffmpeg
end
end
@impl true
def handle_call({:take, n}, _from, st) do
{frames, buf} = pop(st.buf, n, [])
eos? = st.done? and :queue.is_empty(buf)
{:reply, {frames, eos?}, %{st | buf: buf}}
end
def handle_call({:await_ready, min_frames}, from, st) do
case readiness(st, min_frames) do
{:wait, st} -> {:noreply, %{st | waiters: [{from, min_frames} | st.waiters]}}
reply -> {:reply, reply, st}
end
end
@impl true
def handle_info({port, {:data, bin}}, %{port: port} = st) do
{frames, leftover} = chunk(st.leftover <> bin)
buf = Enum.reduce(frames, st.buf, &:queue.in/2)
{:noreply, %{st | buf: buf, leftover: leftover} |> reply_ready_waiters()}
end
def handle_info({port, {:exit_status, status}}, %{port: port} = st) do
if status != 0, do: Logger.warning("[airplay] ffmpeg exited #{status}")
# Pad and emit any trailing partial frame so the last fraction of a second
# isn't dropped.
buf =
if st.leftover == <<>> do
st.buf
else
pad = @frame_bytes - byte_size(st.leftover)
:queue.in(st.leftover <> :binary.copy(<<0>>, pad), st.buf)
end
{:noreply,
%{st | buf: buf, leftover: <<>>, done?: true, exit_status: status} |> reply_ready_waiters()}
end
def handle_info(_msg, st), do: {:noreply, st}
defp pop(q, 0, acc), do: {Enum.reverse(acc), q}
defp pop(q, n, acc) do
case :queue.out(q) do
{{:value, frame}, q2} -> pop(q2, n - 1, [frame | acc])
{:empty, q2} -> {Enum.reverse(acc), q2}
end
end
# Split a byte buffer into whole frames, returning the trailing remainder.
defp chunk(bin) do
whole = div(byte_size(bin), @frame_bytes) * @frame_bytes
<<frames_bin::binary-size(^whole), rest::binary>> = bin
frames = for <<frame::binary-size(@frame_bytes) <- frames_bin>>, do: frame
{frames, rest}
end
defp readiness(st, min_frames) do
frame_count = :queue.len(st.buf)
cond do
frame_count >= min_frames ->
{:ok, frame_count}
st.done? and frame_count > 0 ->
{:ok, frame_count}
st.done? and st.exit_status not in [nil, 0] ->
{:error, {:ffmpeg_exit, st.exit_status}}
st.done? ->
{:error, :decoder_ended}
true ->
{:wait, st}
end
end
defp reply_ready_waiters(st) do
{ready, waiting} =
Enum.split_with(st.waiters, fn {_from, min_frames} ->
not match?({:wait, _}, readiness(st, min_frames))
end)
Enum.each(ready, fn {from, min_frames} ->
GenServer.reply(from, readiness(st, min_frames))
end)
%{st | waiters: waiting}
end
end