lib/mongo/grid_fs/bucket.ex

defmodule Mongo.GridFs.Bucket do
  @moduledoc """

  This is the MongoDB Bucket struct, which specifies the underlying collections to support GridFS. There are always
  two kind of collections used for storing files by GridFS. The `fs.files` collection contains the meta information about
  the file and the `fs.chunks` collection contains the data of the file chunked into blocks of bytes. The names of the
  collection and the size of the block is defined by the following options:

  The bucket has some configuration options:
    * `:chunk_size` - The chunk size in bytes. Defaults to `255*1024`
    * `:name` - The bucket name. Defaults to `fs`

  The bucket checks whether the indexes already exist before attempting to create them. The names of the
  created indexes are "filename_1_uploadDate_1" and "files_id_1_n_1"

  """

  alias BSON.ObjectId
  alias Mongo.GridFs.Bucket

  ##
  # constants used in this module
  #
  @files_index_name "filename_1_uploadDate_1"
  @chunks_index_name "files_id_1_n_1"
  @defaults [name: "fs", chunk_size: 255 * 1024]

  @type t :: %__MODULE__{
          name: String.t(),
          chunk_size: non_neg_integer,
          topology_pid: GenServer.server()
        }

  defstruct name: "fs", chunk_size: 255 * 1024, topology_pid: nil, opts: []

  @doc """
  Creates a new Bucket with a existing connection using the default values. It just contains the
  name of the collections (fs) and the chunk size (255KB).

  The bucket checks the index for both collections as well. In case of multiple
  upload or downloads just create only one bucket and reuse it.

  """
  @spec new(GenServer.server(), Keyword.t()) :: Bucket.t()
  def new(topology_pid, options \\ []) do
    Keyword.merge(@defaults, options)
    |> Enum.reduce(%Bucket{topology_pid: topology_pid, opts: options}, fn {k, v}, bucket -> Map.put(bucket, k, v) end)
    |> check_indexes()
  end

  @doc """
  In case of using transaction you need to add the session to the bucket. This functions adds the session pid to
  the bucket.
  """
  def add_session(%Bucket{opts: opts} = bucket, session_opts) do
    %Bucket{bucket | opts: opts ++ session_opts}
  end

  @doc """
  Returns the collection name for the files collection, default is fs.files.
  """
  @spec files_collection_name(Bucket.t()) :: String.t()
  def files_collection_name(%Bucket{name: fs}), do: fs <> ".files"

  @doc """
  Returns the collection name for the chunks collection, default is fs.chunks.
  """
  @spec chunks_collection_name(Bucket.t()) :: String.t()
  def chunks_collection_name(%Bucket{name: fs}), do: fs <> ".chunks"

  @doc """
  Renames the stored file with the specified file_id.
  """
  @spec rename(Bucket.t(), BSON.ObjectId.t(), String.t()) :: Mongo.result(BSON.document())
  def rename(%Bucket{topology_pid: topology_pid, opts: opts} = bucket, file_id, new_filename) do
    query = %{_id: file_id}
    update = %{"$set" => %{filename: new_filename}}
    collection = files_collection_name(bucket)
    {:ok, _doc} = Mongo.find_one_and_update(topology_pid, collection, query, update, opts)
  end

  @doc """
  Given a `id`, delete this stored file’s files collection document and
  associated chunks from a GridFS bucket.
  """
  @spec delete(Bucket.t(), String.t()) :: {:ok, Mongo.DeleteResult.t()}
  def delete(%Bucket{} = bucket, file_id) when is_binary(file_id) do
    delete(bucket, ObjectId.decode!(file_id))
  end

  @spec delete(Bucket.t(), BSON.ObjectId.t()) :: {:ok, Mongo.DeleteResult.t()}
  def delete(%Bucket{topology_pid: topology_pid, opts: opts} = bucket, %BSON.ObjectId{} = oid) do
    # first delete files document
    collection = files_collection_name(bucket)
    {:ok, %Mongo.DeleteResult{deleted_count: _}} = Mongo.delete_one(topology_pid, collection, %{_id: oid}, opts)

    # then delete all chunk documents
    collection = chunks_collection_name(bucket)
    {:ok, %Mongo.DeleteResult{deleted_count: _}} = Mongo.delete_many(topology_pid, collection, %{files_id: oid}, opts)
  end

  @doc """
  Drops the files and chunks collections associated with
  this bucket.
  """
  @spec drop(Bucket.t()) :: :ok | {:error, Mongo.Error.t()}
  def drop(%Bucket{topology_pid: topology_pid, opts: opts} = bucket) do
    with :ok <- Mongo.drop_collection(topology_pid, files_collection_name(bucket), opts) do
      Mongo.drop_collection(topology_pid, chunks_collection_name(bucket), opts)
    end
  end

  @doc """
  Returns a cursor from the fs.files collection.
  """
  @spec find(Bucket.t(), BSON.document(), Keyword.t()) :: Mongo.cursor()
  def find(%Bucket{topology_pid: topology_pid} = bucket, filter, opts \\ []) do
    Mongo.find(topology_pid, files_collection_name(bucket), filter, opts)
  end

  @doc """
  Finds one file document by `file_id` specified either as a string or `BSON.ObjectId`.
  """
  def find_one(bucket, file_id)

  @spec find_one(Bucket.t(), String.t()) :: BSON.document() | nil
  def find_one(%Bucket{} = bucket, file_id) when is_binary(file_id) do
    find_one(bucket, ObjectId.decode!(file_id))
  end

  @spec find_one(Bucket.t(), BSON.ObjectId.t()) :: BSON.document() | nil
  def find_one(%Bucket{topology_pid: topology_pid, opts: opts} = bucket, %BSON.ObjectId{} = oid) do
    Mongo.find_one(topology_pid, files_collection_name(bucket), %{"_id" => oid}, opts)
  end

  ##
  # checks and creates indexes for the *.files and *.chunks collections
  #
  defp check_indexes(bucket) do
    case files_collection_empty?(bucket) do
      true ->
        create_indexes(bucket)

      false ->
        with :ok <- check_and_create_files_index(bucket),
             :ok <- check_and_chunks_files_index(bucket) do
          bucket
        end
    end
  end

  ##
  # create the collection and indexes for files and chunks
  #
  defp create_indexes(%Bucket{topology_pid: topology_pid, opts: opts} = bucket) do
    with :ok <- Mongo.create(topology_pid, files_collection_name(bucket), opts),
         :ok <- Mongo.create(topology_pid, chunks_collection_name(bucket), opts),
         :ok <- create_files_index(bucket),
         :ok <- create_chunks_index(bucket) do
      bucket
    end
  end

  def check_and_create_files_index(bucket) do
    case check_files_index(bucket) do
      false -> create_files_index(bucket)
      true -> :ok
    end
  end

  def check_and_chunks_files_index(bucket) do
    case check_chunks_index(bucket) do
      false -> create_chunks_index(bucket)
      true -> :ok
    end
  end

  ##
  # from the specs:
  #
  # To determine whether the files collection is empty drivers SHOULD execute the equivalent of the following shell command:
  #
  # db.fs.files.findOne({}, { _id : 1 })
  #
  defp files_collection_empty?(%Bucket{topology_pid: topology_pid} = bucket) do
    coll_name = files_collection_name(bucket)

    topology_pid
    |> Mongo.show_collections()
    |> Enum.find(fn name -> name == coll_name end)
    |> is_nil()
  end

  ##
  # Checks the indexes for the fs.files collection
  #
  defp check_files_index(%Bucket{topology_pid: topology_pid, opts: opts} = bucket) do
    index_member?(topology_pid, files_collection_name(bucket), @files_index_name, opts)
  end

  ##
  # Checks the indexes for the fs.chunks collection
  #
  defp check_chunks_index(%Bucket{topology_pid: topology_pid, opts: opts} = bucket) do
    index_member?(topology_pid, chunks_collection_name(bucket), @chunks_index_name, opts)
  end

  # returns true if the collection contains a index with the given name
  def index_member?(topology_pid, coll, index, opts) do
    topology_pid
    |> Mongo.list_indexes(coll, opts)
    |> Enum.map(fn
      %{"name" => name} -> name
      _ -> ""
    end)
    |> Enum.member?(index)
  end

  ##
  # Creates the indexes for the fs.chunks collection
  #
  defp create_chunks_index(%Bucket{topology_pid: topology_pid, opts: opts} = bucket) do
    Mongo.create_indexes(topology_pid, chunks_collection_name(bucket), [[key: [files_id: 1, n: 1], name: @chunks_index_name, unique: true]], opts)
  end

  ##
  # Creates the indexes for the fs.files collection
  #
  defp create_files_index(%Bucket{topology_pid: topology_pid, opts: opts} = bucket) do
    Mongo.create_indexes(topology_pid, files_collection_name(bucket), [[key: [filename: 1, uploadDate: 1], name: @files_index_name]], opts)
  end
end