defmodule StepFlow.Amqp.ErrorConsumer do
@moduledoc """
Consumer of all job with error status.
"""
require Logger
alias StepFlow.Amqp.ErrorConsumer
alias StepFlow.Jobs
alias StepFlow.Jobs.Status
alias StepFlow.Metrics.{JobInstrumenter, WorkflowInstrumenter}
alias StepFlow.Repo
alias StepFlow.Repo.Checker
alias StepFlow.Step
alias StepFlow.Step.Live
alias StepFlow.Workflows
alias StepFlow.Workflows.StepManager
use StepFlow.Amqp.CommonConsumer, %{
queue: "job_error",
exchange: "job_response",
prefetch_count: 1,
consumer: &ErrorConsumer.consume/4
}
@doc """
Consume message with error topic, update Job and send a notification.
"""
def consume(channel, tag, _redelivered, %{"job_id" => job_id, "error" => description} = payload) do
if Checker.repo_running?() do
case Jobs.get_job(job_id) do
nil ->
Basic.reject(channel, tag, requeue: false)
job ->
Logger.error("Job error #{inspect(payload)}")
case Status.set_job_status(job_id, :error, %{message: description}) do
{:ok, job_status} ->
JobInstrumenter.inc(:step_flow_jobs_error, job.name)
WorkflowInstrumenter.inc(:step_flow_workflows_error, job.workflow_id)
Workflows.notification_from_job(job_id, description)
if job.allow_failure do
Logger.warn("Allowing failure for job #{job_id}")
StepManager.check_step_status(%{job_id: job_id})
else
Workflows.Status.define_workflow_status(job.workflow_id, :job_error, job_status)
end
if job.is_live do
restart_workflow(job)
end
Basic.ack(channel, tag)
{:error, message} ->
Logger.error("Cannot set job status: #{inspect(message)}")
Basic.reject(channel, tag, requeue: true)
end
end
else
Logger.warn(
"#{__MODULE__}: The database is not available, reject and requeue consumed message..."
)
Basic.reject(channel, tag, requeue: true)
end
end
def consume(
channel,
tag,
_redelivered,
%{
"job_id" => job_id,
"parameters" => [%{"id" => "message", "type" => "string", "value" => description}],
"status" => "error"
} = payload
) do
if Checker.repo_running?() do
case Jobs.get_job(job_id) do
nil ->
Basic.reject(channel, tag, requeue: false)
job ->
Logger.error("Job error #{inspect(payload)}")
JobInstrumenter.inc(:step_flow_jobs_error, job.name)
WorkflowInstrumenter.inc(:step_flow_workflows_error, job.workflow_id)
case Status.set_job_status(job_id, :error, %{message: description}) do
{:ok, job_status} ->
if job.allow_failure do
Logger.warn("Allowing failure for job #{job_id}")
Workflows.notification_from_job(job_id, description)
StepManager.check_step_status(%{job_id: job_id})
else
Workflows.Status.define_workflow_status(job.workflow_id, :job_error, job_status)
Workflows.notification_from_job(job_id, description)
end
if job.is_live do
restart_workflow(job)
end
Basic.ack(channel, tag)
{:error, message} ->
Logger.error("Cannot set job status: #{inspect(message)}")
Basic.reject(channel, tag, requeue: true)
end
end
else
Logger.warn(
"#{__MODULE__}: The database is not available, reject and requeue consumed message..."
)
Basic.reject(channel, tag, requeue: true)
end
end
def consume(channel, tag, _redelivered, payload) do
Logger.error("Job error #{inspect(payload)}")
Basic.reject(channel, tag, requeue: false)
end
defp restart_workflow(job) do
workflow =
job
|> Map.get(:workflow_id)
|> Workflows.get_workflow!()
workflow_jobs = Repo.preload(workflow, [:jobs]).jobs
workflow_jobs
|> Live.stop_jobs()
{:ok, restarted_workflow} =
Workflows.get_attr(workflow)
|> Workflows.create_workflow()
WorkflowInstrumenter.inc(:step_flow_workflows_created, restarted_workflow.identifier)
Workflows.Status.define_workflow_status(restarted_workflow.id, :created_workflow)
Step.start_next(restarted_workflow)
StepFlow.Notification.send("new_workflow", %{workflow_id: restarted_workflow.id})
Logger.info("Workflow #{workflow.id} successfully restarted as #{restarted_workflow.id}.")
end
end