defmodule StepFlow.Amqp.WorkerStartedConsumer do
@moduledoc """
Consumer of all worker starts.
"""
import Ecto.Query, warn: false
require Logger
alias StepFlow.Amqp.WorkerStartedConsumer
alias StepFlow.Jobs
alias StepFlow.Jobs.Job
alias StepFlow.Jobs.Status
alias StepFlow.NotificationHooks.NotificationHookManager
alias StepFlow.Repo
alias StepFlow.Repo.Checker
alias StepFlow.Statistics.JobsDurations
alias StepFlow.Workflows.Status, as: WorkflowStatus
alias StepFlow.Workflows.StepManager
use StepFlow.Amqp.CommonConsumer, %{
queue: "worker_started",
exchange: "worker_response",
prefetch_count: 1,
consumer: &WorkerStartedConsumer.consume/4
}
@doc """
Consume worker started message.
"""
def consume(
channel,
tag,
_redelivered,
%{
"job_id" => job_id
} = _payload
) do
if Checker.repo_running?() do
case Jobs.get_job(job_id) do
nil ->
Basic.nack(channel, tag, requeue: false)
job ->
case Status.set_job_status(job_id, "processing") do
{:ok, _} ->
query = from(j in Job, select: j.id)
stream = Repo.stream(query)
Repo.transaction(fn ->
Enum.to_list(stream)
end)
:timer.sleep(5000)
JobsDurations.set_job_durations(job_id)
NotificationHookManager.notification_from_job(job_id)
if job.is_live do
StepManager.check_step_status(%{job_id: job_id})
WorkflowStatus.define_workflow_live_status(job.workflow_id)
end
Basic.ack(channel, tag)
{:error, message} ->
Logger.error("Cannot set job status: #{inspect(message)}")
Basic.nack(channel, tag, requeue: false)
end
end
else
Logger.warn(
"#{__MODULE__}: The database is not available, reject and requeue consumed message..."
)
Basic.reject(channel, tag, requeue: true)
end
end
def consume(channel, tag, _redelivered, payload) do
Logger.error("Worker started #{inspect(payload)}")
Basic.reject(channel, tag, requeue: false)
end
end