defmodule StepFlow.Step.LaunchJobs do
@moduledoc """
The Step Jobs launching parameters.
"""
require Logger
alias StepFlow.Amqp.CommonEmitter
alias StepFlow.Jobs
alias StepFlow.Jobs.Status
alias StepFlow.Step.Helpers
alias StepFlow.Step.Launch
alias StepFlow.Step.LaunchParams
def start_job_one_for_one([], _launch_params),
do: {:ok, "started"}
def start_job_one_for_one(
[source_path | source_paths],
launch_params
) do
{:ok, job} =
generate_job_one_for_one(
source_path,
launch_params
)
case start_job(job) do
{:ok, _} -> start_job_one_for_one(source_paths, launch_params)
{:error, error} -> {:error, error}
end
end
def start_multiple_jobs_one_for_one(source_paths, multiple_jobs_parameter, launch_params) do
segments =
Helpers.get_value_in_parameters_with_type(
launch_params.workflow,
multiple_jobs_parameter,
"array_of_media_segments"
)
|> List.first()
case segments do
nil ->
start_job_one_for_one(
source_paths,
launch_params
)
segments ->
start_jobs_one_for_one_for_segments(
segments,
source_paths,
launch_params
)
end
end
defp start_jobs_one_for_one_for_segments(
[],
_source_paths,
_launch_params
),
do: {:ok, "started"}
defp start_jobs_one_for_one_for_segments(
[segment | segments],
source_paths,
launch_params
) do
launch_params = %{launch_params | segment: segment}
_result =
start_job_one_for_one_with_segment(
source_paths,
launch_params
)
start_jobs_one_for_one_for_segments(
segments,
source_paths,
launch_params
)
end
defp start_job_one_for_one_with_segment(
[],
_launch_params
),
do: {:ok, "started"}
defp start_job_one_for_one_with_segment(
[source_path | source_paths],
launch_params
) do
new_parameters =
StepFlow.Map.get_by_key_or_atom(launch_params.step, :parameters, [])
|> Enum.concat([
%{
"id" => "sdk_start_index",
"type" => "integer",
"value" => StepFlow.Map.get_by_key_or_atom(launch_params.segment, :start)
},
%{
"id" => "sdk_stop_index",
"type" => "integer",
"value" => StepFlow.Map.get_by_key_or_atom(launch_params.segment, :end)
}
])
updated_step = StepFlow.Map.replace_by_atom(launch_params.step, :parameters, new_parameters)
launch_params = %{launch_params | step: updated_step}
parameters =
generate_job_parameters_one_for_one(
source_path,
launch_params
)
step_name = LaunchParams.get_step_name(launch_params)
step_id = LaunchParams.get_step_id(launch_params)
allow_failure = LaunchParams.get_step_allow_failure(launch_params)
job_params = %{
name: step_name,
step_id: step_id,
workflow_id: launch_params.workflow.id,
allow_failure: allow_failure,
parameters: parameters
}
{:ok, job} = Jobs.create_job(job_params)
case start_job(job) do
{:ok, _} -> start_job_one_for_one_with_segment(source_paths, launch_params)
{:error, error} -> {:error, error}
end
end
def start_job_one_for_many(source_paths, launch_params) do
{:ok, job} = generate_job_one_for_many(source_paths, launch_params)
start_job(job)
end
def start_job(job) do
message = StepFlow.Controllers.Jobs.get_message(job)
datetime = NaiveDateTime.to_string(DateTime.utc_now())
case CommonEmitter.publish_json(
job.name,
job.step_id,
message
) do
:ok ->
{:ok, _status} = Status.set_job_status(job.id, "queued", %{}, datetime)
{:ok, "started"}
_ ->
{:error, "unable to publish message"}
end
end
def generate_message_one_for_one(source_path, launch_params) do
{:ok, job} = generate_job_one_for_one(source_path, launch_params)
StepFlow.Controllers.Jobs.get_message(job)
end
def generate_job_one_for_one(
source_path,
launch_params
) do
parameters =
generate_job_parameters_one_for_one(
source_path,
launch_params
)
job_params = %{
name: LaunchParams.get_step_name(launch_params),
step_id: LaunchParams.get_step_id(launch_params),
workflow_id: launch_params.workflow.id,
allow_failure: LaunchParams.get_step_allow_failure(launch_params),
parameters: parameters
}
Jobs.create_job(job_params)
end
def generate_job_parameters_one_for_one(
source_path,
launch_params
) do
destination_path_templates =
Helpers.get_value_in_parameters_with_type(
launch_params.step,
"destination_path",
"template"
)
destination_filename_templates =
Helpers.get_value_in_parameters_with_type(
launch_params.step,
"destination_filename",
"template"
)
base_directory = Helpers.get_base_directory(launch_params.workflow, launch_params.step)
{required_paths, destination_path} =
Launch.build_requirements_and_destination_path(
destination_path_templates,
destination_filename_templates,
launch_params.workflow,
launch_params.step,
launch_params.dates,
base_directory,
source_path,
launch_params.required_file
)
requirements =
if launch_params.workflow.is_live do
# Add the appropriate srt stream requirements check
[]
else
Helpers.get_step_requirements(launch_params.workflow.jobs, launch_params.step)
|> Helpers.add_required_paths(required_paths)
end
destination_path_parameter =
if StepFlow.Map.get_by_key_or_atom(launch_params.step, :skip_destination_path, false) do
[]
else
[
%{
"id" => "destination_path",
"type" => "string",
"value" => destination_path
}
]
end
filter_and_pre_compile_parameters(
launch_params,
source_path
)
|> Enum.concat(destination_path_parameter)
|> Enum.concat([
%{
"id" => "source_path",
"type" => "string",
"value" => source_path
},
%{
"id" => "requirements",
"type" => "requirements",
"value" => requirements
}
])
end
def generate_message_one_for_many(source_paths, launch_params) do
{:ok, job} = generate_job_one_for_many(source_paths, launch_params)
StepFlow.Controllers.Jobs.get_message(job)
end
def generate_job_one_for_many(source_paths, launch_params) do
select_input =
StepFlow.Map.get_by_key_or_atom(launch_params.step, :parameters, [])
|> Enum.filter(fn param ->
StepFlow.Map.get_by_key_or_atom(param, :type) == "select_input"
end)
|> Enum.map(fn param ->
id = StepFlow.Map.get_by_key_or_atom(param, :id)
value = StepFlow.Map.get_by_key_or_atom(param, :value)
Logger.info("source paths: #{inspect(source_paths)} // value: #{inspect(value)}")
path =
Helpers.filter_path_list(source_paths, [value])
|> List.first()
%{
id: id,
type: "string",
value: path
}
end)
destination_filename_templates =
Helpers.get_value_in_parameters_with_type(
launch_params.step,
"destination_filename",
"template"
)
base_directory = Helpers.get_base_directory(launch_params.workflow, launch_params.step)
select_input =
case destination_filename_templates do
[destination_filename_template] ->
if StepFlow.Map.get_by_key_or_atom(launch_params.step, :skip_destination_path, false) do
select_input
else
filename =
destination_filename_template
|> Helpers.template_process(
launch_params.workflow,
launch_params.step,
launch_params.dates,
source_paths
)
|> Path.basename()
destination_path = base_directory <> filename
Enum.concat(select_input, [
%{
id: "destination_path",
type: "string",
value: destination_path
}
])
end
_ ->
select_input
end
source_paths =
resolve_source_paths_templates(
launch_params.workflow,
launch_params.dates,
launch_params.step,
source_paths
)
|> Helpers.filter_untemplated()
requirements =
if launch_params.workflow.is_live do
# Add the appropriate srt stream requirements check
[]
else
Helpers.get_step_requirements(launch_params.workflow.jobs, launch_params.step)
|> Helpers.add_required_paths(source_paths)
end
parameters =
filter_and_pre_compile_parameters(launch_params, source_paths)
|> Enum.concat(select_input)
|> Enum.concat([
%{
"id" => "source_paths",
"type" => "array_of_strings",
"value" => source_paths
},
%{
"id" => "requirements",
"type" => "requirements",
"value" => requirements
}
])
job_params = %{
name: LaunchParams.get_step_name(launch_params),
step_id: LaunchParams.get_step_id(launch_params),
workflow_id: launch_params.workflow.id,
allow_failure: LaunchParams.get_step_allow_failure(launch_params),
parameters: parameters
}
Jobs.create_job(job_params)
end
defp filter_and_pre_compile_parameters(launch_params, source_paths) do
Launch.filter_and_pre_compile_parameters(
StepFlow.Map.get_by_key_or_atom(launch_params.step, :parameters, []),
launch_params.workflow,
launch_params.step,
launch_params.dates,
source_paths
)
end
defp resolve_source_paths_templates(workflow, dates, step, source_paths) do
source_paths_templates =
Helpers.get_value_in_parameters_with_type(step, "source_paths", "array_of_templates")
|> List.flatten()
|> Helpers.filter_empty()
case source_paths_templates do
nil ->
source_paths
[] ->
source_paths
templates ->
Enum.map(
templates,
fn template ->
Helpers.template_process(template, workflow, step, dates, nil)
end
)
end
end
end