defmodule Mongo.Repo do
@moduledoc """
Defines a repository.
A repository serves as a convenience module for a mongodb instance.
To include the `Mongo.Repo` module in your application, you can put the use macro in your
app's Repo module.
defmodule MyApp.Repo do
use Mongo.Repo,
otp_app: :my_app,
topology: :mongo
end
With that in place we can configure the Repo:
config :my_app, MyApp.Repo,
url: "mongodb://localhost:27017/my-app-dev",
timeout: 60_000,
idle_interval: 10_000,
queue_target: 5_000
For a complete list of configuration options take a look at `Mongo`.
Finally we can add the `Mongo.Repo` instance to our application supervision tree
children = [
# ...
MyApp.Repo,
# ...
]
## Read-only repositories
To explicitly set a repository as read-only, we can pass in the `:read_only` flag to `use`:
use Mongo.Repo,
otp_app: :my_app,
topology: :mongo,
read_only: true
The read-only option will not include any write operation related functions in the module.
"""
@type t() :: module()
@doc false
defmacro __using__(opts) do
quote bind_quoted: [opts: opts] do
@behaviour Mongo.Repo
@topology opts[:topology] || :mongo
@otp_app opts[:otp_app]
@read_only opts[:read_only] || false
def config() do
@otp_app
|> Application.get_env(__MODULE__, [])
|> Keyword.put_new(:name, @topology)
end
def child_spec(_opts) do
Mongo.child_spec(config())
end
unless @read_only do
def insert(%{__struct__: module} = doc, opts \\ []) do
collection = module.__collection__(:collection)
doc = module.timestamps(doc)
case Mongo.insert_one(@topology, collection, module.dump(doc), opts) do
{:error, reason} -> {:error, reason}
{:ok, %{inserted_id: id}} -> {:ok, %{doc | _id: id}}
end
end
def update(%{__struct__: module, _id: id} = doc, opts \\ []) do
collection = module.__collection__(:collection)
doc = module.timestamps(doc)
case Mongo.update_one(@topology, collection, %{_id: id}, %{"$set" => module.dump(doc)}, opts) do
{:error, reason} -> {:error, reason}
{:ok, %{modified_count: _}} -> {:ok, doc}
end
end
def insert_or_update(%{__struct__: module, _id: id} = doc, opts \\ []) do
collection = module.__collection__(:collection)
doc = module.timestamps(doc)
opts = Keyword.put(opts, :upsert, true)
case Mongo.update_one(@topology, collection, %{_id: id}, %{"$set" => module.dump(doc)}, opts) do
{:error, reason} -> {:error, reason}
{:ok, %{upserted_ids: [id]}} -> {:ok, %{doc | _id: id}}
{:ok, %{modified_count: _}} -> {:ok, doc}
end
end
def delete(%{__struct__: module, _id: id} = doc, opts \\ []) do
collection = module.__collection__(:collection)
case Mongo.delete_one(@topology, collection, %{_id: id}, opts) do
{:ok, %{deleted_count: 1}} -> {:ok, doc}
{:ok, %{deleted_count: 0}} -> {:error, :not_found}
{:error, reason} -> {:error, reason}
end
end
def delete_by(module, filter \\ %{}, opts \\ []) do
collection = module.__collection__(:collection)
case Mongo.delete_one(@topology, collection, module.dump_part(filter), opts) do
{:ok, %{deleted_count: 1}} -> {:ok, filter}
{:ok, %{deleted_count: 0}} -> {:error, :not_found}
{:error, reason} -> {:error, reason}
end
end
def insert!(%{__struct__: module} = doc, opts \\ []) do
collection = module.__collection__(:collection)
doc = module.timestamps(doc)
%{inserted_id: id} = Mongo.insert_one!(@topology, collection, module.dump(doc), opts)
%{doc | _id: id}
end
def update!(%{__struct__: module, _id: id} = doc, opts \\ []) do
collection = module.__collection__(:collection)
doc = module.timestamps(doc)
Mongo.update_one!(@topology, collection, %{_id: id}, %{"$set" => module.dump(doc)}, opts)
doc
end
def insert_or_update!(%{__struct__: module, _id: id} = doc, opts \\ []) do
collection = module.__collection__(:collection)
doc = module.timestamps(doc)
opts = Keyword.put(opts, :upsert, true)
update_one_result = Mongo.update_one!(@topology, collection, %{_id: id}, %{"$set" => module.dump(doc)}, opts)
case update_one_result do
%{upserted_ids: [id]} -> %{doc | _id: id}
%{modified_count: _} -> doc
end
end
def delete!(%{__struct__: module, _id: id} = doc, opts \\ []) do
collection = module.__collection__(:collection)
delete_result = Mongo.delete_one!(@topology, collection, %{_id: id}, opts)
case delete_result do
%{deleted_count: 1} -> doc
%{deleted_count: 0} -> raise Mongo.Error.exception("Document not found")
end
end
def insert_all(module, entries, opts \\ []) do
collection = module.__collection__(:collection)
case Mongo.insert_many(@topology, collection, entries, opts) do
{:ok, %{inserted_ids: ids}} -> {:ok, length(ids), ids}
error -> error
end
end
def update_all(module, filter \\ %{}, update, opts \\ []) do
collection = module.__collection__(:collection)
Mongo.update_many(@topology, collection, module.dump_part(filter), update, opts)
end
def delete_all(module, filter \\ %{}, opts \\ []) do
collection = module.__collection__(:collection)
Mongo.delete_many(@topology, collection, module.dump_part(filter), opts)
end
end
def transaction(fun, opts \\ []) do
Mongo.transaction(@topology, fun, opts)
end
def all(module, filter \\ %{}, opts \\ []) do
collection = module.__collection__(:collection)
case Mongo.find(@topology, collection, filter, opts) do
{:error, _reason} = error -> error
cursor -> Enum.map(cursor, &module.load/1)
end
end
def stream(module, filter \\ %{}, opts \\ []) do
collection = module.__collection__(:collection)
case Mongo.find(@topology, collection, module.dump_part(filter), opts) do
{:error, _reason} = error -> error
cursor -> Stream.map(cursor, &module.load/1)
end
end
def aggregate(module, pipeline, opts \\ []) do
collection = module.__collection__(:collection)
case Mongo.aggregate(@topology, collection, pipeline, opts) do
{:error, _reason} = error -> error
cursor -> Enum.map(cursor, &module.load/1)
end
end
def get(module, id, opts \\ []) do
collection = module.__collection__(:collection)
case Mongo.find_one(@topology, collection, %{_id: id}, opts) do
{:error, _reason} = error -> error
value -> module.load(value)
end
end
def get_by(module, filter \\ %{}, opts \\ []) do
collection = module.__collection__(:collection)
case Mongo.find_one(@topology, collection, module.dump_part(filter), opts) do
{:error, _reason} = error -> error
value -> module.load(value)
end
end
def fetch(module, id, opts \\ []) do
case get(module, id, opts) do
{:error, _reason} = error -> error
nil -> {:error, :not_found}
doc -> {:ok, doc}
end
end
def fetch_by(module, filter \\ %{}, opts \\ []) do
case get_by(module, module.dump_part(filter), opts) do
{:error, _reason} = error -> error
nil -> {:error, :not_found}
doc -> {:ok, doc}
end
end
def count(module, filter \\ %{}, opts \\ []) do
collection = module.__collection__(:collection)
Mongo.count_documents(@topology, collection, filter, opts)
end
def exists?(module, filter \\ %{}, opts \\ []) do
opts = Keyword.put(opts, :limit, 1)
with {:ok, count} <- count(module, filter, opts) do
count > 0
end
end
end
end
@doc """
Returns the mongo configuration stored in the `:otp_app` environment.
"""
@callback config() :: Keyword.t()
@optional_callbacks get: 3, get_by: 3, aggregate: 3, exists?: 3, all: 3, stream: 3, update_all: 4, delete_all: 3
@doc """
Returns a single document struct for the collection defined in the given module and bson object id.
Returns `nil` if no result was found.
If multiple documents satisfy the query, this method returns the first document
according to the natural order which reflects the order of documents on the disk.
For all options see [Options](https://docs.mongodb.com/manual/reference/command/find/#dbcmd.find)
## Example
MyApp.Repo.get(Post, id)
MyApp.Repo.get(Post, id, read_concern: %{level: "local"})
"""
@callback get(module :: module(), id :: BSON.ObjectId.t(), opts :: Keyword.t()) ::
Mongo.Collection.t() | nil | {:error, any()}
@doc """
Returns a single document struct for the collection defined in the given module and query.
Returns `nil` if no result was found.
If multiple documents satisfy the query, this method returns the first document
according to the natural order which reflects the order of documents on the disk.
For all options see [Options](https://docs.mongodb.com/manual/reference/command/find/#dbcmd.find)
## Example
MyApp.Repo.get_by(Post, %{title: title})
MyApp.Repo.get_by(Post, %{title: title}, read_concern: %{level: "local"})
"""
@callback get_by(module :: module(), query :: BSON.document(), opts :: Keyword.t()) ::
Mongo.Collection.t() | nil | {:error, any()}
@doc """
Selects documents for the collection defined in the given module and returns a list of collection
structs for the given filter
For all options see [Options](https://docs.mongodb.com/manual/reference/command/find/#dbcmd.find)
## Example
MyApp.Repo.all(Post, %{title: title})
MyApp.Repo.all(Post, %{title: title}, batch_size: 2)
"""
@callback all(module :: module(), filter :: BSON.document(), opts :: Keyword.t()) ::
list(Mongo.Collection.t()) | {:error, any()}
@doc """
Selects documents for the collection defined in the given module and returns a stream of collection
structs for the given filter
For all options see [Options](https://docs.mongodb.com/manual/reference/command/find/#dbcmd.find)
## Example
MyApp.Repo.stream(Post, %{title: title})
MyApp.Repo.stream(Post, %{title: title}, batch_size: 2)
"""
@callback stream(module :: module(), filter :: BSON.document(), opts :: Keyword.t()) ::
Enumerable.t() | {:error, any()}
@doc """
Performs aggregation operation using the aggregation pipeline on the given collection module and returns
a list of collection structs.
For all options see [Options](https://docs.mongodb.com/manual/reference/command/aggregate/#aggregate)
## Example
MyApp.Repo.aggregate(Post, [
%{"$match" => %{title: title}},
%{"$sort" => [{"title", -1}]},
%{"$limit" => 10}
])
"""
@callback aggregate(module :: module(), pipeline :: BSON.document(), opts :: Keyword.t()) ::
list(Mongo.Collection.t()) | {:error, any()}
@doc """
Returns the count of documents in the given collection module for the given filter.
## Options
* `:limit` - Maximum number of documents to fetch with the cursor
* `:skip` - Number of documents to skip before returning the first
## Example
MyApp.Repo.count(Post)
"""
@callback count(module :: module(), filter :: BSON.document(), opts :: Keyword.t()) :: {:ok, integer()}
@doc """
Checks whether there are any documents in the given collection module for the given filter.
Returns a boolean.
## Example
MyApp.Repo.exists?(Post, %{title: title})
"""
@callback exists?(module :: module(), filter :: BSON.document(), opts :: Keyword.t()) :: boolean()
@doc """
Applies the updates for the documents in the given collection module and filter.
Uses MongoDB update operators to specify the updates. For more information and all options
please refer to the [MongoDB documentation](https://docs.mongodb.com/manual/reference/command/update/#dbcmd.update)
## Example
MyApp.Repo.update_all(Post, %{}, %{"$set" => %{title: "updated"}})
MyApp.Repo.update_all(Post, %{title: "old"}, %{"$set" => %{title: "updated"}})
"""
@callback update_all(module :: module(), filter :: BSON.document(), update :: BSON.document(), opts :: Keyword.t()) ::
{:ok, Mongo.result(Mongo.UpdateResult.t())}
@doc """
Deletes all documents for the given collection module and filter.
For all options see [Options](https://www.mongodb.com/docs/manual/reference/command/delete/#dbcmd.delete)
## Example
MyApp.Repo.delete_all(Post, %{})
MyApp.Repo.delete_all(Post, %{title: "todelete"})
"""
@callback delete_all(module :: module(), filter :: BSON.document(), opts :: Keyword.t()) ::
{:ok, Mongo.DeleteResult.t()}
@optional_callbacks fetch: 3, fetch_by: 3
@doc """
Returns a single document struct for the collection defined in the given module and bson object id as
a tuple of `{:ok, document}`.
Returns `{:error, :not_found}` if no result was found.
If multiple documents satisfy the query, this method returns the first document
according to the natural order which reflects the order of documents on the disk.
For all options see [Options](https://docs.mongodb.com/manual/reference/command/find/#dbcmd.find)
## Example
MyApp.Repo.fetch(Post, id)
MyApp.Repo.fetch(Post, id, read_concern: %{level: "local"})
"""
@callback fetch(module :: module(), id :: BSON.ObjectId.t(), opts :: Keyword.t()) ::
{:ok, Mongo.Collection.t()} | {:error, :not_found} | {:error, any()}
@doc """
Returns a single document struct for the collection defined in the given module and query as
a tuple of `{:ok, document}`.
Returns `{:error, :not_found}` if no result was found.
If multiple documents satisfy the query, this method returns the first document
according to the natural order which reflects the order of documents on the disk.
For all options see [Options](https://docs.mongodb.com/manual/reference/command/find/#dbcmd.find)
## Example
MyApp.Repo.fetch_by(Post, %{title: title})
MyApp.Repo.fetch_by(Post, %{title: title}, read_concern: %{level: "local"})
"""
@callback fetch_by(module :: module(), query :: BSON.document(), opts :: Keyword.t()) ::
{:ok, Mongo.Collection.t()} | {:error, :not_found} | {:error, any()}
@optional_callbacks insert: 2, insert!: 2, update: 2, update!: 2, insert_or_update: 2, insert_or_update!: 2, delete: 2, delete!: 2, insert_all: 3
@doc """
Inserts a new document struct into the database and returns a `{:ok, doc}` tuple.
For all options see [Options](https://www.mongodb.com/docs/manual/reference/method/db.collection.insertOne/#mongodb-method-db.collection.insertOne)
## Example
MyApp.Repo.insert(Post.new())
"""
@callback insert(doc :: Mongo.Collection.t(), opts :: Keyword.t()) :: {:ok, Mongo.Collection.t()} | {:error, any()}
@doc """
Same as `c:insert/2` but raises an error.
"""
@callback insert!(doc :: Mongo.Collection.t(), opts :: Keyword.t()) :: Mongo.Collection.t()
@doc """
Updates a document struct and returns a `{:ok, doc}` tuple.
## Example
post = MyApp.Repo.insert!(Post.new())
MyApp.Repo.update(%{post | title: "new"})
"""
@callback update(doc :: Mongo.Collection.t(), opts :: Keyword.t()) :: {:ok, Mongo.Collection.t()} | {:error, any()}
@doc """
Same as `c:update/2` but raises an error.
"""
@callback update!(doc :: Mongo.Collection.t(), opts :: Keyword.t()) :: Mongo.Collection.t()
@doc """
Upserts a document struct and returns a `{:ok, doc}` tuple.
## Example
MyApp.Repo.insert_or_update(Post.new())
"""
@callback insert_or_update(doc :: Mongo.Collection.t(), opts :: Keyword.t()) :: {:ok, Mongo.Collection.t()} | {:error, any()}
@doc """
Same as `c:insert_or_update/2` but raises an error.
"""
@callback insert_or_update!(doc :: Mongo.Collection.t(), opts :: Keyword.t()) :: Mongo.Collection.t()
@doc """
Deletes a document struct from the database and returns a `{:ok, doc}` tuple.
For all options see [Options](https://www.mongodb.com/docs/manual/reference/method/db.collection.deleteOne/)
## Example
MyApp.Repo.delete(post)
"""
@callback delete(doc :: Mongo.Collection.t(), opts :: Keyword.t()) :: {:ok, Mongo.Collection.t()} | {:error, any()}
@doc """
Same as `c:delete/2` but raises an error.
"""
@callback delete!(doc :: Mongo.Collection.t(), opts :: Keyword.t()) :: Mongo.Collection.t()
@doc """
Inserts all given documents into the document in one write operation and returns an `:ok` tuple
with the count of inserted documents as second element and the inserted ids as third element.
Returns `{:error, reason}` on failure.
## Example
MyApp.Repo.insert_all(Post, [%{title: "a"}, %{title: "b"}])
"""
@callback insert_all(module :: module(), entries :: list(BSON.document()), opts :: Keyword.t()) :: {:ok, integer(), list(BSON.ObjectId.t())} | {:error, any()}
@doc """
Convenient function for running multiple write commands in a transaction.
In case of `TransientTransactionError` or `UnknownTransactionCommitResult` the function will retry the whole transaction or
the commit of the transaction. You can specify a timeout (`:transaction_retry_timeout_s`) to limit the time of repeating.
The default value is 120 seconds. If you don't wait so long, you call `transaction` with the
option `transaction_retry_timeout_s: 10`. In this case after 10 seconds of retrying, the function will return
an error.
You can nest the function calls. In this case the first session will be reused.
"""
@callback transaction(fun :: (... -> {:ok, any()} | :error), opts :: Keyword.t()) :: {:ok, any()} | :error | {:error, any()}
end