lib/step_flow/models/workers/worker_statuses.ex

defmodule StepFlow.Workers.WorkerStatuses do
  @moduledoc """
  The WorkerStatuses context.
  """

  import Ecto.Query, warn: false
  alias StepFlow.Repo

  alias StepFlow.Workers.WorkerStatus

  @doc """
  Returns the list of WorkerStatuses.

  ## Examples

      iex> StepFlow.WorkerStatuses.list_worker_statuses()
      %{data: [], page: 0, size: 10, total: 0}

  """
  def list_worker_statuses(params \\ %{}) do
    page =
      Map.get(params, "page", 0)
      |> StepFlow.Integer.force()

    size =
      Map.get(params, "size", 10)
      |> StepFlow.Integer.force()

    offset = page * size

    query = from(worker_status in WorkerStatus)

    query =
      case Map.get(params, "instance_id") do
        nil ->
          query

        instance_id ->
          from(
            worker_status in query,
            where: worker_status.instance_id == ^instance_id
          )
      end

    query =
      case Map.get(params, "job_id") do
        nil ->
          query

        job_id ->
          {job_id, _} =
            job_id
            |> Integer.parse()

          from(
            worker_status in query,
            where:
              not is_nil(worker_status.current_job) and
                fragment("? ->'job_id'", worker_status.current_job) == ^job_id
          )
      end

    total_query = from(item in query, select: count(item.id))

    total =
      Repo.all(total_query)
      |> List.first()

    query =
      from(
        worker_status in query,
        order_by: [desc: :inserted_at],
        offset: ^offset,
        limit: ^size
      )

    worker_statuses = Repo.all(query)

    %{
      data: worker_statuses,
      total: total,
      page: page,
      size: size
    }
  end

  @doc """
  Gets the last inserted WorkerStatus.

  Returns `nil` if the Worker does not exist.
  """
  def get_worker_status(instance_id) do
    query =
      Ecto.Query.from(worker_status in WorkerStatus,
        where: worker_status.instance_id == ^instance_id,
        order_by: [desc: worker_status.inserted_at],
        limit: 1
      )

    Repo.one(query)
  end

  @doc """
  Gets the last inserted WorkerStatus.


  Raises `Ecto.NoResultsError` if the WorkerStatus does not exist.
  """
  def get_worker_status!(instance_id) do
    query =
      Ecto.Query.from(worker_status in WorkerStatus,
        where: worker_status.instance_id == ^instance_id,
        order_by: [desc: worker_status.inserted_at],
        limit: 1
      )

    Repo.one!(query)
  end

  @doc """
  Gets the first WorkerStatus which is running a job which ID is `job_id`.

  Returns `nil` if that WorkerStatus does not exist.
  """
  def get_worker_status_for_job(job_id) do
    query =
      from(
        worker_status in WorkerStatus,
        where:
          not is_nil(worker_status.current_job) and
            fragment("? ->'job_id'", worker_status.current_job) == ^job_id,
        order_by: [desc: worker_status.inserted_at],
        limit: 1
      )

    Repo.one(query)
  end

  @doc """
  Creates a WorkerStatus.

  ## Examples

      iex> result = StepFlow.Workers.WorkerStatuses.create_worker_status!(%{
      ...>    job: nil,
      ...>    worker: %{
      ...>      activity: "Idle",
      ...>      description: "This worker is just an example.",
      ...>      direct_messaging_queue_name: "direct_messaging_e1297fe6-fe94-49cf-9ef8-1a751cba28f2",
      ...>      instance_id: "e1297fe6-fe94-49cf-9ef8-1a751cba28f2",
      ...>      label: "UnitTestWorker",
      ...>      queue_name: "job_test_worker",
      ...>      sdk_version: "2.3.4",
      ...>      short_description: "A test worker",
      ...>      system_info: %{
      ...>        docker_container_id: "e1297fe6-fe94-49cf-9ef8-1a751cba28f2",
      ...>        number_of_processors: 12,
      ...>        total_memory: 16_574_754,
      ...>        total_swap: 2_046_816,
      ...>        used_memory: 8_865_633,
      ...>        used_swap: 0
      ...>      },
      ...>      version: "1.2.3"
      ...>    }
      ...> })
      ...> match?(%StepFlow.Workers.WorkerStatus{}, result)
      true

  Raises error if something went wrong during creation.
  """
  def create_worker_status!(%{} = message) do
    attrs =
      message
      |> StepFlow.Controllers.WorkerStatus.process_worker_status_message()

    %WorkerStatus{}
    |> WorkerStatus.changeset(attrs)
    |> Repo.insert!()
  end

  def create_worker_status!(%StepFlow.Progressions.Progression{} = message) do
    attrs =
      message
      |> StepFlow.Controllers.WorkerStatus.process_worker_status_message()

    %WorkerStatus{}
    |> WorkerStatus.changeset(attrs)
    |> Repo.insert!()
  end

  @doc """
  Creates a WorkerStatus.

  ## Examples

      iex> result = StepFlow.Workers.WorkerStatuses.create_worker_status!(%{
      ...>    job: nil,
      ...>    worker: %{
      ...>      activity: "Idle",
      ...>      description: "This worker is just an example.",
      ...>      direct_messaging_queue_name: "direct_messaging_e1297fe6-fe94-49cf-9ef8-1a751cba28f2",
      ...>      instance_id: "e1297fe6-fe94-49cf-9ef8-1a751cba28f2",
      ...>      label: "UnitTestWorker",
      ...>      queue_name: "job_test_worker",
      ...>      sdk_version: "2.3.4",
      ...>      short_description: "A test worker",
      ...>      system_info: %{
      ...>        docker_container_id: "e1297fe6-fe94-49cf-9ef8-1a751cba28f2",
      ...>        number_of_processors: 12,
      ...>        total_memory: 16_574_754,
      ...>        total_swap: 2_046_816,
      ...>        used_memory: 8_865_633,
      ...>        used_swap: 0
      ...>      },
      ...>      version: "1.2.3"
      ...>    }
      ...> })
      ...> match?(%StepFlow.Workers.WorkerStatus{}, result)
      true

  Returns `{:ok, WorkerStatus}` on success, `{:error, changeset}` otherwise.
  """
  def create_worker_status(%{} = message) do
    attrs =
      message
      |> StepFlow.Controllers.WorkerStatus.process_worker_status_message()

    %WorkerStatus{}
    |> WorkerStatus.changeset(attrs)
    |> Repo.insert()
  end

  @doc """
  Updates a WorkerStatus.

  ## Examples

      iex> result = StepFlow.Workers.WorkerStatuses.update_worker_status!(%{
      ...>    job: %{
      ...>      destination_paths: [],
      ...>      execution_duration: 0.0,
      ...>      job_id: 1234,
      ...>      parameters: [],
      ...>      status: "processing"
      ...>    },
      ...>    worker: %{
      ...>      activity: "Idle",
      ...>      description: "This worker is just an example.",
      ...>      direct_messaging_queue_name: "direct_messaging_e1297fe6-fe94-49cf-9ef8-1a751cba28f2",
      ...>      instance_id: "e1297fe6-fe94-49cf-9ef8-1a751cba28f2",
      ...>      label: "UnitTestWorker",
      ...>      queue_name: "job_test_worker",
      ...>      sdk_version: "2.3.4",
      ...>      short_description: "A test worker",
      ...>      system_info: %{
      ...>        docker_container_id: "e1297fe6-fe94-49cf-9ef8-1a751cba28f2",
      ...>        number_of_processors: 12,
      ...>        total_memory: 16_574_754,
      ...>        total_swap: 2_046_816,
      ...>        used_memory: 8_865_633,
      ...>        used_swap: 0
      ...>      },
      ...>      version: "1.2.3"
      ...>    }
      ...> })
      ...> match?(%StepFlow.Workers.WorkerStatus{}, result)
      true

  Raises error if something went wrong during update.
  """
  def update_worker_status!(%WorkerStatus{} = worker_status, %{} = message) do
    attrs =
      message
      |> StepFlow.Controllers.WorkerStatus.process_worker_status_message()

    worker_status
    |> WorkerStatus.changeset(attrs)
    |> Repo.update!()
  end
end