lib/membrane_stream_plugin/serializer.ex

defmodule Membrane.Stream.Serializer do
  @moduledoc """
  Element recording a stream of Membrane actions into binary format, suitable for saving to the file.

  Currently supported, as of V#{Membrane.Stream.Utils.get_current_version()}:
  - buffers
  - stream formats
  - events

  A stream can be replayed using `Membrane.Stream.Deserializer`:
  """
  use Membrane.Filter
  alias Membrane.{Buffer, RemoteStream}
  alias Membrane.Stream.Format.Header

  alias Membrane.Stream.Utils

  def_input_pad :input, accepted_format: _any
  def_output_pad :output, accepted_format: %RemoteStream{content_format: Membrane.Stream}

  def_options version: [
                spec: Membrane.Stream.Format.Version.t(),
                default: Utils.get_current_version()
              ]

  @impl true
  def handle_init(_ctx, %__MODULE__{version: version}) do
    {:ok, serializer_fn} = Utils.get_serializer(version)
    {[], %{serializer_fn: serializer_fn, version: version}}
  end

  @impl true
  def handle_playing(_ctx, state) do
    stream_format = %RemoteStream{content_format: Membrane.Stream}

    header =
      state.version
      |> Header.build()
      |> then(&%Buffer{payload: &1})

    {[stream_format: {:output, stream_format}, buffer: {:output, header}], state}
  end

  @impl true
  def handle_stream_format(:input, stream_format, _ctx, state) do
    process({:stream_format, stream_format}, state)
  end

  @impl true
  def handle_buffer(:input, buffer, _ctx, state) do
    process({:buffer, buffer}, state)
  end

  @impl true
  def handle_event(:input, event, _ctx, state) do
    process({:event, event}, state)
  end

  @impl true
  def handle_end_of_stream(:input, _ctx, state), do: {[end_of_stream: :output], state}

  defp process(action, state) do
    serialized = state.serializer_fn.(action)
    {[buffer: {:output, %Buffer{payload: serialized}}], state}
  end
end