lib/step_flow/amqp/worker_created_consumer.ex

defmodule StepFlow.Amqp.WorkerCreatedConsumer do
  @moduledoc """
  Consumer of all worker creations.
  """

  import Ecto.Query, warn: false

  require Logger
  alias StepFlow.Amqp.WorkerCreatedConsumer
  alias StepFlow.Jobs.Job
  alias StepFlow.Jobs.Status
  alias StepFlow.LiveWorkers
  alias StepFlow.Repo
  alias StepFlow.Repo.Checker
  alias StepFlow.Workers.WorkerStatuses
  alias StepFlow.Workflows
  alias StepFlow.Workflows.StepManager

  use StepFlow.Amqp.CommonConsumer, %{
    queue: "worker_created",
    exchange: "worker_response",
    prefetch_count: 1,
    consumer: &WorkerCreatedConsumer.consume/4
  }

  @doc """
  Consume worker created message.
  """
  def consume(
        channel,
        tag,
        _redelivered,
        %{
          "direct_messaging_queue_name" => direct_messaging_queue_name
        } = payload
      ) do
    if Checker.repo_running?() do
      handle_created_message(channel, tag, direct_messaging_queue_name, payload)
    else
      Logger.warn(
        "#{__MODULE__}: The database is not available, reject and requeue consumed message..."
      )

      Basic.reject(channel, tag, requeue: true)
    end
  end

  def consume(channel, tag, _redelivered, payload) do
    Logger.error("Worker creation #{inspect(payload)}")
    Basic.reject(channel, tag, requeue: false)
  end

  defp handle_created_message(channel, tag, direct_messaging_queue_name, payload) do
    job =
      StepFlow.Jobs.internal_list_jobs(%{
        "direct_messaging_queue_name" => direct_messaging_queue_name
      })
      |> Map.get(:data)
      |> List.first()

    instance_id = Map.get(payload, "instance_id")

    Logger.debug("Worker #{instance_id} creation job search result: #{inspect(job)}")

    case {job, instance_id} do
      {nil, nil} ->
        Basic.reject(channel, tag, requeue: false)

      {nil, _} ->
        case declare_worker_status(payload, instance_id) do
          :ok ->
            Basic.ack(channel, tag)

          :error ->
            Basic.reject(channel, tag, requeue: true)
        end

      _ ->
        if job.is_live do
          job_id = job.id

          case live_worker_update(job_id) do
            :ok ->
              Basic.ack(channel, tag)

            :error ->
              Basic.reject(channel, tag, requeue: true)
          end
        else
          Basic.ack(channel, tag)
        end
    end
  end

  defp declare_worker_status(payload, instance_id) do
    worker =
      payload
      |> Map.delete("parameters")
      |> Map.put_new("system_info", %{"docker_container_id" => instance_id})

    Logger.debug("Declare #{instance_id} worker status without job: #{inspect(worker)}")

    creation_result = WorkerStatuses.create_worker_status(%{"job" => nil, "worker" => worker})

    case creation_result do
      {:ok, _} ->
        Logger.info("Worker #{instance_id} created without job")

        workers_statuses = WorkerStatuses.list_worker_statuses()

        Logger.debug(
          "[#{__MODULE__}] Notify that workers status have been updated: #{inspect(workers_statuses)}"
        )

        StepFlow.Notification.send("workers_status_updated", %{
          content: StepFlow.WorkerStatusView.render("index.json", workers_statuses)
        })

        :ok

      {:error, error} ->
        Logger.error("Could not create WorkerStatus: #{inspect(error)}")

        :error
    end
  end

  defp live_worker_update(job_id) do
    live_worker = LiveWorkers.get_by(%{"job_id" => job_id})

    case live_worker do
      nil ->
        :error

      _ ->
        case Status.set_job_status(job_id, "ready_to_init") do
          {:ok, _status} ->
            LiveWorkers.update_live_worker(live_worker, %{
              "creation_date" => NaiveDateTime.utc_now()
            })

            repo_checker()

            Workflows.notification_from_job(job_id)
            StepManager.check_step_status(%{job_id: job_id})
            :ok

          {:error, message} ->
            Logger.error("Cannot set job status: #{inspect(message)}")
            :error
        end
    end
  end

  defp repo_checker do
    query = from(job in Job, select: job.id)
    stream = Repo.stream(query)

    Repo.transaction(fn ->
      Enum.to_list(stream)
    end)

    :timer.sleep(1000)
  end
end