defmodule StepFlow.WorkflowDefinitions do
@moduledoc """
The WorkflowDefinitions context.
"""
import Ecto.Query, warn: false
alias StepFlow.Controllers.WorkflowDefinitions
alias StepFlow.Repo
alias StepFlow.Roles
alias StepFlow.WorkflowDefinitions.WorkflowDefinition
require Logger
@doc """
Returns the list of Workflow Definitions.
"""
def list_workflow_definitions(params \\ %{}) do
allowed_workflows = check_rights(params)
page =
Map.get(params, "page", 0)
|> StepFlow.Integer.force()
size =
Map.get(params, "size", 10)
|> StepFlow.Integer.force()
mode = Map.get(params, "mode", "full")
offset = page * size
query =
from(workflow_definition in WorkflowDefinition)
|> filter_deleted()
|> filter_by_rights(allowed_workflows)
|> filter_by_label_or_identifier(Map.get(params, "search"))
|> filter_by_versions(Map.get(params, "versions"))
|> select_by_mode(mode)
total_query = from(item in subquery(query), select: count(item.id))
total =
Repo.all(total_query)
|> List.first()
query =
from(
workflow_definition in subquery(query),
order_by: [
desc: workflow_definition.version_major,
desc: workflow_definition.version_minor,
desc: workflow_definition.version_micro
]
)
|> paginate(offset, size)
workflow_definitions = Repo.all(query)
%{
data: workflow_definitions,
total: total,
page: page,
size: size,
mode: mode
}
end
defp filter_deleted(query) do
from(
workflow in query,
where: workflow.deleted == false
)
end
defp check_rights(params) do
roles =
StepFlow.Map.get_by_key_or_atom(params, :roles)
|> Roles.get_roles()
view_rights =
case roles do
nil ->
[]
_ ->
StepFlow.Controllers.Roles.get_rights_for_entity_type_and_action(
roles,
"workflow",
"view"
)
end
for %{entity: entity} <- view_rights,
do:
String.split(entity, "::")
|> List.last()
end
defp paginate(query, offset, size) do
case size do
-1 ->
query
_ ->
from(
workflow_definition in subquery(query),
offset: ^offset,
limit: ^size
)
end
end
def filter_by_versions(query, versions) do
case versions do
["latest"] ->
from(
workflow_definition in subquery(query),
order_by: [
desc: workflow_definition.version_major,
desc: workflow_definition.version_minor,
desc: workflow_definition.version_micro
],
distinct: :identifier
)
versions when is_list(versions) and length(versions) != 0 ->
from(
workflow_definition in subquery(query),
where:
fragment(
"concat(?, '.', ?, '.', ?) = ANY(?)",
workflow_definition.version_major,
workflow_definition.version_minor,
workflow_definition.version_micro,
^versions
)
)
_ ->
query
end
end
defp filter_by_label_or_identifier(query, search) do
case search do
nil ->
query
search ->
like = "%#{search}%"
from(
workflow_definition in subquery(query),
where:
ilike(workflow_definition.label, ^like) or
ilike(workflow_definition.identifier, ^search)
)
end
end
defp filter_by_rights(query, allowed) do
if Enum.member?(allowed, "*") do
query
else
from(
workflow_definition in subquery(query),
where: workflow_definition.identifier in ^allowed
)
end
end
defp select_by_mode(query, mode) do
case mode do
"simple" ->
from(
workflow_definition in subquery(query),
select: %{
id: workflow_definition.id,
identifier: workflow_definition.identifier,
is_live: workflow_definition.is_live,
label: workflow_definition.label,
version_major: workflow_definition.version_major,
version_minor: workflow_definition.version_minor,
version_micro: workflow_definition.version_micro
}
)
"full" ->
query
_ ->
query
end
end
@doc """
Gets a single WorkflowDefinition (latest version).
Returns nil if the WorkflowDefinition does not exist.
"""
def get_workflow_definition(identifier) do
query =
from(workflow_definition in WorkflowDefinition,
where:
workflow_definition.identifier == ^identifier and
workflow_definition.deleted == false,
order_by: [
desc: workflow_definition.version_major,
desc: workflow_definition.version_minor,
desc: workflow_definition.version_micro
],
limit: 1
)
Repo.one(query)
end
@doc """
Gets a single WorkflowDefinition by specifying the version.
Returns nil if the WorkflowDefinition does not exist.
"""
def get_workflow_definition(identifier, version_major, version_minor, version_micro) do
query =
from(workflow_definition in WorkflowDefinition,
where:
workflow_definition.identifier == ^identifier and
workflow_definition.deleted == false and
workflow_definition.version_major == ^version_major and
workflow_definition.version_minor == ^version_minor and
workflow_definition.version_micro == ^version_micro,
limit: 1
)
Repo.one(query)
end
@doc """
Loads workflows
"""
def load_workflows do
WorkflowDefinitions.get_workflow_definition_directories()
|> Enum.map(fn directory ->
WorkflowDefinitions.list_workflow_definitions_for_a_directory(directory)
end)
|> List.flatten()
end
@doc """
Loads workflows in database
"""
def load_workflows_in_database do
WorkflowDefinitions.get_workflow_definition_directories()
|> Enum.map(fn directory ->
WorkflowDefinitions.list_workflow_definitions_for_a_directory(directory)
end)
|> List.flatten()
|> Enum.each(fn workflow_definition ->
case get_workflow_definition(
StepFlow.Map.get_by_key_or_atom(workflow_definition, :identifier),
StepFlow.Map.get_by_key_or_atom(workflow_definition, :version_major),
StepFlow.Map.get_by_key_or_atom(workflow_definition, :version_minor),
StepFlow.Map.get_by_key_or_atom(workflow_definition, :version_micro)
) do
nil ->
%WorkflowDefinition{}
|> WorkflowDefinition.changeset(workflow_definition)
|> Repo.insert()
_ ->
Logger.warn("Workflow already present in database")
end
end)
end
def load_workflow_in_database(workflow_definition) do
if WorkflowDefinitions.valid?(workflow_definition) do
case get_workflow_definition(
StepFlow.Map.get_by_key_or_atom(workflow_definition, :identifier),
StepFlow.Map.get_by_key_or_atom(workflow_definition, :version_major),
StepFlow.Map.get_by_key_or_atom(workflow_definition, :version_minor),
StepFlow.Map.get_by_key_or_atom(workflow_definition, :version_micro)
) do
nil ->
%WorkflowDefinition{}
|> WorkflowDefinition.changeset(workflow_definition)
|> Repo.insert()
{:ok,
"Workflow #{StepFlow.Map.get_by_key_or_atom(workflow_definition, :identifier)} properly loaded in database."}
_ ->
Logger.warn("Workflow already present in database")
{:error, "Workflow already present in database"}
end
else
fun = fn _error, path, acc ->
["at " <> inspect(path) | acc]
end
errors =
WorkflowDefinitions.validate(workflow_definition)
|> JsonXema.ValidationError.travers_errors([], fun)
Logger.error(
"Workflow definition #{inspect(Map.get(workflow_definition, "identifier"))} not valid: #{inspect(errors)}"
)
{:error,
"Workflow definition #{inspect(Map.get(workflow_definition, "identifier"))} not valid: #{inspect(errors)}"}
end
end
def delete_by_id(id) do
case Repo.get(WorkflowDefinition, id) do
nil ->
{:error, "Unable to find Workflow definition with ID #{id}"}
workflow_definition ->
case workflow_definition
|> WorkflowDefinition.changeset(%{"deleted" => true})
|> Repo.update() do
{:ok, _struct} ->
{:ok, "Workflow definition with ID #{id} marked as deleted"}
{:error, _changeset} ->
{:error, "Could not mark Workflow definition with ID #{id} as deleted"}
end
end
end
end