lib/step_flow/amqp/worker_updated_consumer.ex

defmodule StepFlow.Amqp.WorkerUpdatedConsumer do
  @moduledoc """
  Consumer of all worker updates.
  """

  require Logger
  alias StepFlow.Amqp.WorkerUpdatedConsumer
  alias StepFlow.Jobs.Status
  alias StepFlow.Repo.Checker
  alias StepFlow.Workflows

  use StepFlow.Amqp.CommonConsumer, %{
    queue: "worker_updated",
    exchange: "worker_response",
    prefetch_count: 1,
    consumer: &WorkerUpdatedConsumer.consume/4
  }

  @doc """
  Consume worker updated message.
  """
  def consume(
        channel,
        tag,
        _redelivered,
        %{
          "job_id" => job_id
        } = _payload
      ) do
    if Checker.repo_running?() do
      Status.set_job_status(job_id, "processing")
      Workflows.notification_from_job(job_id)
      Basic.ack(channel, tag)
    else
      Logger.warn(
        "#{__MODULE__}: The database is not available, reject and requeue consumed message..."
      )

      Basic.reject(channel, tag, requeue: true)
    end
  end

  def consume(channel, tag, _redelivered, payload) do
    Logger.error("Worker updated #{inspect(payload)}")
    Basic.reject(channel, tag, requeue: false)
  end
end