lib/membrane_file/source.ex

defmodule Membrane.File.Source do
  @moduledoc """
  Element that reads chunks of data from given file and sends them as buffers
  through the output pad.
  May also read from standard input by setting location to :stdin.

  Can work in two modes, determined by the `seekable?` option.
  Seekable mode is not supported when reading from standard input.
  """
  use Membrane.Source

  alias Membrane.{Buffer, RemoteStream}
  alias Membrane.File.NewSeekEvent
  alias Membrane.File.SeekSourceEvent

  @common_file Membrane.File.CommonFileBehaviour.get_impl()

  def_options location: [
                spec: Path.t() | :stdin,
                description: "Path to the file or :stdin"
              ],
              chunk_size: [
                spec: pos_integer(),
                default: 2048,
                description: "Size of chunks being read"
              ],
              seekable?: [
                spec: boolean(),
                default: false,
                description: """
                With `seekable?: false`, the source will start reading data from the file exactly the moment it starts
                playing and will read it till the end of file, setting the `end_of_stream` action on the `:output` pad
                when the reading is done.
                With `seekable?: true`, the process of reading is driven by receiving `Membrane.File.SeekSourceEvent` events.
                The source working in `seekable?: true` mode won't send any data before that event is received.
                For more information about how to steer reading in `seekable?: true` mode, see: `Membrane.File.SeekSourceEvent`.
                """
              ]

  def_output_pad :output, accepted_format: %RemoteStream{type: :bytestream}, flow_control: :manual

  @impl true
  def handle_init(_ctx, %__MODULE__{location: :stdin, chunk_size: size, seekable?: seekable?}) do
    if seekable? do
      raise "Cannot seek when reading from :stdin"
    else
      {[],
       %{
         location: :stdin,
         chunk_size: size,
         should_send_eos: true,
         size_to_read: :infinity,
         seekable?: false
       }}
    end
  end

  @impl true
  def handle_init(_ctx, %__MODULE__{location: location, chunk_size: size, seekable?: seekable?}) do
    size_to_read = if seekable?, do: 0, else: :infinity

    {[],
     %{
       location: Path.expand(location),
       chunk_size: size,
       fd: nil,
       should_send_eos?: not seekable?,
       size_to_read: size_to_read,
       seekable?: seekable?
     }}
  end

  @impl true
  def handle_setup(_ctx, %{location: :stdin} = state) do
    {[], state}
  end

  @impl true
  def handle_setup(_ctx, %{location: location} = state) do
    fd = @common_file.open!(location, :read)

    {[], %{state | fd: fd}}
  end

  @impl true
  def handle_playing(_ctx, state) do
    {[stream_format: {:output, %RemoteStream{type: :bytestream}}], state}
  end

  @impl true
  def handle_event(
        :output,
        %SeekSourceEvent{start: seek_start, size_to_read: size_to_read, last?: last?},
        _ctx,
        %{seekable?: true} = state
      ) do
    @common_file.seek!(state.fd, seek_start)

    {[event: {:output, %NewSeekEvent{}}, redemand: :output],
     %{state | should_send_eos?: last?, size_to_read: size_to_read}}
  end

  @impl true
  def handle_event(
        :output,
        %SeekSourceEvent{},
        _ctx,
        %{seekable?: false}
      ) do
    raise "Cannot handle `Membrane.File.SeekSourceEvent` in a `#{__MODULE__}` with `seekable?: false` option."
  end

  @impl true
  def handle_demand(:output, _size, :buffers, _ctx, %{chunk_size: chunk_size} = state),
    do: supply_demand(chunk_size, [redemand: :output], state)

  def handle_demand(:output, size, :bytes, _ctx, state),
    do: supply_demand(size, [], state)

  @impl true
  def handle_terminate_request(_ctx, %{location: :stdin} = state) do
    {[terminate: :normal], state}
  end

  @impl true
  def handle_terminate_request(_ctx, state) do
    @common_file.close!(state.fd)

    {[terminate: :normal], %{state | fd: nil}}
  end

  defp supply_demand(demand_size, redemand, %{size_to_read: :infinity} = state) do
    do_supply_demand(demand_size, redemand, state)
  end

  defp supply_demand(_demand_size, _redemand, %{size_to_read: 0} = state) do
    {[], state}
  end

  defp supply_demand(demand_size, redemand, %{size_to_read: size_to_read} = state) do
    do_supply_demand(min(demand_size, size_to_read), redemand, state)
  end

  defp do_supply_demand(to_supply_size, redemand, %{location: :stdin} = state) do
    {buffer_actions, supplied_size} =
      case IO.binread(to_supply_size) do
        <<payload::binary>> ->
          {[buffer: {:output, %Buffer{payload: payload}}], byte_size(payload)}

        :eof ->
          {[], 0}
      end

    actions =
      buffer_actions ++
        cond do
          supplied_size < to_supply_size ->
            [end_of_stream: :output]

          supplied_size == to_supply_size ->
            redemand

          true ->
            []
        end

    {actions, state}
  end

  defp do_supply_demand(to_supply_size, redemand, state) do
    {buffer_actions, supplied_size} =
      case @common_file.binread!(state.fd, to_supply_size) do
        <<payload::binary>> ->
          {[buffer: {:output, %Buffer{payload: payload}}], byte_size(payload)}

        :eof ->
          {[], 0}
      end

    new_size_to_read =
      if state.size_to_read == :infinity, do: :infinity, else: state.size_to_read - supplied_size

    state = %{state | size_to_read: new_size_to_read}

    actions =
      buffer_actions ++
        cond do
          state.should_send_eos? and (state.size_to_read == 0 or supplied_size < to_supply_size) ->
            [end_of_stream: :output]

          to_supply_size == supplied_size ->
            redemand

          true ->
            []
        end

    {actions, state}
  end
end