defmodule StepFlow.Amqp.StoppedConsumer do
@moduledoc """
Consumer of all job with stopped status.
"""
require Logger
alias StepFlow.{
Amqp.StoppedConsumer,
Jobs,
Jobs.Status,
Metrics.JobInstrumenter,
NotificationHooks.NotificationHookManager,
Repo.Checker,
Statistics.JobsDurations,
Workflows,
Workflows.StepManager
}
use StepFlow.Amqp.CommonConsumer, %{
queue: "job_stopped",
exchange: "job_response",
prefetch_count: 1,
consumer: &StoppedConsumer.consume/4
}
@doc """
Consume messages with stopped topic, update Job status and continue the workflow.
"""
def consume(
channel,
tag,
_redelivered,
%{
"job_id" => job_id,
"datetime" => datetime,
"status" => status
} = _payload
) do
if Checker.repo_running?() do
case Jobs.get_job(job_id) do
nil ->
Basic.nack(channel, tag, requeue: false)
job ->
JobInstrumenter.inc(:step_flow_jobs_status_total, job.name, "stopped")
case Status.set_job_status(job_id, status, %{}, datetime) do
{:ok, job_status} ->
Workflows.Status.define_workflow_status(job.workflow_id, :job_stopped, job_status)
JobsDurations.set_job_durations(job_id)
NotificationHookManager.notification_from_job(job_id)
StepManager.check_step_status(%{job_id: job_id})
Basic.ack(channel, tag)
{:error, message} ->
Logger.error("Cannot set job status: #{inspect(message)}")
Basic.nack(channel, tag, requeue: false)
end
end
else
Logger.warn(
"#{__MODULE__}: The database is not available, reject and requeue consumed message..."
)
Basic.reject(channel, tag, requeue: true)
end
end
def consume(channel, tag, _redelivered, payload) do
Logger.error("Job stopped #{inspect(payload)}")
Basic.reject(channel, tag, requeue: false)
end
end