defmodule StepFlow.Workflows do
@moduledoc """
The Workflows context.
"""
import Ecto.Query, warn: false
alias StepFlow.Artifacts.Artifact
alias StepFlow.Jobs
alias StepFlow.Jobs.Status
alias StepFlow.QueryFilter
alias StepFlow.Repo
alias StepFlow.Roles
alias StepFlow.Workflows
alias StepFlow.Workflows.Workflow
require Logger
@doc """
Returns the list of workflows.
## Examples
iex> list_workflows()
[%Workflow{}, ...]
"""
def list_workflows(params \\ %{}) do
page =
Map.get(params, "page", 0)
|> StepFlow.Integer.force()
size =
Map.get(params, "size", 10)
|> StepFlow.Integer.force()
offset = page * size
query =
from(workflow in Workflow)
|> apply_default_query_filters(params)
total_query = from(item in subquery(query), select: count(item.id))
total =
Repo.all(total_query)
|> List.first()
query =
from(
workflow in subquery(query),
order_by: [desc: :inserted_at],
offset: ^offset,
limit: ^size
)
workflows =
Repo.all(query)
|> Repo.preload([:artifacts, :status, jobs: :child_workflow])
|> preload_workflows
%{
data: workflows,
total: total,
page: page,
size: size
}
end
def apply_default_query_filters(query, params \\ %{}) do
query =
from(workflow in subquery(query))
|> search_for_reference(params, :search)
|> QueryFilter.filter_query(params, :reference)
|> QueryFilter.filter_query(params, :identifier)
|> filter_version(params)
|> QueryFilter.filter_query(params, :is_live)
|> filter_deleted(params)
|> QueryFilter.filter_query(params, :user_uuid)
|> filter_mode(params)
|> QueryFilter.apply_before_date_filter(params, :before_date)
|> QueryFilter.apply_after_date_filter(params, :after_date)
|> filter_status(params, :states)
allowed_workflows = check_rights(params)
query =
case {StepFlow.Map.get_by_key_or_atom(params, :workflow_ids),
Enum.member?(allowed_workflows, "*")} do
{nil, true} ->
query
{nil, false} ->
from(
workflow in query,
where: workflow.identifier in ^allowed_workflows
)
{workflow_ids, true} ->
from(
workflow in query,
where: workflow.identifier in ^workflow_ids
)
{workflow_ids, false} ->
intersect =
workflow_ids
|> Enum.filter(fn element -> element in allowed_workflows end)
from(
workflow in query,
where: workflow.identifier in ^intersect
)
end
query =
case StepFlow.Map.get_by_key_or_atom(params, :ids) do
nil ->
query
identifiers ->
from(workflow in query, where: workflow.id in ^identifiers)
end
query
end
defp search_for_reference(query, params, field) do
case StepFlow.Map.get_by_key_or_atom(params, field) do
nil ->
query
search ->
like = "%#{search}%"
from(
workflow in subquery(query),
where: ilike(workflow.reference, ^like)
)
end
end
def check_rights(params) do
roles =
StepFlow.Map.get_by_key_or_atom(params, :roles, [])
|> Roles.get_roles()
view_rights =
case roles do
nil ->
[]
_ ->
StepFlow.Controllers.Roles.get_rights_for_entity_type_and_action(
roles,
"workflow",
"view"
)
end
for %{entity: entity} <- view_rights,
do:
String.split(entity, "::")
|> List.last()
end
defp filter_deleted(query, params) do
case Map.get(params, "deleted") do
value when value in [nil, "none"] ->
from(
workflow in query,
where: workflow.deleted == false
)
"only" ->
from(
workflow in query,
where: workflow.deleted == true
)
"all" ->
query
end
end
defp filter_mode(query, params) do
case Map.get(params, "mode") do
nil ->
from(workflow in query)
["live", "file"] ->
from(workflow in query)
["live"] ->
from(
workflow in query,
where: workflow.is_live == true
)
["file"] ->
from(
workflow in query,
where: workflow.is_live == false
)
end
end
defp filter_version(query, params) do
case Map.get(params, "version") do
nil ->
from(workflow in query)
versions ->
from(
workflow in query,
where:
fragment("CONCAT(version_major,'.',version_minor,'.',version_micro)") in ^versions
)
end
end
def filter_status(query, params, key) do
case StepFlow.Map.get_by_key_or_atom(params, key) do
nil ->
query
states ->
from(
workflow in query,
join:
workflow_status in subquery(
from(
workflow_status in Workflows.Status,
order_by: [desc: workflow_status.id, desc: workflow_status.workflow_id],
distinct: [desc: workflow_status.workflow_id]
)
),
on: workflow.id == workflow_status.workflow_id,
where: workflow_status.state in ^states
)
end
end
@doc """
Gets a single workflows.
Raises `Ecto.NoResultsError` if the Workflow does not exist.
## Examples
iex> get_workflows!(123)
%Workflow{}
iex> get_workflows!(456)
** (Ecto.NoResultsError)
"""
def get_workflow!(id) do
Repo.get!(Workflow, id)
|> Repo.preload([:artifacts, jobs: :child_workflow])
|> preload_workflow
end
@doc """
Gets a single workflows containing the specified job ID.
Raises `Ecto.NoResultsError` if the Workflow does not exist.
## Examples
iex> get_workflow_for_job!(19)
%Workflow{}
iex> get_workflows!(456)
** (Ecto.NoResultsError)
"""
def get_workflow_for_job!(job_id) do
job = Jobs.get_job!(job_id)
get_workflow!(job.workflow_id)
end
defp preload_workflow(workflow) do
jobs = Repo.preload(workflow.jobs, [:status, :progressions])
steps =
workflow
|> Map.get(:steps)
|> StepFlow.Controllers.Workflows.get_steps_with_status(jobs)
workflow
|> Map.put(:steps, steps)
|> Map.put(:jobs, jobs)
end
def preload_workflows(workflows, result \\ [])
def preload_workflows([], result), do: result
def preload_workflows([workflow | workflows], result) do
result = List.insert_at(result, -1, workflow |> preload_workflow)
preload_workflows(workflows, result)
end
def get_step_definition(job) do
job = Repo.preload(job, workflow: [jobs: :child_workflow])
step =
Enum.filter(job.workflow.steps, fn step ->
Map.get(step, "id") == job.step_id
end)
|> List.first()
%{step: step, workflow: job.workflow}
end
@doc """
Creates a workflow.
## Examples
iex> create_workflow(%{field: value})
{:ok, %Workflow{}}
iex> create_workflow(%{field: bad_value})
{:error, %Ecto.Changeset{}}
"""
def create_workflow(attrs \\ %{}) do
%Workflow{}
|> Workflow.changeset(attrs)
|> Repo.insert()
end
@doc """
Updates a workflow.
## Examples
iex> update_workflow(workflow, %{field: new_value})
{:ok, %Workflow{}}
iex> update_workflow(workflow, %{field: bad_value})
{:error, %Ecto.Changeset{}}
"""
def update_workflow(%Workflow{} = workflow, attrs) do
workflow
|> Workflow.changeset(attrs)
|> Repo.update()
end
@doc """
Deletes a Workflow.
## Examples
iex> delete_workflow(workflow)
{:ok, %Workflow{}}
iex> delete_workflow(workflow)
{:error, %Ecto.Changeset{}}
"""
def delete_workflow(%Workflow{} = workflow) do
workflow
|> Workflow.changeset(%{deleted: true})
|> Repo.update()
end
@doc """
Returns an `%Ecto.Changeset{}` for tracking workflow changes.
## Examples
iex> change_workflow(workflow)
%Ecto.Changeset{source: %Workflow{}}
"""
def change_workflow(%Workflow{} = workflow) do
Workflow.changeset(workflow, %{})
end
def get_completed_statistics(scale, delta) do
query =
from(
workflow in Workflow,
inner_join:
artifacts in subquery(
from(
artifacts in Artifact,
where:
artifacts.inserted_at > datetime_add(^NaiveDateTime.utc_now(), ^delta, ^scale),
group_by: artifacts.workflow_id,
select: %{
workflow_id: artifacts.workflow_id,
inserted_at: max(artifacts.inserted_at)
}
)
),
on: workflow.id == artifacts.workflow_id,
group_by: workflow.identifier,
select: %{
count: count(),
duration:
fragment(
"CAST(EXTRACT(EPOCH FROM (SELECT avg(? - ?))) AS FLOAT)",
artifacts.inserted_at,
workflow.inserted_at
),
identifier: workflow.identifier
}
)
Repo.all(query)
end
@doc """
Convert workflow version fields to a version string
"""
def get_workflow_version_as_string(workflow) do
to_string(workflow.version_major) <>
"." <> to_string(workflow.version_minor) <> "." <> to_string(workflow.version_micro)
end
def abort(workflow) do
workflow.steps
|> abort_running_step_jobs(workflow)
workflow.steps
|> skip_remaining_steps(workflow)
result = Workflows.Status.set_workflow_status(workflow.id, :stopped)
topic = "update_workflow_" <> Integer.to_string(workflow.id)
StepFlow.Notification.send(topic, %{workflow_id: workflow.id})
result
end
def pause(workflow, action, trigger_date_time) do
case action do
action when action in ["resume", "abort"] ->
description = %{
action: action,
trigger_at:
DateTime.from_unix!(trigger_date_time, :millisecond)
|> DateTime.to_naive()
}
workflow = Repo.preload(workflow, jobs: :child_workflow)
has_processing_steps =
workflow.steps
|> Enum.filter(fn step -> step.jobs.processing > 0 end)
|> Enum.empty?()
|> Kernel.not()
workflow_status =
if has_processing_steps do
:pausing
else
:paused
end
result =
Workflows.Status.set_workflow_status(workflow.id, workflow_status, nil, description)
_paused_steps = pause_remaining_steps(workflow.steps, workflow.jobs, %{})
topic = "update_workflow_" <> Integer.to_string(workflow.id)
StepFlow.Notification.send(topic, %{workflow_id: workflow.id})
result
_ ->
{:error, "Unknown action: #{action}"}
end
end
def resume(workflow) do
{:ok, _status} = Workflows.Status.set_workflow_status(workflow.id, :pending)
case StepFlow.Step.start_next(workflow) do
{:ok, _} ->
workflow = Repo.preload(workflow, jobs: :child_workflow)
_resumed_steps = resume_remaining_steps(workflow.steps, workflow.jobs)
result = Workflows.Status.set_workflow_status(workflow.id, :processing)
topic = "update_workflow_" <> Integer.to_string(workflow.id)
StepFlow.Notification.send(topic, %{workflow_id: workflow.id})
result
error ->
error
end
end
defp abort_running_step_jobs([], _workflow), do: nil
defp abort_running_step_jobs([step | steps], workflow) do
case step.status do
:processing ->
case StepFlow.Map.get_by_key_or_atom(step, :mode) do
# When nested workflows
mode when mode in ["workflow_one_for_one", "workflow_one_for_many"] ->
abort_nested_workflows(step, workflow)
_ ->
[
StepFlow.Step.abort_step_jobs(workflow, step)
]
end
_ ->
nil
end
abort_running_step_jobs(steps, workflow)
end
defp abort_nested_workflows(step, workflow) do
step_id = StepFlow.Map.get_by_key_or_atom(step, :id)
step_jobs =
workflow.jobs
|> Enum.filter(fn job -> job.step_id == step_id end)
Enum.each(step_jobs, fn job ->
get_workflow!(job.child_workflow.id)
|> abort()
end)
end
defp skip_remaining_steps([], _workflow), do: nil
defp skip_remaining_steps([step | steps], workflow) do
case step.status do
:queued -> StepFlow.Step.skip_step(workflow, step)
:paused -> StepFlow.Step.skip_step_jobs(workflow, step)
:processing -> StepFlow.Step.skip_step_jobs(workflow, step)
_ -> nil
end
skip_remaining_steps(steps, workflow)
end
defp pause_remaining_steps(_steps, _workflow_jobs, _post_action, _paused_steps \\ [])
defp pause_remaining_steps([], _workflow_jobs, _post_action, paused_steps), do: paused_steps
defp pause_remaining_steps([step | steps], workflow_jobs, post_action, paused_steps) do
# Get step jobs
step_id = StepFlow.Map.get_by_key_or_atom(step, :id)
step_jobs =
workflow_jobs
|> Enum.filter(fn job -> job.step_id == step_id end)
paused_steps =
case step.status do
status when status in [:queued, :processing] ->
Logger.debug("Pause #{inspect(status)} step: #{inspect(step)}")
case StepFlow.Map.get_by_key_or_atom(step, :mode) do
mode when mode in ["workflow_one_for_one", "workflow_one_for_many"] ->
trigger_date_time = DateTime.utc_now() |> DateTime.to_unix()
pause_nested_workflows(step_jobs, post_action, trigger_date_time)
workflow = get_workflow!(List.first(step_jobs).workflow_id)
pause(workflow, "resume", trigger_date_time)
_ ->
[
StepFlow.Step.pause_step(step, step_jobs, post_action) | paused_steps
]
end
_ ->
paused_steps
end
pause_remaining_steps(steps, workflow_jobs, post_action, paused_steps)
end
defp pause_nested_workflows(step_jobs, post_action, trigger_date_time) do
Enum.each(step_jobs, fn job ->
{:ok, _status} = Status.set_job_status(job.id, :paused, post_action)
get_workflow!(job.child_workflow.id)
|> pause("resume", trigger_date_time)
end)
end
defp resume_remaining_steps([], _workflow_jobs), do: nil
defp resume_remaining_steps([step | steps], workflow_jobs) do
case step.status do
:paused ->
Logger.debug("Resume paused step: #{inspect(step)}")
step_id = StepFlow.Map.get_by_key_or_atom(step, :id)
step_jobs =
workflow_jobs
|> Enum.filter(fn job -> job.step_id == step_id end)
case StepFlow.Map.get_by_key_or_atom(step, :mode) do
# When nested workflows
mode when mode in ["workflow_one_for_one", "workflow_one_for_many"] ->
Enum.each(step_jobs, fn job ->
get_workflow!(job.child_workflow.id)
|> resume()
end)
_ ->
StepFlow.Step.resume_step(step_jobs)
end
_ ->
nil
end
resume_remaining_steps(steps, workflow_jobs)
end
end