lib/step_flow/controllers/step/launch.ex

defmodule StepFlow.Step.Launch do
  @moduledoc """
  The Step launcher context.
  """
  require Logger

  alias StepFlow.Jobs
  alias StepFlow.Step.Helpers
  alias StepFlow.Step.LaunchJobs
  alias StepFlow.Step.LaunchParams
  alias StepFlow.Step.LaunchWorkflows
  alias StepFlow.Step.Live
  alias StepFlow.Workflows

  def launch_step(workflow, step) do
    dates = Helpers.get_dates()
    # refresh workflow to get recent stored parameters on it
    workflow = Workflows.get_workflow!(workflow.id)

    step_id = StepFlow.Map.get_by_key_or_atom(step, :id)
    step_name = StepFlow.Map.get_by_key_or_atom(step, :name)
    step_mode = StepFlow.Map.get_by_key_or_atom(step, :mode, "one_for_one")
    source_paths = get_source_paths(workflow, step, dates)
    is_live = workflow.is_live

    case {source_paths, step_mode, is_live} do
      {source_paths, _, true} when is_list(source_paths) ->
        Logger.debug("Live Step")

        first_file =
          source_paths
          |> Enum.sort()
          |> List.first()

        direct_messaging_parameters = %{
          id: "direct_messaging_queue_name",
          type: "string",
          value: Ecto.UUID.generate()
        }

        step =
          Map.put(
            step,
            :parameters,
            StepFlow.Map.get_by_key_or_atom(step, :parameters) ++
              [direct_messaging_parameters]
          )

        launch_params = LaunchParams.new(workflow, step, dates, first_file)
        Live.create_job_live(source_paths, launch_params)

      {[], _, false} ->
        Logger.debug("Job skipped")
        Jobs.create_skipped_job(workflow, step_id, step_name)

      {source_paths, step_mode, false} when is_list(source_paths) ->
        launch_file_step(workflow, step, dates, source_paths, step_mode)

      {_, _, false} ->
        Logger.debug("Job skipped")
        Jobs.create_skipped_job(workflow, step_id, step_name)
    end
  end

  defp launch_file_step(workflow, step, dates, source_paths, step_mode) do
    case step_mode do
      "one_for_one" ->
        Logger.debug("Job one for one path")

        first_file =
          source_paths
          |> Enum.sort()
          |> List.first()

        launch_params = LaunchParams.new(workflow, step, dates, first_file)

        case StepFlow.Map.get_by_key_or_atom(step, :multiple_jobs) do
          nil ->
            LaunchJobs.start_job_one_for_one(
              source_paths,
              launch_params
            )

          multiple_jobs_parameter ->
            LaunchJobs.start_multiple_jobs_one_for_one(
              source_paths,
              multiple_jobs_parameter,
              launch_params
            )
        end

      "one_for_many" ->
        Logger.debug("Job one for many paths")
        launch_params = LaunchParams.new(workflow, step, dates)
        LaunchJobs.start_job_one_for_many(source_paths, launch_params)

      "workflow_one_for_one" ->
        Logger.debug("Workflow one for one path")

        LaunchWorkflows.start_child_workflows_one_for_one(
          workflow,
          step,
          dates,
          source_paths
        )

      "workflow_one_for_many" ->
        Logger.debug("Workflow one for many paths")
        LaunchWorkflows.start_child_workflow_one_for_many(workflow, step, dates, source_paths)
    end
  end

  def get_source_paths(workflow, step, dates) do
    input_filter = Helpers.get_value_in_parameters(step, "input_filter")

    case StepFlow.Map.get_by_key_or_atom(step, :parent_ids, []) do
      [] ->
        Helpers.get_value_in_parameters(step, "source_paths")
        |> List.flatten()
        |> Helpers.filter_empty()
        |> Helpers.templates_process(workflow, step, dates)
        |> Helpers.filter_untemplated()
        |> Helpers.filter_path_list(input_filter)

      parent_ids ->
        workflow.jobs
        |> Enum.filter(fn job -> job.step_id in parent_ids end)
        |> Helpers.get_jobs_destination_paths()
        |> Helpers.filter_empty()
        |> Helpers.filter_path_list(input_filter)
    end
  end

  def build_requirements_and_destination_path(
        destination_path_templates,
        _,
        workflow,
        step,
        dates,
        base_directory,
        source_path,
        first_file
      )
      when destination_path_templates != [] and destination_path_templates != "" do
    destination_path_template =
      case destination_path_templates do
        [path] -> path
        _ -> destination_path_templates
      end

    destination_path =
      Helpers.template_process(destination_path_template, workflow, step, dates, source_path)

    required_paths = get_required_paths(source_path, first_file, base_directory)

    {required_paths, destination_path}
  end

  def build_requirements_and_destination_path(
        _,
        destination_filename_templates,
        workflow,
        step,
        dates,
        base_directory,
        source_path,
        first_file
      )
      when destination_filename_templates != [] and destination_filename_templates != "" do
    destination_filename_template =
      case destination_filename_templates do
        [path] -> path
        _ -> destination_filename_templates
      end

    filename =
      Helpers.template_process(destination_filename_template, workflow, step, dates, source_path)
      |> Path.basename()

    required_paths = get_required_paths(source_path, first_file, base_directory)

    {required_paths, base_directory <> filename}
  end

  def build_requirements_and_destination_path(
        _,
        _,
        _workflow,
        _step,
        _dates,
        base_directory,
        source_path,
        first_file
      ) do
    required_paths = get_required_paths(source_path, first_file, base_directory)

    {required_paths, base_directory <> Path.basename(source_path)}
  end

  def get_required_paths(source_path, first_file, base_directory) do
    if source_path != first_file do
      if String.starts_with?(first_file, base_directory) do
        first_file
      else
        base_directory <> Path.basename(first_file)
      end
    else
      []
    end
  end

  def filter_and_pre_compile_parameters(params, workflow, step, dates, source_paths) do
    params
    |> Enum.map(fn param ->
      case StepFlow.Map.get_by_key_or_atom(param, :type) do
        "template" ->
          value =
            StepFlow.Map.get_by_key_or_atom(
              param,
              :value,
              StepFlow.Map.get_by_key_or_atom(param, :default)
            )
            |> Helpers.template_process(
              workflow,
              step,
              dates,
              source_paths
            )

          {_, filtered_map} =
            StepFlow.Map.replace_by_atom(param, :type, "string")
            |> StepFlow.Map.replace_by_atom(:value, value)
            |> Map.pop("default")

          filtered_map

        "array_of_templates" ->
          filter_and_pre_compile_array_of_templates_parameter(
            param,
            workflow,
            step,
            dates
          )

        _ ->
          param
      end
    end)
    |> Enum.filter(fn param ->
      filter_parameters_by_type(param) &&
        (StepFlow.Map.get_by_key_or_atom(param, :id) != "source_paths" ||
           StepFlow.Map.get_by_key_or_atom(param, :type) != "array_of_strings" ||
           StepFlow.Map.get_by_key_or_atom(step, :keep_source_paths, true))
    end)
  end

  defp filter_parameters_by_type(param) do
    StepFlow.Map.get_by_key_or_atom(param, :type) != "filter" &&
      StepFlow.Map.get_by_key_or_atom(param, :type) != "template" &&
      StepFlow.Map.get_by_key_or_atom(param, :type) != "select_input" &&
      StepFlow.Map.get_by_key_or_atom(param, :type) != "array_of_templates"
  end

  defp filter_and_pre_compile_array_of_templates_parameter(param, workflow, step, dates) do
    case StepFlow.Map.get_by_key_or_atom(param, :id) do
      "source_paths" ->
        param

      _ ->
        value =
          StepFlow.Map.get_by_key_or_atom(
            param,
            :value,
            StepFlow.Map.get_by_key_or_atom(param, :default)
          )
          |> Helpers.templates_process(workflow, step, dates)

        {_, filtered_map} =
          StepFlow.Map.replace_by_atom(param, :type, infer_type(List.first(value)))
          |> StepFlow.Map.replace_by_atom(:value, value)
          |> Map.pop("default")

        filtered_map
    end
  end

  defp infer_type(value) when is_integer(value), do: "array_of_integers"
  defp infer_type(value) when is_binary(value), do: "array_of_strings"
end