lib/membrane_stream_plugin/deserializer.ex

defmodule Membrane.Stream.Deserializer do
  @moduledoc """
  Element restoring recorded data in Membrane.Stream format, as captured by `Membrane.Stream.Serializer`
  """
  use Membrane.Filter

  alias Membrane.{Buffer, RemoteStream}
  alias Membrane.Caps.Matcher

  alias Membrane.Stream.Format.Header
  alias Membrane.Stream.Utils

  def_input_pad :input,
    caps: {RemoteStream, content_format: Matcher.one_of([nil, Membrane.Stream])},
    demand_unit: :buffers,
    demand_mode: :auto

  def_output_pad :output,
    caps: :any,
    demand_mode: :auto

  @impl true
  def handle_init(_opts) do
    {:ok, %{partial: <<>>, header_read?: false, parser_fn: nil}}
  end

  @impl true
  def handle_caps(:input, _caps, _ctx, state), do: {:ok, state}

  @impl true
  def handle_process(:input, %Buffer{payload: payload}, ctx, %{header_read?: false} = state) do
    data = state.partial <> payload
    state = %{state | partial: data}

    case Header.parse(data) do
      {:ok, %Header{version: version}, leftover} ->
        {:ok, parser_fn} = Utils.get_parser(version)
        state = %{state | parser_fn: parser_fn, partial: leftover, header_read?: true}
        handle_process(:input, %Buffer{payload: ""}, ctx, state)

      {:error, :not_enough_data} ->
        {:ok, state}

      {:error, reason} ->
        raise "Failed to parse MSR header with reason: #{inspect(reason)}. Header: #{inspect(data, limit: :infinity)}"
    end
  end

  @impl true
  def handle_process(:input, %Buffer{payload: payload}, _ctx, %{header_read?: true} = state) do
    data = state.partial <> payload
    state = %{state | partial: data}

    case state.parser_fn.(data) do
      {:ok, actions, leftover} ->
        {{:ok, actions}, %{state | partial: leftover}}

      {:error, :not_enough_data} ->
        {:ok, state}

      {:error, reason} ->
        raise "Failed to parse Membrane Stream payload with reason: #{inspect(reason)}. Data: #{inspect(data, limit: :infinity)}"
    end
  end
end