lib/step_flow/controllers/step/step.ex

defmodule StepFlow.Step do
  @moduledoc """
  The Step context.
  """

  require Logger

  alias StepFlow.Amqp.CommonEmitter
  alias StepFlow.Artifacts
  alias StepFlow.Controllers.BlackList
  alias StepFlow.Jobs
  alias StepFlow.Jobs.Status
  alias StepFlow.Repo
  alias StepFlow.Step.Helpers
  alias StepFlow.Step.Launch
  alias StepFlow.Step.LaunchJobs
  alias StepFlow.Workflows
  alias StepFlow.Workflows.Workflow

  def start_next(%Workflow{id: _workflow_id} = workflow) do
    workflow = Repo.preload(workflow, :jobs, force: true)
    is_live = workflow.is_live

    last_workflow_status = Workflows.Status.get_last_workflow_status(workflow.id)

    case last_workflow_status do
      nil ->
        do_start_next(workflow, is_live)

      last_workflow_status ->
        case last_workflow_status.state do
          :stopped -> {:ok, "stopped"}
          :pausing -> {:ok, "paused"}
          :paused -> {:ok, "paused"}
          _ -> do_start_next(workflow, is_live)
        end
    end
  end

  defp do_start_next(%Workflow{id: workflow_id} = workflow, is_live) do
    jobs = Repo.preload(workflow.jobs, [:status, :progressions])

    steps =
      StepFlow.Map.get_by_key_or_atom(workflow, :steps)
      |> StepFlow.Controllers.Workflows.get_steps_with_status(jobs)

    {is_completed_workflow, steps_to_start} = get_steps_to_start(steps, is_live)

    steps_to_start =
      case {steps_to_start, jobs} do
        {[], []} ->
          case List.first(steps) do
            nil ->
              Logger.warn("#{__MODULE__}: empty workflow #{workflow_id} is completed")
              {:completed_workflow, []}

            step ->
              {:ok, [step]}
          end

        {[], _} ->
          {:completed_workflow, []}

        {list, _} ->
          {:ok, list}
      end

    results = start_steps(steps_to_start, workflow)

    get_final_status(workflow, is_completed_workflow, Enum.uniq(results) |> Enum.sort())
  end

  def pause_step(step, [], post_action) do
    step_id = StepFlow.Map.get_by_key_or_atom(step, :id)
    step_name = StepFlow.Map.get_by_key_or_atom(step, :name)
    workflow_id = StepFlow.Map.get_by_key_or_atom(step, :workflow_id)

    {:ok, _status} = Jobs.create_paused_job(workflow_id, step_id, step_name, post_action)

    step
  end

  def pause_step(step, step_jobs, post_action) do
    queued_jobs =
      step_jobs
      |> Enum.map(fn job -> Repo.preload(job, :status, force: true) end)
      |> Enum.filter(fn job ->
        last_status = StepFlow.Controllers.Jobs.get_last_status(job.status)
        last_status.state == :queued
      end)

    queued_jobs
    |> Enum.map(fn job -> job.id end)
    |> Enum.each(fn job_id ->
      BlackList.add_and_notify!(job_id)

      {:ok, _status} = Status.set_job_status(job_id, :processing)
      {:ok, _status} = Status.set_job_status(job_id, :paused, post_action)
    end)

    step
  end

  def resume_step(step_jobs) do
    step_jobs =
      step_jobs
      |> Enum.map(fn job -> Repo.preload(job, :status, force: true) end)

    paused_jobs =
      step_jobs
      |> Enum.filter(fn job ->
        last_status = StepFlow.Controllers.Jobs.get_last_status(job.status)
        last_status.state == :paused
      end)

    paused_jobs
    |> Enum.each(fn job ->
      {:ok, new_job} = Jobs.copy_job(job)
      {:ok, _} = LaunchJobs.start_job(new_job)
    end)

    {:ok, "queued"}
  end

  def skip_step(workflow, step) do
    step_id = StepFlow.Map.get_by_key_or_atom(step, :id)
    step_name = StepFlow.Map.get_by_key_or_atom(step, :name)

    Repo.preload(workflow, :jobs, force: true)
    |> Jobs.create_skipped_job(step_id, step_name)
  end

  def abort_step_jobs(workflow, step) do
    step_id = StepFlow.Map.get_by_key_or_atom(step, :id)
    step_name = StepFlow.Map.get_by_key_or_atom(step, :name)

    Repo.preload(workflow, :jobs, force: true)
    |> Jobs.abort_jobs(step_id, step_name)
  end

  def skip_step_jobs(workflow, step) do
    step_id = StepFlow.Map.get_by_key_or_atom(step, :id)
    step_name = StepFlow.Map.get_by_key_or_atom(step, :name)

    Repo.preload(workflow, :jobs, force: true)
    |> Jobs.skip_jobs(step_id, step_name)
  end

  defp set_artifacts(workflow) do
    resources = %{}

    params = %{
      resources: resources,
      workflow_id: workflow.id
    }

    Artifacts.create_artifact(params)
  end

  defp get_steps_to_start(steps, is_live), do: iter_get_steps_to_start(steps, steps, is_live)

  defp iter_get_steps_to_start(steps, all_steps, is_live, completed \\ true, result \\ [])

  defp iter_get_steps_to_start([], _all_steps, _is_live, completed, result),
    do: {completed, result}

  defp iter_get_steps_to_start([step | steps], all_steps, true, completed, result) do
    result = List.insert_at(result, -1, step)

    iter_get_steps_to_start(steps, all_steps, true, completed, result)
  end

  defp iter_get_steps_to_start([step | steps], all_steps, false, completed, result) do
    completed =
      if step.status in [:completed, :skipped, :stopped] or
           (step.status in [:error] and StepFlow.Map.get_by_key_or_atom(step, :allow_failure)) do
        completed
      else
        false
      end

    result =
      if step.status == :pending do
        case StepFlow.Map.get_by_key_or_atom(step, :required_to_start) do
          nil ->
            List.insert_at(result, -1, step)

          required_to_start ->
            all_steps =
              Enum.filter(all_steps, fn s ->
                StepFlow.Map.get_by_key_or_atom(s, :id) in required_to_start
              end)

            count_not_completed =
              all_steps
              |> Enum.map(fn s -> StepFlow.Map.get_by_key_or_atom(s, :status) end)
              |> Enum.filter(fn s -> s != :completed and s != :skipped end)
              |> length

            count_allow_failure =
              all_steps
              |> Enum.filter(fn s ->
                StepFlow.Map.get_by_key_or_atom(s, "allow_failure") == true
              end)
              |> Enum.map(fn s -> StepFlow.Map.get_by_key_or_atom(s, :status) end)
              |> Enum.filter(fn s -> s == :error end)
              |> length

            count_not_completed = count_not_completed - count_allow_failure

            if count_not_completed == 0 do
              List.insert_at(result, -1, step)
            else
              result
            end
        end
      else
        result
      end

    iter_get_steps_to_start(steps, all_steps, false, completed, result)
  end

  defp start_steps({:completed_workflow, _}, _workflow), do: [:completed_workflow]

  defp start_steps({:ok, steps}, workflow) do
    dates = Helpers.get_dates()

    for step <- steps do
      step_name = StepFlow.Map.get_by_key_or_atom(step, :name)
      step_id = StepFlow.Map.get_by_key_or_atom(step, :id)
      source_paths = Launch.get_source_paths(workflow, step, dates)

      Logger.warn(
        "#{__MODULE__}: start to process step #{step_name} (index #{step_id}) for workflow #{workflow.id}"
      )

      {result, status} =
        StepFlow.Map.get_by_key_or_atom(step, :condition)
        |> case do
          condition when condition in [0, nil] ->
            Launch.launch_step(workflow, step)

          condition ->
            Helpers.template_process(
              "<%= " <> condition <> "%>",
              workflow,
              step,
              dates,
              source_paths
            )
            |> case do
              "true" ->
                Launch.launch_step(workflow, step)

              "false" ->
                skip_step(workflow, step)
                {:ok, "skipped"}

              _ ->
                Logger.error(
                  "#{__MODULE__}: cannot estimate condition for step #{step_name} (index #{step_id}) for workflow #{workflow.id}"
                )

                {:error, "bad step condition"}
            end
        end

      Logger.info("#{step_name}: #{inspect({result, status})}")

      topic = "update_workflow_" <> Integer.to_string(workflow.id)

      StepFlow.Notification.send(topic, %{workflow_id: workflow.id})

      status
    end
  end

  defp get_final_status(_workflow, _is_completed_workflow, ["started"]), do: {:ok, "started"}
  defp get_final_status(_workflow, _is_completed_workflow, ["created"]), do: {:ok, "started"}

  defp get_final_status(_workflow, _is_completed_workflow, ["created", "started"]),
    do: {:ok, "started"}

  defp get_final_status(workflow, _is_completed_workflow, ["skipped"]), do: start_next(workflow)
  defp get_final_status(workflow, _is_completed_workflow, ["completed"]), do: start_next(workflow)

  defp get_final_status(workflow, true, [:completed_workflow]) do
    Workflows.Status.define_workflow_status(workflow.id, :completed_workflow)
    set_artifacts(workflow)
    Logger.warn("#{__MODULE__}: workflow #{workflow.id} is completed")
    # For nested workflows
    if workflow.parent_id != nil do
      CommonEmitter.publish_json(
        "job_completed",
        0,
        %{
          job_id: workflow.parent_id,
          status: "completed",
          description: %{
            "id" => "message",
            "type" => "string",
            "value" => "Child workflow #{workflow.id} completed"
          }
        },
        "job_response"
      )
    end

    {:ok, "completed"}
  end

  defp get_final_status(_workflow, _is_completed_workflow, _states), do: {:ok, "still_processing"}
end