lib/step_flow/amqp/worker_initialized_consumer.ex

defmodule StepFlow.Amqp.WorkerInitializedConsumer do
  @moduledoc """
  Consumer of all worker inits.
  """

  import Ecto.Query, warn: false

  require Logger
  alias StepFlow.Amqp.WorkerInitializedConsumer
  alias StepFlow.Jobs
  alias StepFlow.Jobs.Job
  alias StepFlow.Jobs.Status
  alias StepFlow.NotificationHooks.NotificationHookManager
  alias StepFlow.Repo
  alias StepFlow.Repo.Checker
  alias StepFlow.Statistics.JobsDurations
  alias StepFlow.Workflows.StepManager

  use StepFlow.Amqp.CommonConsumer, %{
    queue: "worker_initialized",
    exchange: "worker_response",
    prefetch_count: 1,
    consumer: &WorkerInitializedConsumer.consume/4
  }

  @doc """
  Consume worker initialized message.
  """
  def consume(
        channel,
        tag,
        _redelivered,
        %{
          "job_id" => job_id
        } = _payload
      ) do
    if Checker.repo_running?() do
      case Status.set_job_status(job_id, "ready_to_start") do
        {:ok, _} ->
          query = from(job in Job, select: job.id)
          stream = Repo.stream(query)

          Repo.transaction(fn ->
            Enum.to_list(stream)
          end)

          :timer.sleep(5000)

          JobsDurations.set_job_durations(job_id)

          NotificationHookManager.notification_from_job(job_id)

          case Jobs.get_job(job_id) do
            nil ->
              Basic.nack(channel, tag, requeue: false)

            job ->
              if job.is_live do
                StepManager.check_step_status(%{job_id: job_id})
              end

              Basic.ack(channel, tag)
          end

        {:error, message} ->
          Logger.error("Cannot set job status: #{inspect(message)}")
          Basic.nack(channel, tag, requeue: false)
      end
    else
      Logger.warn(
        "#{__MODULE__}: The database is not available, reject and requeue consumed message..."
      )

      Basic.reject(channel, tag, requeue: true)
    end
  end

  def consume(channel, tag, _redelivered, payload) do
    Logger.error("Worker initialized #{inspect(payload)}")
    Basic.reject(channel, tag, requeue: false)
  end
end