defmodule StepFlow.Amqp.ErrorConsumer do
@moduledoc """
Consumer of all job with error status.
"""
require Logger
alias StepFlow.Amqp.ErrorConsumer
alias StepFlow.Jobs
alias StepFlow.Jobs.Status
alias StepFlow.Metrics.{JobInstrumenter, WorkflowInstrumenter}
alias StepFlow.NotificationHooks.NotificationHookManager
alias StepFlow.Repo
alias StepFlow.Repo.Checker
alias StepFlow.Statistics.JobsDurations
alias StepFlow.Step
alias StepFlow.Step.Live
alias StepFlow.Workflows
alias StepFlow.Workflows.StepManager
use StepFlow.Amqp.CommonConsumer, %{
queue: "job_error",
exchange: "job_response",
prefetch_count: 1,
consumer: &ErrorConsumer.consume/4
}
@doc """
Consume message with error topic, update Job and send a notification.
"""
def consume(channel, tag, _redelivered, %{"job_id" => job_id, "error" => description} = payload) do
consume_error_message(channel, tag, payload, job_id, description)
end
def consume(
channel,
tag,
_redelivered,
%{
"job_id" => job_id,
"parameters" => [%{"id" => "message", "type" => "string", "value" => description}],
"status" => "error"
} = payload
) do
consume_error_message(channel, tag, payload, job_id, description)
end
def consume(channel, tag, _redelivered, payload) do
Logger.error("Job error #{inspect(payload)}")
Basic.reject(channel, tag, requeue: false)
end
def consume_error_message(channel, tag, payload, job_id, description) do
if Checker.repo_running?() do
case Jobs.get_job(job_id) do
nil ->
Basic.nack(channel, tag, requeue: false)
job ->
Logger.error("#{__MODULE__}: Job error #{inspect(payload)}")
JobInstrumenter.inc(:step_flow_jobs_error, job.name)
WorkflowInstrumenter.inc(:step_flow_workflows_error, job.workflow_id)
case Status.set_job_status(job_id, :error, %{message: description}) do
{:ok, job_status} ->
JobInstrumenter.inc(:step_flow_jobs_error, job.name)
WorkflowInstrumenter.inc(:step_flow_workflows_error, job.workflow_id)
JobsDurations.set_job_durations(job_id)
NotificationHookManager.notification_from_job(
job_id,
description
)
if job.allow_failure do
Logger.warn("Allowing failure for job #{job_id}")
StepManager.check_step_status(%{job_id: job_id})
else
Workflows.Status.define_workflow_status(job.workflow_id, :job_error, job_status)
NotificationHookManager.manage_notification_status(
job.workflow_id,
job,
"error"
)
NotificationHookManager.notification_from_job(
job_id,
description
)
publish_parent_job_error(job, description)
end
if job.is_live do
restart_workflow(job)
end
Basic.ack(channel, tag)
{:error, message} ->
Logger.error("Cannot set job status: #{inspect(message)}")
Basic.nack(channel, tag, requeue: false)
end
end
else
Logger.warn(
"#{__MODULE__}: The database is not available, reject and requeue consumed message..."
)
Basic.reject(channel, tag, requeue: true)
end
end
def publish_parent_job_error(job, description) do
workflow =
job
|> Map.get(:workflow_id)
|> Workflows.get_workflow!()
if workflow.parent_id != nil do
CommonEmitter.publish_json(
"job_error",
0,
%{
job_id: workflow.parent_id,
status: "error",
error: "[Child workflow] #{description}"
},
"job_response"
)
end
end
defp restart_workflow(job) do
workflow =
job
|> Map.get(:workflow_id)
|> Workflows.get_workflow!()
workflow_jobs = Repo.preload(workflow, [:jobs]).jobs
workflow_jobs
|> Live.stop_jobs()
case Live.delete_worker_from_job(job) do
{:error, "unable to publish message"} ->
Logger.error("Cannot delete worker linked to job #{job.id} in error.")
_ ->
Logger.info("Deleted worker linked to job #{job.id} in error.")
end
{:ok, restarted_workflow} =
StepFlow.Controllers.Workflows.get_attr(workflow)
|> Workflows.create_workflow()
WorkflowInstrumenter.inc(:step_flow_workflows_created, restarted_workflow.identifier)
Workflows.Status.define_workflow_status(restarted_workflow.id, :created_workflow)
Step.start_next(restarted_workflow)
StepFlow.Notification.send("new_workflow", %{workflow_id: restarted_workflow.id})
Logger.info("Workflow #{workflow.id} successfully restarted as #{restarted_workflow.id}.")
end
end