lib/step_flow/models/workflows/workflows.ex

defmodule StepFlow.Workflows do
  @moduledoc """
  The Workflows context.
  """

  import Ecto.Query, warn: false

  alias StepFlow.Artifacts.Artifact
  alias StepFlow.Jobs
  alias StepFlow.Jobs.Status
  alias StepFlow.QueryFilter
  alias StepFlow.Repo
  alias StepFlow.Roles
  alias StepFlow.Workflows
  alias StepFlow.Workflows.Workflow

  require Logger

  @doc """
  Returns the list of workflows.

  ## Examples

      iex> list_workflows()
      [%Workflow{}, ...]

  """
  def list_workflows(params \\ %{}) do
    page =
      Map.get(params, "page", 0)
      |> StepFlow.Integer.force()

    size =
      Map.get(params, "size", 10)
      |> StepFlow.Integer.force()

    offset = page * size

    query =
      from(workflow in Workflow)
      |> apply_default_query_filters(params)

    total_query = from(item in subquery(query), select: count(item.id))

    total =
      Repo.all(total_query)
      |> List.first()

    query =
      from(
        workflow in subquery(query),
        order_by: [desc: :inserted_at],
        offset: ^offset,
        limit: ^size
      )

    workflows =
      Repo.all(query)
      |> Repo.preload([:artifacts, :status, jobs: :child_workflow])
      |> preload_workflows

    %{
      data: workflows,
      total: total,
      page: page,
      size: size
    }
  end

  def apply_default_query_filters(query, params \\ %{}) do
    query =
      from(workflow in subquery(query))
      |> search_for_reference(params, :search)
      |> QueryFilter.filter_query(params, :reference)
      |> QueryFilter.filter_query(params, :identifier)
      |> filter_version(params)
      |> QueryFilter.filter_query(params, :is_live)
      |> filter_deleted(params)
      |> QueryFilter.filter_query(params, :user_uuid)
      |> filter_mode(params)
      |> QueryFilter.apply_before_date_filter(params, :before_date)
      |> QueryFilter.apply_after_date_filter(params, :after_date)
      |> filter_status(params, :states)

    allowed_workflows = check_rights(params)

    query =
      case {StepFlow.Map.get_by_key_or_atom(params, :workflow_ids),
            Enum.member?(allowed_workflows, "*")} do
        {nil, true} ->
          query

        {nil, false} ->
          from(
            workflow in query,
            where: workflow.identifier in ^allowed_workflows
          )

        {workflow_ids, true} ->
          from(
            workflow in query,
            where: workflow.identifier in ^workflow_ids
          )

        {workflow_ids, false} ->
          intersect =
            workflow_ids
            |> Enum.filter(fn element -> element in allowed_workflows end)

          from(
            workflow in query,
            where: workflow.identifier in ^intersect
          )
      end

    query =
      case StepFlow.Map.get_by_key_or_atom(params, :ids) do
        nil ->
          query

        identifiers ->
          from(workflow in query, where: workflow.id in ^identifiers)
      end

    query
  end

  defp search_for_reference(query, params, field) do
    case StepFlow.Map.get_by_key_or_atom(params, field) do
      nil ->
        query

      search ->
        like = "%#{search}%"

        from(
          workflow in subquery(query),
          where: ilike(workflow.reference, ^like)
        )
    end
  end

  def check_rights(params) do
    roles =
      StepFlow.Map.get_by_key_or_atom(params, :roles, [])
      |> Roles.get_roles()

    view_rights =
      case roles do
        nil ->
          []

        _ ->
          StepFlow.Controllers.Roles.get_rights_for_entity_type_and_action(
            roles,
            "workflow",
            "view"
          )
      end

    for %{entity: entity} <- view_rights,
        do:
          String.split(entity, "::")
          |> List.last()
  end

  defp filter_deleted(query, params) do
    case Map.get(params, "deleted") do
      value when value in [nil, "none"] ->
        from(
          workflow in query,
          where: workflow.deleted == false
        )

      "only" ->
        from(
          workflow in query,
          where: workflow.deleted == true
        )

      "all" ->
        query
    end
  end

  defp filter_mode(query, params) do
    case Map.get(params, "mode") do
      nil ->
        from(workflow in query)

      ["live", "file"] ->
        from(workflow in query)

      ["live"] ->
        from(
          workflow in query,
          where: workflow.is_live == true
        )

      ["file"] ->
        from(
          workflow in query,
          where: workflow.is_live == false
        )
    end
  end

  defp filter_version(query, params) do
    case Map.get(params, "version") do
      nil ->
        from(workflow in query)

      versions ->
        from(
          workflow in query,
          where:
            fragment("CONCAT(version_major,'.',version_minor,'.',version_micro)") in ^versions
        )
    end
  end

  def filter_status(query, params, key) do
    case StepFlow.Map.get_by_key_or_atom(params, key) do
      nil ->
        query

      states ->
        from(
          workflow in query,
          join:
            workflow_status in subquery(
              from(
                workflow_status in Workflows.Status,
                order_by: [desc: workflow_status.id, desc: workflow_status.workflow_id],
                distinct: [desc: workflow_status.workflow_id]
              )
            ),
          on: workflow.id == workflow_status.workflow_id,
          where: workflow_status.state in ^states
        )
    end
  end

  @doc """
  Gets a single workflows.

  Raises `Ecto.NoResultsError` if the Workflow does not exist.

  ## Examples

      iex> get_workflows!(123)
      %Workflow{}

      iex> get_workflows!(456)
      ** (Ecto.NoResultsError)

  """
  def get_workflow!(id) do
    Repo.get!(Workflow, id)
    |> Repo.preload([:artifacts, jobs: :child_workflow])
    |> preload_workflow
  end

  @doc """
  Gets a single workflows containing the specified job ID.

  Raises `Ecto.NoResultsError` if the Workflow does not exist.

  ## Examples

      iex> get_workflow_for_job!(19)
      %Workflow{}

      iex> get_workflows!(456)
      ** (Ecto.NoResultsError)

  """
  def get_workflow_for_job!(job_id) do
    job = Jobs.get_job!(job_id)
    get_workflow!(job.workflow_id)
  end

  defp preload_workflow(workflow) do
    jobs = Repo.preload(workflow.jobs, [:status, :progressions])

    steps =
      workflow
      |> Map.get(:steps)
      |> StepFlow.Controllers.Workflows.get_steps_with_status(jobs)

    workflow
    |> Map.put(:steps, steps)
    |> Map.put(:jobs, jobs)
  end

  def preload_workflows(workflows, result \\ [])
  def preload_workflows([], result), do: result

  def preload_workflows([workflow | workflows], result) do
    result = List.insert_at(result, -1, workflow |> preload_workflow)
    preload_workflows(workflows, result)
  end

  def get_step_definition(job) do
    job = Repo.preload(job, workflow: [jobs: :child_workflow])

    step =
      Enum.filter(job.workflow.steps, fn step ->
        Map.get(step, "id") == job.step_id
      end)
      |> List.first()

    %{step: step, workflow: job.workflow}
  end

  @doc """
  Creates a workflow.

  ## Examples

      iex> create_workflow(%{field: value})
      {:ok, %Workflow{}}

      iex> create_workflow(%{field: bad_value})
      {:error, %Ecto.Changeset{}}

  """
  def create_workflow(attrs \\ %{}) do
    %Workflow{}
    |> Workflow.changeset(attrs)
    |> Repo.insert()
  end

  @doc """
  Updates a workflow.

  ## Examples

      iex> update_workflow(workflow, %{field: new_value})
      {:ok, %Workflow{}}

      iex> update_workflow(workflow, %{field: bad_value})
      {:error, %Ecto.Changeset{}}

  """
  def update_workflow(%Workflow{} = workflow, attrs) do
    workflow
    |> Workflow.changeset(attrs)
    |> Repo.update()
  end

  @doc """
  Deletes a Workflow.

  ## Examples

      iex> delete_workflow(workflow)
      {:ok, %Workflow{}}

      iex> delete_workflow(workflow)
      {:error, %Ecto.Changeset{}}

  """
  def delete_workflow(%Workflow{} = workflow) do
    workflow
    |> Workflow.changeset(%{deleted: true})
    |> Repo.update()
  end

  @doc """
  Returns an `%Ecto.Changeset{}` for tracking workflow changes.

  ## Examples

      iex> change_workflow(workflow)
      %Ecto.Changeset{source: %Workflow{}}

  """
  def change_workflow(%Workflow{} = workflow) do
    Workflow.changeset(workflow, %{})
  end

  def get_completed_statistics(scale, delta) do
    query =
      from(
        workflow in Workflow,
        inner_join:
          artifacts in subquery(
            from(
              artifacts in Artifact,
              where:
                artifacts.inserted_at > datetime_add(^NaiveDateTime.utc_now(), ^delta, ^scale),
              group_by: artifacts.workflow_id,
              select: %{
                workflow_id: artifacts.workflow_id,
                inserted_at: max(artifacts.inserted_at)
              }
            )
          ),
        on: workflow.id == artifacts.workflow_id,
        group_by: workflow.identifier,
        select: %{
          count: count(),
          duration:
            fragment(
              "CAST(EXTRACT(EPOCH FROM (SELECT avg(? - ?))) AS FLOAT)",
              artifacts.inserted_at,
              workflow.inserted_at
            ),
          identifier: workflow.identifier
        }
      )

    Repo.all(query)
  end

  @doc """
  Convert workflow version fields to a version string
  """
  def get_workflow_version_as_string(workflow) do
    to_string(workflow.version_major) <>
      "." <> to_string(workflow.version_minor) <> "." <> to_string(workflow.version_micro)
  end

  def abort(workflow) do
    workflow.steps
    |> abort_running_step_jobs(workflow)

    workflow.steps
    |> skip_remaining_steps(workflow)

    result = Workflows.Status.set_workflow_status(workflow.id, :stopped)

    topic = "update_workflow_" <> Integer.to_string(workflow.id)
    StepFlow.Notification.send(topic, %{workflow_id: workflow.id})

    result
  end

  def pause(workflow, action, trigger_date_time) do
    case action do
      action when action in ["resume", "abort"] ->
        description = %{
          action: action,
          trigger_at:
            DateTime.from_unix!(trigger_date_time, :millisecond)
            |> DateTime.to_naive()
        }

        workflow = Repo.preload(workflow, jobs: :child_workflow)

        has_processing_steps =
          workflow.steps
          |> Enum.filter(fn step -> step.jobs.processing > 0 end)
          |> Enum.empty?()
          |> Kernel.not()

        workflow_status =
          if has_processing_steps do
            :pausing
          else
            :paused
          end

        result =
          Workflows.Status.set_workflow_status(workflow.id, workflow_status, nil, description)

        _paused_steps = pause_remaining_steps(workflow.steps, workflow.jobs, %{})

        topic = "update_workflow_" <> Integer.to_string(workflow.id)
        StepFlow.Notification.send(topic, %{workflow_id: workflow.id})

        result

      _ ->
        {:error, "Unknown action: #{action}"}
    end
  end

  def resume(workflow) do
    {:ok, _status} = Workflows.Status.set_workflow_status(workflow.id, :pending)

    case StepFlow.Step.start_next(workflow) do
      {:ok, _} ->
        workflow = Repo.preload(workflow, jobs: :child_workflow)

        _resumed_steps = resume_remaining_steps(workflow.steps, workflow.jobs)

        result = Workflows.Status.set_workflow_status(workflow.id, :processing)

        topic = "update_workflow_" <> Integer.to_string(workflow.id)
        StepFlow.Notification.send(topic, %{workflow_id: workflow.id})

        result

      error ->
        error
    end
  end

  defp abort_running_step_jobs([], _workflow), do: nil

  defp abort_running_step_jobs([step | steps], workflow) do
    case step.status do
      :processing ->
        case StepFlow.Map.get_by_key_or_atom(step, :mode) do
          # When nested workflows
          mode when mode in ["workflow_one_for_one", "workflow_one_for_many"] ->
            abort_nested_workflows(step, workflow)

          _ ->
            [
              StepFlow.Step.abort_step_jobs(workflow, step)
            ]
        end

      _ ->
        nil
    end

    abort_running_step_jobs(steps, workflow)
  end

  defp abort_nested_workflows(step, workflow) do
    step_id = StepFlow.Map.get_by_key_or_atom(step, :id)

    step_jobs =
      workflow.jobs
      |> Enum.filter(fn job -> job.step_id == step_id end)

    Enum.each(step_jobs, fn job ->
      get_workflow!(job.child_workflow.id)
      |> abort()
    end)
  end

  defp skip_remaining_steps([], _workflow), do: nil

  defp skip_remaining_steps([step | steps], workflow) do
    case step.status do
      :queued -> StepFlow.Step.skip_step(workflow, step)
      :paused -> StepFlow.Step.skip_step_jobs(workflow, step)
      :processing -> StepFlow.Step.skip_step_jobs(workflow, step)
      _ -> nil
    end

    skip_remaining_steps(steps, workflow)
  end

  defp pause_remaining_steps(_steps, _workflow_jobs, _post_action, _paused_steps \\ [])

  defp pause_remaining_steps([], _workflow_jobs, _post_action, paused_steps), do: paused_steps

  defp pause_remaining_steps([step | steps], workflow_jobs, post_action, paused_steps) do
    # Get step jobs
    step_id = StepFlow.Map.get_by_key_or_atom(step, :id)

    step_jobs =
      workflow_jobs
      |> Enum.filter(fn job -> job.step_id == step_id end)

    paused_steps =
      case step.status do
        status when status in [:queued, :processing] ->
          Logger.debug("Pause #{inspect(status)} step: #{inspect(step)}")

          case StepFlow.Map.get_by_key_or_atom(step, :mode) do
            mode when mode in ["workflow_one_for_one", "workflow_one_for_many"] ->
              trigger_date_time = DateTime.utc_now() |> DateTime.to_unix()
              pause_nested_workflows(step_jobs, post_action, trigger_date_time)
              workflow = get_workflow!(List.first(step_jobs).workflow_id)
              pause(workflow, "resume", trigger_date_time)

            _ ->
              [
                StepFlow.Step.pause_step(step, step_jobs, post_action) | paused_steps
              ]
          end

        _ ->
          paused_steps
      end

    pause_remaining_steps(steps, workflow_jobs, post_action, paused_steps)
  end

  defp pause_nested_workflows(step_jobs, post_action, trigger_date_time) do
    Enum.each(step_jobs, fn job ->
      {:ok, _status} = Status.set_job_status(job.id, :paused, post_action)

      get_workflow!(job.child_workflow.id)
      |> pause("resume", trigger_date_time)
    end)
  end

  defp resume_remaining_steps([], _workflow_jobs), do: nil

  defp resume_remaining_steps([step | steps], workflow_jobs) do
    case step.status do
      :paused ->
        Logger.debug("Resume paused step: #{inspect(step)}")

        step_id = StepFlow.Map.get_by_key_or_atom(step, :id)

        step_jobs =
          workflow_jobs
          |> Enum.filter(fn job -> job.step_id == step_id end)

        case StepFlow.Map.get_by_key_or_atom(step, :mode) do
          # When nested workflows
          mode when mode in ["workflow_one_for_one", "workflow_one_for_many"] ->
            Enum.each(step_jobs, fn job ->
              get_workflow!(job.child_workflow.id)
              |> resume()
            end)

          _ ->
            StepFlow.Step.resume_step(step_jobs)
        end

      _ ->
        nil
    end

    resume_remaining_steps(steps, workflow_jobs)
  end
end