lib/step_flow/amqp/error_consumer.ex

defmodule StepFlow.Amqp.ErrorConsumer do
  @moduledoc """
  Consumer of all job with error status.
  """

  require Logger

  alias StepFlow.Amqp.ErrorConsumer
  alias StepFlow.Jobs
  alias StepFlow.Jobs.Status
  alias StepFlow.Metrics.{JobInstrumenter, WorkflowInstrumenter}
  alias StepFlow.Repo
  alias StepFlow.Repo.Checker
  alias StepFlow.Step
  alias StepFlow.Step.Live
  alias StepFlow.Workflows
  alias StepFlow.Workflows.StepManager

  use StepFlow.Amqp.CommonConsumer, %{
    queue: "job_error",
    exchange: "job_response",
    prefetch_count: 1,
    consumer: &ErrorConsumer.consume/4
  }

  @doc """
  Consume message with error topic, update Job and send a notification.
  """
  def consume(channel, tag, _redelivered, %{"job_id" => job_id, "error" => description} = payload) do
    if Checker.repo_running?() do
      case Jobs.get_job(job_id) do
        nil ->
          Basic.reject(channel, tag, requeue: false)

        job ->
          Logger.error("Job error #{inspect(payload)}")

          case Status.set_job_status(job_id, :error, %{message: description}) do
            {:ok, job_status} ->
              JobInstrumenter.inc(:step_flow_jobs_error, job.name)
              WorkflowInstrumenter.inc(:step_flow_workflows_error, job.workflow_id)

              Workflows.notification_from_job(job_id, description)

              if job.allow_failure do
                Logger.warn("Allowing failure for job #{job_id}")
                StepManager.check_step_status(%{job_id: job_id})
              else
                Workflows.Status.define_workflow_status(job.workflow_id, :job_error, job_status)
              end

              if job.is_live do
                restart_workflow(job)
              end

              Basic.ack(channel, tag)

            {:error, message} ->
              Logger.error("Cannot set job status: #{inspect(message)}")
              Basic.reject(channel, tag, requeue: true)
          end
      end
    else
      Logger.warn(
        "#{__MODULE__}: The database is not available, reject and requeue consumed message..."
      )

      Basic.reject(channel, tag, requeue: true)
    end
  end

  def consume(
        channel,
        tag,
        _redelivered,
        %{
          "job_id" => job_id,
          "parameters" => [%{"id" => "message", "type" => "string", "value" => description}],
          "status" => "error"
        } = payload
      ) do
    if Checker.repo_running?() do
      case Jobs.get_job(job_id) do
        nil ->
          Basic.reject(channel, tag, requeue: false)

        job ->
          Logger.error("Job error #{inspect(payload)}")
          JobInstrumenter.inc(:step_flow_jobs_error, job.name)
          WorkflowInstrumenter.inc(:step_flow_workflows_error, job.workflow_id)

          case Status.set_job_status(job_id, :error, %{message: description}) do
            {:ok, job_status} ->
              if job.allow_failure do
                Logger.warn("Allowing failure for job #{job_id}")
                Workflows.notification_from_job(job_id, description)
                StepManager.check_step_status(%{job_id: job_id})
              else
                Workflows.Status.define_workflow_status(job.workflow_id, :job_error, job_status)
                Workflows.notification_from_job(job_id, description)
              end

              if job.is_live do
                restart_workflow(job)
              end

              Basic.ack(channel, tag)

            {:error, message} ->
              Logger.error("Cannot set job status: #{inspect(message)}")
              Basic.reject(channel, tag, requeue: true)
          end
      end
    else
      Logger.warn(
        "#{__MODULE__}: The database is not available, reject and requeue consumed message..."
      )

      Basic.reject(channel, tag, requeue: true)
    end
  end

  def consume(channel, tag, _redelivered, payload) do
    Logger.error("Job error #{inspect(payload)}")
    Basic.reject(channel, tag, requeue: false)
  end

  defp restart_workflow(job) do
    workflow =
      job
      |> Map.get(:workflow_id)
      |> Workflows.get_workflow!()

    workflow_jobs = Repo.preload(workflow, [:jobs]).jobs

    workflow_jobs
    |> Live.stop_jobs()

    {:ok, restarted_workflow} =
      Workflows.get_attr(workflow)
      |> Workflows.create_workflow()

    WorkflowInstrumenter.inc(:step_flow_workflows_created, restarted_workflow.identifier)
    Workflows.Status.define_workflow_status(restarted_workflow.id, :created_workflow)
    Step.start_next(restarted_workflow)

    StepFlow.Notification.send("new_workflow", %{workflow_id: restarted_workflow.id})

    Logger.info("Workflow #{workflow.id} successfully restarted as #{restarted_workflow.id}.")
  end
end