defmodule StepFlow.Amqp.WorkerStatusConsumer do
@moduledoc """
Consumer of all worker statuses.
"""
require Logger
alias StepFlow.Amqp.WorkerStatusConsumer
alias StepFlow.Repo.Checker
alias StepFlow.Workers.WorkerStatusWatcher
use StepFlow.Amqp.CommonConsumer, %{
queue: "worker_status",
exchange: "worker_response",
prefetch_count: 1,
consumer: &WorkerStatusConsumer.consume/4
}
@doc """
Consume worker status message.
"""
def consume(
channel,
tag,
_redelivered,
%{
"worker" => %{"system_info" => %{"docker_container_id" => docker_container_id}}
} = payload
) do
if Checker.repo_running?() do
Logger.debug("Got #{docker_container_id} worker status: #{inspect(payload)}")
WorkerStatusWatcher.update_worker_status(docker_container_id, payload)
Basic.ack(channel, tag)
else
Logger.warn(
"#{__MODULE__}: The database is not available, reject and requeue consumed message..."
)
Basic.reject(channel, tag, requeue: true)
end
end
def consume(channel, tag, _redelivered, payload) do
Logger.error("Worker status #{inspect(payload)}")
Basic.reject(channel, tag, requeue: false)
end
end