lib/step_flow/controllers/step/step.ex

defmodule StepFlow.Step do
  @moduledoc """
  The Step context.
  """

  require Logger

  alias StepFlow.Amqp.CommonEmitter
  alias StepFlow.Artifacts
  alias StepFlow.Controllers.BlackList
  alias StepFlow.Jobs
  alias StepFlow.Jobs.Status
  alias StepFlow.Repo
  alias StepFlow.Step.Helpers
  alias StepFlow.Step.Launch
  alias StepFlow.Step.LaunchJobs
  alias StepFlow.Workflows
  alias StepFlow.Workflows.Workflow

  def start_next(%Workflow{id: _workflow_id} = workflow) do
    workflow = Repo.preload(workflow, :jobs, force: true)
    is_live = workflow.is_live

    last_workflow_status = Workflows.Status.get_last_workflow_status(workflow.id)

    case last_workflow_status do
      nil ->
        do_start_next(workflow, is_live)

      last_workflow_status ->
        case last_workflow_status.state do
          :stopped -> {:ok, "stopped"}
          :pausing -> {:ok, "paused"}
          :paused -> {:ok, "paused"}
          _ -> do_start_next(workflow, is_live)
        end
    end
  end

  defp do_start_next(%Workflow{id: workflow_id} = workflow, is_live) do
    jobs = Repo.preload(workflow.jobs, [:status, :progressions])

    steps =
      StepFlow.Map.get_by_key_or_atom(workflow, :steps)
      |> StepFlow.Controllers.Workflows.get_steps_with_status(jobs)

    {is_completed_workflow, steps_to_start} = get_steps_to_start(steps, is_live)

    steps_to_start =
      case {steps_to_start, jobs} do
        {[], []} ->
          case List.first(steps) do
            nil ->
              Logger.info("#{__MODULE__}: empty workflow #{workflow_id} is completed")
              {:completed_workflow, []}

            step ->
              {:ok, [step]}
          end

        {[], _} ->
          {:completed_workflow, []}

        {list, _} ->
          {:ok, list}
      end

    results = start_steps(steps_to_start, workflow)

    get_final_status(workflow, is_completed_workflow, Enum.uniq(results) |> Enum.sort())
  end

  def pause_step(step, [], post_action) do
    step_id = StepFlow.Map.get_by_key_or_atom(step, :id)
    step_name = StepFlow.Map.get_by_key_or_atom(step, :name)
    workflow_id = StepFlow.Map.get_by_key_or_atom(step, :workflow_id)

    {:ok, _status} = Jobs.create_paused_job(workflow_id, step_id, step_name, post_action)

    step
  end

  def pause_step(step, step_jobs, post_action) do
    queued_jobs =
      step_jobs
      |> Enum.map(fn job -> Repo.preload(job, :status, force: true) end)
      |> Enum.filter(fn job ->
        last_status = StepFlow.Controllers.Jobs.get_last_status(job.status)
        last_status.state == :queued
      end)

    queued_jobs
    |> Enum.map(fn job -> job.id end)
    |> Enum.each(fn job_id ->
      BlackList.add_and_notify!(job_id)

      {:ok, _status} = Status.set_job_status(job_id, :processing)
      {:ok, _status} = Status.set_job_status(job_id, :paused, post_action)
    end)

    step
  end

  def resume_step(step_jobs) do
    step_jobs =
      step_jobs
      |> Enum.map(fn job -> Repo.preload(job, :status, force: true) end)

    paused_jobs =
      step_jobs
      |> Enum.filter(fn job ->
        last_status = StepFlow.Controllers.Jobs.get_last_status(job.status)
        last_status.state == :paused
      end)

    paused_jobs
    |> Enum.each(fn job ->
      {:ok, new_job} = Jobs.copy_job(job)
      {:ok, _} = LaunchJobs.start_job(new_job)
    end)

    {:ok, "queued"}
  end

  def skip_step(workflow, step) do
    step_id = StepFlow.Map.get_by_key_or_atom(step, :id)
    step_name = StepFlow.Map.get_by_key_or_atom(step, :name)

    Repo.preload(workflow, :jobs, force: true)
    |> Jobs.create_skipped_job(step_id, step_name)
  end

  def abort_step_jobs(workflow, step) do
    step_id = StepFlow.Map.get_by_key_or_atom(step, :id)
    step_name = StepFlow.Map.get_by_key_or_atom(step, :name)

    Repo.preload(workflow, :jobs, force: true)
    |> Jobs.abort_jobs(step_id, step_name)
  end

  def skip_step_jobs(workflow, step) do
    step_id = StepFlow.Map.get_by_key_or_atom(step, :id)
    step_name = StepFlow.Map.get_by_key_or_atom(step, :name)

    Repo.preload(workflow, :jobs, force: true)
    |> Jobs.skip_jobs(step_id, step_name)
  end

  defp set_artifacts(workflow) do
    resources = %{}

    params = %{
      resources: resources,
      workflow_id: workflow.id
    }

    Artifacts.create_artifact(params)
  end

  defp get_steps_to_start(steps, is_live),
    do: iter_get_steps_to_start(steps, steps, is_live)

  defp iter_get_steps_to_start(steps, all_steps, is_live, completed \\ true, result \\ [])

  defp iter_get_steps_to_start([], _all_steps, _is_live, completed, result),
    do: {completed, result}

  defp iter_get_steps_to_start([step | steps], all_steps, true, completed, result) do
    result = List.insert_at(result, -1, step)

    iter_get_steps_to_start(steps, all_steps, true, completed, result)
  end

  defp iter_get_steps_to_start([step | steps], all_steps, false, completed, result) do
    completed =
      if step.status in [:completed, :skipped, :stopped] or
           (step.status in [:error] and allowed_to_fail(step)) do
        completed
      else
        false
      end

    result =
      if step.status == :pending and !StepFlow.Map.get_by_key_or_atom(step, :started, false) do
        check_required_steps(all_steps, step, result)
      else
        result
      end

    iter_get_steps_to_start(steps, all_steps, false, completed, result)
  end

  defp start_steps({:completed_workflow, _}, _workflow), do: [:completed_workflow]

  defp start_steps({:ok, steps}, workflow) do
    dates = Helpers.get_dates()

    for step <- steps do
      step_name = StepFlow.Map.get_by_key_or_atom(step, :name)
      step_id = StepFlow.Map.get_by_key_or_atom(step, :id)
      source_paths = Launch.get_source_paths(workflow, step, dates)
      step = Map.put(step, "started", true)

      {:ok, workflow} =
        StepFlow.Controllers.Workflows.update_step(workflow, step, "started", true)

      Logger.info(
        "#{__MODULE__}: start to process step #{step_name} (index #{step_id}) for workflow #{workflow.id}"
      )

      {result, status} =
        StepFlow.Map.get_by_key_or_atom(step, :condition)
        |> case do
          condition when condition in [0, nil] ->
            Launch.launch_step(workflow, step)

          condition ->
            Helpers.template_process(
              "<%= " <> condition <> "%>",
              workflow,
              step,
              dates,
              source_paths
            )
            |> case do
              "true" ->
                Launch.launch_step(workflow, step)

              "false" ->
                skip_step(workflow, step)
                {:ok, "skipped"}

              _ ->
                Logger.error(
                  "#{__MODULE__}: cannot estimate condition for step #{step_name} (index #{step_id}) for workflow #{workflow.id}"
                )

                {:error, "bad step condition"}
            end
        end

      Logger.info("#{step_name}: #{inspect({result, status})}")

      topic = "update_workflow_" <> Integer.to_string(workflow.id)

      StepFlow.Notification.send(topic, %{workflow_id: workflow.id})

      status
    end
  end

  defp get_final_status(_workflow, _is_completed_workflow, ["started"]),
    do: {:ok, "started"}

  defp get_final_status(_workflow, _is_completed_workflow, ["created"]),
    do: {:ok, "started"}

  defp get_final_status(_workflow, _is_completed_workflow, ["created", "started"]),
    do: {:ok, "started"}

  defp get_final_status(workflow, _is_completed_workflow, ["skipped"]),
    do: start_next(workflow)

  defp get_final_status(workflow, _is_completed_workflow, ["completed"]),
    do: start_next(workflow)

  defp get_final_status(workflow, true, [:completed_workflow]) do
    Workflows.Status.define_workflow_status(workflow.id, :completed_workflow)
    set_artifacts(workflow)
    Logger.info("#{__MODULE__}: workflow #{workflow.id} is completed")
    # For nested workflows
    if workflow.parent_id != nil do
      CommonEmitter.publish_json(
        "job_completed",
        0,
        %{
          job_id: workflow.parent_id,
          status: "completed",
          description: %{
            "id" => "message",
            "type" => "string",
            "value" => "Child workflow #{workflow.id} completed"
          }
        },
        "job_response"
      )
    end

    {:ok, "completed"}
  end

  defp get_final_status(_workflow, _is_completed_workflow, _states),
    do: {:ok, "still_processing"}

  defp check_required_steps(all_steps, step, result) do
    case StepFlow.Map.get_by_key_or_atom(step, :required_to_start) do
      nil ->
        List.insert_at(result, -1, step)

      required_to_start ->
        all_steps =
          Enum.filter(all_steps, fn s ->
            StepFlow.Map.get_by_key_or_atom(s, :id) in required_to_start
          end)

        count_not_completed =
          all_steps
          |> Enum.map(fn s -> StepFlow.Map.get_by_key_or_atom(s, :status) end)
          |> Enum.filter(fn s -> s != :completed and s != :skipped end)
          |> length

        count_allow_failure =
          all_steps
          |> Enum.filter(fn s ->
            StepFlow.Map.get_by_key_or_atom(s, :status) == :error and
              allowed_to_fail(s)
          end)
          |> length

        count_not_completed = count_not_completed - count_allow_failure

        if count_not_completed == 0 do
          List.insert_at(result, -1, step)
        else
          result
        end
    end
  end

  defp allowed_to_fail(step) do
    StepFlow.Map.get_by_key_or_atom(step, :allow_failure) == true and
      is_step_not_processing(step)
  end

  defp is_step_not_processing(step) do
    step.jobs.errors + step.jobs.skipped + step.jobs.completed == step.jobs.total
  end
end