defmodule StepFlow.Amqp.ErrorConsumer do
@moduledoc """
Consumer of all job with error status.
"""
require Logger
alias StepFlow.Amqp.ErrorConsumer
alias StepFlow.Jobs
alias StepFlow.Jobs.Status
alias StepFlow.NotificationHooks.NotificationHookManager
alias StepFlow.Repo
alias StepFlow.Repo.Checker
alias StepFlow.Statistics.JobsDurations
alias StepFlow.Step
alias StepFlow.Step.Live
alias StepFlow.Workflows
alias StepFlow.Workflows.StepManager
use StepFlow.Amqp.CommonConsumer, %{
queue: "job_error",
exchange: "job_response",
prefetch_count: 1,
consumer: &ErrorConsumer.consume/4
}
@doc """
Consume message with error topic, update Job and send a notification.
"""
def consume(channel, tag, _redelivered, %{"job_id" => job_id, "error" => description} = payload) do
consume_error_message(channel, tag, payload, job_id, description)
end
def consume(
channel,
tag,
_redelivered,
%{
"job_id" => job_id,
"parameters" => [%{"id" => "message", "type" => "string", "value" => description}],
"status" => "error"
} = payload
) do
consume_error_message(channel, tag, payload, job_id, description)
end
def consume(channel, tag, _redelivered, payload) do
Logger.error("Job error #{inspect(payload)}")
Basic.reject(channel, tag, requeue: false)
end
def consume_error_message(channel, tag, payload, job_id, description) do
if Checker.repo_running?() do
case Jobs.get_job(job_id) do
nil ->
Basic.nack(channel, tag, requeue: false)
job ->
Logger.error("#{__MODULE__}: Job error #{inspect(payload)}")
case Status.set_job_status(
job_id,
:error,
%{message: description},
Map.get(payload, "datetime")
) do
{:ok, job_status} ->
JobsDurations.set_job_durations(job_id)
NotificationHookManager.manage_notification_status(
job.workflow_id,
job,
"error"
)
NotificationHookManager.notification_from_job(
job_id,
description
)
if job.allow_failure do
Logger.warn("Allowing failure for job #{job_id}")
StepManager.check_step_status(%{job_id: job_id})
else
Workflows.Status.define_workflow_status(job.workflow_id, :job_error, job_status)
publish_parent_job_error(job, description)
end
if job.is_live do
workflow =
job
|> Map.get(:workflow_id)
|> Workflows.get_workflow!()
workflow_jobs = Repo.preload(workflow, [:jobs]).jobs
workflow_jobs
|> Live.stop_jobs()
if StepFlow.Configuration.get_automatic_restart_live() do
restart_live_workflow(workflow)
end
end
Basic.ack(channel, tag)
{:error, message} ->
Logger.error("Cannot set job status: #{inspect(message)}")
Basic.nack(channel, tag, requeue: false)
end
end
else
Logger.warn(
"#{__MODULE__}: The database is not available, reject and requeue consumed message..."
)
Basic.reject(channel, tag, requeue: true)
end
end
def publish_parent_job_error(job, description) do
workflow =
job
|> Map.get(:workflow_id)
|> Workflows.get_workflow!()
if workflow.parent_id != nil do
CommonEmitter.publish_json(
"job_error",
0,
%{
job_id: workflow.parent_id,
status: "error",
error: "[Child workflow] #{description}"
},
"job_response"
)
end
end
defp restart_live_workflow(workflow) do
{:ok, restarted_workflow} =
StepFlow.Controllers.Workflows.get_attr(workflow)
|> Workflows.create_workflow()
Workflows.Status.define_workflow_status(restarted_workflow.id, :created_workflow)
Step.start_next(restarted_workflow)
StepFlow.Notification.send("new_workflow", %{workflow_id: restarted_workflow.id})
Logger.info("Workflow #{workflow.id} successfully restarted as #{restarted_workflow.id}.")
end
end