lib/step_flow/controllers/step/launch_jobs.ex

defmodule StepFlow.Step.LaunchJobs do
  @moduledoc """
  The Step Jobs launching parameters.
  """
  require Logger

  alias StepFlow.Amqp.CommonEmitter
  alias StepFlow.Jobs
  alias StepFlow.Jobs.Status
  alias StepFlow.Step.Helpers
  alias StepFlow.Step.Launch
  alias StepFlow.Step.LaunchParams

  def start_job_one_for_one([], _launch_params),
    do: {:ok, "started"}

  def start_job_one_for_one(
        [source_path | source_paths],
        launch_params
      ) do
    {:ok, job} =
      generate_job_one_for_one(
        source_path,
        launch_params
      )

    case start_job(job) do
      {:ok, _} -> start_job_one_for_one(source_paths, launch_params)
      {:error, error} -> {:error, error}
    end
  end

  def start_multiple_jobs_one_for_one(source_paths, multiple_jobs_parameter, launch_params) do
    segments =
      Helpers.get_value_in_parameters_with_type(
        launch_params.workflow,
        multiple_jobs_parameter,
        "array_of_media_segments"
      )
      |> List.first()

    case segments do
      nil ->
        start_job_one_for_one(
          source_paths,
          launch_params
        )

      segments ->
        start_jobs_one_for_one_for_segments(
          segments,
          source_paths,
          launch_params
        )
    end
  end

  defp start_jobs_one_for_one_for_segments(
         [],
         _source_paths,
         _launch_params
       ),
       do: {:ok, "started"}

  defp start_jobs_one_for_one_for_segments(
         [segment | segments],
         source_paths,
         launch_params
       ) do
    launch_params = %{launch_params | segment: segment}

    _result =
      start_job_one_for_one_with_segment(
        source_paths,
        launch_params
      )

    start_jobs_one_for_one_for_segments(
      segments,
      source_paths,
      launch_params
    )
  end

  defp start_job_one_for_one_with_segment(
         [],
         _launch_params
       ),
       do: {:ok, "started"}

  defp start_job_one_for_one_with_segment(
         [source_path | source_paths],
         launch_params
       ) do
    new_parameters =
      StepFlow.Map.get_by_key_or_atom(launch_params.step, :parameters, [])
      |> Enum.concat([
        %{
          "id" => "sdk_start_index",
          "type" => "integer",
          "value" => StepFlow.Map.get_by_key_or_atom(launch_params.segment, :start)
        },
        %{
          "id" => "sdk_stop_index",
          "type" => "integer",
          "value" => StepFlow.Map.get_by_key_or_atom(launch_params.segment, :end)
        }
      ])

    updated_step = StepFlow.Map.replace_by_atom(launch_params.step, :parameters, new_parameters)
    launch_params = %{launch_params | step: updated_step}

    parameters =
      generate_job_parameters_one_for_one(
        source_path,
        launch_params
      )

    step_name = LaunchParams.get_step_name(launch_params)
    step_id = LaunchParams.get_step_id(launch_params)
    allow_failure = LaunchParams.get_step_allow_failure(launch_params)

    job_params = %{
      name: step_name,
      step_id: step_id,
      workflow_id: launch_params.workflow.id,
      allow_failure: allow_failure,
      parameters: parameters
    }

    {:ok, job} = Jobs.create_job(job_params)

    case start_job(job) do
      {:ok, _} -> start_job_one_for_one_with_segment(source_paths, launch_params)
      {:error, error} -> {:error, error}
    end
  end

  def start_job_one_for_many(source_paths, launch_params) do
    {:ok, job} = generate_job_one_for_many(source_paths, launch_params)

    start_job(job)
  end

  def start_job(job) do
    message = StepFlow.Controllers.Jobs.get_message(job)
    datetime = NaiveDateTime.to_string(DateTime.utc_now())

    case CommonEmitter.publish_json(
           job.name,
           job.step_id,
           message
         ) do
      :ok ->
        {:ok, _status} = Status.set_job_status(job.id, "queued", %{}, datetime)
        {:ok, "started"}

      _ ->
        {:error, "unable to publish message"}
    end
  end

  def generate_message_one_for_one(source_path, launch_params) do
    {:ok, job} = generate_job_one_for_one(source_path, launch_params)

    StepFlow.Controllers.Jobs.get_message(job)
  end

  def generate_job_one_for_one(
        source_path,
        launch_params
      ) do
    parameters =
      generate_job_parameters_one_for_one(
        source_path,
        launch_params
      )

    job_params = %{
      name: LaunchParams.get_step_name(launch_params),
      step_id: LaunchParams.get_step_id(launch_params),
      workflow_id: launch_params.workflow.id,
      allow_failure: LaunchParams.get_step_allow_failure(launch_params),
      parameters: parameters
    }

    Jobs.create_job(job_params)
  end

  def generate_job_parameters_one_for_one(
        source_path,
        launch_params
      ) do
    destination_path_templates =
      Helpers.get_value_in_parameters_with_type(
        launch_params.step,
        "destination_path",
        "template"
      )

    destination_filename_templates =
      Helpers.get_value_in_parameters_with_type(
        launch_params.step,
        "destination_filename",
        "template"
      )

    base_directory = Helpers.get_base_directory(launch_params.workflow, launch_params.step)

    {required_paths, destination_path} =
      Launch.build_requirements_and_destination_path(
        destination_path_templates,
        destination_filename_templates,
        launch_params.workflow,
        launch_params.step,
        launch_params.dates,
        base_directory,
        source_path,
        launch_params.required_file
      )

    requirements =
      if launch_params.workflow.is_live do
        # Add the appropriate srt stream requirements check
        []
      else
        Helpers.get_step_requirements(launch_params.workflow.jobs, launch_params.step)
        |> Helpers.add_required_paths(required_paths)
      end

    destination_path_parameter =
      if StepFlow.Map.get_by_key_or_atom(launch_params.step, :skip_destination_path, false) do
        []
      else
        [
          %{
            "id" => "destination_path",
            "type" => "string",
            "value" => destination_path
          }
        ]
      end

    filter_and_pre_compile_parameters(
      launch_params,
      source_path
    )
    |> Enum.concat(destination_path_parameter)
    |> Enum.concat([
      %{
        "id" => "source_path",
        "type" => "string",
        "value" => source_path
      },
      %{
        "id" => "requirements",
        "type" => "requirements",
        "value" => requirements
      }
    ])
  end

  def generate_message_one_for_many(source_paths, launch_params) do
    {:ok, job} = generate_job_one_for_many(source_paths, launch_params)

    StepFlow.Controllers.Jobs.get_message(job)
  end

  def generate_job_one_for_many(source_paths, launch_params) do
    select_input =
      StepFlow.Map.get_by_key_or_atom(launch_params.step, :parameters, [])
      |> Enum.filter(fn param ->
        StepFlow.Map.get_by_key_or_atom(param, :type) == "select_input"
      end)
      |> Enum.map(fn param ->
        id = StepFlow.Map.get_by_key_or_atom(param, :id)
        value = StepFlow.Map.get_by_key_or_atom(param, :value)
        Logger.info("source paths: #{inspect(source_paths)} // value: #{inspect(value)}")

        path =
          Helpers.filter_path_list(source_paths, [value])
          |> List.first()

        %{
          id: id,
          type: "string",
          value: path
        }
      end)

    destination_filename_templates =
      Helpers.get_value_in_parameters_with_type(
        launch_params.step,
        "destination_filename",
        "template"
      )

    base_directory = Helpers.get_base_directory(launch_params.workflow, launch_params.step)

    select_input =
      case destination_filename_templates do
        [destination_filename_template] ->
          if StepFlow.Map.get_by_key_or_atom(launch_params.step, :skip_destination_path, false) do
            select_input
          else
            filename =
              destination_filename_template
              |> Helpers.template_process(
                launch_params.workflow,
                launch_params.step,
                launch_params.dates,
                source_paths
              )
              |> Path.basename()

            destination_path = base_directory <> filename

            Enum.concat(select_input, [
              %{
                id: "destination_path",
                type: "string",
                value: destination_path
              }
            ])
          end

        _ ->
          select_input
      end

    source_paths =
      resolve_source_paths_templates(
        launch_params.workflow,
        launch_params.dates,
        launch_params.step,
        source_paths
      )
      |> Helpers.filter_untemplated()

    requirements =
      if launch_params.workflow.is_live do
        # Add the appropriate srt stream requirements check
        []
      else
        Helpers.get_step_requirements(launch_params.workflow.jobs, launch_params.step)
        |> Helpers.add_required_paths(source_paths)
      end

    parameters =
      filter_and_pre_compile_parameters(launch_params, source_paths)
      |> Enum.concat(select_input)
      |> Enum.concat([
        %{
          "id" => "source_paths",
          "type" => "array_of_strings",
          "value" => source_paths
        },
        %{
          "id" => "requirements",
          "type" => "requirements",
          "value" => requirements
        }
      ])

    job_params = %{
      name: LaunchParams.get_step_name(launch_params),
      step_id: LaunchParams.get_step_id(launch_params),
      workflow_id: launch_params.workflow.id,
      allow_failure: LaunchParams.get_step_allow_failure(launch_params),
      parameters: parameters
    }

    Jobs.create_job(job_params)
  end

  defp filter_and_pre_compile_parameters(launch_params, source_paths) do
    Launch.filter_and_pre_compile_parameters(
      StepFlow.Map.get_by_key_or_atom(launch_params.step, :parameters, []),
      launch_params.workflow,
      launch_params.step,
      launch_params.dates,
      source_paths
    )
  end

  defp resolve_source_paths_templates(workflow, dates, step, source_paths) do
    source_paths_templates =
      Helpers.get_value_in_parameters_with_type(step, "source_paths", "array_of_templates")
      |> List.flatten()
      |> Helpers.filter_empty()

    case source_paths_templates do
      nil ->
        source_paths

      [] ->
        source_paths

      templates ->
        Enum.map(
          templates,
          fn template ->
            Helpers.template_process(template, workflow, step, dates, nil)
          end
        )
    end
  end
end