lib/membrane_file/source.ex

defmodule Membrane.File.Source do
  @moduledoc """
  Element that reads chunks of data from given file and sends them as buffers
  through the output pad.
  """
  use Membrane.Source

  alias Membrane.{Buffer, RemoteStream}

  @common_file Membrane.File.CommonFileBehaviour.get_impl()

  def_options location: [
                spec: Path.t(),
                description: "Path to the file"
              ],
              chunk_size: [
                spec: pos_integer(),
                default: 2048,
                description: "Size of chunks being read"
              ]

  def_output_pad :output, accepted_format: %RemoteStream{type: :bytestream}

  @impl true
  def handle_init(_ctx, %__MODULE__{location: location, chunk_size: size}) do
    {[],
     %{
       location: Path.expand(location),
       chunk_size: size,
       fd: nil
     }}
  end

  @impl true
  def handle_setup(ctx, %{location: location} = state) do
    fd = @common_file.open!(location, :read)

    Membrane.ResourceGuard.register(
      ctx.resource_guard,
      fn -> @common_file.close!(fd) end
    )

    {[], %{state | fd: fd}}
  end

  @impl true
  def handle_playing(_ctx, state) do
    {[stream_format: {:output, %RemoteStream{type: :bytestream}}], state}
  end

  @impl true
  def handle_demand(:output, _size, :buffers, _ctx, %{chunk_size: chunk_size} = state),
    do: supply_demand(chunk_size, [redemand: :output], state)

  def handle_demand(:output, size, :bytes, _ctx, state),
    do: supply_demand(size, [], state)

  defp supply_demand(size, redemand, %{fd: fd} = state) do
    actions =
      case @common_file.binread!(fd, size) do
        <<payload::binary>> when byte_size(payload) == size ->
          [buffer: {:output, %Buffer{payload: payload}}] ++ redemand

        <<payload::binary>> when byte_size(payload) < size ->
          [buffer: {:output, %Buffer{payload: payload}}, end_of_stream: :output]

        :eof ->
          [end_of_stream: :output]
      end

    {actions, state}
  end
end