defmodule StepFlow.WorkflowDefinitions do
@moduledoc """
The WorkflowDefinitions context.
"""
import Ecto.Query, warn: false
alias StepFlow.Repo
alias StepFlow.Roles
alias StepFlow.WorkflowDefinitions.ExternalLoader
alias StepFlow.WorkflowDefinitions.WorkflowDefinition
require Logger
@doc """
Returns the list of Workflow Definitions.
"""
def list_workflow_definitions(params \\ %{}) do
allowed_workflows = check_rights(params)
page =
Map.get(params, "page", 0)
|> StepFlow.Integer.force()
size =
Map.get(params, "size", 10)
|> StepFlow.Integer.force()
mode = Map.get(params, "mode", "full")
offset = page * size
query =
from(workflow_definition in WorkflowDefinition)
|> filter_by_rights(allowed_workflows)
|> filter_by_label_or_identifier(Map.get(params, "search"))
|> filter_by_versions(Map.get(params, "versions"))
|> select_by_mode(mode)
total_query = from(item in subquery(query), select: count(item.id))
total =
Repo.all(total_query)
|> List.first()
query =
from(
workflow_definition in subquery(query),
order_by: [
desc: workflow_definition.version_major,
desc: workflow_definition.version_minor,
desc: workflow_definition.version_micro
]
)
|> paginate(offset, size)
workflow_definitions = Repo.all(query)
%{
data: workflow_definitions,
total: total,
page: page,
size: size,
mode: mode
}
end
defp check_rights(params) do
roles =
StepFlow.Map.get_by_key_or_atom(params, :roles)
|> Roles.get_roles()
view_rights =
case roles do
nil ->
[]
_ ->
Roles.get_rights_for_entity_type_and_action(roles, "workflow", "view")
end
for %{entity: entity} <- view_rights,
do:
String.split(entity, "::")
|> List.last()
end
defp paginate(query, offset, size) do
case size do
-1 ->
query
_ ->
from(
workflow_definition in subquery(query),
offset: ^offset,
limit: ^size
)
end
end
def filter_by_versions(query, versions) do
case versions do
["latest"] ->
from(
workflow_definition in subquery(query),
order_by: [
desc: workflow_definition.version_major,
desc: workflow_definition.version_minor,
desc: workflow_definition.version_micro
],
distinct: :identifier
)
versions when is_list(versions) and length(versions) != 0 ->
from(
workflow_definition in subquery(query),
where:
fragment(
"concat(?, '.', ?, '.', ?) = ANY(?)",
workflow_definition.version_major,
workflow_definition.version_minor,
workflow_definition.version_micro,
^versions
)
)
_ ->
query
end
end
defp filter_by_label_or_identifier(query, search) do
case search do
nil ->
query
search ->
like = "%#{search}%"
from(
workflow_definition in subquery(query),
where:
ilike(workflow_definition.label, ^like) or
ilike(workflow_definition.identifier, ^search)
)
end
end
defp filter_by_rights(query, allowed) do
if Enum.member?(allowed, "*") do
query
else
from(
workflow_definition in subquery(query),
where: workflow_definition.identifier in ^allowed
)
end
end
defp select_by_mode(query, mode) do
case mode do
"simple" ->
from(
workflow_definition in subquery(query),
select: %{
id: workflow_definition.id,
identifier: workflow_definition.identifier,
is_live: workflow_definition.is_live,
label: workflow_definition.label,
version_major: workflow_definition.version_major,
version_minor: workflow_definition.version_minor,
version_micro: workflow_definition.version_micro
}
)
"full" ->
query
_ ->
query
end
end
@doc """
Gets a single WorkflowDefinition (latest version).
Returns nil if the WorkflowDefinition does not exist.
"""
def get_workflow_definition(identifier) do
query =
from(workflow_definition in WorkflowDefinition,
where: workflow_definition.identifier == ^identifier,
order_by: [
desc: workflow_definition.version_major,
desc: workflow_definition.version_minor,
desc: workflow_definition.version_micro
],
limit: 1
)
Repo.one(query)
end
@doc """
Gets a single WorkflowDefinition by specifying the version.
Returns nil if the WorkflowDefinition does not exist.
"""
def get_workflow_definition(identifier, version_major, version_minor, version_micro) do
query =
from(workflow_definition in WorkflowDefinition,
where:
workflow_definition.identifier == ^identifier and
workflow_definition.version_major == ^version_major and
workflow_definition.version_minor == ^version_minor and
workflow_definition.version_micro == ^version_micro,
limit: 1
)
Repo.one(query)
end
@doc """
Loads workflows
"""
def load_workflows do
get_workflow_definition_directories()
|> Enum.map(fn directory ->
list_workflow_definitions_for_a_directory(directory)
end)
|> List.flatten()
end
@doc """
Loads workflows in database
"""
def load_workflows_in_database do
get_workflow_definition_directories()
|> Enum.map(fn directory ->
list_workflow_definitions_for_a_directory(directory)
end)
|> List.flatten()
|> Enum.each(fn workflow_definition ->
case get_workflow_definition(
StepFlow.Map.get_by_key_or_atom(workflow_definition, :identifier),
StepFlow.Map.get_by_key_or_atom(workflow_definition, :version_major),
StepFlow.Map.get_by_key_or_atom(workflow_definition, :version_minor),
StepFlow.Map.get_by_key_or_atom(workflow_definition, :version_micro)
) do
nil ->
%WorkflowDefinition{}
|> WorkflowDefinition.changeset(workflow_definition)
|> Repo.insert()
_ ->
Logger.warn("Workflow already present in database")
end
end)
end
@doc """
Get workflow definitions from directories
"""
def get_workflow_definition_directories do
Application.get_env(:step_flow, StepFlow)
|> Keyword.get(:workflow_definition)
|> case do
{:system, key} ->
System.get_env(key)
|> String.split(get_separator())
key when is_list(key) ->
key
key when is_bitstring(key) ->
[key]
key ->
Logger.info("unable to use #{inspect(key)} to list directory")
[]
end
end
def valid?(definition) do
get_schema()
|> JsonXema.valid?(definition)
end
def validate(definition) do
get_schema()
|> JsonXema.validate(definition)
end
defp get_schema do
schema =
Application.get_env(
StepFlow.WorkflowDefinitions.WorkflowDefinition,
:workflow_schema_url,
"https://media-cloud.ai/standard/1.10/workflow-definition.schema.json"
)
|> load_content()
|> Jason.decode!()
:ok = JsonXema.SchemaValidator.validate("http://json-schema.org/draft-07/schema#", schema)
JsonXema.new(schema, loader: ExternalLoader)
end
defp load_content("http://" <> _ = url) do
HTTPoison.get!(url)
|> Map.get(:body)
end
defp load_content("https://" <> _ = url) do
HTTPoison.get!(url)
|> Map.get(:body)
end
defp load_content(source_filename) do
File.read!(source_filename)
end
defp get_separator do
if :os.type() |> elem(0) == :unix do
":"
else
";"
end
end
defp list_workflow_definitions_for_a_directory(directory) do
File.ls!(directory)
|> Enum.filter(fn filename ->
String.ends_with?(filename, ".json")
end)
|> Enum.map(fn filename ->
Path.join(directory, filename)
|> File.read!()
|> Jason.decode!()
end)
|> Enum.filter(fn workflow_definition ->
if valid?(workflow_definition) do
true
else
fun = fn _error, path, acc ->
["at " <> inspect(path) | acc]
end
errors =
validate(workflow_definition)
|> JsonXema.ValidationError.travers_errors([], fun)
Logger.error(
"Workflow definition #{inspect(Map.get(workflow_definition, "identifier"))} not valid: #{inspect(errors)}"
)
false
end
end)
end
end