lib/step_flow/amqp/worker_terminated_consumer.ex

defmodule StepFlow.Amqp.WorkerTerminatedConsumer do
  @moduledoc """
  Consumer of all worker terminations.
  """

  require Logger
  alias StepFlow.Amqp.WorkerTerminatedConsumer
  alias StepFlow.LiveWorkers
  alias StepFlow.Repo.Checker
  alias StepFlow.Workers.WorkerStatusWatcher
  alias StepFlow.Workflows
  alias StepFlow.Workflows.StepManager

  use StepFlow.Amqp.CommonConsumer, %{
    queue: "worker_terminated",
    exchange: "worker_response",
    prefetch_count: 1,
    consumer: &WorkerTerminatedConsumer.consume/4
  }

  @doc """
  Consume worker terminated message.
  """
  def consume(
        channel,
        tag,
        _redelivered,
        %{
          "job_id" => job_id
        } = _payload
      ) do
    live_worker_update(job_id)
    Basic.ack(channel, tag)
  end

  def consume(
        channel,
        tag,
        _redelivered,
        %{
          "instance_id" => instance_id
        } = payload
      ) do
    if Checker.repo_running?() do
      Logger.info("Worker #{instance_id} terminated #{inspect(payload)}")

      WorkerStatusWatcher.update_worker_status(instance_id, %{
        instance_id: instance_id,
        activity: "Terminated"
      })

      Basic.ack(channel, tag)
    else
      Logger.warn(
        "#{__MODULE__}: The database is not available, reject and requeue consumed message..."
      )

      Basic.reject(channel, tag, requeue: true)
    end
  end

  def consume(channel, tag, _redelivered, payload) do
    Logger.error("Worker terminated #{inspect(payload)}")
    Basic.reject(channel, tag, requeue: false)
  end

  defp live_worker_update(job_id) do
    live_worker = LiveWorkers.get_by(%{"job_id" => job_id})

    LiveWorkers.update_live_worker(live_worker, %{
      "termination_date" => NaiveDateTime.utc_now()
    })

    Workflows.notification_from_job(job_id)
    StepManager.check_step_status(%{job_id: job_id})
    :ok
  end
end