lib/step_flow/amqp/completed_consumer.ex

defmodule StepFlow.Amqp.CompletedConsumer do
  @moduledoc """
  Consumer of all job with completed status.
  """

  require Logger

  alias StepFlow.{
    Amqp.CompletedConsumer,
    Jobs,
    Jobs.Status,
    LiveWorkers,
    NotificationHooks.NotificationHookManager,
    Repo,
    Repo.Checker,
    Statistics.JobsDurations,
    Workflows,
    Workflows.StepManager
  }

  use StepFlow.Amqp.CommonConsumer, %{
    queue: "job_completed",
    exchange: "job_response",
    prefetch_count: 1,
    consumer: &CompletedConsumer.consume/4
  }

  @doc """
  Consume messages with completed topic, update Job status and continue the workflow.
  """
  def consume(
        channel,
        tag,
        _redelivered,
        %{
          "job_id" => job_id,
          "status" => status
        } = payload
      ) do
    # TO INTRODUCE IN LATER VERSIONS
    # when status == "completed" do
    if Checker.repo_running?() do
      case Jobs.get_job(job_id) do
        nil ->
          Basic.nack(channel, tag, requeue: false)

        job ->
          if job.is_live do
            case live_worker_update(job, payload) do
              :ok ->
                StepManager.check_step_status(%{job_id: job_id})

                Basic.ack(channel, tag)

              :error ->
                Basic.nack(channel, tag, requeue: false)
            end
          else
            workflow =
              job
              |> Map.get(:workflow_id)
              |> Workflows.get_workflow!()

            set_generated_destination_paths(payload, job)
            set_output_parameters(payload, workflow)

            datetime = Status.get_datetime_from_payload(payload)

            case Status.set_job_status(job_id, status, payload, datetime) do
              {:ok, job_status} ->
                Workflows.Status.define_workflow_status(
                  job.workflow_id,
                  :job_completed,
                  job_status
                )

                JobsDurations.set_job_durations(job_id)

                NotificationHookManager.manage_notification_status(
                  job.workflow_id,
                  job,
                  "completed"
                )

                NotificationHookManager.notification_from_job(job_id)
                StepManager.check_step_status(%{job_id: job_id})
                Basic.ack(channel, tag)

              {:error, message} ->
                Logger.error("Cannot set job status: #{inspect(message)}")
                Basic.nack(channel, tag, requeue: false)
            end
          end
      end
    else
      Logger.warn(
        "#{__MODULE__}: The database is not available, reject and requeue consumed message..."
      )

      Basic.reject(channel, tag, requeue: true)
    end
  end

  def consume(channel, tag, _redelivered, payload) do
    Logger.error("Job completed #{inspect(payload)}")
    Basic.reject(channel, tag, requeue: false)
  end

  defp set_generated_destination_paths(payload, job) do
    case StepFlow.Map.get_by_key_or_atom(payload, "destination_paths") do
      nil ->
        nil

      destination_paths ->
        job_parameters =
          job.parameters ++
            [
              %{
                id: "destination_paths",
                type: "array_of_strings",
                value: destination_paths
              }
            ]

        Jobs.update_job(job, %{parameters: job_parameters})
    end
  end

  defp set_output_parameters(payload, workflow) do
    case StepFlow.Map.get_by_key_or_atom(payload, "parameters") do
      nil ->
        nil

      parameters ->
        workflow_params = deduplicate_id_values(workflow.parameters, parameters)
        parameters = workflow_params ++ parameters
        Workflows.update_workflow(workflow, %{parameters: parameters})
    end
  end

  defp deduplicate_id_values(former_parameters, new_parameters) do
    Enum.filter(former_parameters, fn parameter ->
      Enum.filter(new_parameters, fn param ->
        StepFlow.Map.get_by_key_or_atom(param, :id) ==
          StepFlow.Map.get_by_key_or_atom(parameter, :id)
      end) == []
    end)
  end

  defp live_worker_update(job, payload) do
    job_id = job.id
    live_worker = LiveWorkers.get_by(%{"job_id" => job_id})

    case live_worker do
      nil ->
        live_worker_creation(job_id, payload)

      _ ->
        case live_worker.termination_date do
          nil ->
            Logger.warn(
              "No termination date found for the worker linked to job id #{job_id}. Adding one."
            )

            LiveWorkers.update_live_worker(live_worker, %{
              "termination_date" => NaiveDateTime.utc_now()
            })

          _ ->
            Logger.info("Termination date found for the worker linked to job id #{job_id}.")
        end

        complete_live_job(job)
    end
  end

  defp live_worker_creation(job_id, payload) do
    instance_id =
      StepFlow.Map.get_by_key_or_atom(payload, :parameters)
      |> Enum.filter(fn param ->
        StepFlow.Map.get_by_key_or_atom(param, :id) == "instance_id"
      end)
      |> List.first()
      |> StepFlow.Map.get_by_key_or_atom(:value)

    host_ip =
      StepFlow.Map.get_by_key_or_atom(payload, :parameters)
      |> Enum.filter(fn param ->
        StepFlow.Map.get_by_key_or_atom(param, :id) == "host_ip"
      end)
      |> List.first()
      |> StepFlow.Map.get_by_key_or_atom(:value)

    ports =
      StepFlow.Map.get_by_key_or_atom(payload, :parameters)
      |> Enum.filter(fn param ->
        StepFlow.Map.get_by_key_or_atom(param, :id) == "ports"
      end)
      |> List.first()
      |> StepFlow.Map.get_by_key_or_atom(:value)

    direct_messaging_queue_name =
      StepFlow.Map.get_by_key_or_atom(payload, :parameters)
      |> Enum.filter(fn param ->
        StepFlow.Map.get_by_key_or_atom(param, :id) == "direct_messaging_queue_name"
      end)
      |> List.first()
      |> StepFlow.Map.get_by_key_or_atom(:value)

    LiveWorkers.create_live_worker(%{
      job_id: job_id,
      instance_id: instance_id,
      direct_messaging_queue_name: direct_messaging_queue_name,
      ips: [host_ip],
      ports: ports
    })

    :ok
  end

  defp complete_live_job(job) do
    job = Jobs.get_job!(job.id) |> Repo.preload([:status])
    last_status = StepFlow.Controllers.Jobs.get_last_status(job.status)

    if Status.convert_to_string(last_status) == "error" do
      Logger.warn("Job already in error, not transiting to completed.")
      :ok
    else
      case Status.set_job_status(job.id, "completed") do
        {:ok, job_status} ->
          case Workflows.Status.get_last_workflow_status(job.workflow_id).state do
            :error ->
              Logger.warn("Workflow already in error, not changing status.")

            _ ->
              Workflows.Status.define_workflow_status(
                job.workflow_id,
                :completed_workflow,
                job_status
              )

              JobsDurations.set_job_durations(job.id)

              NotificationHookManager.manage_notification_status(
                job.workflow_id,
                job,
                "completed"
              )

              NotificationHookManager.notification_from_job(job.id)
              :ok
          end

        {:error, message} ->
          Logger.error("Cannot set job status: #{inspect(message)}")
          :error
      end
    end
  end
end