defmodule FaktoryWorker.Job do
@moduledoc """
The `FaktoryWorker.Job` module is used to perform jobs in the background by sending to and fetching from Faktory.
To build a worker you must `use` the job module within a module in your application.
```elixir
defmodule MyApp.SomeWorker do
use FaktoryWorker.Job
end
```
This will bring in all of the functionality required to perform jobs via Faktory. The `MyApp.SomeWorker` will now
have a `perform_async/2` function available for sending jobs to Faktory. Before this function can be called, a
`perform` function must be defined. This function must accept the same number of arguments that are
being sent to Faktory via the `perform_async/2` function.
```elixir
defmodule MyApp.SomeWorker do
use FaktoryWorker.Job
def perform(job_arg) do
do_some_work(job_arg)
end
end
```
With this in place it is now possible to send work to Faktory and the `MyApp.SomeWorker` will fetch the job and call
the `perform/1` function with the same job arguments that we sent.
```elixir
> MyApp.SomeWorker.perform_async("job arg")
:ok
```
It is also possible to send multiple arguments for a single job by passing in a list of values to the `perform_async/2`
function.
```elixir
> MyApp.SomeWorker.perform_async(["job arg1", "job arg2"])
:ok
```
In order for the job to be performed correctly, a `perform/2` function needs to be defined within the `MyApp.SomeWorker`
module.
```elixir
defmodule MyApp.SomeWorker do
use FaktoryWorker.Job
def perform(job_arg1, job_arg2) do
do_some_work(job_arg1, job_arg2)
end
end
```
When defining `perform` functions, they must always accept one argument for each item in the list of values passed into
`perform_async/2`.
## Synchronous job pushing
Previous version used Broadway to send jobs and `:skip_pipeline` parameter was used to do it synchronously.
`:skip_pipeline` is not supported anymore.
Since Batch operations is a feature of Faktory Enterprise this library now sends any single job synchronously
and makes HTTP call to faktory server (see `FaktoryWorker.Batch`).
## Worker Configuration
A list of options can be specified when using the the `FaktoryWorker.Job` module. These options will be used when sending
jobs to faktory and will apply to all jobs sent with the `perform_async/2` function.
For a full list of configuration options see the [Worker Configuration](configuration.html#worker-configuration) documentation.
## Overriding Worker Configuration
The `perform_async/2` function accepts a keyword list as its second argument. This list has the same options
available that the `FaktoryWorker.Job` module accepts. Any options passed into this function override the options
that have been set on the worker module.
For a full list of configuration options see the [Worker Configuration](configuration.html#worker-configuration) documentation.
## Data Serialization
Faktory expects all values to be serialized in JSON format. FaktoryWorker uses `Jason` for serialization. This
means only values that implement the `Jason.Encoder` protocol are valid when calling the `perform_async/2` function.
"""
alias FaktoryWorker.{Random, Telemetry, Sandbox}
# Look at supporting the following optional fields when pushing a job
# priority
# backtrace
# created_at
@optional_job_fields [:jobtype, :queue, :custom, :retry, :reserve_for, :at]
@default_push_timeout 5000
defmacro __using__(using_opts \\ []) do
alias FaktoryWorker.Job
quote do
def perform_async(job, opts \\ []) do
opts = Keyword.merge(unquote(using_opts), opts)
__MODULE__
|> Job.build_payload(job, opts)
|> Job.perform_async(opts)
end
end
end
@doc false
def build_payload(worker_module, job, opts) when is_list(job) do
%{
jid: Random.job_id(),
jobtype: job_type_for_module(worker_module),
args: normalize_job_args(job)
}
|> append_optional_fields(opts)
end
def build_payload(worker_module, job, opts) do
build_payload(worker_module, [job], opts)
end
@doc false
def perform_async(payload, opts) do
if Sandbox.active?() do
Sandbox.enqueue_job(
String.to_existing_atom("Elixir." <> payload.jobtype),
payload.args,
opts
)
{:ok, payload}
else
opts
|> faktory_name()
|> push(payload)
end
end
@doc false
def normalize_job_args(args) when is_list(args) do
Enum.map(args, fn
%_{} = arg -> Map.from_struct(arg)
arg -> arg
end)
end
@doc false
def push(_, invalid_payload = {:error, _}), do: invalid_payload
def push(faktory_name, job) do
{:push, job}
|> FaktoryWorker.send_command(faktory_name: faktory_name, timeout: @default_push_timeout)
|> handle_push_result(job)
end
defp append_optional_fields(args, opts) do
Enum.reduce_while(@optional_job_fields, args, fn field, args ->
case Keyword.get(opts, field) do
nil ->
{:cont, args}
value ->
if is_valid_field_value?(field, value) do
value = format_field_value(value)
{:cont, Map.put(args, field, value)}
else
{:halt, {:error, field_error_message(field, value)}}
end
end
end)
end
defp is_valid_field_value?(:jobtype, value), do: is_binary(value)
defp is_valid_field_value?(:queue, value), do: is_binary(value)
defp is_valid_field_value?(:custom, value), do: is_map(value)
defp is_valid_field_value?(:retry, value), do: is_integer(value)
defp is_valid_field_value?(:reserve_for, value), do: is_integer(value) and value >= 60
defp is_valid_field_value?(:at, %DateTime{}), do: true
defp is_valid_field_value?(_, _), do: false
defp format_field_value(%DateTime{} = date_time) do
DateTime.to_iso8601(date_time)
end
defp format_field_value(value), do: value
defp field_error_message(field, value) do
"The field '#{Atom.to_string(field)}' has an invalid value '#{inspect(value)}'"
end
defp faktory_name(opts) do
Keyword.get(opts, :faktory_name, FaktoryWorker)
end
defp handle_push_result({:ok, _}, job) do
Telemetry.execute(:push, :ok, job)
{:ok, job}
end
defp handle_push_result({:error, :timeout}, job) do
Telemetry.execute(:push, {:error, :timeout}, job)
{:error, :timeout}
end
defp handle_push_result({:error, reason}, _) do
{:error, reason}
end
defp job_type_for_module(module) do
module
|> to_string()
|> String.trim_leading("Elixir.")
end
end