defmodule StepFlow.Step do
@moduledoc """
The Step context.
"""
require Logger
alias StepFlow.Amqp.CommonEmitter
alias StepFlow.Artifacts
alias StepFlow.Controllers.BlackList
alias StepFlow.Jobs
alias StepFlow.Jobs.Status
alias StepFlow.Repo
alias StepFlow.Step.Helpers
alias StepFlow.Step.Launch
alias StepFlow.Step.LaunchJobs
alias StepFlow.Workflows
alias StepFlow.Workflows.Workflow
def start_next(%Workflow{id: _workflow_id} = workflow) do
workflow = Repo.preload(workflow, :jobs, force: true)
is_live = workflow.is_live
last_workflow_status = Workflows.Status.get_last_workflow_status(workflow.id)
case last_workflow_status do
nil ->
do_start_next(workflow, is_live)
last_workflow_status ->
case last_workflow_status.state do
:stopped -> {:ok, "stopped"}
:pausing -> {:ok, "paused"}
:paused -> {:ok, "paused"}
_ -> do_start_next(workflow, is_live)
end
end
end
defp do_start_next(%Workflow{id: workflow_id} = workflow, is_live) do
jobs = Repo.preload(workflow.jobs, [:status, :progressions])
steps =
StepFlow.Map.get_by_key_or_atom(workflow, :steps)
|> StepFlow.Controllers.Workflows.get_steps_with_status(jobs)
{is_completed_workflow, steps_to_start} = get_steps_to_start(steps, is_live)
steps_to_start =
case {steps_to_start, jobs} do
{[], []} ->
case List.first(steps) do
nil ->
Logger.info("#{__MODULE__}: empty workflow #{workflow_id} is completed")
{:completed_workflow, []}
step ->
{:ok, [step]}
end
{[], _} ->
{:completed_workflow, []}
{list, _} ->
{:ok, list}
end
results = start_steps(steps_to_start, workflow)
get_final_status(workflow, is_completed_workflow, Enum.uniq(results) |> Enum.sort())
end
def pause_step(step, [], post_action) do
step_id = StepFlow.Map.get_by_key_or_atom(step, :id)
step_name = StepFlow.Map.get_by_key_or_atom(step, :name)
workflow_id = StepFlow.Map.get_by_key_or_atom(step, :workflow_id)
{:ok, _status} = Jobs.create_paused_job(workflow_id, step_id, step_name, post_action)
step
end
def pause_step(step, step_jobs, post_action) do
queued_jobs =
step_jobs
|> Enum.map(fn job -> Repo.preload(job, :status, force: true) end)
|> Enum.filter(fn job ->
last_status = StepFlow.Controllers.Jobs.get_last_status(job.status)
last_status.state == :queued
end)
queued_jobs
|> Enum.map(fn job -> job.id end)
|> Enum.each(fn job_id ->
BlackList.add_and_notify!(job_id)
{:ok, _status} = Status.set_job_status(job_id, :processing)
{:ok, _status} = Status.set_job_status(job_id, :paused, post_action)
end)
step
end
def resume_step(step_jobs) do
step_jobs =
step_jobs
|> Enum.map(fn job -> Repo.preload(job, :status, force: true) end)
paused_jobs =
step_jobs
|> Enum.filter(fn job ->
last_status = StepFlow.Controllers.Jobs.get_last_status(job.status)
last_status.state == :paused
end)
paused_jobs
|> Enum.each(fn job ->
{:ok, new_job} = Jobs.copy_job(job)
{:ok, _} = LaunchJobs.start_job(new_job)
end)
{:ok, "queued"}
end
def skip_step(workflow, step) do
step_id = StepFlow.Map.get_by_key_or_atom(step, :id)
step_name = StepFlow.Map.get_by_key_or_atom(step, :name)
Repo.preload(workflow, :jobs, force: true)
|> Jobs.create_skipped_job(step_id, step_name)
end
def abort_step_jobs(workflow, step) do
step_id = StepFlow.Map.get_by_key_or_atom(step, :id)
step_name = StepFlow.Map.get_by_key_or_atom(step, :name)
Repo.preload(workflow, :jobs, force: true)
|> Jobs.abort_jobs(step_id, step_name)
end
def skip_step_jobs(workflow, step) do
step_id = StepFlow.Map.get_by_key_or_atom(step, :id)
step_name = StepFlow.Map.get_by_key_or_atom(step, :name)
Repo.preload(workflow, :jobs, force: true)
|> Jobs.skip_jobs(step_id, step_name)
end
defp set_artifacts(workflow) do
resources = %{}
params = %{
resources: resources,
workflow_id: workflow.id
}
Artifacts.create_artifact(params)
end
defp get_steps_to_start(steps, is_live),
do: iter_get_steps_to_start(steps, steps, is_live)
defp iter_get_steps_to_start(steps, all_steps, is_live, completed \\ true, result \\ [])
defp iter_get_steps_to_start([], _all_steps, _is_live, completed, result),
do: {completed, result}
defp iter_get_steps_to_start([step | steps], all_steps, true, completed, result) do
result = List.insert_at(result, -1, step)
iter_get_steps_to_start(steps, all_steps, true, completed, result)
end
defp iter_get_steps_to_start([step | steps], all_steps, false, completed, result) do
completed =
if step.status in [:completed, :skipped, :stopped] or
(step.status in [:error] and allowed_to_fail(step)) do
completed
else
false
end
result =
if step.status == :pending and !StepFlow.Map.get_by_key_or_atom(step, :started, false) do
check_required_steps(all_steps, step, result)
else
result
end
iter_get_steps_to_start(steps, all_steps, false, completed, result)
end
defp start_steps({:completed_workflow, _}, _workflow), do: [:completed_workflow]
defp start_steps({:ok, steps}, workflow) do
dates = Helpers.get_dates()
for step <- steps do
step_name = StepFlow.Map.get_by_key_or_atom(step, :name)
step_id = StepFlow.Map.get_by_key_or_atom(step, :id)
source_paths = Launch.get_source_paths(workflow, step, dates)
step = Map.put(step, "started", true)
{:ok, workflow} =
StepFlow.Controllers.Workflows.update_step(workflow, step, "started", true)
Logger.info(
"#{__MODULE__}: start to process step #{step_name} (index #{step_id}) for workflow #{workflow.id}"
)
{result, status} =
StepFlow.Map.get_by_key_or_atom(step, :condition)
|> case do
condition when condition in [0, nil] ->
Launch.launch_step(workflow, step)
condition ->
Helpers.template_process(
"<%= " <> condition <> "%>",
workflow,
step,
dates,
source_paths
)
|> case do
"true" ->
Launch.launch_step(workflow, step)
"false" ->
skip_step(workflow, step)
{:ok, "skipped"}
_ ->
Logger.error(
"#{__MODULE__}: cannot estimate condition for step #{step_name} (index #{step_id}) for workflow #{workflow.id}"
)
{:error, "bad step condition"}
end
end
Logger.info("#{step_name}: #{inspect({result, status})}")
topic = "update_workflow_" <> Integer.to_string(workflow.id)
StepFlow.Notification.send(topic, %{workflow_id: workflow.id})
status
end
end
defp get_final_status(_workflow, _is_completed_workflow, ["started"]),
do: {:ok, "started"}
defp get_final_status(_workflow, _is_completed_workflow, ["created"]),
do: {:ok, "started"}
defp get_final_status(_workflow, _is_completed_workflow, ["created", "started"]),
do: {:ok, "started"}
defp get_final_status(workflow, _is_completed_workflow, ["skipped"]),
do: start_next(workflow)
defp get_final_status(workflow, _is_completed_workflow, ["completed"]),
do: start_next(workflow)
defp get_final_status(workflow, true, [:completed_workflow]) do
Workflows.Status.define_workflow_status(workflow.id, :completed_workflow)
set_artifacts(workflow)
Logger.info("#{__MODULE__}: workflow #{workflow.id} is completed")
# For nested workflows
if workflow.parent_id != nil do
CommonEmitter.publish_json(
"job_completed",
0,
%{
job_id: workflow.parent_id,
status: "completed",
description: %{
"id" => "message",
"type" => "string",
"value" => "Child workflow #{workflow.id} completed"
}
},
"job_response"
)
end
{:ok, "completed"}
end
defp get_final_status(_workflow, _is_completed_workflow, _states),
do: {:ok, "still_processing"}
defp check_required_steps(all_steps, step, result) do
case StepFlow.Map.get_by_key_or_atom(step, :required_to_start) do
nil ->
List.insert_at(result, -1, step)
required_to_start ->
all_steps =
Enum.filter(all_steps, fn s ->
StepFlow.Map.get_by_key_or_atom(s, :id) in required_to_start
end)
count_not_completed =
all_steps
|> Enum.map(fn s -> StepFlow.Map.get_by_key_or_atom(s, :status) end)
|> Enum.filter(fn s -> s != :completed and s != :skipped end)
|> length
count_allow_failure =
all_steps
|> Enum.filter(fn s ->
StepFlow.Map.get_by_key_or_atom(s, :status) == :error and
allowed_to_fail(s)
end)
|> length
count_not_completed = count_not_completed - count_allow_failure
if count_not_completed == 0 do
List.insert_at(result, -1, step)
else
result
end
end
end
defp allowed_to_fail(step) do
StepFlow.Map.get_by_key_or_atom(step, :allow_failure) == true and
is_step_not_processing(step)
end
defp is_step_not_processing(step) do
step.jobs.errors + step.jobs.skipped + step.jobs.completed == step.jobs.total
end
end