defmodule StepFlow.Step.Launch do
@moduledoc """
The Step launcher context.
"""
require Logger
alias StepFlow.Amqp.CommonEmitter
alias StepFlow.Jobs
alias StepFlow.Notifications.Notification
alias StepFlow.Step.Helpers
alias StepFlow.Step.LaunchParams
alias StepFlow.Step.Live
alias StepFlow.Workflows
def launch_step(workflow, step) do
dates = Helpers.get_dates()
# refresh workflow to get recent stored parameters on it
workflow = Workflows.get_workflow!(workflow.id)
step_id = StepFlow.Map.get_by_key_or_atom(step, :id)
step_name = StepFlow.Map.get_by_key_or_atom(step, :name)
step_mode = StepFlow.Map.get_by_key_or_atom(step, :mode, "one_for_one")
source_paths = get_source_paths(workflow, step, dates)
is_live = workflow.is_live
case {source_paths, step_mode, is_live} do
{source_paths, _, true} when is_list(source_paths) ->
Logger.debug("Live Step")
first_file =
source_paths
|> Enum.sort()
|> List.first()
direct_messaging_parameters = %{
id: "direct_messaging_queue_name",
type: "string",
value: Ecto.UUID.generate()
}
step =
Map.put(
step,
:parameters,
StepFlow.Map.get_by_key_or_atom(step, :parameters) ++
[direct_messaging_parameters]
)
launch_params = LaunchParams.new(workflow, step, dates, first_file)
Live.create_job_live(source_paths, launch_params)
{_, "notification", false} ->
Logger.debug("Notification step")
Notification.process(
workflow,
dates,
step_name,
step,
step_id,
source_paths
)
{[], _, false} ->
Logger.debug("job one for one path")
Jobs.create_skipped_job(workflow, step_id, step_name)
{source_paths, "one_for_one", false} when is_list(source_paths) ->
first_file =
source_paths
|> Enum.sort()
|> List.first()
launch_params = LaunchParams.new(workflow, step, dates, first_file)
case StepFlow.Map.get_by_key_or_atom(step, :multiple_jobs) do
nil ->
start_job_one_for_one(
source_paths,
launch_params
)
multiple_jobs_parameter ->
start_multiple_jobs_one_for_one(source_paths, multiple_jobs_parameter, launch_params)
end
{source_paths, "one_for_many", false} when is_list(source_paths) ->
Logger.debug("job one for many paths")
launch_params = LaunchParams.new(workflow, step, dates)
start_job_one_for_many(source_paths, launch_params)
{_, _, false} ->
Jobs.create_skipped_job(workflow, step_id, step_name)
end
end
defp start_job_one_for_one([], _launch_params),
do: {:ok, "started"}
defp start_job_one_for_one(
[source_path | source_paths],
launch_params
) do
message =
generate_message_one_for_one(
source_path,
launch_params
)
case CommonEmitter.publish_json(
LaunchParams.get_step_name(launch_params),
LaunchParams.get_step_id(launch_params),
message
) do
:ok ->
start_job_one_for_one(source_paths, launch_params)
_ ->
{:error, "unable to publish message"}
end
end
defp start_multiple_jobs_one_for_one(source_paths, multiple_jobs_parameter, launch_params) do
segments =
Helpers.get_value_in_parameters_with_type(
launch_params.workflow,
multiple_jobs_parameter,
"array_of_media_segments"
)
|> List.first()
case segments do
nil ->
start_job_one_for_one(
source_paths,
launch_params
)
segments ->
start_jobs_one_for_one_for_segments(
segments,
source_paths,
launch_params
)
end
end
defp start_jobs_one_for_one_for_segments(
[],
_source_paths,
_launch_params
),
do: {:ok, "started"}
defp start_jobs_one_for_one_for_segments(
[segment | segments],
source_paths,
launch_params
) do
launch_params = %{launch_params | segment: segment}
_result =
start_job_one_for_one_with_segment(
source_paths,
launch_params
)
start_jobs_one_for_one_for_segments(
segments,
source_paths,
launch_params
)
end
defp start_job_one_for_one_with_segment(
[],
_launch_params
),
do: {:ok, "started"}
defp start_job_one_for_one_with_segment(
[source_path | source_paths],
launch_params
) do
new_parameters =
StepFlow.Map.get_by_key_or_atom(launch_params.step, :parameters, [])
|> Enum.concat([
%{
"id" => "sdk_start_index",
"type" => "integer",
"value" => StepFlow.Map.get_by_key_or_atom(launch_params.segment, :start)
},
%{
"id" => "sdk_stop_index",
"type" => "integer",
"value" => StepFlow.Map.get_by_key_or_atom(launch_params.segment, :end)
}
])
updated_step = StepFlow.Map.replace_by_atom(launch_params.step, :parameters, new_parameters)
launch_params = %{launch_params | step: updated_step}
parameters =
generate_job_parameters_one_for_one(
source_path,
launch_params
)
step_name = LaunchParams.get_step_name(launch_params)
step_id = LaunchParams.get_step_id(launch_params)
allow_failure = LaunchParams.get_step_allow_failure(launch_params)
job_params = %{
name: step_name,
step_id: step_id,
workflow_id: launch_params.workflow.id,
allow_failure: allow_failure,
parameters: parameters
}
{:ok, job} = Jobs.create_job(job_params)
message = Jobs.get_message(job)
case CommonEmitter.publish_json(step_name, step_id, message) do
:ok ->
start_job_one_for_one_with_segment(
source_paths,
launch_params
)
_ ->
{:error, "unable to publish message"}
end
end
def get_source_paths(workflow, step, dates) do
input_filter = Helpers.get_value_in_parameters(step, "input_filter")
case StepFlow.Map.get_by_key_or_atom(step, :parent_ids, []) do
[] ->
Helpers.get_value_in_parameters(step, "source_paths")
|> List.flatten()
|> Helpers.filter_empty()
|> Helpers.templates_process(workflow, step, dates)
|> Helpers.filter_untemplated()
|> Helpers.filter_path_list(input_filter)
parent_ids ->
workflow.jobs
|> Enum.filter(fn job -> job.step_id in parent_ids end)
|> Helpers.get_jobs_destination_paths()
|> Helpers.filter_empty()
|> Helpers.filter_path_list(input_filter)
end
end
def start_job_one_for_many(source_paths, launch_params) do
message = generate_message_one_for_many(source_paths, launch_params)
case CommonEmitter.publish_json(
LaunchParams.get_step_name(launch_params),
LaunchParams.get_step_id(launch_params),
message
) do
:ok -> {:ok, "started"}
_ -> {:error, "unable to publish message"}
end
end
def generate_message_one_for_one(
source_path,
launch_params
) do
parameters =
generate_job_parameters_one_for_one(
source_path,
launch_params
)
job_params = %{
name: LaunchParams.get_step_name(launch_params),
step_id: LaunchParams.get_step_id(launch_params),
workflow_id: launch_params.workflow.id,
allow_failure: LaunchParams.get_step_allow_failure(launch_params),
parameters: parameters
}
{:ok, job} = Jobs.create_job(job_params)
Jobs.get_message(job)
end
def generate_job_parameters_one_for_one(
source_path,
launch_params
) do
destination_path_templates =
Helpers.get_value_in_parameters_with_type(
launch_params.step,
"destination_path",
"template"
)
destination_filename_templates =
Helpers.get_value_in_parameters_with_type(
launch_params.step,
"destination_filename",
"template"
)
base_directory = Helpers.get_base_directory(launch_params.workflow, launch_params.step)
{required_paths, destination_path} =
build_requirements_and_destination_path(
destination_path_templates,
destination_filename_templates,
launch_params.workflow,
launch_params.step,
launch_params.dates,
base_directory,
source_path,
launch_params.required_file
)
requirements =
Helpers.get_step_requirements(launch_params.workflow.jobs, launch_params.step)
|> Helpers.add_required_paths(required_paths)
destination_path_parameter =
if StepFlow.Map.get_by_key_or_atom(launch_params.step, :skip_destination_path, false) do
[]
else
[
%{
"id" => "destination_path",
"type" => "string",
"value" => destination_path
}
]
end
filter_and_pre_compile_parameters(
launch_params,
source_path
)
|> Enum.concat(destination_path_parameter)
|> Enum.concat([
%{
"id" => "source_path",
"type" => "string",
"value" => source_path
},
%{
"id" => "requirements",
"type" => "requirements",
"value" => requirements
}
])
end
def generate_message_one_for_many(source_paths, launch_params) do
select_input =
StepFlow.Map.get_by_key_or_atom(launch_params.step, :parameters, [])
|> Enum.filter(fn param ->
StepFlow.Map.get_by_key_or_atom(param, :type) == "select_input"
end)
|> Enum.map(fn param ->
id = StepFlow.Map.get_by_key_or_atom(param, :id)
value = StepFlow.Map.get_by_key_or_atom(param, :value)
Logger.warn("source paths: #{inspect(source_paths)} // value: #{inspect(value)}")
path =
Helpers.filter_path_list(source_paths, [value])
|> List.first()
%{
id: id,
type: "string",
value: path
}
end)
destination_filename_templates =
Helpers.get_value_in_parameters_with_type(
launch_params.step,
"destination_filename",
"template"
)
select_input =
case destination_filename_templates do
[destination_filename_template] ->
if StepFlow.Map.get_by_key_or_atom(launch_params.step, :skip_destination_path, false) do
select_input
else
filename =
destination_filename_template
|> Helpers.template_process(
launch_params.workflow,
launch_params.step,
launch_params.dates,
source_paths
)
|> Path.basename()
destination_path =
Helpers.get_base_directory(launch_params.workflow, launch_params.step) <> filename
Enum.concat(select_input, [
%{
id: "destination_path",
type: "string",
value: destination_path
}
])
end
_ ->
select_input
end
source_paths =
get_source_paths(
launch_params.workflow,
launch_params.dates,
launch_params.step,
source_paths
)
|> Helpers.filter_untemplated()
requirements =
Helpers.get_step_requirements(launch_params.workflow.jobs, launch_params.step)
|> Helpers.add_required_paths(source_paths)
parameters =
filter_and_pre_compile_parameters(launch_params, source_paths)
|> Enum.concat(select_input)
|> Enum.concat([
%{
"id" => "source_paths",
"type" => "array_of_strings",
"value" => source_paths
},
%{
"id" => "requirements",
"type" => "requirements",
"value" => requirements
}
])
job_params = %{
name: LaunchParams.get_step_name(launch_params),
step_id: LaunchParams.get_step_id(launch_params),
workflow_id: launch_params.workflow.id,
allow_failure: LaunchParams.get_step_allow_failure(launch_params),
parameters: parameters
}
{:ok, job} = Jobs.create_job(job_params)
Jobs.get_message(job)
end
def build_requirements_and_destination_path(
[destination_path_template],
_,
workflow,
step,
dates,
source_path,
_first_file
) do
destination_path =
Helpers.template_process(destination_path_template, workflow, step, dates, source_path)
{[], destination_path}
end
def build_requirements_and_destination_path(
_,
[destination_filename_template],
workflow,
step,
dates,
base_directory,
source_path,
first_file
) do
filename =
Helpers.template_process(destination_filename_template, workflow, step, dates, source_path)
|> Path.basename()
required_paths =
if source_path != first_file do
base_directory <> Path.basename(first_file)
else
[]
end
{required_paths, base_directory <> filename}
end
def build_requirements_and_destination_path(
_,
_,
_workflow,
_step,
_dates,
base_directory,
source_path,
first_file
) do
required_paths =
if source_path != first_file do
base_directory <> Path.basename(first_file)
else
[]
end
{required_paths, base_directory <> Path.basename(source_path)}
end
defp get_source_paths(workflow, dates, step, source_paths) do
source_paths_templates =
Helpers.get_value_in_parameters_with_type(step, "source_paths", "array_of_templates")
|> List.flatten()
|> Helpers.filter_empty()
case source_paths_templates do
nil ->
source_paths
[] ->
source_paths
templates ->
Enum.map(
templates,
fn template ->
Helpers.template_process(template, workflow, step, dates, nil)
end
)
end
end
defp filter_and_pre_compile_parameters(launch_params, source_paths) do
StepFlow.Map.get_by_key_or_atom(launch_params.step, :parameters, [])
|> Enum.map(fn param ->
case StepFlow.Map.get_by_key_or_atom(param, :type) do
"template" ->
value =
StepFlow.Map.get_by_key_or_atom(
param,
:value,
StepFlow.Map.get_by_key_or_atom(param, :default)
)
|> Helpers.template_process(
launch_params.workflow,
launch_params.step,
launch_params.dates,
source_paths
)
{_, filtered_map} =
StepFlow.Map.replace_by_atom(param, :type, "string")
|> StepFlow.Map.replace_by_atom(:value, value)
|> Map.pop("default")
filtered_map
"array_of_templates" ->
filter_and_pre_compile_array_of_templates_parameter(
param,
launch_params.workflow,
launch_params.step,
launch_params.dates
)
_ ->
param
end
end)
|> Enum.filter(fn param ->
filter_parameters_by_type(param) &&
(StepFlow.Map.get_by_key_or_atom(param, :id) != "source_paths" ||
StepFlow.Map.get_by_key_or_atom(param, :type) != "array_of_strings" ||
StepFlow.Map.get_by_key_or_atom(launch_params.step, :keep_source_paths, true))
end)
end
defp filter_parameters_by_type(param) do
StepFlow.Map.get_by_key_or_atom(param, :type) != "filter" &&
StepFlow.Map.get_by_key_or_atom(param, :type) != "template" &&
StepFlow.Map.get_by_key_or_atom(param, :type) != "select_input" &&
StepFlow.Map.get_by_key_or_atom(param, :type) != "array_of_templates"
end
defp filter_and_pre_compile_array_of_templates_parameter(param, workflow, step, dates) do
case StepFlow.Map.get_by_key_or_atom(param, :id) do
"source_paths" ->
param
_ ->
value =
StepFlow.Map.get_by_key_or_atom(
param,
:value,
StepFlow.Map.get_by_key_or_atom(param, :default)
)
|> Helpers.templates_process(workflow, step, dates)
{_, filtered_map} =
StepFlow.Map.replace_by_atom(param, :type, infer_type(List.first(value)))
|> StepFlow.Map.replace_by_atom(:value, value)
|> Map.pop("default")
filtered_map
end
end
defp infer_type(value) when is_integer(value), do: "array_of_integers"
defp infer_type(value) when is_binary(value), do: "array_of_strings"
end