defmodule StepFlow.Step do
@moduledoc """
The Step context.
"""
require Logger
alias StepFlow.Artifacts
alias StepFlow.Jobs
alias StepFlow.Jobs.Status
alias StepFlow.Metrics.WorkflowInstrumenter
alias StepFlow.Repo
alias StepFlow.Step.Helpers
alias StepFlow.Step.Launch
alias StepFlow.Workflows
alias StepFlow.Workflows.Workflow
def start_next(%Workflow{id: workflow_id} = workflow) do
workflow = Repo.preload(workflow, :jobs, force: true)
is_live = workflow.is_live
jobs = Repo.preload(workflow.jobs, [:status, :progressions])
steps =
StepFlow.Map.get_by_key_or_atom(workflow, :steps)
|> Workflows.get_steps_with_status(jobs)
{is_completed_workflow, steps_to_start} = get_steps_to_start(steps, is_live)
steps_to_start =
case {steps_to_start, jobs} do
{[], []} ->
case List.first(steps) do
nil ->
Logger.warn("#{__MODULE__}: empty workflow #{workflow_id} is completed")
{:completed_workflow, []}
step ->
{:ok, [step]}
end
{[], _} ->
{:completed_workflow, []}
{list, _} ->
{:ok, list}
end
results = start_steps(steps_to_start, workflow)
get_final_status(workflow, is_completed_workflow, Enum.uniq(results) |> Enum.sort())
end
def pause_step(workflow, step, post_action) do
step_id = StepFlow.Map.get_by_key_or_atom(step, :id)
step_name = StepFlow.Map.get_by_key_or_atom(step, :name)
Repo.preload(workflow, :jobs, force: true)
|> Jobs.create_paused_job(step_id, step_name, post_action)
end
def resume_step(workflow, step) do
step_id = StepFlow.Map.get_by_key_or_atom(step, :id)
step_name = StepFlow.Map.get_by_key_or_atom(step, :name)
IO.puts(">> Resume #{step_name} #{step_id} step? Status=#{inspect(step.status)}")
workflow = Repo.preload(workflow, [:jobs, :status], force: true)
workflow.jobs
|> Enum.filter(fn job -> job.step_id == step_id end)
|> Enum.map(fn job -> job.id end)
|> Enum.each(fn job_id -> Status.set_job_status(job_id, :queued) end)
{:ok, "queued"}
end
def skip_step(workflow, step) do
step_id = StepFlow.Map.get_by_key_or_atom(step, :id)
step_name = StepFlow.Map.get_by_key_or_atom(step, :name)
Repo.preload(workflow, :jobs, force: true)
|> Jobs.create_skipped_job(step_id, step_name)
end
def abort_step_jobs(workflow, step) do
step_id = StepFlow.Map.get_by_key_or_atom(step, :id)
step_name = StepFlow.Map.get_by_key_or_atom(step, :name)
Repo.preload(workflow, :jobs, force: true)
|> Jobs.abort_jobs(step_id, step_name)
end
def skip_step_jobs(workflow, step) do
step_id = StepFlow.Map.get_by_key_or_atom(step, :id)
step_name = StepFlow.Map.get_by_key_or_atom(step, :name)
Repo.preload(workflow, :jobs, force: true)
|> Jobs.skip_jobs(step_id, step_name)
end
defp set_artifacts(workflow) do
resources = %{}
params = %{
resources: resources,
workflow_id: workflow.id
}
Artifacts.create_artifact(params)
end
defp get_steps_to_start(steps, is_live), do: iter_get_steps_to_start(steps, steps, is_live)
defp iter_get_steps_to_start(steps, all_steps, is_live, completed \\ true, result \\ [])
defp iter_get_steps_to_start([], _all_steps, _is_live, completed, result),
do: {completed, result}
defp iter_get_steps_to_start([step | steps], all_steps, true, completed, result) do
result = List.insert_at(result, -1, step)
iter_get_steps_to_start(steps, all_steps, true, completed, result)
end
defp iter_get_steps_to_start([step | steps], all_steps, false, completed, result) do
completed =
if step.status in [:completed, :skipped, :stopped] or
(step.status in [:error] and StepFlow.Map.get_by_key_or_atom(step, :allow_failure)) do
completed
else
false
end
result =
if step.status == :queued do
case StepFlow.Map.get_by_key_or_atom(step, :required_to_start) do
nil ->
List.insert_at(result, -1, step)
required_to_start ->
all_steps =
Enum.filter(all_steps, fn s ->
StepFlow.Map.get_by_key_or_atom(s, :id) in required_to_start
end)
count_not_completed =
all_steps
|> Enum.map(fn s -> StepFlow.Map.get_by_key_or_atom(s, :status) end)
|> Enum.filter(fn s -> s != :completed and s != :skipped end)
|> length
count_allow_failure =
all_steps
|> Enum.filter(fn s ->
StepFlow.Map.get_by_key_or_atom(s, "allow_failure") == true
end)
|> Enum.map(fn s -> StepFlow.Map.get_by_key_or_atom(s, :status) end)
|> Enum.filter(fn s -> s == :error end)
|> length
count_not_completed = count_not_completed - count_allow_failure
if count_not_completed == 0 do
List.insert_at(result, -1, step)
else
result
end
end
else
result
end
iter_get_steps_to_start(steps, all_steps, false, completed, result)
end
defp start_steps({:completed_workflow, _}, _workflow), do: [:completed_workflow]
defp start_steps({:ok, steps}, workflow) do
dates = Helpers.get_dates()
for step <- steps do
step_name = StepFlow.Map.get_by_key_or_atom(step, :name)
step_id = StepFlow.Map.get_by_key_or_atom(step, :id)
source_paths = Launch.get_source_paths(workflow, step, dates)
Logger.warn(
"#{__MODULE__}: start to process step #{step_name} (index #{step_id}) for workflow #{workflow.id}"
)
{result, status} =
StepFlow.Map.get_by_key_or_atom(step, :condition)
|> case do
condition when condition in [0, nil] ->
Launch.launch_step(workflow, step)
condition ->
Helpers.template_process(
"<%= " <> condition <> "%>",
workflow,
step,
dates,
source_paths
)
|> case do
"true" ->
Launch.launch_step(workflow, step)
"false" ->
skip_step(workflow, step)
{:ok, "skipped"}
_ ->
Logger.error(
"#{__MODULE__}: cannot estimate condition for step #{step_name} (index #{step_id}) for workflow #{workflow.id}"
)
{:error, "bad step condition"}
end
end
Logger.info("#{step_name}: #{inspect({result, status})}")
topic = "update_workflow_" <> Integer.to_string(workflow.id)
StepFlow.Notification.send(topic, %{workflow_id: workflow.id})
status
end
end
defp get_final_status(_workflow, _is_completed_workflow, ["started"]), do: {:ok, "started"}
defp get_final_status(_workflow, _is_completed_workflow, ["created"]), do: {:ok, "started"}
defp get_final_status(_workflow, _is_completed_workflow, ["created", "started"]),
do: {:ok, "started"}
defp get_final_status(workflow, _is_completed_workflow, ["skipped"]), do: start_next(workflow)
defp get_final_status(workflow, _is_completed_workflow, ["completed"]), do: start_next(workflow)
defp get_final_status(workflow, true, [:completed_workflow]) do
Workflows.Status.define_workflow_status(workflow.id, :completed_workflow)
WorkflowInstrumenter.inc(:step_flow_workflows_completed, workflow.identifier)
set_artifacts(workflow)
Logger.warn("#{__MODULE__}: workflow #{workflow.id} is completed")
{:ok, "completed"}
end
defp get_final_status(_workflow, _is_completed_workflow, _states), do: {:ok, "still_processing"}
end