lib/step_flow/controllers/statistics/durations.ex

defmodule StepFlow.Controllers.Statistics.Durations do
  @moduledoc """
  The duration statistics accessor.
  """

  require Logger

  import Ecto.Query, warn: false
  alias StepFlow.Jobs
  alias StepFlow.Jobs.Job
  alias StepFlow.Repo
  alias StepFlow.Statistics.Helpers
  alias StepFlow.Statistics.JobDurations
  alias StepFlow.Workflows.Workflow

  def list_durations_for_jobs(params \\ %{}) do
    %{
      data: jobs,
      total: total,
      page: page,
      size: size
    } =
      case Map.get(params, "job_id") do
        nil ->
          Jobs.internal_list_jobs(params)

        str_job_id ->
          job_id = StepFlow.Integer.force(str_job_id)

          %{
            data: [Jobs.get_job_with_status!(job_id)],
            total: 1,
            page: 0,
            size: 1
          }
      end

    durations =
      jobs
      |> Enum.map(fn job -> get_job_duration!(job) end)
      |> Enum.sort(fn duration_1, duration_2 -> duration_1.job_id <= duration_2.job_id end)

    %{
      data: durations,
      total: total,
      page: page,
      size: size
    }
  end

  def list_durations_for_workflows(params \\ %{}) do
    %{
      data: workflows,
      total: total,
      page: page,
      size: size
    } =
      case Map.get(params, "workflow_id") do
        nil ->
          StepFlow.Workflows.list_workflows(params)

        str_workflow_id ->
          workflow_id = StepFlow.Integer.force(str_workflow_id)

          %{
            data: [StepFlow.Workflows.get_workflow!(workflow_id)],
            total: 1,
            page: 0,
            size: 1
          }
      end

    durations =
      workflows
      |> Enum.map(fn workflow -> get_workflow_duration!(workflow) end)
      |> Enum.sort(fn duration_1, duration_2 ->
        duration_1.workflow_id <= duration_2.workflow_id
      end)

    %{
      data: durations,
      total: total,
      page: page,
      size: size
    }
  end

  def get_workflows_duration_statistics(params \\ %{}) do
    page =
      Map.get(params, "page", 0)
      |> StepFlow.Integer.force()

    size =
      Map.get(params, "size", 10)
      |> StepFlow.Integer.force()

    sub_sub_query =
      from(
        workflow in Workflow,
        order_by: [asc: :identifier]
      )
      |> StepFlow.Workflows.apply_default_query_filters(params)

    sub_query =
      from(
        workflow in subquery(sub_sub_query),
        left_join: job in Job,
        on: job.workflow_id == workflow.id,
        left_join: job_durations in JobDurations,
        on: job_durations.job_id == job.id,
        select: %{
          id: workflow.id,
          identifier: workflow.identifier,
          version: fragment("CONCAT(version_major,'.',version_minor,'.',version_micro)"),
          workflow_order_pending_duration: fragment("SUM(order_pending_duration)::real"),
          workflow_processing_duration: fragment("SUM(processing_duration)::real"),
          workflow_response_pending_duration: fragment("SUM(response_pending_duration)::real"),
          workflow_duration: fragment("extract(epoch from (MAX(job_end) - MIN(job_start)))::real")
        },
        group_by: [
          workflow.id,
          workflow.identifier,
          workflow.version_major,
          workflow.version_minor,
          workflow.version_micro
        ]
      )

    query =
      from(
        workflow in subquery(sub_query),
        select: %{
          identifier: workflow.identifier,
          version: workflow.version,
          count: count(workflow.id),
          avg_order_pending_duration: avg(workflow.workflow_order_pending_duration),
          min_order_pending_duration: min(workflow.workflow_order_pending_duration),
          max_order_pending_duration: max(workflow.workflow_order_pending_duration),
          avg_processing_duration: avg(workflow.workflow_processing_duration),
          min_processing_duration: min(workflow.workflow_processing_duration),
          max_processing_duration: max(workflow.workflow_processing_duration),
          avg_response_pending_duration: avg(workflow.workflow_response_pending_duration),
          min_response_pending_duration: min(workflow.workflow_response_pending_duration),
          max_response_pending_duration: max(workflow.workflow_response_pending_duration),
          avg_total_duration: avg(workflow.workflow_duration),
          min_total_duration: min(workflow.workflow_duration),
          max_total_duration: max(workflow.workflow_duration)
        },
        group_by: [
          workflow.identifier,
          workflow.version
        ]
      )

    statistics = Repo.all(query)

    data =
      statistics
      |> Enum.slice(page * size, size)
      |> Enum.map(fn result ->
        %{
          name: result.identifier,
          version: result.version,
          durations: %{
            count: result.count,
            average: %{
              order_pending:
                Helpers.limit_duration_to_milliseconds(result.avg_order_pending_duration),
              processing: Helpers.limit_duration_to_milliseconds(result.avg_processing_duration),
              response_pending:
                Helpers.limit_duration_to_milliseconds(result.avg_response_pending_duration),
              total: Helpers.limit_duration_to_milliseconds(result.avg_total_duration)
            },
            max: %{
              order_pending:
                Helpers.limit_duration_to_milliseconds(result.max_order_pending_duration),
              processing: Helpers.limit_duration_to_milliseconds(result.max_processing_duration),
              response_pending:
                Helpers.limit_duration_to_milliseconds(result.max_response_pending_duration),
              total: Helpers.limit_duration_to_milliseconds(result.max_total_duration)
            },
            min: %{
              order_pending:
                Helpers.limit_duration_to_milliseconds(result.min_order_pending_duration),
              processing: Helpers.limit_duration_to_milliseconds(result.min_processing_duration),
              response_pending:
                Helpers.limit_duration_to_milliseconds(result.min_response_pending_duration),
              total: Helpers.limit_duration_to_milliseconds(result.min_total_duration)
            }
          }
        }
      end)

    %{
      data: data,
      total: length(statistics),
      page: page,
      size: size
    }
  end

  def get_jobs_duration_statistics(params \\ %{}) do
    page =
      Map.get(params, "page", 0)
      |> StepFlow.Integer.force()

    size =
      Map.get(params, "size", 10)
      |> StepFlow.Integer.force()

    sub_query =
      from(job in Job)
      |> Jobs.apply_rights(params)
      |> Jobs.apply_default_query_filters(params)

    query =
      from(
        job in subquery(sub_query),
        left_join: job_duration in JobDurations,
        on: job_duration.job_id == job.id,
        select: %{
          name: job.name,
          count: fragment("COUNT(name)::bigint"),
          avg_order_pending_duration: fragment("AVG(order_pending_duration)::real"),
          min_order_pending_duration: fragment("MIN(order_pending_duration)::real"),
          max_order_pending_duration: fragment("MAX(order_pending_duration)::real"),
          avg_processing_duration: fragment("AVG(processing_duration)::real"),
          min_processing_duration: fragment("MIN(processing_duration)::real"),
          max_processing_duration: fragment("MAX(processing_duration)::real"),
          avg_response_pending_duration: fragment("AVG(response_pending_duration)::real"),
          min_response_pending_duration: fragment("MIN(response_pending_duration)::real"),
          max_response_pending_duration: fragment("MAX(response_pending_duration)::real"),
          avg_total_duration: fragment("AVG(total_duration)::real"),
          min_total_duration: fragment("MIN(total_duration)::real"),
          max_total_duration: fragment("MAX(total_duration)::real")
        },
        group_by: job.name
      )

    statistics = Repo.all(query)

    data =
      statistics
      |> Enum.slice(page * size, size)
      |> Enum.map(fn result ->
        %{
          name: result.name,
          durations: %{
            count: result.count,
            average: %{
              order_pending:
                Helpers.limit_duration_to_milliseconds(result.avg_order_pending_duration),
              processing: Helpers.limit_duration_to_milliseconds(result.avg_processing_duration),
              response_pending:
                Helpers.limit_duration_to_milliseconds(result.avg_response_pending_duration),
              total: Helpers.limit_duration_to_milliseconds(result.avg_total_duration)
            },
            max: %{
              order_pending:
                Helpers.limit_duration_to_milliseconds(result.max_order_pending_duration),
              processing: Helpers.limit_duration_to_milliseconds(result.max_processing_duration),
              response_pending:
                Helpers.limit_duration_to_milliseconds(result.max_response_pending_duration),
              total: Helpers.limit_duration_to_milliseconds(result.max_total_duration)
            },
            min: %{
              order_pending:
                Helpers.limit_duration_to_milliseconds(result.min_order_pending_duration),
              processing: Helpers.limit_duration_to_milliseconds(result.min_processing_duration),
              response_pending:
                Helpers.limit_duration_to_milliseconds(result.min_response_pending_duration),
              total: Helpers.limit_duration_to_milliseconds(result.min_total_duration)
            }
          }
        }
      end)

    %{
      data: data,
      total: length(statistics),
      page: page,
      size: size
    }
  end

  defp get_jobs_from_workflow(workflow_id, page, result \\ [])
  defp get_jobs_from_workflow(_workflow_id, -1, result), do: result

  defp get_jobs_from_workflow(workflow_id, page, result) do
    %{
      data: jobs,
      total: total,
      page: page,
      size: _size
    } = Jobs.internal_list_jobs(%{"workflow_id" => workflow_id, "page" => page})

    result = result ++ jobs

    if length(result) < total do
      get_jobs_from_workflow(workflow_id, page + 1, result)
    else
      get_jobs_from_workflow(workflow_id, -1, result)
    end
  end

  defp get_job_duration!(job) do
    query =
      from(
        job_duration in JobDurations,
        where: job_duration.job_id == ^job.id,
        distinct: true,
        select: %{
          order_pending_duration: job_duration.order_pending_duration,
          processing_duration: job_duration.processing_duration,
          response_pending_duration: job_duration.response_pending_duration,
          total_duration: job_duration.total_duration
        },
        limit: 1
      )

    case Repo.one(query) do
      nil ->
        %{
          job_id: job.id,
          workflow_id: job.workflow_id,
          order_pending: 0.0,
          processing: 0.0,
          response_pending: 0.0,
          total: 0.0
        }

      result ->
        %{
          job_id: job.id,
          workflow_id: job.workflow_id,
          order_pending: Helpers.limit_duration_to_milliseconds(result.order_pending_duration),
          processing: Helpers.limit_duration_to_milliseconds(result.processing_duration),
          response_pending:
            Helpers.limit_duration_to_milliseconds(result.response_pending_duration),
          total: Helpers.limit_duration_to_milliseconds(result.total_duration)
        }
    end
  end

  defp get_workflow_duration!(workflow) when workflow.jobs == [] do
    %{
      workflow_id: workflow.id,
      order_pending: 0,
      processing: 0,
      response_pending: 0,
      total: 0
    }
  end

  defp get_workflow_duration!(workflow) do
    jobs = get_jobs_from_workflow(workflow.id, 0)

    jobs_durations =
      jobs
      |> Enum.map(fn job -> get_job_duration!(job) end)

    query =
      from(job in Job,
        left_join: job_duration in JobDurations,
        on: job.id == job_duration.job_id,
        where: job.workflow_id == ^workflow.id,
        select: %{
          first_time: fragment("MIN(job_start)"),
          last_time: fragment("MAX(job_end)")
        }
      )

    workflow_duration =
      case Repo.one(query) do
        nil ->
          0.0

        result ->
          case {result.last_time, result.first_time} do
            {nil, _} -> 0.0
            {_, nil} -> 0.0
            {_, _} -> NaiveDateTime.diff(result.last_time, result.first_time)
          end
      end

    order_pending_duration =
      jobs_durations
      |> Enum.map(fn duration -> duration.order_pending end)
      |> Enum.sum()
      |> Helpers.limit_duration_to_milliseconds()

    process_duration =
      jobs_durations
      |> Enum.map(fn duration -> duration.processing end)
      |> Enum.sum()
      |> Helpers.limit_duration_to_milliseconds()

    response_pending_duration =
      jobs_durations
      |> Enum.map(fn duration -> duration.response_pending end)
      |> Enum.sum()
      |> Helpers.limit_duration_to_milliseconds()

    %{
      workflow_id: workflow.id,
      order_pending: order_pending_duration,
      processing: process_duration,
      response_pending: response_pending_duration,
      total: workflow_duration
    }
  end
end