lib/step_flow/statistics/durations.ex

defmodule StepFlow.Statistics.Durations do
  @moduledoc """
  The duration statistics accessor.
  """

  require Logger

  import Ecto.Query, warn: false
  alias StepFlow.Jobs
  alias StepFlow.Jobs.Job
  alias StepFlow.Jobs.Status
  alias StepFlow.Progressions.Progression
  alias StepFlow.Repo
  alias StepFlow.Workflows.Workflow

  def list_durations_for_jobs(params \\ %{}) do
    %{
      data: jobs,
      total: total,
      page: page,
      size: size
    } =
      case Map.get(params, "job_id") do
        nil ->
          Jobs.list_jobs(params)

        str_job_id ->
          job_id = StepFlow.Integer.force(str_job_id)

          %{
            data: [Jobs.get_job_with_status!(job_id)],
            total: 1,
            page: 0,
            size: 1
          }
      end

    durations =
      jobs
      |> Enum.map(fn job -> get_job_duration!(job) end)
      |> Enum.sort(fn duration_1, duration_2 -> duration_1.job_id <= duration_2.job_id end)

    %{
      data: durations,
      total: total,
      page: page,
      size: size
    }
  end

  def list_durations_for_workflows(params \\ %{}) do
    %{
      data: workflows,
      total: total,
      page: page,
      size: size
    } =
      case Map.get(params, "workflow_id") do
        nil ->
          StepFlow.Workflows.list_workflows(params)

        str_workflow_id ->
          workflow_id = StepFlow.Integer.force(str_workflow_id)

          %{
            data: [StepFlow.Workflows.get_workflow!(workflow_id)],
            total: 1,
            page: 0,
            size: 1
          }
      end

    durations =
      workflows
      |> Enum.map(fn workflow -> get_workflow_duration!(workflow) end)
      |> Enum.sort(fn duration_1, duration_2 ->
        duration_1.workflow_id <= duration_2.workflow_id
      end)

    %{
      data: durations,
      total: total,
      page: page,
      size: size
    }
  end

  def get_workflows_duration_statistics(params \\ %{}) do
    query =
      from(workflow in Workflow)
      |> StepFlow.Workflows.apply_default_query_filters(params)

    Repo.all(query)
    |> Repo.preload([:jobs])
    |> StepFlow.Workflows.preload_workflows()
    |> Enum.map(fn workflow -> get_workflow_duration!(workflow) end)
    |> calculate_duration_statistics()
  end

  def get_jobs_duration_statistics(params \\ %{}) do
    query =
      from(job in Job)
      |> Jobs.apply_default_query_filters(params)

    Repo.all(query)
    |> Repo.preload([:progressions, :status])
    |> Enum.reduce(%{}, fn job, acc ->
      if Map.has_key?(acc, job.name) do
        Map.put(acc, job.name, [get_job_duration!(job) | Map.get(acc, job.name)])
      else
        Map.put(acc, job.name, [get_job_duration!(job)])
      end
    end)
    |> Enum.map(fn {name, durations} ->
      %{name: name, durations: calculate_duration_statistics(durations)}
    end)
  end

  defp get_job_duration!(job) do
    job_creation = get_job_creation(job)

    job_start = get_job_process_start(job, job_creation)

    order_pending_duration =
      NaiveDateTime.diff(job_start, job_creation)
      |> limit_duration_to_milliseconds()

    process_duration =
      get_job_process_duration(job, job_start)
      |> limit_duration_to_milliseconds()

    job_end = get_job_process_end(job, job_start)

    response_pending_duration =
      get_response_pending_duration(job, job_end)
      |> limit_duration_to_milliseconds()

    total =
      (order_pending_duration + process_duration + response_pending_duration)
      |> limit_duration_to_milliseconds()

    %{
      job_id: job.id,
      workflow_id: job.workflow_id,
      order_pending: order_pending_duration,
      processing: process_duration,
      response_pending: response_pending_duration,
      total: total
    }
  end

  defp get_jobs_from_workflow(workflow_id, page, result \\ [])
  defp get_jobs_from_workflow(_workflow_id, -1, result), do: result

  defp get_jobs_from_workflow(workflow_id, page, result) do
    %{
      data: jobs,
      total: total,
      page: page,
      size: _size
    } = Jobs.list_jobs(%{"workflow_id" => workflow_id, "page" => page})

    result = result ++ jobs

    if length(result) < total do
      get_jobs_from_workflow(workflow_id, page + 1, result)
    else
      get_jobs_from_workflow(workflow_id, -1, result)
    end
  end

  defp get_workflow_duration!(workflow) do
    jobs = get_jobs_from_workflow(workflow.id, 0)

    jobs_durations =
      jobs
      |> Enum.map(fn job -> get_job_duration!(job) end)

    order_pending_duration =
      jobs_durations
      |> Enum.map(fn duration -> duration.order_pending end)
      |> Enum.sum()
      |> limit_duration_to_milliseconds()

    process_duration =
      jobs_durations
      |> Enum.map(fn duration -> duration.processing end)
      |> Enum.sum()
      |> limit_duration_to_milliseconds()

    response_pending_duration =
      jobs_durations
      |> Enum.map(fn duration -> duration.response_pending end)
      |> Enum.sum()
      |> limit_duration_to_milliseconds()

    total =
      (order_pending_duration + process_duration + response_pending_duration)
      |> limit_duration_to_milliseconds()

    %{
      workflow_id: workflow.id,
      order_pending: order_pending_duration,
      processing: process_duration,
      response_pending: response_pending_duration,
      total: total
    }
  end

  defp limit_duration_to_milliseconds(duration) when is_integer(duration) and duration < 0, do: 0

  defp limit_duration_to_milliseconds(duration) when is_integer(duration), do: duration

  defp limit_duration_to_milliseconds(duration) when is_float(duration) and duration < 0, do: 0.0

  defp limit_duration_to_milliseconds(duration) when is_float(duration) do
    Float.round(duration, 3)
  end

  defp get_job_creation(job) do
    job.inserted_at
  end

  defp get_job_process_start(job, job_creation) do
    case Enum.find(job.progressions, fn job_progression -> job_progression.progression == 0 end) do
      nil ->
        case Status.get_last_status(job.status) do
          nil -> job_creation
          last_status -> get_last_progression_time(job) || last_status.inserted_at
        end

      first_progression ->
        first_progression.datetime
    end
  end

  defp get_job_process_duration(job, job_start) do
    case Status.get_last_status(job.status) do
      nil ->
        0

      last_status ->
        case Map.get(last_status.description, "execution_duration") do
          nil ->
            job_end = get_last_progression_time(job) || last_status.inserted_at
            NaiveDateTime.diff(job_end, job_start)

          execution_duration ->
            execution_duration
        end
    end
  end

  defp get_job_process_end(job, job_start) do
    case Status.get_last_status(job.status) do
      nil ->
        job_start

      last_status ->
        case Map.get(last_status.description, "execution_duration") do
          nil ->
            get_last_progression_time(job) || last_status.inserted_at

          execution_duration ->
            NaiveDateTime.add(job_start, round(execution_duration))
        end
    end
  end

  defp get_last_progression_time(job) do
    case Progression.get_last_progression(job.progressions) do
      nil ->
        nil

      last_progression ->
        last_progression.datetime
    end
  end

  defp get_response_pending_duration(job, job_end) do
    case {Status.get_last_status(job.status), Progression.get_last_progression(job.progressions)} do
      {nil, nil} ->
        0

      {nil, last_progression} ->
        NaiveDateTime.diff(last_progression.datetime, job_end)

      {last_status, nil} ->
        NaiveDateTime.diff(last_status.inserted_at, job_end)

      {last_status, last_progression} ->
        case NaiveDateTime.compare(last_status.inserted_at, last_progression.datetime) do
          :lt -> NaiveDateTime.diff(last_progression.datetime, job_end)
          :eq -> NaiveDateTime.diff(last_status.inserted_at, job_end)
          :gt -> NaiveDateTime.diff(last_status.inserted_at, job_end)
        end
    end
  end

  def calculate_duration_statistics([]) do
    %{
      count: 0,
      average: nil,
      max: nil,
      min: nil
    }
  end

  def calculate_duration_statistics(durations) do
    nb_durations =
      durations
      |> Enum.count()

    statistics = %{
      average: %{
        order_pending: 0,
        processing: 0,
        response_pending: 0,
        total: 0
      },
      max: nil,
      min: nil
    }

    statistics =
      durations
      |> Enum.reduce(statistics, fn duration, statistics ->
        %{
          min: calculate_minimum_duration(statistics, duration),
          max: calculate_maximum_duration(statistics, duration),
          average: calculate_average_duration(statistics, duration)
        }
      end)

    statistics
    |> Map.put(:count, nb_durations)
    |> Map.replace(:average, %{
      order_pending:
        limit_duration_to_milliseconds(statistics.average.order_pending / nb_durations),
      processing: limit_duration_to_milliseconds(statistics.average.processing / nb_durations),
      response_pending:
        limit_duration_to_milliseconds(statistics.average.response_pending / nb_durations),
      total: limit_duration_to_milliseconds(statistics.average.total / nb_durations)
    })
  end

  defp calculate_minimum_duration(statistics, duration) do
    case statistics.min do
      nil ->
        duration

      min ->
        %{
          order_pending: min(min.order_pending, duration.order_pending),
          processing: min(min.processing, duration.processing),
          response_pending: min(min.response_pending, duration.response_pending),
          total: min(min.total, duration.total)
        }
    end
  end

  defp calculate_maximum_duration(statistics, duration) do
    case statistics.max do
      nil ->
        duration

      max ->
        %{
          order_pending: max(max.order_pending, duration.order_pending),
          processing: max(max.processing, duration.processing),
          response_pending: max(max.response_pending, duration.response_pending),
          total: max(max.total, duration.total)
        }
    end
  end

  defp calculate_average_duration(statistics, duration) do
    case statistics.average do
      nil ->
        %{
          order_pending: duration.order_pending,
          processing: duration.processing,
          response_pending: duration.response_pending,
          total: duration.total
        }

      average ->
        %{
          order_pending: average.order_pending + duration.order_pending,
          processing: average.processing + duration.processing,
          response_pending: average.response_pending + duration.response_pending,
          total: average.total + duration.total
        }
    end
  end
end