defmodule Membrane.Element.IVF.Deserializer do
@moduledoc """
Deserializer is capable of converting stream representing video in IVF format
into stream of Membrane.Buffer's with video frames with correct timestamps in
Membrane timebase (it is 1 nanosecond = 1/(10^9)[s])
"""
use Membrane.Filter
use Ratio
alias Membrane.{Time, RemoteStream, Buffer}
alias Membrane.{VP9, VP8}
alias Membrane.Element.IVF.Headers
alias Membrane.Element.IVF.Headers.FrameHeader
def_input_pad :input, caps: :any, demand_unit: :buffers
def_output_pad :output,
caps: {RemoteStream, content_format: one_of([VP9, VP8]), type: :packetized}
defmodule State do
@moduledoc false
@doc """
frame_acc is tuple of {bytes_left_to_accumulate, accumulated_binary}
When bytes_left_to_accumulate is equal to 0 it means that whole frame has been accumulated
"""
defstruct [:timebase, frame_acc: <<>>, start_of_stream?: true]
end
@impl true
def handle_init(_options) do
{:ok, %State{}}
end
@impl true
def handle_demand(:output, size, :buffers, _ctx, state) do
{{:ok, demand: {:input, size}}, state}
end
@impl true
def handle_process(:input, buffer, _ctx, %State{start_of_stream?: true} = state) do
state = %State{state | frame_acc: state.frame_acc <> buffer.payload}
with {:ok, file_header, rest} <- Headers.parse_ivf_header(state.frame_acc),
{:ok, buffer, rest} <- get_buffer(rest, file_header.scale <|> file_header.rate) do
caps =
case file_header.four_cc do
"VP90" -> %Membrane.RemoteStream{content_format: VP9, type: :packetized}
"VP80" -> %Membrane.RemoteStream{content_format: VP8, type: :packetized}
end
{{:ok, caps: {:output, caps}, buffer: {:output, buffer}, redemand: :output},
%State{
frame_acc: rest,
start_of_stream?: false,
timebase: file_header.scale <|> file_header.rate
}}
else
{:error_too_short, _payload} ->
{{:ok, redemand: :output}, state}
_error ->
{:ok, %State{}}
end
end
def handle_process(:input, buffer, _ctx, state) do
state = %State{state | frame_acc: state.frame_acc <> buffer.payload}
case flush_acc(state, []) do
{:ok, buffers, state} ->
{{:ok, buffer: {:output, buffers}, redemand: :output}, state}
{:error_too_short, payload} ->
{{:ok, redemand: :output}, %State{state | frame_acc: payload}}
error ->
error
end
end
defp flush_acc(state, buffers) do
case get_buffer(state.frame_acc, state.timebase) do
{:ok, buffer, rest} -> flush_acc(%State{state | frame_acc: rest}, [buffer | buffers])
_error -> {:ok, buffers |> Enum.reverse(), state}
end
end
defp get_buffer(payload, timebase) do
with {:ok, %FrameHeader{size_of_frame: size_of_frame, timestamp: timestamp}, rest} <-
Headers.parse_ivf_frame_header(payload),
<<frame::binary-size(size_of_frame), rest::binary()>> <- rest do
timestamp = timestamp * (timebase * Time.second())
{:ok, %Buffer{metadata: %{timestamp: timestamp}, payload: frame}, rest}
else
_error -> {:error_too_short, payload}
end
end
end