defmodule StepFlow.Statistics.Durations do
@moduledoc """
The duration statistics accessor.
"""
require Logger
import Ecto.Query, warn: false
alias StepFlow.Jobs
alias StepFlow.Jobs.Job
alias StepFlow.Jobs.Status
alias StepFlow.Progressions.Progression
alias StepFlow.Repo
alias StepFlow.Workflows.Workflow
def list_durations_for_jobs(params \\ %{}) do
%{
data: jobs,
total: total,
page: page,
size: size
} =
case Map.get(params, "job_id") do
nil ->
Jobs.list_jobs(params)
str_job_id ->
job_id = StepFlow.Integer.force(str_job_id)
%{
data: [Jobs.get_job_with_status!(job_id)],
total: 1,
page: 0,
size: 1
}
end
durations =
jobs
|> Enum.map(fn job -> get_job_duration!(job) end)
|> Enum.sort(fn duration_1, duration_2 -> duration_1.job_id <= duration_2.job_id end)
%{
data: durations,
total: total,
page: page,
size: size
}
end
def list_durations_for_workflows(params \\ %{}) do
%{
data: workflows,
total: total,
page: page,
size: size
} =
case Map.get(params, "workflow_id") do
nil ->
StepFlow.Workflows.list_workflows(params)
str_workflow_id ->
workflow_id = StepFlow.Integer.force(str_workflow_id)
%{
data: [StepFlow.Workflows.get_workflow!(workflow_id)],
total: 1,
page: 0,
size: 1
}
end
durations =
workflows
|> Enum.map(fn workflow -> get_workflow_duration!(workflow) end)
|> Enum.sort(fn duration_1, duration_2 ->
duration_1.workflow_id <= duration_2.workflow_id
end)
%{
data: durations,
total: total,
page: page,
size: size
}
end
def get_workflows_duration_statistics(params \\ %{}) do
query =
from(workflow in Workflow)
|> StepFlow.Workflows.apply_default_query_filters(params)
Repo.all(query)
|> Repo.preload([:jobs])
|> StepFlow.Workflows.preload_workflows()
|> Enum.map(fn workflow -> get_workflow_duration!(workflow) end)
|> calculate_duration_statistics()
end
def get_jobs_duration_statistics(params \\ %{}) do
query =
from(job in Job)
|> Jobs.apply_default_query_filters(params)
Repo.all(query)
|> Repo.preload([:progressions, :status])
|> Enum.reduce(%{}, fn job, acc ->
if Map.has_key?(acc, job.name) do
Map.put(acc, job.name, [get_job_duration!(job) | Map.get(acc, job.name)])
else
Map.put(acc, job.name, [get_job_duration!(job)])
end
end)
|> Enum.map(fn {name, durations} ->
%{name: name, durations: calculate_duration_statistics(durations)}
end)
end
defp get_job_duration!(job) do
job_creation = get_job_creation(job)
job_start = get_job_process_start(job, job_creation)
order_pending_duration =
NaiveDateTime.diff(job_start, job_creation)
|> limit_duration_to_milliseconds()
process_duration =
get_job_process_duration(job, job_start)
|> limit_duration_to_milliseconds()
job_end = get_job_process_end(job, job_start)
response_pending_duration =
get_response_pending_duration(job, job_end)
|> limit_duration_to_milliseconds()
total =
(order_pending_duration + process_duration + response_pending_duration)
|> limit_duration_to_milliseconds()
%{
job_id: job.id,
workflow_id: job.workflow_id,
order_pending: order_pending_duration,
processing: process_duration,
response_pending: response_pending_duration,
total: total
}
end
defp get_jobs_from_workflow(workflow_id, page, result \\ [])
defp get_jobs_from_workflow(_workflow_id, -1, result), do: result
defp get_jobs_from_workflow(workflow_id, page, result) do
%{
data: jobs,
total: total,
page: page,
size: _size
} = Jobs.list_jobs(%{"workflow_id" => workflow_id, "page" => page})
result = result ++ jobs
if length(result) < total do
get_jobs_from_workflow(workflow_id, page + 1, result)
else
get_jobs_from_workflow(workflow_id, -1, result)
end
end
defp get_workflow_duration!(workflow) do
jobs = get_jobs_from_workflow(workflow.id, 0)
jobs_durations =
jobs
|> Enum.map(fn job -> get_job_duration!(job) end)
order_pending_duration =
jobs_durations
|> Enum.map(fn duration -> duration.order_pending end)
|> Enum.sum()
|> limit_duration_to_milliseconds()
process_duration =
jobs_durations
|> Enum.map(fn duration -> duration.processing end)
|> Enum.sum()
|> limit_duration_to_milliseconds()
response_pending_duration =
jobs_durations
|> Enum.map(fn duration -> duration.response_pending end)
|> Enum.sum()
|> limit_duration_to_milliseconds()
total =
(order_pending_duration + process_duration + response_pending_duration)
|> limit_duration_to_milliseconds()
%{
workflow_id: workflow.id,
order_pending: order_pending_duration,
processing: process_duration,
response_pending: response_pending_duration,
total: total
}
end
defp limit_duration_to_milliseconds(duration) when is_integer(duration) and duration < 0, do: 0
defp limit_duration_to_milliseconds(duration) when is_integer(duration), do: duration
defp limit_duration_to_milliseconds(duration) when is_float(duration) and duration < 0, do: 0.0
defp limit_duration_to_milliseconds(duration) when is_float(duration) do
Float.round(duration, 3)
end
defp get_job_creation(job) do
job.inserted_at
end
defp get_job_process_start(job, job_creation) do
case Enum.find(job.progressions, fn job_progression -> job_progression.progression == 0 end) do
nil ->
case Status.get_last_status(job.status) do
nil -> job_creation
last_status -> get_last_progression_time(job) || last_status.inserted_at
end
first_progression ->
first_progression.datetime
end
end
defp get_job_process_duration(job, job_start) do
case Status.get_last_status(job.status) do
nil ->
0
last_status ->
case Map.get(last_status.description, "execution_duration") do
nil ->
job_end = get_last_progression_time(job) || last_status.inserted_at
NaiveDateTime.diff(job_end, job_start)
execution_duration ->
execution_duration
end
end
end
defp get_job_process_end(job, job_start) do
case Status.get_last_status(job.status) do
nil ->
job_start
last_status ->
case Map.get(last_status.description, "execution_duration") do
nil ->
get_last_progression_time(job) || last_status.inserted_at
execution_duration ->
NaiveDateTime.add(job_start, round(execution_duration))
end
end
end
defp get_last_progression_time(job) do
case Progression.get_last_progression(job.progressions) do
nil ->
nil
last_progression ->
last_progression.datetime
end
end
defp get_response_pending_duration(job, job_end) do
case {Status.get_last_status(job.status), Progression.get_last_progression(job.progressions)} do
{nil, nil} ->
0
{nil, last_progression} ->
NaiveDateTime.diff(last_progression.datetime, job_end)
{last_status, nil} ->
NaiveDateTime.diff(last_status.inserted_at, job_end)
{last_status, last_progression} ->
case NaiveDateTime.compare(last_status.inserted_at, last_progression.datetime) do
:lt -> NaiveDateTime.diff(last_progression.datetime, job_end)
:eq -> NaiveDateTime.diff(last_status.inserted_at, job_end)
:gt -> NaiveDateTime.diff(last_status.inserted_at, job_end)
end
end
end
def calculate_duration_statistics([]) do
%{
count: 0,
average: nil,
max: nil,
min: nil
}
end
def calculate_duration_statistics(durations) do
nb_durations =
durations
|> Enum.count()
statistics = %{
average: %{
order_pending: 0,
processing: 0,
response_pending: 0,
total: 0
},
max: nil,
min: nil
}
statistics =
durations
|> Enum.reduce(statistics, fn duration, statistics ->
%{
min: calculate_minimum_duration(statistics, duration),
max: calculate_maximum_duration(statistics, duration),
average: calculate_average_duration(statistics, duration)
}
end)
statistics
|> Map.put(:count, nb_durations)
|> Map.replace(:average, %{
order_pending:
limit_duration_to_milliseconds(statistics.average.order_pending / nb_durations),
processing: limit_duration_to_milliseconds(statistics.average.processing / nb_durations),
response_pending:
limit_duration_to_milliseconds(statistics.average.response_pending / nb_durations),
total: limit_duration_to_milliseconds(statistics.average.total / nb_durations)
})
end
defp calculate_minimum_duration(statistics, duration) do
case statistics.min do
nil ->
duration
min ->
%{
order_pending: min(min.order_pending, duration.order_pending),
processing: min(min.processing, duration.processing),
response_pending: min(min.response_pending, duration.response_pending),
total: min(min.total, duration.total)
}
end
end
defp calculate_maximum_duration(statistics, duration) do
case statistics.max do
nil ->
duration
max ->
%{
order_pending: max(max.order_pending, duration.order_pending),
processing: max(max.processing, duration.processing),
response_pending: max(max.response_pending, duration.response_pending),
total: max(max.total, duration.total)
}
end
end
defp calculate_average_duration(statistics, duration) do
case statistics.average do
nil ->
%{
order_pending: duration.order_pending,
processing: duration.processing,
response_pending: duration.response_pending,
total: duration.total
}
average ->
%{
order_pending: average.order_pending + duration.order_pending,
processing: average.processing + duration.processing,
response_pending: average.response_pending + duration.response_pending,
total: average.total + duration.total
}
end
end
end