lib/step_flow/models/workflows/status.ex

defmodule StepFlow.Workflows.Status do
  use Ecto.Schema
  import Ecto.Changeset
  import Ecto.Query, warn: false
  import EctoEnum

  alias StepFlow.Jobs
  alias StepFlow.Metrics.WorkflowInstrumenter
  alias StepFlow.NotificationHooks.NotificationHookManager
  alias StepFlow.Progressions.Progression
  alias StepFlow.Repo
  alias StepFlow.Roles
  alias StepFlow.Workflows
  alias StepFlow.Workflows.Workflow
  require Logger

  @moduledoc false

  defenum(StateEnum, [
    # Processing, Retrying -->
    # --> Processing, Paused
    "pending",
    # Error -->
    "skipped",
    # Pending, Paused, Stopped, Processing -->
    # --> Error, Completed, Stopped, Paused, Pending, Processing
    "processing",
    # Error -->
    # --> Pending
    "retrying",
    # Paused, Processing -->
    # --> Completed (LIVE), Processing
    "stopped",
    # Processing -->
    # --> Retrying, Skipped
    "error",
    # Processing, Stopped -->
    "completed",
    # Processing, Pending -->
    # --> Paused, Error
    "pausing",
    # Pausing -->
    # --> Processing, Stopped
    "paused"
  ])

  schema "step_flow_workflow_status" do
    field(:state, StepFlow.Workflows.Status.StateEnum)
    field(:description, :map, default: %{})
    belongs_to(:status, Jobs.Status, foreign_key: :status_id, defaults: nil)
    belongs_to(:workflow, Workflow, foreign_key: :workflow_id)

    timestamps()
  end

  @doc false
  def changeset(%Workflows.Status{} = status, attrs) do
    status
    |> cast(attrs, [:workflow_id, :state, :status_id, :description])
    |> foreign_key_constraint(:workflow_id)
    |> validate_required([:state, :workflow_id])
  end

  @doc """
  Define the workflow status given events. It also tracks completed, retrying
  and error job status of a workflow.

  Returns `{:ok, workflow_status}` if the event is correct, nil otherwise

  ## Examples

      iex> define_workflow_status(1, :completed_workflow)
      {:ok, %Workflows.Status{state: :completed, workflow_id: 1, job_id: nil, id: 1}}

      iex> define_workflow_status(1, :incorrect_event)
      nil

  """
  def define_workflow_status(workflow_id, event, payload \\ %{})

  def define_workflow_status(workflow_id, :created_workflow, _payload) do
    with %{identifier: identifier} <- Workflows.get_workflow!(workflow_id) do
      WorkflowInstrumenter.inc(:step_flow_workflows_status_total, identifier, :created_workflow)
    end

    set_workflow_status(workflow_id, :pending)
  end

  def define_workflow_status(workflow_id, :job_progression, %Progression{progression: 0}) do
    last_status = get_last_workflow_status(workflow_id)

    if last_status.state == :pending do
      set_workflow_status(workflow_id, :processing)
    else
      Logger.warn(
        "Can't set workflow #{workflow_id} to :processing because current state is #{last_status.state}."
      )

      {:ok, last_status}
    end
  end

  def define_workflow_status(workflow_id, :job_completed, %Jobs.Status{
        id: status_id,
        job_id: job_id
      }) do
    jobs_status_not_completed =
      get_last_jobs_status(workflow_id)
      |> Enum.filter(fn s -> s.state in [:error, :retrying] and s.job_id != job_id end)
      |> length()

    last_status = get_last_workflow_status(workflow_id)

    if jobs_status_not_completed == 0 do
      case last_status do
        nil ->
          set_workflow_status(workflow_id, :pending, status_id)

        last_status when last_status.state in [:paused, :pausing] ->
          set_workflow_status(workflow_id, :paused, status_id, last_status.description)

        _ ->
          set_workflow_status(workflow_id, :pending, status_id)
      end
    else
      set_workflow_status(workflow_id, last_status.state, status_id)
    end
  end

  def define_workflow_status(workflow_id, :job_retrying, %Jobs.Status{
        id: status_id,
        job_id: job_id
      }) do
    jobs_status_in_error =
      get_last_jobs_status(workflow_id)
      |> Enum.filter(fn s -> s.state == :error and s.job_id != job_id end)
      |> length()

    if jobs_status_in_error == 0 do
      set_workflow_status(workflow_id, :processing, status_id)
    else
      NotificationHookManager.manage_notification_status(
        workflow_id,
        nil,
        "error"
      )

      set_workflow_status(workflow_id, :error, status_id)
    end
  end

  def define_workflow_status(workflow_id, :completed_workflow, _payload) do
    last_status = get_last_workflow_status(workflow_id)
    Logger.info("Complete workflow #{workflow_id}.")

    if last_status == nil || last_status.state != :completed do
      NotificationHookManager.manage_notification_status(
        workflow_id,
        nil,
        "completed"
      )
    end

    set_workflow_status(workflow_id, :completed)
  end

  def define_workflow_status(workflow_id, :job_stopped, %Jobs.Status{id: status_id}) do
    set_workflow_status(workflow_id, :stopped, status_id)
  end

  def define_workflow_status(workflow_id, event, %Jobs.Status{id: status_id})
      when event in [:job_error, :queue_not_found] do
    last_status = get_last_workflow_status(workflow_id)

    Logger.info("Erroneous workflow #{workflow_id}.")

    if last_status == nil || last_status.state != :error do
      NotificationHookManager.manage_notification_status(
        workflow_id,
        nil,
        "error"
      )
    end

    set_workflow_status(workflow_id, :error, status_id)
  end

  def define_workflow_status(_workflow_id, _event, _payload), do: nil

  def set_workflow_status(workflow_id, status, status_id \\ nil, description \\ %{}) do
    with %{identifier: identifier} <- Workflows.get_workflow!(workflow_id) do
      WorkflowInstrumenter.inc(:step_flow_workflows_status_total, identifier, status)
    end

    %Workflows.Status{}
    |> Workflows.Status.changeset(%{
      workflow_id: workflow_id,
      state: status,
      status_id: status_id,
      description: description
    })
    |> Repo.insert()
  end

  @doc """
  Returns the last updated status of a workflow per job_id.
  """
  def get_last_jobs_status(workflow_id) when is_number(workflow_id) do
    query =
      from(
        job_status in Jobs.Status,
        inner_join:
          workflow_status in subquery(
            from(
              workflow_status in Workflows.Status,
              where: workflow_status.workflow_id == ^workflow_id
            )
          ),
        on: workflow_status.status_id == job_status.id,
        order_by: [
          desc: field(workflow_status, :inserted_at),
          desc: field(job_status, :id),
          asc: field(job_status, :job_id)
        ],
        distinct: [asc: field(job_status, :job_id)]
      )

    Repo.all(query)
  end

  @doc """
  Returns the last updated status of a workflow.
  """
  def get_last_workflow_status(workflow_id) when is_number(workflow_id) do
    query =
      from(
        workflow_status in Workflows.Status,
        where: workflow_status.workflow_id == ^workflow_id,
        order_by: [desc: :updated_at, desc: :id],
        limit: 1
      )

    Repo.one(query)
  end

  def get_last_workflow_status(%Workflow{} = workflow) do
    workflow =
      workflow
      |> Repo.preload([:status])

    workflow.status
    |> Enum.sort_by(fn status -> status.inserted_at end, :asc)
    |> List.last()
  end

  def get_last_workflow_status(_workflow_id), do: nil

  def find_workflow_status(workflow, state) when is_atom(state) do
    workflow =
      workflow
      |> Repo.preload([:status])

    workflow.status
    |> Enum.find(fn status -> status.state == state end)
  end

  def find_workflow_status(_workflow, _state), do: nil

  @doc """
  List current status of workflows
  """
  def list_workflows_status(start_date, end_date, identifiers, roles) do
    allowed_workflows = check_rights(roles)

    query =
      case {Enum.empty?(identifiers), Enum.member?(allowed_workflows, "*")} do
        {false, true} ->
          from(workflow in Workflow)

        {true, true} ->
          from(
            workflow in Workflow,
            where: workflow.identifier in ^identifiers
          )

        {false, false} ->
          from(
            workflow in Workflow,
            where: workflow.identifier in ^allowed_workflows
          )

        {true, false} ->
          intersect =
            identifiers
            |> Enum.filter(fn element -> element in allowed_workflows end)

          from(
            workflow in Workflow,
            where: workflow.identifier in ^intersect
          )
      end

    query =
      from(
        workflow_status in subquery(
          from(
            workflow_status in Workflows.Status,
            order_by: [desc: workflow_status.id, desc: workflow_status.workflow_id],
            distinct: [desc: workflow_status.workflow_id],
            where:
              fragment("?::timestamp", workflow_status.inserted_at) >= ^start_date and
                fragment("?::timestamp", workflow_status.inserted_at) <= ^end_date
          )
        ),
        inner_join: workflow in subquery(query),
        on: workflow.id == workflow_status.workflow_id,
        where: workflow_status.state in [:completed, :error, :processing, :pending]
      )

    Repo.all(query)
  end

  @doc """
  Handler called when a live job is started. Checks if all jobs in the workflow are processing. If so, set workflow status to processing.
  """
  def define_workflow_live_status(workflow_id) do
    workflow = Workflows.get_workflow!(workflow_id)

    live_jobs = Repo.preload(workflow, [:jobs])

    live_jobs_status =
      live_jobs.jobs
      |> Enum.map(fn job -> Repo.preload(job, [:status]) end)
      |> Enum.map(fn job -> StepFlow.Controllers.Jobs.get_last_status(job.status).state end)

    if Enum.all?(live_jobs_status, &(&1 == :processing)) do
      Logger.info(
        "All live jobs are processing, setting live workflow #{workflow_id} status to processing."
      )

      set_workflow_status(workflow_id, :processing)
    end
  end

  defp check_rights(role_names) do
    roles = Roles.get_roles(role_names)

    view_rights =
      case roles do
        nil ->
          []

        _ ->
          StepFlow.Controllers.Roles.get_rights_for_entity_type_and_action(
            roles,
            "workflow",
            "view"
          )
      end

    for %{entity: entity} <- view_rights,
        do:
          String.split(entity, "::")
          |> List.last()
  end
end