defmodule StepFlow.Step.LaunchWorkflows do
@moduledoc """
The Step Workflows launching parameters.
"""
require Logger
alias StepFlow.Amqp.CommonEmitter
alias StepFlow.Jobs
alias StepFlow.Jobs.Status
alias StepFlow.Step
alias StepFlow.Step.Helpers
alias StepFlow.Step.Launch
alias StepFlow.Workflows
def generate_child_workflow_job(workflow, step) do
job_params = %{
name: StepFlow.Map.get_by_key_or_atom(step, :name),
step_id: StepFlow.Map.get_by_key_or_atom(step, :id),
workflow_id: workflow.id,
allow_failure: StepFlow.Map.get_by_key_or_atom(step, :allow_failure),
parameters: StepFlow.Map.get_by_key_or_atom(step, :parameters)
}
Jobs.create_job(job_params)
end
# CAREFUL: trick here is that source_paths can be a list or a string
def generate_child_workflow_params(workflow, step, job, dates, source_paths) do
wf_identifier = Helpers.get_value_in_parameters(step, "identifier") |> List.first()
wf_major = Helpers.get_value_in_parameters(step, "version_major") |> List.first()
wf_minor = Helpers.get_value_in_parameters(step, "version_minor") |> List.first()
wf_micro = Helpers.get_value_in_parameters(step, "version_micro") |> List.first()
wf_def =
StepFlow.WorkflowDefinitions.get_workflow_definition(
wf_identifier,
wf_major,
wf_minor,
wf_micro
)
wf_params =
Helpers.get_value_in_parameters(step, "parameters")
|> List.first()
|> Launch.filter_and_pre_compile_parameters(workflow, step, dates, source_paths)
%{
identifier: wf_identifier,
parameters: wf_params,
parent_id: job.id,
reference: "[AUTOGENERATED] Child from workflow #{workflow.id}",
schema_version: StepFlow.Map.get_by_key_or_atom(wf_def, :schema_version),
steps: StepFlow.Map.get_by_key_or_atom(wf_def, :steps),
version_major: wf_major,
version_minor: wf_minor,
version_micro: wf_micro,
user_uuid: workflow.user_uuid,
tags: StepFlow.Map.get_by_key_or_atom(wf_def, :tags)
}
end
defp start_workflow(workflow_params, workflow, job) do
case Workflows.create_workflow(workflow_params) do
{:ok, child_workflow} ->
Logger.info(
"Created child workflow #{child_workflow.id} from parent workflow #{workflow.id}"
)
Workflows.Status.define_workflow_status(child_workflow.id, :created_workflow)
{:ok, datetime} = DateTime.now("Etc/UTC")
CommonEmitter.publish_json(
"job_progression",
0,
%{
job_id: job.id,
datetime: datetime,
docker_container_id: "workflow",
progression: 0
},
"job_response"
)
Step.start_next(child_workflow)
StepFlow.Notification.send("new_workflow", %{workflow_id: child_workflow.id})
{:error, _} ->
Logger.error("Cannot create child workflow from parent workflow #{workflow.id}")
CommonEmitter.publish_json(
"job_error",
0,
%{
job_id: job.id,
status: "error",
parameters: [
%{
"id" => "message",
"type" => "string",
"value" => "Cannot create child workflow from parent workflow #{workflow.id}"
}
]
},
"job_response"
)
end
end
def start_child_workflow_one_for_many(workflow, step, dates, source_paths) do
datetime = NaiveDateTime.to_string(DateTime.utc_now())
{:ok, job} = generate_child_workflow_job(workflow, step)
Status.set_job_status(job.id, "queued", %{}, datetime)
generate_child_workflow_params(workflow, step, job, dates, source_paths)
|> start_workflow(workflow, job)
{:ok, "started"}
end
defp start_child_workflow_one_for_one(workflow, step, dates, source_path) do
datetime = NaiveDateTime.to_string(DateTime.utc_now())
{:ok, job} = generate_child_workflow_job(workflow, step)
Status.set_job_status(job.id, "queued", %{}, datetime)
generate_child_workflow_params(workflow, step, job, dates, source_path)
|> start_workflow(workflow, job)
end
def start_child_workflows_one_for_one(_, _, _, []),
do: {:ok, "started"}
def start_child_workflows_one_for_one(
workflow,
step,
dates,
[source_path | source_paths]
) do
start_child_workflow_one_for_one(
workflow,
step,
dates,
source_path
)
start_child_workflows_one_for_one(workflow, step, dates, source_paths)
end
end