defmodule StepFlow.Workflows.Status do
use Ecto.Schema
import Ecto.Changeset
import Ecto.Query, warn: false
import EctoEnum
alias StepFlow.Jobs
alias StepFlow.Metrics.WorkflowInstrumenter
alias StepFlow.NotificationHooks.NotificationHookManager
alias StepFlow.Progressions.Progression
alias StepFlow.Repo
alias StepFlow.Roles
alias StepFlow.Workflows
alias StepFlow.Workflows.Workflow
require Logger
@moduledoc false
defenum(StateEnum, [
# Processing, Retrying -->
# --> Processing, Paused
"pending",
# Error -->
"skipped",
# Pending, Paused, Stopped, Processing -->
# --> Error, Completed, Stopped, Paused, Pending, Processing
"processing",
# Error -->
# --> Pending
"retrying",
# Paused, Processing -->
# --> Completed (LIVE), Processing
"stopped",
# Processing -->
# --> Retrying, Skipped
"error",
# Processing, Stopped -->
"completed",
# Processing, Pending -->
# --> Paused, Error
"pausing",
# Pausing -->
# --> Processing, Stopped
"paused"
])
schema "step_flow_workflow_status" do
field(:state, StepFlow.Workflows.Status.StateEnum)
field(:description, :map, default: %{})
belongs_to(:status, Jobs.Status, foreign_key: :status_id, defaults: nil)
belongs_to(:workflow, Workflow, foreign_key: :workflow_id)
timestamps()
end
@doc false
def changeset(%Workflows.Status{} = status, attrs) do
status
|> cast(attrs, [:workflow_id, :state, :status_id, :description])
|> foreign_key_constraint(:workflow_id)
|> validate_required([:state, :workflow_id])
end
@doc """
Define the workflow status given events. It also tracks completed, retrying
and error job status of a workflow.
Returns `{:ok, workflow_status}` if the event is correct, nil otherwise
## Examples
iex> define_workflow_status(1, :completed_workflow)
{:ok, %Workflows.Status{state: :completed, workflow_id: 1, job_id: nil, id: 1}}
iex> define_workflow_status(1, :incorrect_event)
nil
"""
def define_workflow_status(workflow_id, event, payload \\ %{})
def define_workflow_status(workflow_id, :created_workflow, _payload) do
with %{identifier: identifier} <- Workflows.get_workflow!(workflow_id) do
WorkflowInstrumenter.inc(:step_flow_workflows_status_total, identifier, :created_workflow)
end
set_workflow_status(workflow_id, :pending)
end
def define_workflow_status(workflow_id, :job_progression, %Progression{progression: 0}) do
last_status = get_last_workflow_status(workflow_id)
if last_status.state == :pending do
set_workflow_status(workflow_id, :processing)
else
Logger.warn(
"Can't set workflow #{workflow_id} to :processing because current state is #{last_status.state}."
)
{:ok, last_status}
end
end
def define_workflow_status(workflow_id, :job_completed, %Jobs.Status{
id: status_id,
job_id: job_id
}) do
jobs_status_not_completed =
get_last_jobs_status(workflow_id)
|> Enum.filter(fn s -> s.state in [:error, :retrying] and s.job_id != job_id end)
|> length()
last_status = get_last_workflow_status(workflow_id)
if jobs_status_not_completed == 0 do
case last_status do
nil ->
set_workflow_status(workflow_id, :pending, status_id)
last_status when last_status.state in [:paused, :pausing] ->
set_workflow_status(workflow_id, :paused, status_id, last_status.description)
_ ->
set_workflow_status(workflow_id, :pending, status_id)
end
else
set_workflow_status(workflow_id, last_status.state, status_id)
end
end
def define_workflow_status(workflow_id, :job_retrying, %Jobs.Status{
id: status_id,
job_id: job_id
}) do
jobs_status_in_error =
get_last_jobs_status(workflow_id)
|> Enum.filter(fn s -> s.state == :error and s.job_id != job_id end)
|> length()
if jobs_status_in_error == 0 do
set_workflow_status(workflow_id, :processing, status_id)
else
NotificationHookManager.manage_notification_status(
workflow_id,
nil,
"error"
)
set_workflow_status(workflow_id, :error, status_id)
end
end
def define_workflow_status(workflow_id, :completed_workflow, _payload) do
last_status = get_last_workflow_status(workflow_id)
Logger.info("Complete workflow #{workflow_id}.")
if last_status == nil || last_status.state != :completed do
NotificationHookManager.manage_notification_status(
workflow_id,
nil,
"completed"
)
end
set_workflow_status(workflow_id, :completed)
end
def define_workflow_status(workflow_id, :job_stopped, %Jobs.Status{id: status_id}) do
set_workflow_status(workflow_id, :stopped, status_id)
end
def define_workflow_status(workflow_id, event, %Jobs.Status{id: status_id})
when event in [:job_error, :queue_not_found] do
last_status = get_last_workflow_status(workflow_id)
Logger.info("Erroneous workflow #{workflow_id}.")
if last_status == nil || last_status.state != :error do
NotificationHookManager.manage_notification_status(
workflow_id,
nil,
"error"
)
end
set_workflow_status(workflow_id, :error, status_id)
end
def define_workflow_status(_workflow_id, _event, _payload), do: nil
def set_workflow_status(workflow_id, status, status_id \\ nil, description \\ %{}) do
with %{identifier: identifier} <- Workflows.get_workflow!(workflow_id) do
WorkflowInstrumenter.inc(:step_flow_workflows_status_total, identifier, status)
end
%Workflows.Status{}
|> Workflows.Status.changeset(%{
workflow_id: workflow_id,
state: status,
status_id: status_id,
description: description
})
|> Repo.insert()
end
@doc """
Returns the last updated status of a workflow per job_id.
"""
def get_last_jobs_status(workflow_id) when is_number(workflow_id) do
query =
from(
job_status in Jobs.Status,
inner_join:
workflow_status in subquery(
from(
workflow_status in Workflows.Status,
where: workflow_status.workflow_id == ^workflow_id
)
),
on: workflow_status.status_id == job_status.id,
order_by: [
desc: field(workflow_status, :inserted_at),
desc: field(job_status, :id),
asc: field(job_status, :job_id)
],
distinct: [asc: field(job_status, :job_id)]
)
Repo.all(query)
end
@doc """
Returns the last updated status of a workflow.
"""
def get_last_workflow_status(workflow_id) when is_number(workflow_id) do
query =
from(
workflow_status in Workflows.Status,
where: workflow_status.workflow_id == ^workflow_id,
order_by: [desc: :updated_at, desc: :id],
limit: 1
)
Repo.one(query)
end
def get_last_workflow_status(%Workflow{} = workflow) do
workflow =
workflow
|> Repo.preload([:status])
workflow.status
|> Enum.sort_by(fn status -> status.inserted_at end, :asc)
|> List.last()
end
def get_last_workflow_status(_workflow_id), do: nil
def find_workflow_status(workflow, state) when is_atom(state) do
workflow =
workflow
|> Repo.preload([:status])
workflow.status
|> Enum.find(fn status -> status.state == state end)
end
def find_workflow_status(_workflow, _state), do: nil
@doc """
List current status of workflows
"""
def list_workflows_status(start_date, end_date, identifiers, roles) do
allowed_workflows = check_rights(roles)
query =
case {Enum.empty?(identifiers), Enum.member?(allowed_workflows, "*")} do
{false, true} ->
from(workflow in Workflow)
{true, true} ->
from(
workflow in Workflow,
where: workflow.identifier in ^identifiers
)
{false, false} ->
from(
workflow in Workflow,
where: workflow.identifier in ^allowed_workflows
)
{true, false} ->
intersect =
identifiers
|> Enum.filter(fn element -> element in allowed_workflows end)
from(
workflow in Workflow,
where: workflow.identifier in ^intersect
)
end
query =
from(
workflow_status in subquery(
from(
workflow_status in Workflows.Status,
order_by: [desc: workflow_status.id, desc: workflow_status.workflow_id],
distinct: [desc: workflow_status.workflow_id],
where:
fragment("?::timestamp", workflow_status.inserted_at) >= ^start_date and
fragment("?::timestamp", workflow_status.inserted_at) <= ^end_date
)
),
inner_join: workflow in subquery(query),
on: workflow.id == workflow_status.workflow_id,
where: workflow_status.state in [:completed, :error, :processing, :pending]
)
Repo.all(query)
end
@doc """
Handler called when a live job is started. Checks if all jobs in the workflow are processing. If so, set workflow status to processing.
"""
def define_workflow_live_status(workflow_id) do
workflow = Workflows.get_workflow!(workflow_id)
live_jobs = Repo.preload(workflow, [:jobs])
live_jobs_status =
live_jobs.jobs
|> Enum.map(fn job -> Repo.preload(job, [:status]) end)
|> Enum.map(fn job -> StepFlow.Controllers.Jobs.get_last_status(job.status).state end)
if Enum.all?(live_jobs_status, &(&1 == :processing)) do
Logger.info(
"All live jobs are processing, setting live workflow #{workflow_id} status to processing."
)
set_workflow_status(workflow_id, :processing)
end
end
defp check_rights(role_names) do
roles = Roles.get_roles(role_names)
view_rights =
case roles do
nil ->
[]
_ ->
StepFlow.Controllers.Roles.get_rights_for_entity_type_and_action(
roles,
"workflow",
"view"
)
end
for %{entity: entity} <- view_rights,
do:
String.split(entity, "::")
|> List.last()
end
end