lib/step_flow/amqp/stopped_consumer.ex

defmodule StepFlow.Amqp.StoppedConsumer do
  @moduledoc """
  Consumer of all job with stopped status.
  """

  require Logger

  alias StepFlow.{
    Amqp.StoppedConsumer,
    Jobs,
    Jobs.Status,
    Metrics.JobInstrumenter,
    NotificationHooks.NotificationHookManager,
    Repo.Checker,
    Statistics.JobsDurations,
    Workflows,
    Workflows.StepManager
  }

  use StepFlow.Amqp.CommonConsumer, %{
    queue: "job_stopped",
    exchange: "job_response",
    prefetch_count: 1,
    consumer: &StoppedConsumer.consume/4
  }

  @doc """
  Consume messages with stopped topic, update Job status and continue the workflow.
  """
  def consume(
        channel,
        tag,
        _redelivered,
        %{
          "job_id" => job_id,
          "datetime" => datetime,
          "status" => status
        } = _payload
      ) do
    if Checker.repo_running?() do
      case Jobs.get_job(job_id) do
        nil ->
          Basic.nack(channel, tag, requeue: false)

        job ->
          JobInstrumenter.inc(:step_flow_jobs_status_total, job.name, "stopped")

          case Status.set_job_status(job_id, status, %{}, datetime) do
            {:ok, job_status} ->
              Workflows.Status.define_workflow_status(job.workflow_id, :job_stopped, job_status)

              JobsDurations.set_job_durations(job_id)

              NotificationHookManager.notification_from_job(job_id)
              StepManager.check_step_status(%{job_id: job_id})

              Basic.ack(channel, tag)

            {:error, message} ->
              Logger.error("Cannot set job status: #{inspect(message)}")
              Basic.nack(channel, tag, requeue: false)
          end
      end
    else
      Logger.warn(
        "#{__MODULE__}: The database is not available, reject and requeue consumed message..."
      )

      Basic.reject(channel, tag, requeue: true)
    end
  end

  def consume(channel, tag, _redelivered, payload) do
    Logger.error("Job stopped #{inspect(payload)}")
    Basic.reject(channel, tag, requeue: false)
  end
end