lib/step_flow/workflows/status.ex

defmodule StepFlow.Workflows.Status do
  use Ecto.Schema
  import Ecto.Changeset
  import Ecto.Query, warn: false
  import EctoEnum

  alias StepFlow.Jobs
  alias StepFlow.Progressions.Progression
  alias StepFlow.Repo
  alias StepFlow.Roles
  alias StepFlow.Workflows
  alias StepFlow.Workflows.Workflow
  require Logger

  @moduledoc false

  defenum(StateEnum, [
    "pending",
    "skipped",
    "processing",
    "retrying",
    "stopped",
    "error",
    "completed"
  ])

  schema "step_flow_workflow_status" do
    field(:state, StepFlow.Workflows.Status.StateEnum)
    belongs_to(:status, Jobs.Status, foreign_key: :status_id, defaults: nil)
    belongs_to(:workflow, Workflow, foreign_key: :workflow_id)

    timestamps()
  end

  @doc false
  def changeset(%Workflows.Status{} = status, attrs) do
    status
    |> cast(attrs, [:workflow_id, :state, :status_id])
    |> foreign_key_constraint(:workflow_id)
    |> validate_required([:state, :workflow_id])
  end

  @doc """
  Define the workflow status given events. It also tracks completed, retrying
  and error job status of a workflow.

  Returns `{:ok, workflow_status}` if the event is correct, nil otherwise

  ## Examples

      iex> define_workflow_status(1, :completed_workflow)
      {:ok, %Workflows.Status{state: :completed, workflow_id: 1, job_id: nil, id: 1}}

      iex> define_workflow_status(1, :incorrect_event)
      nil

  """
  def define_workflow_status(workflow_id, event, payload \\ %{})

  def define_workflow_status(workflow_id, :created_workflow, _payload) do
    set_workflow_status(workflow_id, :pending)
  end

  def define_workflow_status(workflow_id, :job_progression, %Progression{progression: 0}) do
    last_status = get_last_workflow_status(workflow_id)

    if last_status.state == :pending do
      set_workflow_status(workflow_id, :processing)
    else
      Logger.warn(
        "Can't set workflow #{workflow_id} to :processing because current state is #{
          last_status.state
        }."
      )

      {:ok, last_status}
    end
  end

  def define_workflow_status(workflow_id, :job_completed, %Jobs.Status{
        id: status_id,
        job_id: job_id
      }) do
    jobs_status_not_completed =
      get_last_jobs_status(workflow_id)
      |> Enum.filter(fn s -> s.state in [:error, :retrying] and s.job_id != job_id end)
      |> length()

    if jobs_status_not_completed == 0 do
      set_workflow_status(workflow_id, :pending, status_id)
    else
      last_status = get_last_workflow_status(workflow_id)
      set_workflow_status(workflow_id, last_status.state, status_id)
    end
  end

  def define_workflow_status(workflow_id, :job_retrying, %Jobs.Status{
        id: status_id,
        job_id: job_id
      }) do
    jobs_status_in_error =
      get_last_jobs_status(workflow_id)
      |> Enum.filter(fn s -> s.state == :error and s.job_id != job_id end)
      |> length()

    if jobs_status_in_error == 0 do
      set_workflow_status(workflow_id, :processing, status_id)
    else
      set_workflow_status(workflow_id, :error, status_id)
    end
  end

  def define_workflow_status(workflow_id, :completed_workflow, _payload) do
    last_status = get_last_workflow_status(workflow_id)

    if last_status != nil do
      Logger.info("Complete wokflow #{workflow_id} from state #{last_status.state}.")
    end

    set_workflow_status(workflow_id, :completed)
  end

  def define_workflow_status(workflow_id, :job_stopped, %Jobs.Status{id: status_id}) do
    set_workflow_status(workflow_id, :stopped, status_id)
  end

  def define_workflow_status(workflow_id, event, %Jobs.Status{id: status_id})
      when event in [:job_error, :queue_not_found] do
    set_workflow_status(workflow_id, :error, status_id)
  end

  def define_workflow_status(_workflow_id, _event, _payload), do: nil

  def set_workflow_status(workflow_id, status, status_id \\ nil) do
    %Workflows.Status{}
    |> Workflows.Status.changeset(%{
      workflow_id: workflow_id,
      state: status,
      status_id: status_id
    })
    |> Repo.insert()
  end

  @doc """
  Returns the last updated status of a workflow per job_id.
  """
  def get_last_jobs_status(workflow_id) when is_number(workflow_id) do
    query =
      from(
        job_status in Jobs.Status,
        inner_join:
          workflow_status in subquery(
            from(
              workflow_status in Workflows.Status,
              where: workflow_status.workflow_id == ^workflow_id
            )
          ),
        on: workflow_status.status_id == job_status.id,
        order_by: [
          desc: field(workflow_status, :inserted_at),
          desc: field(job_status, :id),
          asc: field(job_status, :job_id)
        ],
        distinct: [asc: field(job_status, :job_id)]
      )

    Repo.all(query)
  end

  @doc """
  Returns the last updated status of a workflow.
  """
  def get_last_workflow_status(workflow_id) when is_number(workflow_id) do
    query =
      from(
        workflow_status in Workflows.Status,
        where: workflow_status.workflow_id == ^workflow_id,
        order_by: [desc: :updated_at, desc: :id],
        limit: 1
      )

    Repo.one(query)
  end

  def get_last_workflow_status(_workflow_id), do: nil

  @doc """
  """
  def list_workflows_status(start_date, end_date, identifiers, roles) do
    allowed_workflows = check_rights(roles)

    query =
      case {Enum.empty?(identifiers), allowed_workflows} do
        {false, ["*"]} ->
          from(workflow in Workflow)

        {false, _} ->
          from(
            workflow in Workflow,
            where: workflow.identifier in ^allowed_workflows
          )

        {true, _} ->
          intersect =
            identifiers
            |> Enum.filter(fn element -> element in allowed_workflows end)

          from(
            workflow in Workflow,
            where: workflow.identifier in ^intersect
          )
      end

    query =
      from(
        workflows_status in Workflows.Status,
        inner_join: workflow in subquery(query),
        on: workflows_status.workflow_id == workflow.id,
        where:
          fragment("?::timestamp", workflows_status.inserted_at) >= ^start_date and
            fragment("?::timestamp", workflows_status.inserted_at) <= ^end_date and
            workflows_status.state in [:completed, :error, :processing, :pending]
      )

    Repo.all(query)
  end

  defp check_rights(role_names) do
    roles = Roles.get_roles(role_names)

    view_rights =
      case roles do
        nil ->
          []

        _ ->
          Roles.get_rights_for_entity_type_and_action(roles, "workflow", "view")
      end

    for %{entity: entity} <- view_rights,
        do:
          String.split(entity, "::")
          |> List.last()
  end
end