lib/step_flow/amqp/progression_consumer.ex

defmodule StepFlow.Amqp.ProgressionConsumer do
  @moduledoc """
  Consumer of all progression jobs.
  """

  require Logger
  alias StepFlow.Amqp.ProgressionConsumer
  alias StepFlow.Jobs
  alias StepFlow.Jobs.Status
  alias StepFlow.NotificationHooks.NotificationHookManager
  alias StepFlow.Progressions
  alias StepFlow.Repo
  alias StepFlow.Repo.Checker
  alias StepFlow.Statistics.JobsDurations
  alias StepFlow.Workers.WorkerStatuses
  alias StepFlow.Workflows

  use StepFlow.Amqp.CommonConsumer, %{
    queue: "job_progression",
    exchange: "job_response",
    prefetch_count: 1,
    consumer: &ProgressionConsumer.consume/4
  }

  @doc """
  Consume message with job progression and save it in database.
  """
  def consume(
        channel,
        tag,
        _redelivered,
        %{
          "job_id" => job_id
        } = payload
      ) do
    if Checker.repo_running?() do
      case Jobs.get_job(job_id) do
        nil ->
          Basic.nack(channel, tag, requeue: false)

        job ->
          job = Repo.preload(Jobs.get_job(job.id), [:status])
          last_status = StepFlow.Controllers.Jobs.get_last_status(job.status)

          case last_status do
            nil ->
              add_progression(channel, tag, job, payload)

            _ ->
              error_status_check(channel, tag, job, payload, last_status)
          end
      end
    else
      Logger.warn(
        "#{__MODULE__}: The database is not available, reject and requeue consumed message..."
      )

      Basic.reject(channel, tag, requeue: true)
    end
  end

  def consume(channel, tag, _redelivered, payload) do
    Logger.error("Job progression #{inspect(payload)}")
    Basic.reject(channel, tag, requeue: false)
  end

  def error_status_check(channel, tag, job, payload, last_status) do
    if last_status.state == :error do
      Logger.warn("Progression arrived after job #{job.id} was put in error, ignoring.")
      Basic.ack(channel, tag)
    else
      add_progression(channel, tag, job, payload)
    end
  end

  def add_progression(channel, tag, job, payload) do
    if job.is_live do
      # Worker Manager progression response
      Basic.ack(channel, tag)
    else
      job = Jobs.get_job_with_status!(job.id)

      # Writing status only if job is in processing status
      case StepFlow.Controllers.Jobs.get_last_status_id(job.status) do
        status when status.state == :processing ->
          {_, progression} = Progressions.create_progression(payload)

          JobsDurations.set_job_durations(job.id)

          Workflows.Status.define_workflow_status(
            job.workflow_id,
            :job_progression,
            progression
          )

          NotificationHookManager.notification_from_job(job.id)
          Basic.ack(channel, tag)

        status when status.state == :queued ->
          datetime = Status.get_datetime_from_payload(payload)

          unless datetime do
            Logger.warn("Worker treating job #{job.id} did not provide a progression timestamp.")
          end

          case Status.set_job_status(job.id, "processing", %{}, datetime) do
            {:ok, _status} ->
              {_, progression} = Progressions.create_progression(payload)
              worker_status = WorkerStatuses.create_worker_status!(progression)

              JobsDurations.set_job_durations(job.id)

              Logger.debug(
                "[#{__MODULE__}] Added worker status from first progression: #{inspect(worker_status)}"
              )

              Workflows.Status.define_workflow_status(
                job.workflow_id,
                :job_progression,
                progression
              )

              NotificationHookManager.notification_from_job(job.id)
              Basic.ack(channel, tag)

            {:error, message} ->
              Logger.error("Cannot set job status: #{inspect(message)}")
              Basic.nack(channel, tag, requeue: true)
          end

        _ ->
          Basic.nack(channel, tag, requeue: false)
      end
    end
  end
end