defmodule StepFlow.Jobs do
@moduledoc """
The Jobs context.
"""
require Logger
import Ecto.Query, warn: false
alias StepFlow.Repo
alias StepFlow.Amqp.CommonEmitter
alias StepFlow.Controllers.BlackList
alias StepFlow.Controllers.Jobs
alias StepFlow.Jobs.Job
alias StepFlow.Jobs.Status
alias StepFlow.Progressions.Progression
alias StepFlow.QueryFilter
alias StepFlow.Workers.WorkerStatuses
alias StepFlow.Workflows
@doc """
Returns the list of jobs.
## Examples
iex> list_jobs()
[%Job{}, ...]
"""
def list_jobs(params \\ %{}) do
query =
from(job in Job)
|> apply_rights(params)
|> apply_default_query_filters(params)
internal_query_jobs(query, params)
end
def internal_list_jobs(params \\ %{}) do
query =
from(job in Job)
|> apply_default_query_filters(params)
internal_query_jobs(query, params)
end
defp internal_query_jobs(query, params) do
page =
Map.get(params, "page", 0)
|> StepFlow.Integer.force()
size =
Map.get(params, "size", 10)
|> StepFlow.Integer.force()
offset = page * size
total_query = from(item in query, select: count(item.id))
total =
Repo.all(total_query)
|> List.first()
query =
from(
job in query,
order_by: [desc: :inserted_at],
offset: ^offset,
limit: ^size
)
jobs =
Repo.all(query)
|> Repo.preload([
:status,
:progressions,
:updates,
child_workflow: [:jobs, :artifacts, :status]
])
%{
data: jobs,
total: total,
page: page,
size: size
}
end
def apply_rights(query, params) do
allowed_workflows = Workflows.check_rights(params)
case Enum.member?(allowed_workflows, "*") do
true ->
query
false ->
allowed_job_ids = Repo.all(query) |> check_rights(allowed_workflows)
from(
job in query,
where: job.id in ^allowed_job_ids
)
end
end
def apply_default_query_filters(query, params) do
query =
case StepFlow.Map.get_by_key_or_atom(params, "workflow_id") do
nil ->
query
str_workflow_id ->
workflow_id = StepFlow.Integer.force(str_workflow_id)
from(job in query, where: job.workflow_id == ^workflow_id)
end
query =
query
|> QueryFilter.filter_query(params, :job_type, :name)
|> QueryFilter.filter_query(params, :step_id)
|> filter_status(params)
|> filter_worker_label(params)
|> filter_worker_version(params)
|> filter_worker_instance_id(params)
query =
case Map.get(params, "direct_messaging_queue_name") do
nil ->
query
direct_messaging_queue_name ->
direct_messaging_queue_name =
String.replace(direct_messaging_queue_name, "direct_messaging_", "")
expected =
%{
id: "direct_messaging_queue_name",
type: "string",
value: direct_messaging_queue_name
}
|> Jason.encode!()
from(
job in query,
where: fragment("? @> array[?::text]::jsonb[]", job.parameters, ^expected)
)
end
query
|> QueryFilter.apply_end_date_filter(params, :end_date)
|> QueryFilter.apply_end_date_filter(params, :before_date)
|> QueryFilter.apply_start_date_filter(params, :start_date)
|> QueryFilter.apply_start_date_filter(params, :after_date)
end
@doc """
Returns the list of jobs pending"
"""
def get_pending_jobs_by_type do
ids_with_progress =
from(p in Progression,
select: %{job_id: p.job_id}
)
ids_with_status =
from(s in Status,
select: %{job_id: s.job_id}
)
query =
from(job in Job,
left_join: progress in subquery(ids_with_progress),
on: job.id == progress.job_id,
left_join: status in subquery(ids_with_status),
on: job.id == status.job_id,
where: is_nil(progress.job_id) and is_nil(status.job_id),
select: %{name: job.name, value: count(job.id)},
group_by: job.name
)
Repo.all(query)
end
@doc """
Returns the list of jobs pending"
"""
def get_processing_jobs_by_type do
sub_query_status =
from(
status in Status,
select: %{job_id: status.job_id, inserted_at: max(status.inserted_at)},
group_by: [status.job_id]
)
sub_query =
from(
status in Status,
join: st in subquery(sub_query_status),
on: status.job_id == st.job_id and status.inserted_at == st.inserted_at,
where: status.state == :processing
)
query =
from(job in Job,
right_join: st in subquery(sub_query),
on: st.job_id == job.id,
select: %{name: job.name, value: count(job.id)},
group_by: job.name
)
Repo.all(query)
end
defp check_rights(jobs, allowed_workflows) do
Enum.map(
jobs,
fn job ->
if Enum.member?(
allowed_workflows,
StepFlow.Map.get_by_key_or_atom(
Workflows.get_workflow_for_job!(job.id),
:identifier
)
) do
job.id
end
end
)
end
def filter_status(query, params) do
QueryFilter.filter_query_with_related_entry(
query,
params,
:states,
:id,
Status,
:state,
:job_id
)
end
def filter_worker_version(query, params) do
QueryFilter.filter_query_with_related_entry(
query,
params,
:versions,
:last_worker_instance_id,
StepFlow.Workers.WorkerStatus,
:version,
:instance_id
)
end
def filter_worker_label(query, params) do
QueryFilter.filter_query_with_related_entry(
query,
params,
:labels,
:last_worker_instance_id,
StepFlow.Workers.WorkerStatus,
:label,
:instance_id
)
end
def filter_worker_instance_id(query, params) do
QueryFilter.filter_query_with_related_entry(
query,
params,
:instance_ids,
:last_worker_instance_id,
StepFlow.Workers.WorkerStatus,
:instance_id,
:instance_id
)
end
@doc """
Gets a single job.
Raises `Ecto.NoResultsError` if the Job does not exist.
## Examples
iex> get_job!(123)
%Job{}
iex> get_job!(456)
** (Ecto.NoResultsError)
"""
def get_job!(id), do: Repo.get!(Job, id)
@doc """
Gets a single job.
## Examples
iex> get_job(123)
%Job{}
iex> get_job(456)
nil
"""
def get_job(id), do: Repo.get(Job, id)
@doc """
Gets a single job by workflow ID and step ID
## Examples
iex> get_job(123)
%Job{}
iex> get_job(456)
nil
"""
def get_by!(%{"workflow_id" => workflow_id, "step_id" => step_id}) do
Repo.get_by!(Job, workflow_id: workflow_id, step_id: step_id)
end
@doc """
Gets a single job by workflow ID and step ID
## Examples
iex> get_job(123)
%Job{}
iex> get_job(456)
nil
"""
def get_by(%{"workflow_id" => workflow_id, "step_id" => step_id}) do
Repo.get_by(Job, workflow_id: workflow_id, step_id: step_id)
end
@doc """
Gets a single job with its related status.
Raises `Ecto.NoResultsError` if the Job does not exist.
## Examples
iex> get_job_with_status!(123)
%Job{}
iex> get_job!(456)
** (Ecto.NoResultsError)
"""
def get_job_with_status!(id) do
get_job!(id)
|> Repo.preload([
:status,
:progressions,
:updates,
child_workflow: [:jobs, :artifacts, :status]
])
end
@doc """
Creates a job.
## Examples
iex> create_job(%{field: value})
{:ok, %Job{}}
iex> create_job(%{field: bad_value})
{:error, %Ecto.Changeset{}}
"""
def create_job(attrs \\ %{}) do
%Job{}
|> Job.changeset(attrs)
|> Repo.insert()
end
def copy_job(%Job{} = job) do
%{
name: job.name,
step_id: job.step_id,
parameters: job.parameters,
workflow_id: job.workflow_id,
is_live: job.is_live,
is_updatable: job.is_updatable,
allow_failure: job.allow_failure
}
|> create_job()
end
@doc """
Creates a job with a paused status.
## Examples
iex> create_paused_job(workflow, 1, "download_http")
{:ok, "paused"}
"""
def create_paused_job(workflow_id, step_id, step_name, post_action) do
job_params = %{
name: step_name,
step_id: step_id,
workflow_id: workflow_id,
parameters: []
}
{:ok, job} = create_job(job_params)
Status.set_job_status(job.id, :queued)
Status.set_job_status(job.id, :processing)
Status.set_job_status(job.id, :paused, post_action)
end
@doc """
Creates a job with a skipped status.
## Examples
iex> create_skipped_job(workflow, 1, "download_http")
{:ok, "skipped"}
"""
def create_skipped_job(workflow, step_id, action) do
job_params = %{
name: action,
step_id: step_id,
workflow_id: workflow.id,
parameters: []
}
{:ok, job} = create_job(job_params)
Status.set_job_status(job.id, :skipped)
{:ok, "skipped"}
end
@doc """
Creates a job with an error status.
## Examples
iex> create_error_job(workflow, step_id, "download_http", "unsupported step")
{:ok, "created"}
"""
def create_error_job(workflow, step_id, action, description) do
job_params = %{
name: action,
step_id: step_id,
workflow_id: workflow.id,
parameters: []
}
{:ok, job} = create_job(job_params)
Status.set_job_status(job.id, :queued)
Status.set_job_status(job.id, :processing)
Status.set_job_status(job.id, :error, %{message: description})
{:ok, "created"}
end
@doc """
Creates a job with a completed status.
## Examples
iex> create_completed_job(workflow, step_id, "webhook_notification")
{:ok, "completed"}
"""
def create_completed_job(workflow, step_id, action) do
job_params = %{
name: action,
step_id: step_id,
workflow_id: workflow.id,
parameters: []
}
{:ok, job} = create_job(job_params)
Status.set_job_status(job.id, :queued)
Status.set_job_status(job.id, :processing)
Status.set_job_status(job.id, :completed)
{:ok, "completed"}
end
defp abort_job(job) do
worker_status = WorkerStatuses.get_worker_status_for_job(job.id)
case worker_status do
nil ->
Logger.warn("Cannot abort job #{job.id}, no associated running worker found.")
worker_status ->
message =
Jobs.get_message(job)
|> Map.put(:type, "stop_process")
Logger.info(
"Send stop_process message for step #{job.id} to worker #{worker_status.instance_id}"
)
publish_result =
case CommonEmitter.publish_json(
worker_status.direct_messaging_queue_name,
job.id,
message,
"direct_messaging",
headers: [{"instance_id", :longstr, worker_status.instance_id}]
) do
:ok ->
{:ok, "stopped"}
_ ->
{:error, "unable to publish message"}
end
publish_result
end
end
def abort_jobs(workflow, step_id, action) do
jobs =
internal_list_jobs(%{
name: action,
step_id: step_id,
workflow_id: workflow.id,
# assuming that a workflow cannot have more than 1000 jobs
size: 1000
})
# Create dedicated method
|> Map.get(:data)
# Black list queued jobs
jobs
|> Enum.filter(fn job ->
last_status = Jobs.get_last_status(job.status)
last_status.state == :queued
end)
|> Enum.each(fn job ->
BlackList.add_and_notify!(job.id)
end)
# Abort running jobs
jobs
|> Enum.filter(fn job ->
length(job.progressions) > 0
end)
|> Enum.each(fn job ->
abort_job(job)
end)
end
@doc """
Set skipped status to all queued jobs.
## Examples
iex> skip_jobs(workflow, step_id, "download_http")
:ok
"""
def skip_jobs(workflow, step_id, action) do
internal_list_jobs(%{
name: action,
step_id: step_id,
workflow_id: workflow.id
})
# Create dedicated method
|> Map.get(:data)
|> Enum.filter(fn job ->
case job.status do
[%{state: state}] -> state != "queued"
_ -> false
end
end)
|> Enum.each(fn job ->
Status.set_job_status(job.id, :skipped)
end)
end
@doc """
Updates a job.
## Examples
iex> update_job(job, %{field: new_value})
{:ok, %Job{}}
iex> update_job(job, %{field: bad_value})
{:error, %Ecto.Changeset{}}
"""
def update_job(%Job{} = job, attrs) do
job
|> Job.changeset(attrs)
|> Repo.update()
end
@doc """
Deletes a Job.
## Examples
iex> delete_job(job)
{:ok, %Job{}}
iex> delete_job(job)
{:error, %Ecto.Changeset{}}
"""
def delete_job(%Job{} = job) do
job
|> Job.changeset(%{deleted: true})
|> Repo.update()
end
@doc """
Returns an `%Ecto.Changeset{}` for tracking job changes.
## Examples
iex> change_job(job)
%Ecto.Changeset{source: %Job{}}
"""
def change_job(%Job{} = job) do
Job.changeset(job, %{})
end
end