defmodule StepFlow.Amqp.ErrorConsumer do
@moduledoc """
Consumer of all job with error status.
"""
require Logger
alias StepFlow.Amqp.ErrorConsumer
alias StepFlow.Jobs
alias StepFlow.Jobs.Status
alias StepFlow.NotificationHooks.NotificationHookManager
alias StepFlow.Repo
alias StepFlow.Repo.Checker
alias StepFlow.Statistics.JobsDurations
alias StepFlow.Step
alias StepFlow.Step.Live
alias StepFlow.Workflows
alias StepFlow.Workflows.StepManager
use StepFlow.Amqp.CommonConsumer, %{
queue: "job_error",
exchange: "job_response",
prefetch_count: 1,
consumer: &ErrorConsumer.consume/4
}
@doc """
Consume message with error topic, update Job and send a notification.
"""
def consume(channel, tag, _redelivered, %{"job_id" => job_id, "error" => description} = payload) do
consume_error_message(channel, tag, payload, job_id, description)
end
def consume(
channel,
tag,
_redelivered,
%{
"job_id" => job_id,
"parameters" => [%{"id" => "message", "type" => "string", "value" => description}],
"status" => "error"
} = payload
) do
consume_error_message(channel, tag, payload, job_id, description)
end
def consume(channel, tag, _redelivered, payload) do
Logger.error("Job error #{inspect(payload)}")
Basic.reject(channel, tag, requeue: false)
end
def consume_error_message(channel, tag, payload, job_id, description) do
if Checker.repo_running?() do
case Jobs.get_job(job_id) do
nil ->
Basic.nack(channel, tag, requeue: false)
job ->
Logger.error("#{__MODULE__}: Job error #{inspect(payload)}")
case Status.set_job_status(
job_id,
:error,
%{message: description},
Map.get(payload, "datetime")
) do
{:ok, job_status} ->
JobsDurations.set_job_durations(job_id)
NotificationHookManager.notification_from_job(
job_id,
description
)
if job.allow_failure do
Logger.warn("Allowing failure for job #{job_id}")
StepManager.check_step_status(%{job_id: job_id})
else
Workflows.Status.define_workflow_status(job.workflow_id, :job_error, job_status)
NotificationHookManager.manage_notification_status(
job.workflow_id,
job,
"error"
)
NotificationHookManager.notification_from_job(
job_id,
description
)
publish_parent_job_error(job, description)
end
if job.is_live do
restart_workflow(job)
end
Basic.ack(channel, tag)
{:error, message} ->
Logger.error("Cannot set job status: #{inspect(message)}")
Basic.nack(channel, tag, requeue: false)
end
end
else
Logger.warn(
"#{__MODULE__}: The database is not available, reject and requeue consumed message..."
)
Basic.reject(channel, tag, requeue: true)
end
end
def publish_parent_job_error(job, description) do
workflow =
job
|> Map.get(:workflow_id)
|> Workflows.get_workflow!()
if workflow.parent_id != nil do
CommonEmitter.publish_json(
"job_error",
0,
%{
job_id: workflow.parent_id,
status: "error",
error: "[Child workflow] #{description}"
},
"job_response"
)
end
end
defp restart_workflow(job) do
workflow =
job
|> Map.get(:workflow_id)
|> Workflows.get_workflow!()
workflow_jobs = Repo.preload(workflow, [:jobs]).jobs
workflow_jobs
|> Live.stop_jobs()
case Live.delete_worker_from_job(job) do
{:error, "unable to publish message"} ->
Logger.error("Cannot delete worker linked to job #{job.id} in error.")
_ ->
Logger.info("Deleted worker linked to job #{job.id} in error.")
end
{:ok, restarted_workflow} =
StepFlow.Controllers.Workflows.get_attr(workflow)
|> Workflows.create_workflow()
Workflows.Status.define_workflow_status(restarted_workflow.id, :created_workflow)
Step.start_next(restarted_workflow)
StepFlow.Notification.send("new_workflow", %{workflow_id: restarted_workflow.id})
Logger.info("Workflow #{workflow.id} successfully restarted as #{restarted_workflow.id}.")
end
end