defmodule Roger.Partition.Worker do
@moduledoc """
Handles the decoding and execution of a single job.
Besides running the job, various administrative tasks need to be
performed as well, namely:
- Check whether the job has not been cancelled in the meantime
- Check whether another job is currently running with the same
execution_key, and if so, delay this current job until the
currently running one finishes
- On job failure, the job needs to be queued in the retry queue, if
the job is marked retryable. By default, jobs are *not* retried.
"""
require Logger
alias Roger.{Job, GProc, Queue, Partition.Retry}
alias Roger.Partition.Global
# after how long the wait queue for execution_key-type jobs expires
@execution_waiting_expiry 1800 * 1000
use GenServer, restart: :transient
def start_link(worker_input) do
GenServer.start_link(__MODULE__, worker_input)
end
def name(job_id) do
{:roger_job_worker, job_id}
end
## Server interface
defmodule State do
@moduledoc false
@type t :: %__MODULE__{
partition_id: String.t(),
meta: map,
raw_payload: binary,
channel: AMQP.Channel.t(),
worker_task_pid: pid,
job: Job.t()
}
defstruct partition_id: nil, meta: nil, raw_payload: nil, channel: nil, worker_task_pid: nil, job: nil
end
def init([partition_id, channel, payload, meta]) do
state = %State{
partition_id: partition_id,
channel: channel,
meta: meta,
raw_payload: payload
}
Process.flag(:trap_exit, true)
{:ok, state, 0}
end
@doc """
This will make sure the worker task is killed when the worker get's stopped
"""
@spec terminate(any(), State.t()) :: any()
def terminate(_reason, state) do
if state.worker_task_pid do
Process.exit(state.worker_task_pid, :kill)
end
end
@doc """
This function starts processing the job as soon as the worker GenServer is started up.
"""
@spec handle_info(:timeout, State.t()) :: {:noreply, State.t()} | {:stop, :normal, State.t()}
def handle_info(:timeout, state) do
case Job.decode(state.raw_payload) do
{:ok, job} ->
job = %Job{job | started_at: Roger.now()}
cond do
Global.cancelled?(state.partition_id, job.id, :remove) ->
job_cancel(job, state)
{:stop, :normal, state}
job_waiting?(job, state) ->
job_waiting(job, state)
{:stop, :normal, state}
true ->
pid = job_startup(job, state)
{:noreply, %{state | worker_task_pid: pid, job: job}}
end
{:error, message} ->
# Decode error
Logger.debug("Job decoding error: #{inspect(message)} #{inspect(state.raw_payload)}")
job_done(nil, :ack, state)
{:stop, :normal, state}
end
end
# When job is finished it sends a message to the GenServer to finish off the worker task.
@spec handle_info(:job_finished, State.t()) :: {:stop, :normal, State.t()}
def handle_info(:job_finished, state) do
{:stop, :normal, state}
end
# When job has errors the async job task sends a message to this worker to correctly unregister and shutdown the worker.
@spec handle_info(:job_errored, State.t()) :: {:stop, :normal, State.t()}
def handle_info(:job_errored, state) do
state.job.id
|> name()
|> GProc.unregp()
GProc.unregp({:roger_job_worker_meta, state.partition_id, state.job.id})
{:stop, :normal, state}
end
# If a timeout is set on the job and the job exceeds the timeout this method is called and correctly shuts down the job.
@spec handle_info(:handle_job_timeout, State.t()) :: {:stop, :normal, State.t()}
def handle_info(:handle_job_timeout, %{worker_task_pid: pid, job: job} = state) when is_pid(pid) do
Process.exit(pid, :kill)
handle_error(job, {:timeout, "Job stopped because of timeout"}, nil, state, nil)
{:stop, :normal, state}
end
# This handle a hard crash
@spec handle_info({:DOWN, reference(), :process, pid(), String.t()}, State.t()) :: {:stop, :normal, State.t()}
def handle_info({:DOWN, _ref, :process, _child, reason}, state) do
handle_error(state.job, {:worker_crash, reason}, nil, state, nil)
{:stop, :normal, state}
end
# This is called when job needs to be cancelled it kills running job and runs the timeout task to correctly finish the job.
@spec handle_call(:cancel_job, any(), State.t()) :: {:reply, :ok, State.t(), 0}
def handle_call(:cancel_job, _source, state) do
Process.exit(state.worker_task_pid, :kill)
{:reply, :ok, state, 0}
end
defp execute_job(job, state, parent) do
before_run_state = callback(:before_run, [state.partition_id, job])
try do
result = Job.execute(job)
job_done(job, :ack, state)
callback(:after_run, [state.partition_id, job, result, before_run_state])
catch
type, exception ->
handle_error(job, {type, exception}, before_run_state, state, __STACKTRACE__)
send(parent, :job_errored)
end
end
defp handle_error(job, {type, exception}, before_run_state, state, stacktrace) do
cb =
with true <- Job.retryable?(job),
{:ok, :buried} <- Retry.retry(state.channel, state.partition_id, job) do
:on_buried
else
_ -> :on_error
end
job_done(job, :ack, state)
callback(cb, [state.partition_id, job, {type, exception}, stacktrace, before_run_state])
end
defp job_startup(job, state) do
GProc.regp(name(job.id))
GProc.regp({:roger_job_worker_meta, state.partition_id, job.id}, job)
parent = self()
{pid, _ref} =
spawn_monitor(fn ->
execute_job(job, state, parent)
send(parent, :job_finished)
end)
if job.max_execution_time != :infinity do
Process.send_after(self(), :handle_job_timeout, job.max_execution_time * 1000)
end
pid
end
defp job_waiting?(job, state) do
job.execution_key != nil && Global.executing?(state.partition_id, job.execution_key, :add)
end
defp job_waiting(job, state) do
# put job in the waiting queue,
:ok = put_execution_waiting(job, state)
# then ack it.
AMQP.Basic.ack(state.channel, state.meta.delivery_tag)
end
defp job_cancel(job, state) do
callback(:on_cancel, [state.partition_id, job])
job_done(job, :ack, state)
end
# Ran at the end of the job, either ack'ing or nack'ing the message.
defp job_done(job, ack_or_nack, state) do
if job != nil do
if job.queue_key != nil do
:ok = Global.remove_queued(state.partition_id, job.queue_key)
end
if job.execution_key != nil do
# mark as "free"
:ok = Global.remove_executed(state.partition_id, job.execution_key)
# check if there are any messages in the waiting queue
check_execution_waiting(job, state)
end
end
meta = state.meta
if meta != nil do
if Process.alive?(state.channel.pid) do
Kernel.apply(AMQP.Basic, ack_or_nack, [state.channel, meta.delivery_tag])
end
end
end
# Run the given worker callback, if a callback module has been defined.
defp callback(callback, args) when is_atom(callback) do
mod = Application.get_env(:roger, :callbacks)
# Make sure module is loaded so function_exported? works correctly
Code.ensure_loaded(mod)
if mod != nil do
try do
# We never want the callback to crash the worker process.
if function_exported?(mod, callback, length(args)) do
Kernel.apply(mod, callback, args)
else
nil
end
catch
:exit = t, e ->
Logger.error("Worker error in callback function #{mod}.#{callback}: #{t}:#{e}")
end
end
end
# Put in the waiting queue
defp put_execution_waiting(job, state) do
Job.enqueue(job, state.partition_id, execution_waiting_queue(job, state, :unprefixed))
end
# Get the next message from the job's execution waiting queue, and
# enqueues it back on the Job's main queue, if there is any
defp check_execution_waiting(job, state) do
name = execution_waiting_queue(job, state)
case AMQP.Basic.get(state.channel, name) do
{:ok, payload, meta} ->
# enqueue the job again
{:ok, job} = Job.decode(payload)
:ok = Job.enqueue(job, state.partition_id)
# ack it to have it removed from waiting queue
:ok = AMQP.Basic.ack(state.channel, meta.delivery_tag)
{:empty, _} ->
# FIXME delete waiting queue when empty - this can error
:ok
end
end
# Return the name of the execution waiting queue. The queue gets
# declared on the AMQP side as well. Returns the queue either
# prefixed with the partition or unprefixed.
defp execution_waiting_queue(job, state, return \\ :prefixed) do
bare_name = "execution-waiting-#{job.execution_key}"
name = Queue.make_name(state.partition_id, bare_name)
{:ok, _} = AMQP.Queue.declare(state.channel, name, arguments: [{"x-expires", @execution_waiting_expiry}])
case return do
:prefixed -> name
:unprefixed -> bare_name
end
end
end