defmodule StepFlow.Amqp.WorkerCreatedConsumer do
@moduledoc """
Consumer of all worker creations.
"""
import Ecto.Query, warn: false
require Logger
alias StepFlow.Amqp.WorkerCreatedConsumer
alias StepFlow.Jobs.Job
alias StepFlow.Jobs.Status
alias StepFlow.LiveWorkers
alias StepFlow.NotificationHooks.NotificationHookManager
alias StepFlow.Repo
alias StepFlow.Repo.Checker
alias StepFlow.Statistics.JobsDurations
alias StepFlow.Workers.WorkerStatuses
alias StepFlow.Workflows.StepManager
use StepFlow.Amqp.CommonConsumer, %{
queue: "worker_created",
exchange: "worker_response",
prefetch_count: 1,
consumer: &WorkerCreatedConsumer.consume/4
}
@doc """
Consume worker created message.
"""
def consume(
channel,
tag,
_redelivered,
%{
"direct_messaging_queue_name" => direct_messaging_queue_name
} = payload
) do
if Checker.repo_running?() do
handle_created_message(channel, tag, direct_messaging_queue_name, payload)
else
Logger.warn(
"#{__MODULE__}: The database is not available, reject and requeue consumed message..."
)
Basic.reject(channel, tag, requeue: true)
end
end
def consume(channel, tag, _redelivered, payload) do
Logger.error("Worker creation #{inspect(payload)}")
Basic.reject(channel, tag, requeue: false)
end
defp handle_created_message(channel, tag, direct_messaging_queue_name, payload) do
job =
StepFlow.Jobs.internal_list_jobs(%{
"direct_messaging_queue_name" => direct_messaging_queue_name
})
|> Map.get(:data)
|> List.first()
instance_id = Map.get(payload, "instance_id")
Logger.debug("Worker #{instance_id} creation job search result: #{inspect(job)}")
case {job, instance_id} do
{nil, nil} ->
Basic.nack(channel, tag, requeue: false)
{nil, _} ->
case declare_worker_status(payload, instance_id) do
:ok ->
Basic.ack(channel, tag)
:error ->
Basic.nack(channel, tag, requeue: false)
end
_ ->
if job.is_live do
job_id = job.id
case live_worker_update(job_id) do
:ok ->
Basic.ack(channel, tag)
:error ->
Basic.nack(channel, tag, requeue: false)
end
else
Basic.ack(channel, tag)
end
end
end
defp declare_worker_status(payload, instance_id) do
worker =
payload
|> Map.delete("parameters")
|> Map.put_new("system_info", %{"docker_container_id" => instance_id})
Logger.debug("Declare #{instance_id} worker status without job: #{inspect(worker)}")
creation_result = WorkerStatuses.create_worker_status(%{"job" => nil, "worker" => worker})
case creation_result do
{:ok, _} ->
Logger.info("Worker #{instance_id} created without job")
workers_statuses = WorkerStatuses.list_worker_statuses()
Logger.debug(
"[#{__MODULE__}] Notify that workers status have been updated: #{inspect(workers_statuses)}"
)
StepFlow.Notification.send("workers_status_updated", %{
content: StepFlow.WorkerStatusView.render("index.json", workers_statuses)
})
:ok
{:error, error} ->
Logger.error("Could not create WorkerStatus: #{inspect(error)}")
:error
end
end
defp live_worker_update(job_id) do
live_worker = LiveWorkers.get_by(%{"job_id" => job_id})
case live_worker do
nil ->
:error
_ ->
case Status.set_job_status(job_id, "ready_to_init") do
{:ok, _status} ->
LiveWorkers.update_live_worker(live_worker, %{
"creation_date" => NaiveDateTime.utc_now()
})
repo_checker()
JobsDurations.set_job_durations(job_id)
NotificationHookManager.notification_from_job(job_id)
StepManager.check_step_status(%{job_id: job_id})
:ok
{:error, message} ->
Logger.error("Cannot set job status: #{inspect(message)}")
:error
end
end
end
defp repo_checker do
query = from(job in Job, select: job.id)
stream = Repo.stream(query)
Repo.transaction(fn ->
Enum.to_list(stream)
end)
:timer.sleep(1000)
end
end