lib/step_flow/amqp/progression_consumer.ex

defmodule StepFlow.Amqp.ProgressionConsumer do
  @moduledoc """
  Consumer of all progression jobs.
  """

  require Logger
  alias StepFlow.Amqp.ProgressionConsumer
  alias StepFlow.Jobs
  alias StepFlow.Jobs.Status
  alias StepFlow.Progressions
  alias StepFlow.Repo
  alias StepFlow.Repo.Checker
  alias StepFlow.Workers.WorkerStatuses
  alias StepFlow.Workflows

  use StepFlow.Amqp.CommonConsumer, %{
    queue: "job_progression",
    exchange: "job_response",
    prefetch_count: 1,
    consumer: &ProgressionConsumer.consume/4
  }

  @doc """
  Consume message with job progression and save it in database.
  """
  def consume(
        channel,
        tag,
        _redelivered,
        %{
          "job_id" => job_id
        } = payload
      ) do
    if Checker.repo_running?() do
      case Jobs.get_job(job_id) do
        nil ->
          Basic.reject(channel, tag, requeue: false)

        job ->
          job = Repo.preload(Jobs.get_job(job.id), [:status])
          last_status = Status.get_last_status(job.status)

          case last_status do
            nil ->
              add_progression(channel, tag, job, payload)

            _ ->
              error_status_check(channel, tag, job, payload, last_status)
          end
      end
    else
      Logger.warn(
        "#{__MODULE__}: The database is not available, reject and requeue consumed message..."
      )

      Basic.reject(channel, tag, requeue: true)
    end
  end

  def consume(channel, tag, _redelivered, payload) do
    Logger.error("Job progression #{inspect(payload)}")
    Basic.reject(channel, tag, requeue: false)
  end

  def error_status_check(channel, tag, job, payload, last_status) do
    if last_status.state == :error do
      Logger.warn("Progression arrived after job #{job.id} was put in error, ignoring.")
      Basic.ack(channel, tag)
    else
      add_progression(channel, tag, job, payload)
    end
  end

  def add_progression(channel, tag, job, payload) do
    if job.is_live do
      # Worker Manager progression response
      Basic.ack(channel, tag)
    else
      {_, progression} = Progressions.create_progression(payload)

      if progression.progression == 0 do
        case Status.set_job_status(job.id, "processing") do
          {:ok, _status} ->
            worker_status = WorkerStatuses.create_worker_status!(progression)

            Logger.debug(
              "[#{__MODULE__}] Added worker status from first progression: #{inspect(worker_status)}"
            )

            Workflows.Status.define_workflow_status(
              job.workflow_id,
              :job_progression,
              progression
            )

            Workflows.notification_from_job(job.id)
            Basic.ack(channel, tag)

          {:error, message} ->
            Logger.error("Cannot set job status: #{inspect(message)}")
            Basic.reject(channel, tag, requeue: true)
        end
      else
        Workflows.Status.define_workflow_status(job.workflow_id, :job_progression, progression)
        Workflows.notification_from_job(job.id)
        Basic.ack(channel, tag)
      end
    end
  end
end