lib/step_flow/amqp/error_consumer.ex

defmodule StepFlow.Amqp.ErrorConsumer do
  @moduledoc """
  Consumer of all job with error status.
  """

  require Logger

  alias StepFlow.Amqp.ErrorConsumer
  alias StepFlow.Jobs
  alias StepFlow.Jobs.Status
  alias StepFlow.NotificationHooks.NotificationHookManager
  alias StepFlow.Repo
  alias StepFlow.Repo.Checker
  alias StepFlow.Statistics.JobsDurations
  alias StepFlow.Step
  alias StepFlow.Step.Live
  alias StepFlow.Workflows
  alias StepFlow.Workflows.StepManager

  use StepFlow.Amqp.CommonConsumer, %{
    queue: "job_error",
    exchange: "job_response",
    prefetch_count: 1,
    consumer: &ErrorConsumer.consume/4
  }

  @doc """
  Consume message with error topic, update Job and send a notification.
  """
  def consume(channel, tag, _redelivered, %{"job_id" => job_id, "error" => description} = payload) do
    consume_error_message(channel, tag, payload, job_id, description)
  end

  def consume(
        channel,
        tag,
        _redelivered,
        %{
          "job_id" => job_id,
          "parameters" => [%{"id" => "message", "type" => "string", "value" => description}],
          "status" => "error"
        } = payload
      ) do
    consume_error_message(channel, tag, payload, job_id, description)
  end

  def consume(channel, tag, _redelivered, payload) do
    Logger.error("Job error #{inspect(payload)}")
    Basic.reject(channel, tag, requeue: false)
  end

  def consume_error_message(channel, tag, payload, job_id, description) do
    if Checker.repo_running?() do
      case Jobs.get_job(job_id) do
        nil ->
          Basic.nack(channel, tag, requeue: false)

        job ->
          Logger.error("#{__MODULE__}: Job error #{inspect(payload)}")

          case Status.set_job_status(
                 job_id,
                 :error,
                 %{message: description},
                 Map.get(payload, "datetime")
               ) do
            {:ok, job_status} ->
              JobsDurations.set_job_durations(job_id)

              NotificationHookManager.manage_notification_status(
                job.workflow_id,
                job,
                "error"
              )

              NotificationHookManager.notification_from_job(
                job_id,
                description
              )

              if job.allow_failure do
                Logger.warn("Allowing failure for job #{job_id}")
                StepManager.check_step_status(%{job_id: job_id})
              else
                Workflows.Status.define_workflow_status(job.workflow_id, :job_error, job_status)

                publish_parent_job_error(job, description)
              end

              if job.is_live do
                workflow =
                  job
                  |> Map.get(:workflow_id)
                  |> Workflows.get_workflow!()

                workflow_jobs = Repo.preload(workflow, [:jobs]).jobs

                workflow_jobs
                |> Live.stop_jobs()

                if StepFlow.Configuration.get_automatic_restart_live() do
                  restart_live_workflow(workflow)
                end
              end

              Basic.ack(channel, tag)

            {:error, message} ->
              Logger.error("Cannot set job status: #{inspect(message)}")
              Basic.nack(channel, tag, requeue: false)
          end
      end
    else
      Logger.warn(
        "#{__MODULE__}: The database is not available, reject and requeue consumed message..."
      )

      Basic.reject(channel, tag, requeue: true)
    end
  end

  def publish_parent_job_error(job, description) do
    workflow =
      job
      |> Map.get(:workflow_id)
      |> Workflows.get_workflow!()

    if workflow.parent_id != nil do
      CommonEmitter.publish_json(
        "job_error",
        0,
        %{
          job_id: workflow.parent_id,
          status: "error",
          error: "[Child workflow] #{description}"
        },
        "job_response"
      )
    end
  end

  defp restart_live_workflow(workflow) do
    {:ok, restarted_workflow} =
      StepFlow.Controllers.Workflows.get_attr(workflow)
      |> Workflows.create_workflow()

    Workflows.Status.define_workflow_status(restarted_workflow.id, :created_workflow)
    Step.start_next(restarted_workflow)

    StepFlow.Notification.send("new_workflow", %{workflow_id: restarted_workflow.id})

    Logger.info("Workflow #{workflow.id} successfully restarted as #{restarted_workflow.id}.")
  end
end