lib/faktory_worker/batch.ex

defmodule FaktoryWorker.Batch do
  @moduledoc """
  Supports Faktory Batch operations

  [Batch support](https://github.com/contribsys/faktory/wiki/Ent-Batches) is a
  Faktory Enterprise feature. It allows jobs to pushed as part of a batch. When
  all jobs in a batch have completed, Faktory will queue a callback job. This
  allows building complex job workflows with dependencies.

  ## Creating a batch

  A batch is created using `new!/1` and must provide a description and declare
  one of the success or complete callbacks. The `new!/1` function returns the
  batch ID (or `bid`) which identifies the batch for future commands.

  Once created, jobs can be pushed to the batch by providing the `bid` in the
  `custom` payload. These jobs must be pushed synchronously.

  ```
  alias FaktoryWorker.Batch

  {:ok, bid} = Batch.new!(on_success: {MyApp.EmailReportJob, [], []})
  MyApp.Job.perform_async([1, 2], custom: %{"bid" => bid})
  MyApp.Job.perform_async([3, 4], custom: %{"bid" => bid})
  MyApp.Job.perform_async([5, 6], custom: %{"bid" => bid})
  Batch.commit(bid)
  ```

  ## Opening a batch

  In order to open a batch, you must know the batch ID. Since FaktoryWorker
  doesn't currently pass the job itself as a parameter to `perform` functions,
  you must explicitly pass it as an argument in order to open the batch as part
  of a job.

  ```
  defmodule MyApp.Job do
    use FaktoryWorker.Job

    def perform(arg1, arg2, bid) do
      Batch.open(bid)

      MyApp.OtherJob.perform_async([1, 2], custom: %{"bid" => bid})

      Batch.commit(bid)
    end
  end
  ```

  """
  alias FaktoryWorker.Job

  @type bid :: String.t()

  @doc """
  Creates a new Faktory batch

  Returns the batch ID (`bid`) which needs to be passed in the `:custom`
  parameters of every job that should be part of this batch as well as to commit
  the batch.

  ## Opts

  Batch jobs must define a success or complete callback (or both). These
  callbacks are passed as tuples to the `:on_success` and `:on_complete` opts.
  They are defined as a tuple consisting of `{mod, args, opts}` where `mod` is a
  module with a `perform` function that corresponds in arity to the length of `args`.

  Any `opts` that can be passed to `perform_async/2` can be provided as `opts`
  to the callback except for `:faktory_worker`.

  If neither callback is provided, an error will be raised.

  ### `:on_success`

  See above.

  ### `:on_complete`

  See above.

  ### `:description`

  The description, if provided, is shown in Faktory's Web UI on the batch
  listing tab.

  ### `:parent_bid`

  The parent batch ID--only used if you are creating a child batch.

  ### `:faktory_name`

  The name of the `FaktoryWorker` instance (determines which connection pool
  will be used).

  ### `:timeout`

  How long to wait for a response, in ms.
  """
  @spec new!(Keyword.t()) :: {:ok, bid()} | {:error, any()}
  def new!(opts \\ []) do
    success = Keyword.get(opts, :on_success)
    complete = Keyword.get(opts, :on_complete)
    bid = Keyword.get(opts, :parent_bid)
    description = Keyword.get(opts, :description)

    payload =
      %{}
      |> maybe_put_description(description)
      |> maybe_put_parent_bid(bid)
      |> maybe_put_callback(:success, success)
      |> maybe_put_callback(:complete, complete)
      |> validate!()

    opts = Keyword.take(opts, [:faktory_name, :timeout])
    FaktoryWorker.send_command({:batch_new, payload}, opts)
  end

  @doc """
  Commits the batch identified by `bid`

  Faktory will begin scheduling jobs that are part of the batch before the batch
  is committed, but
  """
  def commit(bid, opts \\ []) do
    FaktoryWorker.send_command({:batch_commit, bid}, opts)
  end

  @doc """
  Opens the batch identified by `bid`

  An existing batch needs to be re-opened in order to add more jobs to it or to
  add a child batch.

  After opening the batch, it must be committed again using `commit/2`.
  """
  def open(bid, opts \\ []) do
    FaktoryWorker.send_command({:batch_open, bid}, opts)
  end

  @doc """
  Gets the status of a batch

  Returns a map representing the status
  """
  def status(bid, opts \\ []) do
    FaktoryWorker.send_command({:batch_status, bid}, opts)
  end

  defp maybe_put_description(payload, nil), do: payload

  defp maybe_put_description(payload, description),
    do: Map.put_new(payload, :description, description)

  defp maybe_put_parent_bid(payload, nil), do: payload
  defp maybe_put_parent_bid(payload, bid), do: Map.put_new(payload, :parent_bid, bid)

  defp maybe_put_callback(payload, _type, nil), do: payload

  defp maybe_put_callback(payload, type, {mod, job, opts}) do
    job_payload = Job.build_payload(mod, job, opts)

    Map.put_new(payload, type, job_payload)
  end

  defp validate!(payload) do
    success = Map.get(payload, :success)
    complete = Map.get(payload, :complete)

    case {success, complete} do
      {nil, nil} ->
        raise("Faktory batch jobs must declare a success or complete callback")

      {_, _} ->
        payload
    end
  end
end