lib/step_flow/controllers/step/launch_workflows.ex

defmodule StepFlow.Step.LaunchWorkflows do
  @moduledoc """
  The Step Workflows launching parameters.
  """
  require Logger

  alias StepFlow.Amqp.CommonEmitter
  alias StepFlow.Jobs
  alias StepFlow.Jobs.Status
  alias StepFlow.Step
  alias StepFlow.Step.Helpers
  alias StepFlow.Step.Launch
  alias StepFlow.Workflows

  def generate_child_workflow_job(workflow, step) do
    job_params = %{
      name: StepFlow.Map.get_by_key_or_atom(step, :name),
      step_id: StepFlow.Map.get_by_key_or_atom(step, :id),
      workflow_id: workflow.id,
      allow_failure: StepFlow.Map.get_by_key_or_atom(step, :allow_failure),
      parameters: StepFlow.Map.get_by_key_or_atom(step, :parameters)
    }

    Jobs.create_job(job_params)
  end

  # CAREFUL: trick here is that source_paths can be a list or a string
  def generate_child_workflow_params(workflow, step, job, dates, source_paths) do
    wf_identifier = Helpers.get_value_in_parameters(step, "identifier") |> List.first()
    wf_major = Helpers.get_value_in_parameters(step, "version_major") |> List.first()
    wf_minor = Helpers.get_value_in_parameters(step, "version_minor") |> List.first()
    wf_micro = Helpers.get_value_in_parameters(step, "version_micro") |> List.first()

    wf_def =
      StepFlow.WorkflowDefinitions.get_workflow_definition(
        wf_identifier,
        wf_major,
        wf_minor,
        wf_micro
      )

    wf_params =
      Helpers.get_value_in_parameters(step, "parameters")
      |> List.first()
      |> Launch.filter_and_pre_compile_parameters(workflow, step, dates, source_paths)

    %{
      identifier: wf_identifier,
      parameters: wf_params,
      parent_id: job.id,
      reference: "[AUTOGENERATED] Child from workflow #{workflow.id}",
      schema_version: StepFlow.Map.get_by_key_or_atom(wf_def, :schema_version),
      steps: StepFlow.Map.get_by_key_or_atom(wf_def, :steps),
      version_major: wf_major,
      version_minor: wf_minor,
      version_micro: wf_micro,
      user_uuid: workflow.user_uuid,
      tags: StepFlow.Map.get_by_key_or_atom(wf_def, :tags)
    }
  end

  defp start_workflow(workflow_params, workflow, job) do
    case Workflows.create_workflow(workflow_params) do
      {:ok, child_workflow} ->
        Logger.info(
          "Created child workflow #{child_workflow.id} from parent workflow #{workflow.id}"
        )

        Workflows.Status.define_workflow_status(child_workflow.id, :created_workflow)
        {:ok, datetime} = DateTime.now("Etc/UTC")

        CommonEmitter.publish_json(
          "job_progression",
          0,
          %{
            job_id: job.id,
            datetime: datetime,
            docker_container_id: "workflow",
            progression: 0
          },
          "job_response"
        )

        Step.start_next(child_workflow)

        StepFlow.Notification.send("new_workflow", %{workflow_id: child_workflow.id})

      {:error, _} ->
        Logger.error("Cannot create child workflow from parent workflow #{workflow.id}")

        CommonEmitter.publish_json(
          "job_error",
          0,
          %{
            job_id: job.id,
            status: "error",
            parameters: [
              %{
                "id" => "message",
                "type" => "string",
                "value" => "Cannot create child workflow from parent workflow #{workflow.id}"
              }
            ]
          },
          "job_response"
        )
    end
  end

  def start_child_workflow_one_for_many(workflow, step, dates, source_paths) do
    datetime = NaiveDateTime.to_string(DateTime.utc_now())
    {:ok, job} = generate_child_workflow_job(workflow, step)
    Status.set_job_status(job.id, "queued", %{}, datetime)

    generate_child_workflow_params(workflow, step, job, dates, source_paths)
    |> start_workflow(workflow, job)

    {:ok, "started"}
  end

  defp start_child_workflow_one_for_one(workflow, step, dates, source_path) do
    datetime = NaiveDateTime.to_string(DateTime.utc_now())
    {:ok, job} = generate_child_workflow_job(workflow, step)
    Status.set_job_status(job.id, "queued", %{}, datetime)

    generate_child_workflow_params(workflow, step, job, dates, source_path)
    |> start_workflow(workflow, job)
  end

  def start_child_workflows_one_for_one(_, _, _, []),
    do: {:ok, "started"}

  def start_child_workflows_one_for_one(
        workflow,
        step,
        dates,
        [source_path | source_paths]
      ) do
    start_child_workflow_one_for_one(
      workflow,
      step,
      dates,
      source_path
    )

    start_child_workflows_one_for_one(workflow, step, dates, source_paths)
  end
end