defmodule StepFlow.Workers.WorkerStatuses do
@moduledoc """
The WorkerStatuses context.
"""
import Ecto.Query, warn: false
alias StepFlow.Repo
alias StepFlow.Workers.WorkerStatus
@doc """
Returns the list of WorkerStatuses.
## Examples
iex> StepFlow.WorkerStatuses.list_worker_statuses()
%{data: [], page: 0, size: 10, total: 0}
"""
def list_worker_statuses(params \\ %{}) do
page =
Map.get(params, "page", 0)
|> StepFlow.Integer.force()
size =
Map.get(params, "size", 10)
|> StepFlow.Integer.force()
offset = page * size
query = from(worker_status in WorkerStatus)
query =
case Map.get(params, "instance_id") do
nil ->
query
instance_id ->
from(
worker_status in query,
where: worker_status.instance_id == ^instance_id
)
end
query =
case Map.get(params, "job_id") do
nil ->
query
job_id ->
{job_id, _} =
job_id
|> Integer.parse()
from(
worker_status in query,
where:
not is_nil(worker_status.current_job) and
fragment("? ->'job_id'", worker_status.current_job) == ^job_id
)
end
total_query = from(item in query, select: count(item.id))
total =
Repo.all(total_query)
|> List.first()
query =
from(
worker_status in query,
order_by: [desc: :inserted_at],
offset: ^offset,
limit: ^size
)
worker_statuses = Repo.all(query)
%{
data: worker_statuses,
total: total,
page: page,
size: size
}
end
@doc """
Gets the last inserted WorkerStatus.
Returns `nil` if the Worker does not exist.
"""
def get_worker_status(instance_id) do
query =
Ecto.Query.from(worker_status in WorkerStatus,
where: worker_status.instance_id == ^instance_id,
order_by: [desc: worker_status.inserted_at],
limit: 1
)
Repo.one(query)
end
@doc """
Gets the last inserted WorkerStatus.
Raises `Ecto.NoResultsError` if the WorkerStatus does not exist.
"""
def get_worker_status!(instance_id) do
query =
Ecto.Query.from(worker_status in WorkerStatus,
where: worker_status.instance_id == ^instance_id,
order_by: [desc: worker_status.inserted_at],
limit: 1
)
Repo.one!(query)
end
@doc """
Gets the first WorkerStatus which is running a job which ID is `job_id`.
Returns `nil` if that WorkerStatus does not exist.
"""
def get_worker_status_for_job(job_id) do
query =
from(
worker_status in WorkerStatus,
where:
not is_nil(worker_status.current_job) and
fragment("? ->'job_id'", worker_status.current_job) == ^job_id,
order_by: [desc: worker_status.inserted_at],
limit: 1
)
Repo.one(query)
end
@doc """
Creates a WorkerStatus.
## Examples
iex> result = StepFlow.Workers.WorkerStatuses.create_worker_status!(%{
...> job: nil,
...> worker: %{
...> activity: "Idle",
...> description: "This worker is just an example.",
...> direct_messaging_queue_name: "direct_messaging_e1297fe6-fe94-49cf-9ef8-1a751cba28f2",
...> instance_id: "e1297fe6-fe94-49cf-9ef8-1a751cba28f2",
...> label: "UnitTestWorker",
...> queue_name: "job_test_worker",
...> sdk_version: "2.3.4",
...> short_description: "A test worker",
...> system_info: %{
...> docker_container_id: "e1297fe6-fe94-49cf-9ef8-1a751cba28f2",
...> number_of_processors: 12,
...> total_memory: 16_574_754,
...> total_swap: 2_046_816,
...> used_memory: 8_865_633,
...> used_swap: 0
...> },
...> version: "1.2.3"
...> }
...> })
...> match?(%StepFlow.Workers.WorkerStatus{}, result)
true
Raises error if something went wrong during creation.
"""
def create_worker_status!(%{} = message) do
attrs =
message
|> StepFlow.Controllers.WorkerStatus.process_worker_status_message()
%WorkerStatus{}
|> WorkerStatus.changeset(attrs)
|> Repo.insert!()
end
def create_worker_status!(%StepFlow.Progressions.Progression{} = message) do
attrs =
message
|> StepFlow.Controllers.WorkerStatus.process_worker_status_message()
%WorkerStatus{}
|> WorkerStatus.changeset(attrs)
|> Repo.insert!()
end
@doc """
Creates a WorkerStatus.
## Examples
iex> result = StepFlow.Workers.WorkerStatuses.create_worker_status!(%{
...> job: nil,
...> worker: %{
...> activity: "Idle",
...> description: "This worker is just an example.",
...> direct_messaging_queue_name: "direct_messaging_e1297fe6-fe94-49cf-9ef8-1a751cba28f2",
...> instance_id: "e1297fe6-fe94-49cf-9ef8-1a751cba28f2",
...> label: "UnitTestWorker",
...> queue_name: "job_test_worker",
...> sdk_version: "2.3.4",
...> short_description: "A test worker",
...> system_info: %{
...> docker_container_id: "e1297fe6-fe94-49cf-9ef8-1a751cba28f2",
...> number_of_processors: 12,
...> total_memory: 16_574_754,
...> total_swap: 2_046_816,
...> used_memory: 8_865_633,
...> used_swap: 0
...> },
...> version: "1.2.3"
...> }
...> })
...> match?(%StepFlow.Workers.WorkerStatus{}, result)
true
Returns `{:ok, WorkerStatus}` on success, `{:error, changeset}` otherwise.
"""
def create_worker_status(%{} = message) do
attrs =
message
|> StepFlow.Controllers.WorkerStatus.process_worker_status_message()
%WorkerStatus{}
|> WorkerStatus.changeset(attrs)
|> Repo.insert()
end
@doc """
Updates a WorkerStatus.
## Examples
iex> result = StepFlow.Workers.WorkerStatuses.update_worker_status!(%{
...> job: %{
...> destination_paths: [],
...> execution_duration: 0.0,
...> job_id: 1234,
...> parameters: [],
...> status: "processing"
...> },
...> worker: %{
...> activity: "Idle",
...> description: "This worker is just an example.",
...> direct_messaging_queue_name: "direct_messaging_e1297fe6-fe94-49cf-9ef8-1a751cba28f2",
...> instance_id: "e1297fe6-fe94-49cf-9ef8-1a751cba28f2",
...> label: "UnitTestWorker",
...> queue_name: "job_test_worker",
...> sdk_version: "2.3.4",
...> short_description: "A test worker",
...> system_info: %{
...> docker_container_id: "e1297fe6-fe94-49cf-9ef8-1a751cba28f2",
...> number_of_processors: 12,
...> total_memory: 16_574_754,
...> total_swap: 2_046_816,
...> used_memory: 8_865_633,
...> used_swap: 0
...> },
...> version: "1.2.3"
...> }
...> })
...> match?(%StepFlow.Workers.WorkerStatus{}, result)
true
Raises error if something went wrong during update.
"""
def update_worker_status!(%WorkerStatus{} = worker_status, %{} = message) do
attrs =
message
|> StepFlow.Controllers.WorkerStatus.process_worker_status_message()
worker_status
|> WorkerStatus.changeset(attrs)
|> Repo.update!()
end
end