defmodule StepFlow.Amqp.ProgressionConsumer do
@moduledoc """
Consumer of all progression jobs.
"""
require Logger
alias StepFlow.Amqp.ProgressionConsumer
alias StepFlow.Jobs
alias StepFlow.Jobs.Status
alias StepFlow.NotificationHooks.NotificationHookManager
alias StepFlow.Progressions
alias StepFlow.Repo
alias StepFlow.Repo.Checker
alias StepFlow.Statistics.JobsDurations
alias StepFlow.Workers.WorkerStatuses
alias StepFlow.Workflows
use StepFlow.Amqp.CommonConsumer, %{
queue: "job_progression",
exchange: "job_response",
prefetch_count: 1,
consumer: &ProgressionConsumer.consume/4
}
@doc """
Consume message with job progression and save it in database.
"""
def consume(
channel,
tag,
_redelivered,
%{
"job_id" => job_id
} = payload
) do
if Checker.repo_running?() do
case Jobs.get_job(job_id) do
nil ->
Basic.nack(channel, tag, requeue: false)
job ->
job = Repo.preload(Jobs.get_job(job.id), [:status])
last_status = StepFlow.Controllers.Jobs.get_last_status(job.status)
case last_status do
nil ->
add_progression(channel, tag, job, payload)
_ ->
error_status_check(channel, tag, job, payload, last_status)
end
end
else
Logger.warn(
"#{__MODULE__}: The database is not available, reject and requeue consumed message..."
)
Basic.reject(channel, tag, requeue: true)
end
end
def consume(channel, tag, _redelivered, payload) do
Logger.error("Job progression #{inspect(payload)}")
Basic.reject(channel, tag, requeue: false)
end
def error_status_check(channel, tag, job, payload, last_status) do
if last_status.state == :error do
Logger.warn("Progression arrived after job #{job.id} was put in error, ignoring.")
Basic.ack(channel, tag)
else
add_progression(channel, tag, job, payload)
end
end
def add_progression(channel, tag, job, payload) do
if job.is_live do
# Worker Manager progression response
Basic.ack(channel, tag)
else
job = Jobs.get_job_with_status!(job.id)
# Writing status only if job is in processing status
case StepFlow.Controllers.Jobs.get_last_status_id(job.status) do
status when status.state == :processing ->
{_, progression} = Progressions.create_progression(payload)
JobsDurations.set_job_durations(job.id)
Workflows.Status.define_workflow_status(
job.workflow_id,
:job_progression,
progression
)
NotificationHookManager.notification_from_job(job.id)
Basic.ack(channel, tag)
status when status.state == :queued ->
datetime = Status.get_datetime_from_payload(payload)
unless datetime do
Logger.warn("Worker treating job #{job.id} did not provide a progression timestamp.")
end
case Status.set_job_status(job.id, "processing", %{}, datetime) do
{:ok, _status} ->
{_, progression} = Progressions.create_progression(payload)
worker_status = WorkerStatuses.create_worker_status!(progression)
JobsDurations.set_job_durations(job.id)
Logger.debug(
"[#{__MODULE__}] Added worker status from first progression: #{inspect(worker_status)}"
)
Workflows.Status.define_workflow_status(
job.workflow_id,
:job_progression,
progression
)
NotificationHookManager.notification_from_job(job.id)
Basic.ack(channel, tag)
{:error, message} ->
Logger.error("Cannot set job status: #{inspect(message)}")
Basic.nack(channel, tag, requeue: true)
end
_ ->
Basic.nack(channel, tag, requeue: false)
end
end
end
end