defmodule StepFlow.Amqp.CompletedConsumer do
@moduledoc """
Consumer of all job with completed status.
"""
require Logger
alias StepFlow.{
Amqp.CompletedConsumer,
Jobs,
Jobs.Status,
LiveWorkers,
NotificationHooks.NotificationHookManager,
Repo,
Repo.Checker,
Statistics.JobsDurations,
Workflows,
Workflows.StepManager
}
use StepFlow.Amqp.CommonConsumer, %{
queue: "job_completed",
exchange: "job_response",
prefetch_count: 1,
consumer: &CompletedConsumer.consume/4
}
@doc """
Consume messages with completed topic, update Job status and continue the workflow.
"""
def consume(
channel,
tag,
_redelivered,
%{
"job_id" => job_id,
"status" => status
} = payload
) do
# TO INTRODUCE IN LATER VERSIONS
# when status == "completed" do
if Checker.repo_running?() do
case Jobs.get_job(job_id) do
nil ->
Basic.nack(channel, tag, requeue: false)
job ->
if job.is_live do
case live_worker_update(job, payload) do
:ok ->
StepManager.check_step_status(%{job_id: job_id})
Basic.ack(channel, tag)
:error ->
Basic.nack(channel, tag, requeue: false)
end
else
workflow =
job
|> Map.get(:workflow_id)
|> Workflows.get_workflow!()
set_generated_destination_paths(payload, job)
set_output_parameters(payload, workflow)
datetime = Status.get_datetime_from_payload(payload)
case Status.set_job_status(job_id, status, payload, datetime) do
{:ok, job_status} ->
Workflows.Status.define_workflow_status(
job.workflow_id,
:job_completed,
job_status
)
JobsDurations.set_job_durations(job_id)
NotificationHookManager.manage_notification_status(
job.workflow_id,
job,
"completed"
)
NotificationHookManager.notification_from_job(job_id)
StepManager.check_step_status(%{job_id: job_id})
Basic.ack(channel, tag)
{:error, message} ->
Logger.error("Cannot set job status: #{inspect(message)}")
Basic.nack(channel, tag, requeue: false)
end
end
end
else
Logger.warn(
"#{__MODULE__}: The database is not available, reject and requeue consumed message..."
)
Basic.reject(channel, tag, requeue: true)
end
end
def consume(channel, tag, _redelivered, payload) do
Logger.error("Job completed #{inspect(payload)}")
Basic.reject(channel, tag, requeue: false)
end
defp set_generated_destination_paths(payload, job) do
case StepFlow.Map.get_by_key_or_atom(payload, "destination_paths") do
nil ->
nil
destination_paths ->
job_parameters =
job.parameters ++
[
%{
id: "destination_paths",
type: "array_of_strings",
value: destination_paths
}
]
Jobs.update_job(job, %{parameters: job_parameters})
end
end
defp set_output_parameters(payload, workflow) do
case StepFlow.Map.get_by_key_or_atom(payload, "parameters") do
nil ->
nil
parameters ->
workflow_params = deduplicate_id_values(workflow.parameters, parameters)
parameters = workflow_params ++ parameters
Workflows.update_workflow(workflow, %{parameters: parameters})
end
end
defp deduplicate_id_values(former_parameters, new_parameters) do
Enum.filter(former_parameters, fn parameter ->
Enum.filter(new_parameters, fn param ->
StepFlow.Map.get_by_key_or_atom(param, :id) ==
StepFlow.Map.get_by_key_or_atom(parameter, :id)
end) == []
end)
end
defp live_worker_update(job, payload) do
job_id = job.id
live_worker = LiveWorkers.get_by(%{"job_id" => job_id})
case live_worker do
nil ->
live_worker_creation(job_id, payload)
_ ->
case live_worker.termination_date do
nil ->
Logger.warn(
"No termination date found for the worker linked to job id #{job_id}. Adding one."
)
LiveWorkers.update_live_worker(live_worker, %{
"termination_date" => NaiveDateTime.utc_now()
})
_ ->
Logger.info("Termination date found for the worker linked to job id #{job_id}.")
end
complete_live_job(job)
end
end
defp live_worker_creation(job_id, payload) do
instance_id =
StepFlow.Map.get_by_key_or_atom(payload, :parameters)
|> Enum.filter(fn param ->
StepFlow.Map.get_by_key_or_atom(param, :id) == "instance_id"
end)
|> List.first()
|> StepFlow.Map.get_by_key_or_atom(:value)
host_ip =
StepFlow.Map.get_by_key_or_atom(payload, :parameters)
|> Enum.filter(fn param ->
StepFlow.Map.get_by_key_or_atom(param, :id) == "host_ip"
end)
|> List.first()
|> StepFlow.Map.get_by_key_or_atom(:value)
ports =
StepFlow.Map.get_by_key_or_atom(payload, :parameters)
|> Enum.filter(fn param ->
StepFlow.Map.get_by_key_or_atom(param, :id) == "ports"
end)
|> List.first()
|> StepFlow.Map.get_by_key_or_atom(:value)
direct_messaging_queue_name =
StepFlow.Map.get_by_key_or_atom(payload, :parameters)
|> Enum.filter(fn param ->
StepFlow.Map.get_by_key_or_atom(param, :id) == "direct_messaging_queue_name"
end)
|> List.first()
|> StepFlow.Map.get_by_key_or_atom(:value)
LiveWorkers.create_live_worker(%{
job_id: job_id,
instance_id: instance_id,
direct_messaging_queue_name: direct_messaging_queue_name,
ips: [host_ip],
ports: ports
})
:ok
end
defp complete_live_job(job) do
job = Jobs.get_job!(job.id) |> Repo.preload([:status])
last_status = StepFlow.Controllers.Jobs.get_last_status(job.status)
if Status.convert_to_string(last_status) == "error" do
Logger.warn("Job already in error, not transiting to completed.")
:ok
else
case Status.set_job_status(job.id, "completed") do
{:ok, job_status} ->
case Workflows.Status.get_last_workflow_status(job.workflow_id).state do
:error ->
Logger.warn("Workflow already in error, not changing status.")
_ ->
Workflows.Status.define_workflow_status(
job.workflow_id,
:completed_workflow,
job_status
)
JobsDurations.set_job_durations(job.id)
NotificationHookManager.manage_notification_status(
job.workflow_id,
job,
"completed"
)
NotificationHookManager.notification_from_job(job.id)
:ok
end
{:error, message} ->
Logger.error("Cannot set job status: #{inspect(message)}")
:error
end
end
end
end