lib/step_flow/amqp/worker_status_consumer.ex

defmodule StepFlow.Amqp.WorkerStatusConsumer do
  @moduledoc """
  Consumer of all worker statuses.
  """

  require Logger
  alias StepFlow.Amqp.WorkerStatusConsumer
  alias StepFlow.Repo.Checker
  alias StepFlow.Workers.WorkerStatusWatcher

  use StepFlow.Amqp.CommonConsumer, %{
    queue: "worker_status",
    exchange: "worker_response",
    prefetch_count: 1,
    consumer: &WorkerStatusConsumer.consume/4
  }

  @doc """
  Consume worker status message.
  """
  def consume(
        channel,
        tag,
        _redelivered,
        %{
          "worker" => %{"system_info" => %{"docker_container_id" => docker_container_id}}
        } = payload
      ) do
    if Checker.repo_running?() do
      Logger.debug("Got #{docker_container_id} worker status: #{inspect(payload)}")

      WorkerStatusWatcher.update_worker_status(docker_container_id, payload)

      Basic.ack(channel, tag)
    else
      Logger.warn(
        "#{__MODULE__}: The database is not available, reject and requeue consumed message..."
      )

      Basic.reject(channel, tag, requeue: true)
    end
  end

  def consume(channel, tag, _redelivered, payload) do
    Logger.error("Worker status #{inspect(payload)}")
    Basic.reject(channel, tag, requeue: false)
  end
end