defmodule StepFlow.Workflows do
@moduledoc """
The Workflows context.
"""
import Ecto.Query, warn: false
alias StepFlow.Artifacts.Artifact
alias StepFlow.Jobs
alias StepFlow.Jobs.Status
alias StepFlow.Progressions.Progression
alias StepFlow.QueryFilter
alias StepFlow.Repo
alias StepFlow.Roles
alias StepFlow.Workflows
alias StepFlow.Workflows.Workflow
require Logger
@doc """
Returns the list of workflows.
## Examples
iex> list_workflows()
[%Workflow{}, ...]
"""
def list_workflows(params \\ %{}) do
page =
Map.get(params, "page", 0)
|> StepFlow.Integer.force()
size =
Map.get(params, "size", 10)
|> StepFlow.Integer.force()
offset = page * size
query =
from(workflow in Workflow)
|> apply_default_query_filters(params)
total_query = from(item in subquery(query), select: count(item.id))
total =
Repo.all(total_query)
|> List.first()
query =
from(
workflow in subquery(query),
order_by: [desc: :inserted_at],
offset: ^offset,
limit: ^size
)
workflows =
Repo.all(query)
|> Repo.preload([:jobs, :artifacts, :status])
|> preload_workflows
%{
data: workflows,
total: total,
page: page,
size: size
}
end
def apply_default_query_filters(query, params \\ %{}) do
query =
from(workflow in subquery(query))
|> search_for_reference(params, :search)
|> QueryFilter.filter_query(params, :reference)
|> QueryFilter.filter_query(params, :identifier)
|> QueryFilter.filter_query(params, :version_major)
|> QueryFilter.filter_query(params, :version_minor)
|> QueryFilter.filter_query(params, :version_micro)
|> QueryFilter.filter_query(params, :is_live)
|> QueryFilter.filter_query(params, :deleted)
|> QueryFilter.filter_query(params, :user_uuid)
|> filter_mode(params)
|> QueryFilter.apply_before_date_filter(params, :before_date)
|> QueryFilter.apply_after_date_filter(params, :after_date)
|> filter_status(params, :states)
allowed_workflows = check_rights(params)
query =
case {StepFlow.Map.get_by_key_or_atom(params, :workflow_ids),
Enum.member?(allowed_workflows, "*")} do
{nil, true} ->
query
{nil, false} ->
from(
workflow in query,
where: workflow.identifier in ^allowed_workflows
)
{workflow_ids, true} ->
from(
workflow in query,
where: workflow.identifier in ^workflow_ids
)
{workflow_ids, false} ->
intersect =
workflow_ids
|> Enum.filter(fn element -> element in allowed_workflows end)
from(
workflow in query,
where: workflow.identifier in ^intersect
)
end
query =
case StepFlow.Map.get_by_key_or_atom(params, :ids) do
nil ->
query
identifiers ->
from(workflow in query, where: workflow.id in ^identifiers)
end
query
end
defp search_for_reference(query, params, field) do
case StepFlow.Map.get_by_key_or_atom(params, field) do
nil ->
query
search ->
like = "%#{search}%"
from(
workflow in subquery(query),
where: ilike(workflow.reference, ^like)
)
end
end
def check_rights(params) do
roles =
StepFlow.Map.get_by_key_or_atom(params, :roles, [])
|> Roles.get_roles()
view_rights =
case roles do
nil ->
[]
_ ->
Roles.get_rights_for_entity_type_and_action(roles, "workflow", "view")
end
for %{entity: entity} <- view_rights,
do:
String.split(entity, "::")
|> List.last()
end
defp filter_mode(query, params) do
case Map.get(params, "mode") do
nil ->
from(workflow in query)
["live", "file"] ->
from(workflow in query)
["live"] ->
from(
workflow in query,
where: workflow.is_live == true
)
["file"] ->
from(
workflow in query,
where: workflow.is_live == false
)
end
end
def filter_status(query, params, key) do
case StepFlow.Map.get_by_key_or_atom(params, key) do
nil ->
query
states ->
from(
workflow in query,
join:
workflow_status in subquery(
from(
workflow_status in Workflows.Status,
order_by: [desc: workflow_status.id, desc: workflow_status.workflow_id],
distinct: [desc: workflow_status.workflow_id]
)
),
on: workflow.id == workflow_status.workflow_id,
where: workflow_status.state in ^states
)
end
end
@doc """
Gets a single workflows.
Raises `Ecto.NoResultsError` if the Workflow does not exist.
## Examples
iex> get_workflows!(123)
%Workflow{}
iex> get_workflows!(456)
** (Ecto.NoResultsError)
"""
def get_workflow!(id) do
Repo.get!(Workflow, id)
|> Repo.preload([:jobs, :artifacts])
|> preload_workflow
end
@doc """
Gets a single workflows containing the specified job ID.
Raises `Ecto.NoResultsError` if the Workflow does not exist.
## Examples
iex> get_workflow_for_job!(19)
%Workflow{}
iex> get_workflows!(456)
** (Ecto.NoResultsError)
"""
def get_workflow_for_job!(job_id) do
job = Jobs.get_job!(job_id)
get_workflow!(job.workflow_id)
end
defp preload_workflow(workflow) do
jobs = Repo.preload(workflow.jobs, [:status, :progressions])
steps =
workflow
|> Map.get(:steps)
|> get_steps_with_status(jobs)
workflow
|> Map.put(:steps, steps)
|> Map.put(:jobs, jobs)
end
def preload_workflows(workflows, result \\ [])
def preload_workflows([], result), do: result
def preload_workflows([workflow | workflows], result) do
result = List.insert_at(result, -1, workflow |> preload_workflow)
preload_workflows(workflows, result)
end
def get_steps_with_status(steps, workflow_jobs, result \\ [])
def get_steps_with_status([], _workflow_jobs, result), do: result
def get_steps_with_status(nil, _workflow_jobs, result), do: result
def get_steps_with_status([step | steps], workflow_jobs, result) do
name = StepFlow.Map.get_by_key_or_atom(step, :name)
step_id = StepFlow.Map.get_by_key_or_atom(step, :id)
jobs =
workflow_jobs
|> Enum.filter(fn job -> job.name == name && job.step_id == step_id end)
job_status = count_step_jobs_status(jobs)
status = get_step_status_from_job_status(job_status)
step =
step
|> Map.put(:status, status)
|> Map.put(:jobs, job_status)
result = List.insert_at(result, -1, step)
get_steps_with_status(steps, workflow_jobs, result)
end
defp count_step_jobs_status(jobs) do
completed = count_status(jobs, :completed)
errors = count_status(jobs, :error)
skipped = count_status(jobs, :skipped)
stopped = count_status(jobs, :stopped)
processing = count_status(jobs, :processing)
paused = count_status(jobs, :paused)
queued = count_status(jobs, :queued)
%{
total: length(jobs),
completed: completed,
errors: errors,
paused: paused,
processing: processing,
queued: queued,
skipped: skipped,
stopped: stopped
}
end
defp get_step_status_from_job_status(job_status) do
cond do
job_status.errors > 0 -> :error
job_status.processing > 0 -> :processing
job_status.queued > 0 -> :processing
job_status.stopped > 0 -> :stopped
job_status.skipped > 0 -> :skipped
job_status.completed > 0 -> :completed
job_status.paused > 0 -> :paused
true -> :pending
end
end
def get_step_definition(job) do
job = Repo.preload(job, workflow: [:jobs])
step =
Enum.filter(job.workflow.steps, fn step ->
Map.get(step, "id") == job.step_id
end)
|> List.first()
%{step: step, workflow: job.workflow}
end
defp count_status(jobs, status, count \\ 0)
defp count_status([], _status, count), do: count
defp count_status([job | jobs], status, count) do
count_completed =
job.status
|> Enum.filter(fn s -> s.state == :completed end)
|> length
# A job with at least one status.state at :completed is considered :completed
count =
if count_completed >= 1 do
if status == :completed do
count + 1
else
count
end
else
count_uncompleted_status(status, job, count)
end
count_status(jobs, status, count)
end
defp count_uncompleted_status(status, job, count) do
case status do
:processing ->
count_processing(job, count)
:error ->
count_error(job, count)
:paused ->
count_paused(job, count)
:skipped ->
count_skipped(job, count)
:stopped ->
count_stopped(job, count)
:queued ->
count_queued(job, count)
:completed ->
count
_ ->
raise RuntimeError
Logger.error("unreachable")
count
end
end
defp count_processing(job, count) do
if job.progressions == [] do
count
else
last_progression =
job.progressions
|> Progression.get_last_progression()
last_status =
job.status
|> Status.get_last_status()
cond do
last_status == nil ->
count + 1
NaiveDateTime.compare(last_progression.updated_at, last_status.updated_at) == :gt ->
count + 1
NaiveDateTime.compare(last_progression.updated_at, last_status.updated_at) == :eq and
last_status.state == :processing ->
count + 1
true ->
count
end
end
end
defp count_error(job, count) do
case Status.get_last_status(job.status) do
nil -> count
last_status when last_status.state == :error -> count + 1
_last_status -> count
end
end
defp count_paused(job, count) do
case Status.get_last_status(job.status) do
nil -> count
last_status when last_status.state == :paused -> count + 1
_last_status -> count
end
end
defp count_skipped(job, count) do
case Status.get_last_status(job.status) do
nil -> count
last_status when last_status.state == :skipped -> count + 1
_last_status -> count
end
end
defp count_stopped(job, count) do
case Status.get_last_status(job.status) do
nil -> count
last_status when last_status.state == :stopped -> count + 1
_last_status -> count
end
end
defp count_queued(job, count) do
last_status = Status.get_last_status(job.status)
last_progression = Progression.get_last_progression(job.progressions)
case {last_status, last_progression} do
{nil, nil} ->
count + 1
{nil, _} ->
count
{last_status, nil} when last_status.state == :queued ->
count + 1
{last_status, nil} when last_status.state == :retrying ->
count + 1
{last_status, _} when last_status.state == :retrying ->
if NaiveDateTime.compare(last_progression.updated_at, last_status.updated_at) == :gt do
count
else
count + 1
end
{_last_status, _} ->
count
end
end
@doc """
Creates a workflow.
## Examples
iex> create_workflow(%{field: value})
{:ok, %Workflow{}}
iex> create_workflow(%{field: bad_value})
{:error, %Ecto.Changeset{}}
"""
def create_workflow(attrs \\ %{}) do
%Workflow{}
|> Workflow.changeset(attrs)
|> Repo.insert()
end
@doc """
Updates a workflow.
## Examples
iex> update_workflow(workflow, %{field: new_value})
{:ok, %Workflow{}}
iex> update_workflow(workflow, %{field: bad_value})
{:error, %Ecto.Changeset{}}
"""
def update_workflow(%Workflow{} = workflow, attrs) do
workflow
|> Workflow.changeset(attrs)
|> Repo.update()
end
def notification_slack_from_job(job, channel, exposed_domain_name, description) do
send(
:step_flow_slack_bot,
{:message,
"Error for job #{job.name} ##{job.id} <#{exposed_domain_name}/workflows/#{job.workflow_id} |Open Workflow>\n```#{description}```",
channel}
)
end
def notification_teams_from_job(job, url, workflow, exposed_domain_name, description) do
headers = [{"content-type", "application/json"}]
body =
Poison.encode!(%{
"@context": "https://schema.org/extensions",
"@type": "MessageCard",
potentialAction: [
%{
"@type": "OpenUri",
name: "View Workflow",
targets: [
%{
os: "default",
uri: "#{exposed_domain_name}/workflows/#{job.workflow_id}"
}
]
}
],
sections: [
%{
facts: [
%{
name: "Posted By:",
value: "#{exposed_domain_name}"
},
%{
name: "Workflow Reference:",
value: "#{workflow.reference}"
},
%{
name: "Workflow ID:",
value: "#{workflow.id}"
},
%{
name: "Workflow Identifier:",
value: "#{workflow.identifier}"
},
%{
name: "Job ID:",
value: "#{job.id}"
},
%{
name: "Job:",
value: "#{job.name}"
}
],
text: "```#{description}```"
}
],
summary: "MCAI Workflow Error Notification",
themeColor: "FF5733",
title: "MCAI Workflow Error Notification"
})
Logger.debug(
"#{__MODULE__}: POST #{url}, headers: #{inspect(headers)}, body: #{inspect(body)}"
)
{:ok, response} = HTTPoison.request("POST", url, body, headers)
if response.status_code == 200 do
{:ok, response.body}
else
Logger.error("Unable to notify: #{inspect(response)}")
end
end
def notification_hook_from_workflow(
workflow_notification_url,
workflow,
exposed_domain_name,
status_wf,
workflow_endpoint_credentials
) do
headers =
if workflow_endpoint_credentials != nil do
[
{"content-type", "application/json"},
{"Authorization", "Basic #{workflow_endpoint_credentials}"}
]
else
[{"content-type", "application/json"}]
end
body =
if status_wf == "error" do
Poison.encode!(%{
"@context": "https://schema.org/extensions",
"@type": "MessageCard",
potentialAction: [
%{
"@type": "OpenUri",
name: "View Workflow",
targets: [
%{
os: "default",
uri: "#{exposed_domain_name}/workflows/#{workflow.id}"
}
]
}
],
sections: [
%{
facts: [
%{
name: "Posted By:",
value: "#{exposed_domain_name}"
},
%{
name: "Workflow Reference:",
value: "#{workflow.reference}"
},
%{
name: "Workflow ID:",
value: "#{workflow.id}"
},
%{
name: "Workflow Identifier:",
value: "#{workflow.identifier}"
},
%{
name: "Workflow Status:",
value: "#{status_wf}"
}
]
}
],
summary: "MCAI Workflow Error Notification",
themeColor: "FF5733",
title: "MCAI Workflow Error Notification"
})
else
Poison.encode!(%{
"@context": "https://schema.org/extensions",
"@type": "MessageCard",
potentialAction: [
%{
"@type": "OpenUri",
name: "View Workflow",
targets: [
%{
os: "default",
uri: "#{exposed_domain_name}/workflows/#{workflow.id}"
}
]
}
],
sections: [
%{
facts: [
%{
name: "Posted By",
value: "#{exposed_domain_name}"
},
%{
name: "Workflow Reference",
value: "#{workflow.reference}"
},
%{
name: "Workflow ID",
value: "#{workflow.id}"
},
%{
name: "Workflow Identifier",
value: "#{workflow.identifier}"
},
%{
name: "Workflow Status",
value: "#{status_wf}"
}
]
}
],
summary: "MCAI Workflow Success Notification",
themeColor: "339933",
title: "MCAI Workflow Success Notification"
})
end
Logger.debug(
"#{__MODULE__}: POST #{workflow_notification_url}, headers: #{inspect(headers)}, body: #{inspect(body)}"
)
HTTPoison.request("POST", workflow_notification_url, body, headers)
end
def notification_workflow_status(workflow_id, status_wf \\ "error") do
topic = "update_workflow_" <> Integer.to_string(workflow_id)
workflow = Workflows.get_workflow!(workflow_id)
exposed_domain_name = StepFlow.Configuration.get_exposed_domain_name()
workflow_notifications_with_status =
for workflow_notification_data <- workflow.notification_hooks do
{status, _} =
notification_hook_from_workflow(
StepFlow.Map.get_by_key_or_atom(workflow_notification_data, "endpoint"),
workflow,
exposed_domain_name,
status_wf,
StepFlow.Map.get_by_key_or_atom(workflow_notification_data, "credentials")
)
StepFlow.Notification.send(topic, %{workflow_id: workflow_id})
Map.put(workflow_notification_data, "status", status)
end
{_, workflow} =
update_workflow(workflow, %{notification_hooks: workflow_notifications_with_status})
workflow
end
def notification_from_job(job_id, description \\ nil) do
job = Jobs.get_job!(job_id)
topic = "update_workflow_" <> Integer.to_string(job.workflow_id)
channel = StepFlow.Configuration.get_slack_channel()
workflow = Workflows.get_workflow!(job.workflow_id)
exposed_domain_name = StepFlow.Configuration.get_exposed_domain_name()
if StepFlow.Configuration.get_slack_token() != nil and description != nil and channel != nil do
notification_slack_from_job(job, channel, exposed_domain_name, description)
end
url = StepFlow.Configuration.get_teams_url()
if url != nil and description != nil do
notification_teams_from_job(job, url, workflow, exposed_domain_name, description)
end
StepFlow.Notification.send(topic, %{workflow_id: job.workflow_id})
end
@doc """
Deletes a Workflow.
## Examples
iex> delete_workflow(workflow)
{:ok, %Workflow{}}
iex> delete_workflow(workflow)
{:error, %Ecto.Changeset{}}
"""
def delete_workflow(%Workflow{} = workflow) do
workflow
|> Workflow.changeset(%{deleted: true})
|> Repo.update()
end
@doc """
Returns an `%Ecto.Changeset{}` for tracking workflow changes.
## Examples
iex> change_workflow(workflow)
%Ecto.Changeset{source: %Workflow{}}
"""
def change_workflow(%Workflow{} = workflow) do
Workflow.changeset(workflow, %{})
end
def get_completed_statistics(scale, delta) do
query =
from(
workflow in Workflow,
inner_join:
artifacts in subquery(
from(
artifacts in Artifact,
where:
artifacts.inserted_at > datetime_add(^NaiveDateTime.utc_now(), ^delta, ^scale),
group_by: artifacts.workflow_id,
select: %{
workflow_id: artifacts.workflow_id,
inserted_at: max(artifacts.inserted_at)
}
)
),
on: workflow.id == artifacts.workflow_id,
group_by: workflow.identifier,
select: %{
count: count(),
duration:
fragment(
"EXTRACT(EPOCH FROM (SELECT avg(? - ?)))",
artifacts.inserted_at,
workflow.inserted_at
),
identifier: workflow.identifier
}
)
Repo.all(query)
end
@doc """
Returns an attribute map to have a workflow configuration.
"""
def get_attr(workflow) do
%{
schema_version: workflow.schema_version,
identifier: workflow.identifier,
version_major: workflow.version_major,
version_minor: workflow.version_minor,
version_micro: workflow.version_micro,
tags: workflow.tags,
is_live: workflow.is_live,
deleted: workflow.deleted,
notification_hooks: workflow.notification_hooks,
reference: workflow.reference,
user_uuid: workflow.user_uuid,
steps: Enum.map(workflow.steps, fn x -> Map.drop(x, [:jobs, :status]) end),
parameters: workflow.parameters
}
end
@doc """
Convert workflow version fields to a version string
"""
def get_workflow_version_as_string(workflow) do
to_string(workflow.version_major) <>
"." <> to_string(workflow.version_minor) <> "." <> to_string(workflow.version_micro)
end
def abort(workflow) do
workflow.steps
|> abort_running_step_jobs(workflow)
workflow.steps
|> skip_remaining_steps(workflow)
result = Workflows.Status.set_workflow_status(workflow.id, :stopped)
topic = "update_workflow_" <> Integer.to_string(workflow.id)
StepFlow.Notification.send(topic, %{workflow_id: workflow.id})
result
end
def pause(workflow, action, trigger_date_time) do
case action do
action when action in ["resume", "abort"] ->
description = %{
action: action,
trigger_at:
DateTime.from_unix!(trigger_date_time, :millisecond)
|> DateTime.to_naive()
}
result = Workflows.Status.set_workflow_status(workflow.id, :pausing, nil, description)
topic = "update_workflow_" <> Integer.to_string(workflow.id)
StepFlow.Notification.send(topic, %{workflow_id: workflow.id})
result
_ ->
{:error, "Unknown action: #{action}"}
end
end
def resume(workflow) do
{:ok, _status} = Workflows.Status.set_workflow_status(workflow.id, :pending)
case StepFlow.Step.start_next(workflow) do
{:ok, _} ->
result = Workflows.Status.set_workflow_status(workflow.id, :processing)
topic = "update_workflow_" <> Integer.to_string(workflow.id)
StepFlow.Notification.send(topic, %{workflow_id: workflow.id})
result
error ->
error
end
end
defp abort_running_step_jobs([], _workflow), do: nil
defp abort_running_step_jobs([step | steps], workflow) do
case step.status do
:processing -> StepFlow.Step.abort_step_jobs(workflow, step)
_ -> nil
end
abort_running_step_jobs(steps, workflow)
end
defp skip_remaining_steps([], _workflow), do: nil
defp skip_remaining_steps([step | steps], workflow) do
case step.status do
:queued -> StepFlow.Step.skip_step(workflow, step)
:paused -> StepFlow.Step.skip_step_jobs(workflow, step)
:processing -> StepFlow.Step.skip_step_jobs(workflow, step)
_ -> nil
end
skip_remaining_steps(steps, workflow)
end
end