defmodule StepFlow.Step.Launch do
@moduledoc """
The Step launcher context.
"""
require Logger
alias StepFlow.Jobs
alias StepFlow.Step.Helpers
alias StepFlow.Step.LaunchJobs
alias StepFlow.Step.LaunchParams
alias StepFlow.Step.LaunchWorkflows
alias StepFlow.Step.Live
alias StepFlow.Workflows
def launch_step(workflow, step) do
dates = Helpers.get_dates()
# refresh workflow to get recent stored parameters on it
workflow = Workflows.get_workflow!(workflow.id)
step_id = StepFlow.Map.get_by_key_or_atom(step, :id)
step_name = StepFlow.Map.get_by_key_or_atom(step, :name)
step_mode = StepFlow.Map.get_by_key_or_atom(step, :mode, "one_for_one")
source_paths = get_source_paths(workflow, step, dates)
is_live = workflow.is_live
case {source_paths, step_mode, is_live} do
{source_paths, _, true} when is_list(source_paths) ->
Logger.debug("Live Step")
first_file =
source_paths
|> Enum.sort()
|> List.first()
direct_messaging_parameters = %{
id: "direct_messaging_queue_name",
type: "string",
value: Ecto.UUID.generate()
}
step =
Map.put(
step,
:parameters,
StepFlow.Map.get_by_key_or_atom(step, :parameters) ++
[direct_messaging_parameters]
)
launch_params = LaunchParams.new(workflow, step, dates, first_file)
Live.create_job_live(source_paths, launch_params)
{[], _, false} ->
Logger.debug("Job skipped")
Jobs.create_skipped_job(workflow, step_id, step_name)
{source_paths, step_mode, false} when is_list(source_paths) ->
launch_file_step(workflow, step, dates, source_paths, step_mode)
{_, _, false} ->
Logger.debug("Job skipped")
Jobs.create_skipped_job(workflow, step_id, step_name)
end
end
defp launch_file_step(workflow, step, dates, source_paths, step_mode) do
case step_mode do
"one_for_one" ->
Logger.debug("Job one for one path")
first_file =
source_paths
|> Enum.sort()
|> List.first()
launch_params = LaunchParams.new(workflow, step, dates, first_file)
case StepFlow.Map.get_by_key_or_atom(step, :multiple_jobs) do
nil ->
LaunchJobs.start_job_one_for_one(
source_paths,
launch_params
)
multiple_jobs_parameter ->
LaunchJobs.start_multiple_jobs_one_for_one(
source_paths,
multiple_jobs_parameter,
launch_params
)
end
"one_for_many" ->
Logger.debug("Job one for many paths")
launch_params = LaunchParams.new(workflow, step, dates)
LaunchJobs.start_job_one_for_many(source_paths, launch_params)
"workflow_one_for_one" ->
Logger.debug("Workflow one for one path")
LaunchWorkflows.start_child_workflows_one_for_one(
workflow,
step,
dates,
source_paths
)
"workflow_one_for_many" ->
Logger.debug("Workflow one for many paths")
LaunchWorkflows.start_child_workflow_one_for_many(workflow, step, dates, source_paths)
end
end
def get_source_paths(workflow, step, dates) do
input_filter = Helpers.get_value_in_parameters(step, "input_filter")
case StepFlow.Map.get_by_key_or_atom(step, :parent_ids, []) do
[] ->
Helpers.get_value_in_parameters(step, "source_paths")
|> List.flatten()
|> Helpers.filter_empty()
|> Helpers.templates_process(workflow, step, dates)
|> Helpers.filter_untemplated()
|> Helpers.filter_path_list(input_filter)
parent_ids ->
workflow.jobs
|> Enum.filter(fn job -> job.step_id in parent_ids end)
|> Helpers.get_jobs_destination_paths()
|> Helpers.filter_empty()
|> Helpers.filter_path_list(input_filter)
end
end
def build_requirements_and_destination_path(
destination_path_templates,
_,
workflow,
step,
dates,
base_directory,
source_path,
first_file
)
when destination_path_templates != [] and destination_path_templates != "" do
destination_path_template =
case destination_path_templates do
[path] -> path
_ -> destination_path_templates
end
destination_path =
Helpers.template_process(destination_path_template, workflow, step, dates, source_path)
required_paths = get_required_paths(source_path, first_file, base_directory)
{required_paths, destination_path}
end
def build_requirements_and_destination_path(
_,
destination_filename_templates,
workflow,
step,
dates,
base_directory,
source_path,
first_file
)
when destination_filename_templates != [] and destination_filename_templates != "" do
destination_filename_template =
case destination_filename_templates do
[path] -> path
_ -> destination_filename_templates
end
filename =
Helpers.template_process(destination_filename_template, workflow, step, dates, source_path)
|> Path.basename()
required_paths = get_required_paths(source_path, first_file, base_directory)
{required_paths, base_directory <> filename}
end
def build_requirements_and_destination_path(
_,
_,
_workflow,
_step,
_dates,
base_directory,
source_path,
first_file
) do
required_paths = get_required_paths(source_path, first_file, base_directory)
{required_paths, base_directory <> Path.basename(source_path)}
end
def get_required_paths(source_path, first_file, base_directory) do
if source_path != first_file do
if String.starts_with?(first_file, base_directory) do
first_file
else
base_directory <> Path.basename(first_file)
end
else
[]
end
end
def filter_and_pre_compile_parameters(params, workflow, step, dates, source_paths) do
params
|> Enum.map(fn param ->
case StepFlow.Map.get_by_key_or_atom(param, :type) do
"template" ->
value =
StepFlow.Map.get_by_key_or_atom(
param,
:value,
StepFlow.Map.get_by_key_or_atom(param, :default)
)
|> Helpers.template_process(
workflow,
step,
dates,
source_paths
)
{_, filtered_map} =
StepFlow.Map.replace_by_atom(param, :type, "string")
|> StepFlow.Map.replace_by_atom(:value, value)
|> Map.pop("default")
filtered_map
"array_of_templates" ->
filter_and_pre_compile_array_of_templates_parameter(
param,
workflow,
step,
dates
)
_ ->
param
end
end)
|> Enum.filter(fn param ->
filter_parameters_by_type(param) &&
(StepFlow.Map.get_by_key_or_atom(param, :id) != "source_paths" ||
StepFlow.Map.get_by_key_or_atom(param, :type) != "array_of_strings" ||
StepFlow.Map.get_by_key_or_atom(step, :keep_source_paths, true))
end)
end
defp filter_parameters_by_type(param) do
StepFlow.Map.get_by_key_or_atom(param, :type) != "filter" &&
StepFlow.Map.get_by_key_or_atom(param, :type) != "template" &&
StepFlow.Map.get_by_key_or_atom(param, :type) != "select_input" &&
StepFlow.Map.get_by_key_or_atom(param, :type) != "array_of_templates"
end
defp filter_and_pre_compile_array_of_templates_parameter(param, workflow, step, dates) do
case StepFlow.Map.get_by_key_or_atom(param, :id) do
"source_paths" ->
param
_ ->
value =
StepFlow.Map.get_by_key_or_atom(
param,
:value,
StepFlow.Map.get_by_key_or_atom(param, :default)
)
|> Helpers.templates_process(workflow, step, dates)
{_, filtered_map} =
StepFlow.Map.replace_by_atom(param, :type, infer_type(List.first(value)))
|> StepFlow.Map.replace_by_atom(:value, value)
|> Map.pop("default")
filtered_map
end
end
defp infer_type(value) when is_integer(value), do: "array_of_integers"
defp infer_type(value) when is_binary(value), do: "array_of_strings"
end