defmodule StepFlow.Amqp.ProgressionConsumer do
@moduledoc """
Consumer of all progression jobs.
"""
require Logger
alias StepFlow.Amqp.ProgressionConsumer
alias StepFlow.Jobs
alias StepFlow.Jobs.Status
alias StepFlow.Progressions
alias StepFlow.Repo
alias StepFlow.Repo.Checker
alias StepFlow.Workers.WorkerStatuses
alias StepFlow.Workflows
use StepFlow.Amqp.CommonConsumer, %{
queue: "job_progression",
exchange: "job_response",
prefetch_count: 1,
consumer: &ProgressionConsumer.consume/4
}
@doc """
Consume message with job progression and save it in database.
"""
def consume(
channel,
tag,
_redelivered,
%{
"job_id" => job_id
} = payload
) do
if Checker.repo_running?() do
case Jobs.get_job(job_id) do
nil ->
Basic.reject(channel, tag, requeue: false)
job ->
job = Repo.preload(Jobs.get_job(job.id), [:status])
last_status = Status.get_last_status(job.status)
case last_status do
nil ->
add_progression(channel, tag, job, payload)
_ ->
error_status_check(channel, tag, job, payload, last_status)
end
end
else
Logger.warn(
"#{__MODULE__}: The database is not available, reject and requeue consumed message..."
)
Basic.reject(channel, tag, requeue: true)
end
end
def consume(channel, tag, _redelivered, payload) do
Logger.error("Job progression #{inspect(payload)}")
Basic.reject(channel, tag, requeue: false)
end
def error_status_check(channel, tag, job, payload, last_status) do
if last_status.state == :error do
Logger.warn("Progression arrived after job #{job.id} was put in error, ignoring.")
Basic.ack(channel, tag)
else
add_progression(channel, tag, job, payload)
end
end
def add_progression(channel, tag, job, payload) do
if job.is_live do
# Worker Manager progression response
Basic.ack(channel, tag)
else
{_, progression} = Progressions.create_progression(payload)
if progression.progression == 0 do
case Status.set_job_status(job.id, "processing") do
{:ok, _status} ->
worker_status = WorkerStatuses.create_worker_status!(progression)
Logger.debug(
"[#{__MODULE__}] Added worker status from first progression: #{inspect(worker_status)}"
)
Workflows.Status.define_workflow_status(
job.workflow_id,
:job_progression,
progression
)
Workflows.notification_from_job(job.id)
Basic.ack(channel, tag)
{:error, message} ->
Logger.error("Cannot set job status: #{inspect(message)}")
Basic.reject(channel, tag, requeue: true)
end
else
Workflows.Status.define_workflow_status(job.workflow_id, :job_progression, progression)
Workflows.notification_from_job(job.id)
Basic.ack(channel, tag)
end
end
end
end