lib/mongo/bulk_write.ex

defmodule Mongo.BulkWrite do
  @moduledoc """

  The driver supports the so-called bulk writes ([Specification](https://github.com/mongodb/specifications/blob/master/source/crud/crud.rst#basic)):

  The motivation for bulk writes lies in the possibility of optimizing to group the same operations.  The driver supports

  * unordered and ordered bulk writes
  * in-memory and stream bulk writes

  ## Unordered bulk writes

  Unordered bulk writes have the highest optimization factor. Here all operations can be divided into
  three groups (inserts, updates and deletes).
  The order of execution within a group does not matter. However, the groups are executed in the
  order: inserts, updates and deletes. The following example creates three records, changes them, and then
  deletes all records. After execution, the collection is unchanged. It's valid, because of the execution order:

  1. inserts
  2. updates
  3. deletes

  ## Example:

  ```
  alias Mongo.BulkWrite
  alias Mongo.UnorderedBulk

  bulk = "bulk"
      |> UnorderedBulk.new()
      |> UnorderedBulk.insert_one(%{name: "Greta"})
      |> UnorderedBulk.insert_one(%{name: "Tom"})
      |> UnorderedBulk.insert_one(%{name: "Waldo"})
      |> UnorderedBulk.update_one(%{name: "Greta"}, %{"$set": %{kind: "dog"}})
      |> UnorderedBulk.update_one(%{name: "Tom"}, %{"$set": %{kind: "dog"}})
      |> UnorderedBulk.update_one(%{name: "Waldo"}, %{"$set": %{kind: "dog"}})
      |> UnorderedBulk.delete_one(%{kind: "dog"})
      |> UnorderedBulk.delete_one(%{kind: "dog"})
      |> UnorderedBulk.delete_one(%{kind: "dog"})

  result = BulkWrite.write(:mongo, bulk, w: 1)
  ```

  ## Ordered bulk writes

  Sometimes the order of execution is important for successive operations to yield a correct result.
  In this case, one uses ordered bulk writes. The following example would not work with unordered bulk writes
  because the order within the update operations is undefined. The `update_many()` will only work, if it is
  executed after the `update_one()` functions.

  ```
  bulk = "bulk"
       |> OrderedBulk.new()
       |> OrderedBulk.insert_one(%{name: "Greta"})
       |> OrderedBulk.insert_one(%{name: "Tom"})
       |> OrderedBulk.insert_one(%{name: "Waldo"})
       |> OrderedBulk.update_one(%{name: "Greta"}, %{"$set": %{kind: "dog"}})
       |> OrderedBulk.update_one(%{name: "Tom"}, %{"$set": %{kind: "dog"}})
       |> OrderedBulk.update_one(%{name: "Waldo"}, %{"$set": %{kind: "dog"}})
       |> OrderedBulk.update_many(%{kind: "dog"}, %{"$set": %{kind: "cat"}})
       |> OrderedBulk.delete_one(%{kind: "cat"})
       |> OrderedBulk.delete_one(%{kind: "cat"})
       |> OrderedBulk.delete_one(%{kind: "cat"})

  result = BulkWrite.write(:mongo, bulk, w: 1)
  ```

  ## Stream bulk writes

  The examples shown initially filled the bulk with a few operations and then the bulk is written to the database.
  This is all done in memory. For larger amounts of operations or imports of very long files, the main memory would
  be unnecessarily burdened. It could come to some resource problems.

  For such cases you could use streams. Unordered and ordered bulk writes can also be combined with Streams.
  You set the maximum size of the bulk. Once the number of bulk operations has been reached,
  it will be sent to the database. While streaming you can limit the memory consumption regarding the current task.

  In the following example we import 1.000.000 integers into the MongoDB using the stream api:

  We need to create an insert operation (`BulkOps.get_insert_one()`) for each number. Then we call the `UnorderedBulk.stream`
  function to import it. This function returns a stream function which accumulate
  all inserts operations until the limit `1000` is reached. In this case the operation group is written to
  MongoDB.

  ## Example

  ```
  1..1_000_000
    |> Stream.map(fn i -> BulkOps.get_insert_one(%{number: i}) end)
    |> UnorderedBulk.write(:mongo, "bulk", 1_000)
    |> Stream.run()
  ```

  ## Benchmark

  The following benchmark compares multiple `Mongo.insert_one()` calls with a stream using unordered bulk writes.
  Both tests inserts documents into a replica set with `w: 1`.

  ```
  Benchee.run(
      %{
        "inserts" => fn input ->
         input
         |> Enum.map(fn i -> %{number: i} end)
         |> Enum.each(fn doc -> Mongo.insert_one!(top, "bulk_insert", doc) end)
        end,
        "streams" => fn input ->
                        input
                        |> Stream.map(fn i -> get_insert_one(%{number: i}) end)
                        |> Mongo.UnorderedBulk.write(top, "bulk", 1_0000)
                        |> Stream.run()
        end,
      },
      inputs: %{
        "Small" => Enum.to_list(1..10_000),
        "Medium" => Enum.to_list(1..100_000),
        "Bigger" => Enum.to_list(1..1_000_000)
      }
    )
  ```

  Result:

  ```
  ##### With input Bigger #####
  Name              ips        average  deviation         median         99th %
  streams        0.0885      0.188 min     ±0.00%      0.188 min      0.188 min
  inserts       0.00777       2.14 min     ±0.00%       2.14 min       2.14 min

  Comparison:
  streams        0.0885
  inserts       0.00777 - 11.39x slower +1.96 min

  ##### With input Medium #####
  Name              ips        average  deviation         median         99th %
  streams          1.00         1.00 s     ±8.98%         0.99 s         1.12 s
  inserts        0.0764        13.09 s     ±0.00%        13.09 s        13.09 s

  Comparison:
  streams          1.00
  inserts        0.0764 - 13.12x slower +12.10 s

  ##### With input Small #####
  Name              ips        average  deviation         median         99th %
  streams          8.26        0.121 s    ±30.46%        0.112 s         0.23 s
  inserts          0.75         1.34 s     ±7.15%         1.29 s         1.48 s

  Comparison:
  streams          8.26
  inserts          0.75 - 11.07x slower +1.22 s
  ```

  The result is, that using bulk writes is much faster (about 15x faster at all).

  """

  import Keywords
  import Mongo.Utils
  import Mongo.WriteConcern
  import Mongo.Session, only: [in_write_session: 3]

  alias Mongo.UnorderedBulk
  alias Mongo.OrderedBulk
  alias Mongo.BulkWriteResult

  @doc """
  Executes unordered and ordered bulk writes.

  ## Unordered bulk writes
  The operation are grouped (inserts, updates, deletes). The order of execution is:

  1. inserts
  2. updates
  3. deletes

  The execution order within the group is not preserved.

  ## Ordered bulk writes
  Sequences of the same operations are grouped and sent as one command. The order is preserved.

  If a group (inserts, updates or deletes) exceeds the limit `maxWriteBatchSize` it will be split into chunks.
  Everything is done in memory, so this use case is limited by memory. A better approach seems to use streaming bulk writes.
  """
  @spec write(GenServer.server(), UnorderedBulk.t() | OrderedBulk.t(), Keyword.t()) :: Mongo.BulkWriteResult.t()
  def write(topology_pid, bulk, opts \\ [])

  def write(topology_pid, %UnorderedBulk{} = bulk, opts) do
    in_write_session(topology_pid, &one_bulk_write(&1, topology_pid, bulk, &2), opts)
  end

  def write(topology_pid, %OrderedBulk{} = bulk, opts) do
    in_write_session(topology_pid, &write_ordered_bulk(&1, topology_pid, bulk, &2), opts)
  end

  defp write_ordered_bulk(session, topology_pid, %OrderedBulk{coll: coll, ops: ops}, opts) do
    write_concern = write_concern(opts)

    empty = %BulkWriteResult{acknowledged: acknowledged?(write_concern)}

    with {:ok, limits} <- Mongo.limits(topology_pid) do
      max_batch_size = limits.max_write_batch_size

      ops
      |> get_op_sequence()
      |> Enum.reduce_while(empty, fn {cmd, docs}, acc ->
        temp_result = one_bulk_write_operation(session, cmd, coll, docs, max_batch_size, opts)

        case temp_result do
          %{errors: []} ->
            {:cont, BulkWriteResult.add(acc, temp_result)}

          _other ->
            {:halt, BulkWriteResult.add(acc, temp_result)}
        end
      end)
    end
  end

  ##
  # Executes one unordered bulk write. The execution order of operation groups is
  #
  # * inserts
  # * updates
  # * deletes
  #
  # The function returns a keyword list with the results of each operation group:
  # For the details see https://github.com/mongodb/specifications/blob/master/source/crud/crud.rst#results
  #
  defp one_bulk_write(session, topology_pid, %UnorderedBulk{coll: coll, inserts: inserts, updates: updates, deletes: deletes}, opts) do
    with {:ok, limits} <- Mongo.limits(topology_pid) do
      max_batch_size = limits.max_write_batch_size

      results =
        case one_bulk_write_operation(session, :insert, coll, inserts, max_batch_size, opts) do
          %{errors: []} = insert_result ->
            case one_bulk_write_operation(session, :update, coll, updates, max_batch_size, opts) do
              %{errors: []} = update_result ->
                delete_result = one_bulk_write_operation(session, :delete, coll, deletes, max_batch_size, opts)
                [insert_result, update_result, delete_result]

              update_result ->
                [insert_result, update_result]
            end

          insert_result ->
            [insert_result]
        end

      BulkWriteResult.reduce(results, %BulkWriteResult{acknowledged: acknowledged?(opts)})
    end
  end

  ###
  # Executes the command `cmd` and collects the result.
  #
  defp one_bulk_write_operation(session, cmd, coll, docs, max_batch_size, opts) do
    session
    |> run_commands(get_cmds(cmd, coll, docs, max_batch_size, opts), opts)
    |> collect(cmd)
  end

  ##
  # Converts the list of operations into insert/update/delete commands
  #
  defp get_cmds(:insert, coll, docs, max_batch_size, opts), do: get_insert_cmds(coll, docs, max_batch_size, opts)
  defp get_cmds(:update, coll, docs, max_batch_size, opts), do: get_update_cmds(coll, docs, max_batch_size, opts)
  defp get_cmds(:delete, coll, docs, max_batch_size, opts), do: get_delete_cmds(coll, docs, max_batch_size, opts)

  ###
  # Converts the list of operations into list of lists with same operations.
  #
  # [inserts, inserts, updates] -> [[inserts, inserts],[updates]]
  #
  defp get_op_sequence(ops) do
    get_op_sequence(ops, [])
  end

  defp get_op_sequence([], acc), do: acc

  defp get_op_sequence(ops, acc) do
    [{kind, _doc} | _rest] = ops
    {docs, rest} = find_max_sequence(kind, ops)
    get_op_sequence(rest, [{kind, docs} | acc])
  end

  ###
  # Splits the sequence of operations into two parts
  # 1) sequence of operations of kind `kind`
  # 2) rest of operations
  #
  defp find_max_sequence(kind, rest) do
    find_max_sequence(kind, rest, [])
  end

  defp find_max_sequence(_kind, [], acc) do
    {acc, []}
  end

  defp find_max_sequence(kind, [{other, desc} | rest], acc) when kind == other do
    find_max_sequence(kind, rest, [desc | acc])
  end

  defp find_max_sequence(_kind, rest, acc) do
    {acc, rest}
  end

  ##
  # collects the returns values for each operation
  #
  # the update operation is more complex than insert or delete operation
  #
  defp collect({docs, ids}, :insert) do
    docs
    |> Enum.map(fn
      {:ok, %{"n" => n} = doc} -> BulkWriteResult.insert_result(n, ids, doc["writeErrors"] || [])
      {:ok, _other} -> BulkWriteResult.empty()
      {:error, reason} -> BulkWriteResult.error(reason)
    end)
    |> BulkWriteResult.reduce()
  end

  defp collect(docs, :update) do
    docs
    |> Enum.map(fn
      {:ok, %{"n" => n, "nModified" => modified, "upserted" => ids} = doc} ->
        l = length(ids)
        BulkWriteResult.update_result(n - l, modified, l, filter_upsert_ids(ids), doc["writeErrors"] || [])

      {:ok, %{"n" => matched, "nModified" => modified} = doc} ->
        BulkWriteResult.update_result(matched, modified, 0, [], doc["writeErrors"] || [])

      {:ok, _other} ->
        BulkWriteResult.empty()

      {:error, reason} ->
        BulkWriteResult.error(reason)
    end)
    |> BulkWriteResult.reduce()
  end

  defp collect(docs, :delete) do
    docs
    |> Enum.map(fn
      {:ok, %{"n" => n} = doc} -> BulkWriteResult.delete_result(n, doc["writeErrors"] || [])
      {:ok, _other} -> BulkWriteResult.empty()
      {:error, reason} -> BulkWriteResult.error(reason)
    end)
    |> BulkWriteResult.reduce()
  end

  defp filter_upsert_ids([_ | _] = upserted), do: Enum.map(upserted, fn doc -> doc["_id"] end)
  defp filter_upsert_ids(_), do: []

  defp run_commands(session, {cmds, ids}, opts) do
    {Enum.map(cmds, fn cmd -> Mongo.exec_command_session(session, cmd, opts) end), ids}
  end

  defp run_commands(session, cmds, opts) do
    Enum.map(cmds, fn cmd -> Mongo.exec_command_session(session, cmd, opts) end)
  end

  defp get_insert_cmds(coll, docs, max_batch_size, opts) do
    {ids, docs} = assign_ids(docs)

    cmds =
      docs
      |> Enum.chunk_every(max_batch_size)
      |> Enum.map(fn inserts -> get_insert_cmd(coll, inserts, opts) end)

    {cmds, ids}
  end

  defp get_insert_cmd(coll, inserts, opts) do
    [insert: coll, documents: inserts, writeConcern: write_concern(opts)] |> filter_nils()
  end

  defp get_delete_cmds(coll, docs, max_batch_size, opts) do
    docs
    |> Enum.chunk_every(max_batch_size)
    |> Enum.map(fn deletes -> get_delete_cmd(coll, deletes, opts) end)
  end

  defp get_delete_cmd(coll, deletes, opts) do
    [delete: coll, deletes: Enum.map(deletes, fn delete -> get_delete_doc(delete) end), ordered: Keyword.get(opts, :ordered), writeConcern: write_concern(opts)] |> filter_nils()
  end

  defp get_delete_doc({filter, opts}) do
    [q: filter, limit: Keyword.get(opts, :limit), collation: Keyword.get(opts, :collation)] |> filter_nils()
  end

  defp get_update_cmds(coll, docs, max_batch_size, opts) do
    docs
    |> Enum.chunk_every(max_batch_size)
    |> Enum.map(fn updates -> get_update_cmd(coll, updates, opts) end)
  end

  defp get_update_cmd(coll, updates, opts) do
    [update: coll, updates: Enum.map(updates, fn update -> get_update_doc(update) end), ordered: Keyword.get(opts, :ordered), writeConcern: write_concern(opts), bypassDocumentValidation: Keyword.get(opts, :bypass_document_validation)]
    |> filter_nils()
  end

  defp get_update_doc({filter, update, update_opts}) do
    [q: filter, u: update, upsert: Keyword.get(update_opts, :upsert), multi: Keyword.get(update_opts, :multi) || false, collation: Keyword.get(update_opts, :collation), arrayFilters: Keyword.get(update_opts, :array_filters)] |> filter_nils()
  end
end