lib/mongo/grid_fs/upload_stream.ex

defmodule Mongo.GridFs.UploadStream do
  @moduledoc """
  This is the upload stream for save streams into the grid fs.

  First you need to create a bucket. The bucket contains the configuration for the grid fs.

  ## Example:
  streaming the file `./test/data/test.jpg` into the grid fs using the upload-stream

      bucket = Bucket.new( pid )
      upload_stream = Upload.open_upload_stream(bucket, "test.jpg", j: true)

      src_filename = "./test/data/test.jpg"
      File.stream!(src_filename, [], 512) |> Stream.into(upload_stream) |> Stream.run()

  """

  import Record, only: [defrecordp: 2]

  alias Mongo.GridFs.Bucket
  alias Mongo.GridFs.UploadStream

  @type file_id :: BSON.ObjectId.t() | binary()
  @type t :: %__MODULE__{
          bucket: Bucket.t(),
          id: file_id(),
          filename: String.t(),
          metadata: {BSON.document() | nil}
        }
  defstruct bucket: nil, id: nil, filename: nil, metadata: nil

  @doc """
  Creates a new upload stream to insert a file into the grid-fs.
  """
  @spec new(Bucket.t(), String.t(), BSON.document() | nil, file_id() | nil) :: UploadStream.t()
  def new(bucket, filename, metadata \\ nil, file_id \\ nil) do
    %UploadStream{bucket: bucket, filename: filename, id: file_id || Mongo.object_id(), metadata: metadata}
  end

  defimpl Collectable, for: UploadStream do
    ##
    # buffer is the current buffer
    # number is the current chunk number
    #
    defrecordp(:state, [:buffer, :number])

    def into(%UploadStream{} = stream) do
      {state(buffer: <<>>, number: 0), make_fun(stream)}
    end

    def make_fun(%UploadStream{bucket: %{chunk_size: chunk_size}} = stream) do
      fn
        # case: buffer is full
        state(buffer: bin) = s, {:cont, x} when byte_size(bin) >= chunk_size ->
          state(buffer: rest, number: next) = write_buffer(stream, s)
          state(buffer: rest <> x, number: next)

        # case: buffer is empty
        state(buffer: bin, number: n), {:cont, x} ->
          state(buffer: bin <> x, number: n)

        # stream stops, write the rest
        state() = s, :done ->
          _ = flush_buffer(stream, s)
          stream

        # steam halts, write the rest
        state() = s, :halt ->
          _ = flush_buffer(stream, s)
          :ok
      end
    end

    ##
    # flushes the buffer and creates the files document
    #
    defp flush_buffer(
           %UploadStream{bucket: %Bucket{topology_pid: topology_pid, chunk_size: chunk_size, opts: opts} = bucket, filename: filename, id: file_id, metadata: metadata},
           state(buffer: buffer, number: chunk_number)
         ) do
      collection = Bucket.chunks_collection_name(bucket)
      length = chunk_number * chunk_size + byte_size(buffer)
      insert_one_chunk_document(topology_pid, collection, file_id, buffer, chunk_number, opts)

      collection = Bucket.files_collection_name(bucket)
      insert_one_file_document(topology_pid, collection, file_id, length, chunk_size, filename, metadata, opts)
    end

    ##
    # checks if the buffer is smaller than the chunk-size
    # in this case we do nothing
    #
    defp write_buffer(
           %UploadStream{bucket: %Bucket{chunk_size: chunk_size}},
           state(buffer: buffer) = s
         )
         when byte_size(buffer) < chunk_size do
      s
    end

    ##
    # otherwise we
    # write the data to the chunk collections and call the function again with the rest of the buffer
    # for the case that the buffer size is still greater than the chunk size
    #
    defp write_buffer(
           %UploadStream{bucket: %Bucket{topology_pid: topology_pid, chunk_size: chunk_size, opts: opts} = bucket, id: file_id} = stream,
           state(buffer: buffer, number: chunk_number)
         ) do
      collection = Bucket.chunks_collection_name(bucket)

      fun = fn <<data::bytes-size(chunk_size), rest::binary>> ->
        next = insert_one_chunk_document(topology_pid, collection, file_id, data, chunk_number, opts)
        state(buffer: rest, number: next)
      end

      # write the buffer
      new_state = fun.(buffer)

      # try to write the rest of the buffer
      write_buffer(stream, new_state)
    end

    ##
    # inserts one chunk document
    #
    defp insert_one_chunk_document(_topology_pid, _collection, _file_id, data, chunk_number, _opts) when byte_size(data) == 0 do
      chunk_number
    end

    defp insert_one_chunk_document(topology_pid, collection, file_id, binary, chunk_number, opts) do
      bson_binary = %BSON.Binary{binary: binary}
      {:ok, _} = Mongo.insert_one(topology_pid, collection, %{files_id: file_id, n: chunk_number, data: bson_binary}, opts)
      chunk_number + 1
    end

    ##
    # inserts one file document
    #
    defp insert_one_file_document(topology_pid, collection, file_id, length, chunk_size, filename, metadata, opts) do
      doc =
        %{_id: file_id, length: length, filename: filename, chunkSize: chunk_size, uploadDate: now(), metadata: metadata}
        |> filter_nils()

      {:ok, _} = Mongo.insert_one(topology_pid, collection, doc, opts)
    end

    defp now(), do: DateTime.from_unix!(:os.system_time(), :native)

    defp filter_nils(map) when is_map(map) do
      Enum.reject(map, fn {_key, value} -> is_nil(value) end)
      |> Enum.into(%{})
    end
  end
end