lib/step_flow/amqp/error_consumer.ex

defmodule StepFlow.Amqp.ErrorConsumer do
  @moduledoc """
  Consumer of all job with error status.
  """

  require Logger

  alias StepFlow.Amqp.ErrorConsumer
  alias StepFlow.Jobs
  alias StepFlow.Jobs.Status
  alias StepFlow.Metrics.{JobInstrumenter, WorkflowInstrumenter}
  alias StepFlow.NotificationHooks.NotificationHookManager
  alias StepFlow.Repo
  alias StepFlow.Repo.Checker
  alias StepFlow.Statistics.JobsDurations
  alias StepFlow.Step
  alias StepFlow.Step.Live
  alias StepFlow.Workflows
  alias StepFlow.Workflows.StepManager

  use StepFlow.Amqp.CommonConsumer, %{
    queue: "job_error",
    exchange: "job_response",
    prefetch_count: 1,
    consumer: &ErrorConsumer.consume/4
  }

  @doc """
  Consume message with error topic, update Job and send a notification.
  """
  def consume(channel, tag, _redelivered, %{"job_id" => job_id, "error" => description} = payload) do
    consume_error_message(channel, tag, payload, job_id, description)
  end

  def consume(
        channel,
        tag,
        _redelivered,
        %{
          "job_id" => job_id,
          "parameters" => [%{"id" => "message", "type" => "string", "value" => description}],
          "status" => "error"
        } = payload
      ) do
    consume_error_message(channel, tag, payload, job_id, description)
  end

  def consume(channel, tag, _redelivered, payload) do
    Logger.error("Job error #{inspect(payload)}")
    Basic.reject(channel, tag, requeue: false)
  end

  def consume_error_message(channel, tag, payload, job_id, description) do
    if Checker.repo_running?() do
      case Jobs.get_job(job_id) do
        nil ->
          Basic.nack(channel, tag, requeue: false)

        job ->
          Logger.error("#{__MODULE__}: Job error #{inspect(payload)}")
          JobInstrumenter.inc(:step_flow_jobs_error, job.name)
          WorkflowInstrumenter.inc(:step_flow_workflows_error, job.workflow_id)

          case Status.set_job_status(job_id, :error, %{message: description}) do
            {:ok, job_status} ->
              JobInstrumenter.inc(:step_flow_jobs_error, job.name)
              WorkflowInstrumenter.inc(:step_flow_workflows_error, job.workflow_id)

              JobsDurations.set_job_durations(job_id)

              NotificationHookManager.notification_from_job(
                job_id,
                description
              )

              if job.allow_failure do
                Logger.warn("Allowing failure for job #{job_id}")
                StepManager.check_step_status(%{job_id: job_id})
              else
                Workflows.Status.define_workflow_status(job.workflow_id, :job_error, job_status)

                NotificationHookManager.manage_notification_status(
                  job.workflow_id,
                  job,
                  "error"
                )

                NotificationHookManager.notification_from_job(
                  job_id,
                  description
                )

                publish_parent_job_error(job, description)
              end

              if job.is_live do
                restart_workflow(job)
              end

              Basic.ack(channel, tag)

            {:error, message} ->
              Logger.error("Cannot set job status: #{inspect(message)}")
              Basic.nack(channel, tag, requeue: false)
          end
      end
    else
      Logger.warn(
        "#{__MODULE__}: The database is not available, reject and requeue consumed message..."
      )

      Basic.reject(channel, tag, requeue: true)
    end
  end

  def publish_parent_job_error(job, description) do
    workflow =
      job
      |> Map.get(:workflow_id)
      |> Workflows.get_workflow!()

    if workflow.parent_id != nil do
      CommonEmitter.publish_json(
        "job_error",
        0,
        %{
          job_id: workflow.parent_id,
          status: "error",
          error: "[Child workflow] #{description}"
        },
        "job_response"
      )
    end
  end

  defp restart_workflow(job) do
    workflow =
      job
      |> Map.get(:workflow_id)
      |> Workflows.get_workflow!()

    workflow_jobs = Repo.preload(workflow, [:jobs]).jobs

    workflow_jobs
    |> Live.stop_jobs()

    case Live.delete_worker_from_job(job) do
      {:error, "unable to publish message"} ->
        Logger.error("Cannot delete worker linked to job #{job.id} in error.")

      _ ->
        Logger.info("Deleted worker linked to job #{job.id} in error.")
    end

    {:ok, restarted_workflow} =
      StepFlow.Controllers.Workflows.get_attr(workflow)
      |> Workflows.create_workflow()

    WorkflowInstrumenter.inc(:step_flow_workflows_created, restarted_workflow.identifier)
    Workflows.Status.define_workflow_status(restarted_workflow.id, :created_workflow)
    Step.start_next(restarted_workflow)

    StepFlow.Notification.send("new_workflow", %{workflow_id: restarted_workflow.id})

    Logger.info("Workflow #{workflow.id} successfully restarted as #{restarted_workflow.id}.")
  end
end