lib/step_flow/amqp/error_consumer.ex

defmodule StepFlow.Amqp.ErrorConsumer do
  @moduledoc """
  Consumer of all job with error status.
  """

  require Logger

  alias StepFlow.Amqp.ErrorConsumer
  alias StepFlow.Jobs
  alias StepFlow.Jobs.Status
  alias StepFlow.NotificationHooks.NotificationHookManager
  alias StepFlow.Repo
  alias StepFlow.Repo.Checker
  alias StepFlow.Statistics.JobsDurations
  alias StepFlow.Step
  alias StepFlow.Step.Live
  alias StepFlow.Workflows
  alias StepFlow.Workflows.StepManager

  use StepFlow.Amqp.CommonConsumer, %{
    queue: "job_error",
    exchange: "job_response",
    prefetch_count: 1,
    consumer: &ErrorConsumer.consume/4
  }

  @doc """
  Consume message with error topic, update Job and send a notification.
  """
  def consume(channel, tag, _redelivered, %{"job_id" => job_id, "error" => description} = payload) do
    consume_error_message(channel, tag, payload, job_id, description)
  end

  def consume(
        channel,
        tag,
        _redelivered,
        %{
          "job_id" => job_id,
          "parameters" => [%{"id" => "message", "type" => "string", "value" => description}],
          "status" => "error"
        } = payload
      ) do
    consume_error_message(channel, tag, payload, job_id, description)
  end

  def consume(channel, tag, _redelivered, payload) do
    Logger.error("Job error #{inspect(payload)}")
    Basic.reject(channel, tag, requeue: false)
  end

  def consume_error_message(channel, tag, payload, job_id, description) do
    if Checker.repo_running?() do
      case Jobs.get_job(job_id) do
        nil ->
          Basic.nack(channel, tag, requeue: false)

        job ->
          Logger.error("#{__MODULE__}: Job error #{inspect(payload)}")

          case Status.set_job_status(
                 job_id,
                 :error,
                 %{message: description},
                 Map.get(payload, "datetime")
               ) do
            {:ok, job_status} ->
              JobsDurations.set_job_durations(job_id)

              NotificationHookManager.notification_from_job(
                job_id,
                description
              )

              if job.allow_failure do
                Logger.warn("Allowing failure for job #{job_id}")
                StepManager.check_step_status(%{job_id: job_id})
              else
                Workflows.Status.define_workflow_status(job.workflow_id, :job_error, job_status)

                NotificationHookManager.manage_notification_status(
                  job.workflow_id,
                  job,
                  "error"
                )

                NotificationHookManager.notification_from_job(
                  job_id,
                  description
                )

                publish_parent_job_error(job, description)
              end

              if job.is_live do
                restart_workflow(job)
              end

              Basic.ack(channel, tag)

            {:error, message} ->
              Logger.error("Cannot set job status: #{inspect(message)}")
              Basic.nack(channel, tag, requeue: false)
          end
      end
    else
      Logger.warn(
        "#{__MODULE__}: The database is not available, reject and requeue consumed message..."
      )

      Basic.reject(channel, tag, requeue: true)
    end
  end

  def publish_parent_job_error(job, description) do
    workflow =
      job
      |> Map.get(:workflow_id)
      |> Workflows.get_workflow!()

    if workflow.parent_id != nil do
      CommonEmitter.publish_json(
        "job_error",
        0,
        %{
          job_id: workflow.parent_id,
          status: "error",
          error: "[Child workflow] #{description}"
        },
        "job_response"
      )
    end
  end

  defp restart_workflow(job) do
    workflow =
      job
      |> Map.get(:workflow_id)
      |> Workflows.get_workflow!()

    workflow_jobs = Repo.preload(workflow, [:jobs]).jobs

    workflow_jobs
    |> Live.stop_jobs()

    case Live.delete_worker_from_job(job) do
      {:error, "unable to publish message"} ->
        Logger.error("Cannot delete worker linked to job #{job.id} in error.")

      _ ->
        Logger.info("Deleted worker linked to job #{job.id} in error.")
    end

    {:ok, restarted_workflow} =
      StepFlow.Controllers.Workflows.get_attr(workflow)
      |> Workflows.create_workflow()

    Workflows.Status.define_workflow_status(restarted_workflow.id, :created_workflow)
    Step.start_next(restarted_workflow)

    StepFlow.Notification.send("new_workflow", %{workflow_id: restarted_workflow.id})

    Logger.info("Workflow #{workflow.id} successfully restarted as #{restarted_workflow.id}.")
  end
end