defmodule Membrane.Ogg.Demuxer do
@moduledoc """
A Membrane Element for demuxing an Ogg.
For now it supports only Ogg containing a single Opus track.
All the tracks in the Ogg must have a corresponding output pad linked (`Pad.ref(:output, track_id)`).
"""
use Membrane.Filter
require Membrane.Logger
alias Membrane.Ogg.Parser
alias Membrane.Ogg.Parser.Packet
alias Membrane.{Buffer, Opus, RemoteStream}
def_input_pad :input,
flow_control: :auto,
accepted_format: %RemoteStream{content_format: format} when format in [nil, Ogg]
def_output_pad :output,
flow_control: :auto,
accepted_format: %RemoteStream{type: :packetized, content_format: Opus}
defmodule State do
@moduledoc false
@type t :: %__MODULE__{
parser_acc: binary(),
continued_packet: binary(),
received_bos_packet: boolean()
}
defstruct parser_acc: <<>>,
continued_packet: nil,
received_bos_packet: false
end
@impl true
def handle_init(_ctx, _options) do
{[], %State{}}
end
@impl true
def handle_playing(_ctx, state) do
{[stream_format: {:output, %RemoteStream{type: :packetized, content_format: Opus}}], state}
end
@impl true
def handle_stream_format(:input, _stream_format, _ctx, state) do
{[], state}
end
@impl true
def handle_buffer(:input, %Buffer{payload: bytes}, _ctx, state) do
rest = state.parser_acc <> bytes
{parsed, new_continued_packet, rest} =
Parser.parse(rest, state.continued_packet)
state = %State{
state
| parser_acc: rest,
continued_packet: new_continued_packet
}
get_packet_actions(parsed, state)
end
@spec get_packet_actions([Packet.t()], State.t()) :: {[Membrane.Element.Action.t()], State.t()}
defp get_packet_actions(packets_list, state) do
{actions, packets_containing_bos_packet} =
Enum.flat_map_reduce(packets_list, state.received_bos_packet, &get_packet_action/2)
{actions, %State{state | received_bos_packet: packets_containing_bos_packet}}
end
@spec get_packet_action(Packet.t(), boolean()) :: {[Membrane.Element.Action.t()], boolean()}
defp get_packet_action(packet, received_bos_packet) do
case packet do
%Packet{bos?: true, payload: <<"OpusHead", _rest::binary>>} ->
if received_bos_packet do
raise "Multiple Opus streams in the input Ogg stream, currently unsupported"
end
{[], true}
%Packet{bos?: true, payload: _not_opushead} ->
raise "Invalid bos packet, probably unsupported codec."
%Packet{eos?: true, payload: <<>>} ->
{[], received_bos_packet}
%Packet{payload: <<"OpusTags", _rest::binary>>} ->
{[], received_bos_packet}
%Packet{payload: data_payload} ->
{[buffer: {:output, %Buffer{payload: data_payload}}], received_bos_packet}
end
end
end