lib/snap/bulk/bulk.ex

defmodule Snap.Bulk do
  @moduledoc """
  Supports streaming bulk operations against a `Snap.Cluster`.
  """

  @default_page_size 5000
  @default_page_wait 15_000

  alias Snap.Bulk.Actions
  alias Snap.Cluster.Namespace

  @doc """
  Performs a bulk operation.

  Takes an `Enumerable` of action structs, where each struct is one of:

  * `Snap.Bulk.Action.Create`
  * `Snap.Bulk.Action.Index`
  * `Snap.Bulk.Action.Update`
  * `Snap.Bulk.Action.Delete`

  ```
  actions = [
    %Snap.Bulk.Action.Create{_id: 1, doc: %{foo: "bar"}},
    %Snap.Bulk.Action.Create{_id: 2, doc: %{foo: "bar"}},
    %Snap.Bulk.Action.Create{_id: 3, doc: %{foo: "bar"}}
  ]

  actions
  |> Snap.Bulk.perform(Cluster, "index")
  ```

  It chunks the `Enumerable` into pages, and pauses between pages for
  Elasticsearch to catch up. Uses `Stream` under the hood, so you can lazily
  feed it a stream of actions, such as out of an `Ecto.Repo` to bulk load
  documents from an SQL database.

  If no errors occur on any page it returns `:ok`. If any errors occur, on
  any page, it returns `{:error, %Snap.BulkError{}}`, containing a list of
  the errors. It will continue to the end of the stream, even if errors
  occur.

  Options:

  * `page_size` - defines the size of each page, defaulting to 5000 actions.
  * `page_wait` - defines wait period between pages in ms, defaulting to
    15000ms.
  * `max_errors` - aborts when the number of errors returned exceedes this
    count (defaults to `nil`, which will run to the end)
  * `request_opts` - defines the options to be used with `Snap.Request`

  Any other options, such as `pipeline: "foo"` are passed through as query
  parameters to the [Bulk
  API](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html)
  endpoint.
  """
  @spec perform(
          stream :: Enumerable.t(),
          cluster :: module(),
          index :: String.t(),
          opts :: Keyword.t()
        ) ::
          :ok | Snap.Cluster.error() | {:error, Snap.BulkError.t()}
  def perform(stream, cluster, index, opts \\ []) do
    {page_size, opts} = Keyword.pop(opts, :page_size, @default_page_size)
    {page_wait, opts} = Keyword.pop(opts, :page_wait, @default_page_wait)
    {max_errors, opts} = Keyword.pop(opts, :max_errors, nil)
    {request_opts, request_params} = Keyword.pop(opts, :request_opts, [])
    namespaced_index = Namespace.add_namespace_to_index(index, cluster)

    stream
    |> Stream.chunk_every(page_size)
    |> Stream.intersperse({:wait, page_wait})
    |> Stream.transform(
      0,
      &process_chunk(&1, cluster, namespaced_index, request_params, request_opts, &2, max_errors)
    )
    |> Enum.to_list()
    |> handle_result()
  end

  defp process_chunk(
         {:wait, 0},
         _cluster,
         _index,
         _params,
         _request_opts,
         error_count,
         _max_errors
       ) do
    {[], error_count}
  end

  defp process_chunk(
         {:wait, wait},
         _cluster,
         _index,
         _params,
         _request_opts,
         error_count,
         _max_errors
       ) do
    :ok = :timer.sleep(wait)

    {[], error_count}
  end

  defp process_chunk(_actions, _cluster, _index, _params, _request_opts, error_count, max_errors)
       when is_integer(max_errors) and error_count > max_errors do
    {:halt, error_count}
  end

  defp process_chunk(actions, cluster, index, params, request_opts, error_count, _max_errors) do
    body = Actions.encode(actions)

    headers = [{"content-type", "application/x-ndjson"}]

    result = Snap.post(cluster, "/#{index}/_bulk", body, params, headers, request_opts)

    add_errors =
      case result do
        {:ok, %{"errors" => true, "items" => items}} ->
          process_errors(items)

        {:ok, _} ->
          []

        {:error, error} ->
          [error]
      end

    error_count = error_count + Enum.count(add_errors)

    {add_errors, error_count}
  end

  defp handle_result([]), do: :ok

  defp handle_result(errors) do
    err = Snap.BulkError.exception(errors)

    {:error, err}
  end

  defp process_errors(items) do
    items
    |> Enum.map(&process_item/1)
    |> Enum.reject(&is_nil/1)
  end

  defp process_item(%{"create" => %{"error" => error} = item}) when is_map(error) do
    Snap.ResponseError.exception_from_json(item)
  end

  defp process_item(%{"index" => %{"error" => error} = item}) when is_map(error) do
    Snap.ResponseError.exception_from_json(item)
  end

  defp process_item(%{"update" => %{"error" => error} = item}) when is_map(error) do
    Snap.ResponseError.exception_from_json(item)
  end

  defp process_item(%{"delete" => %{"error" => error} = item}) when is_map(error) do
    Snap.ResponseError.exception_from_json(item)
  end

  defp process_item(_), do: nil
end