defmodule StepFlow.Controllers.Statistics.Durations do
@moduledoc """
The duration statistics accessor.
"""
require Logger
import Ecto.Query, warn: false
alias StepFlow.Jobs
alias StepFlow.Jobs.Job
alias StepFlow.Repo
alias StepFlow.Statistics.Helpers
alias StepFlow.Statistics.JobDurations
alias StepFlow.Workflows.Workflow
def list_durations_for_jobs(params \\ %{}) do
%{
data: jobs,
total: total,
page: page,
size: size
} =
case Map.get(params, "job_id") do
nil ->
Jobs.internal_list_jobs(params)
str_job_id ->
job_id = StepFlow.Integer.force(str_job_id)
%{
data: [Jobs.get_job_with_status!(job_id)],
total: 1,
page: 0,
size: 1
}
end
durations =
jobs
|> Enum.map(fn job -> get_job_duration!(job) end)
|> Enum.sort(fn duration_1, duration_2 -> duration_1.job_id <= duration_2.job_id end)
%{
data: durations,
total: total,
page: page,
size: size
}
end
def list_durations_for_workflows(params \\ %{}) do
%{
data: workflows,
total: total,
page: page,
size: size
} =
case Map.get(params, "workflow_id") do
nil ->
StepFlow.Workflows.list_workflows(params)
str_workflow_id ->
workflow_id = StepFlow.Integer.force(str_workflow_id)
%{
data: [StepFlow.Workflows.get_workflow!(workflow_id)],
total: 1,
page: 0,
size: 1
}
end
durations =
workflows
|> Enum.map(fn workflow -> get_workflow_duration!(workflow) end)
|> Enum.sort(fn duration_1, duration_2 ->
duration_1.workflow_id <= duration_2.workflow_id
end)
%{
data: durations,
total: total,
page: page,
size: size
}
end
def get_workflows_duration_statistics(params \\ %{}) do
page =
Map.get(params, "page", 0)
|> StepFlow.Integer.force()
size =
Map.get(params, "size", 10)
|> StepFlow.Integer.force()
sub_sub_query =
from(
workflow in Workflow,
order_by: [asc: :identifier]
)
|> StepFlow.Workflows.apply_default_query_filters(params)
sub_query =
from(
workflow in subquery(sub_sub_query),
left_join: job in Job,
on: job.workflow_id == workflow.id,
left_join: job_durations in JobDurations,
on: job_durations.job_id == job.id,
select: %{
id: workflow.id,
identifier: workflow.identifier,
version: fragment("CONCAT(version_major,'.',version_minor,'.',version_micro)"),
workflow_order_pending_duration: fragment("SUM(order_pending_duration)::real"),
workflow_processing_duration: fragment("SUM(processing_duration)::real"),
workflow_response_pending_duration: fragment("SUM(response_pending_duration)::real"),
workflow_duration: fragment("extract(epoch from (MAX(job_end) - MIN(job_start)))::real")
},
group_by: [
workflow.id,
workflow.identifier,
workflow.version_major,
workflow.version_minor,
workflow.version_micro
]
)
query =
from(
workflow in subquery(sub_query),
select: %{
identifier: workflow.identifier,
version: workflow.version,
count: count(workflow.id),
avg_order_pending_duration: avg(workflow.workflow_order_pending_duration),
min_order_pending_duration: min(workflow.workflow_order_pending_duration),
max_order_pending_duration: max(workflow.workflow_order_pending_duration),
avg_processing_duration: avg(workflow.workflow_processing_duration),
min_processing_duration: min(workflow.workflow_processing_duration),
max_processing_duration: max(workflow.workflow_processing_duration),
avg_response_pending_duration: avg(workflow.workflow_response_pending_duration),
min_response_pending_duration: min(workflow.workflow_response_pending_duration),
max_response_pending_duration: max(workflow.workflow_response_pending_duration),
avg_total_duration: avg(workflow.workflow_duration),
min_total_duration: min(workflow.workflow_duration),
max_total_duration: max(workflow.workflow_duration)
},
group_by: [
workflow.identifier,
workflow.version
]
)
statistics = Repo.all(query)
data =
statistics
|> Enum.slice(page * size, size)
|> Enum.map(fn result ->
%{
name: result.identifier,
version: result.version,
durations: %{
count: result.count,
average: %{
order_pending:
Helpers.limit_duration_to_milliseconds(result.avg_order_pending_duration),
processing: Helpers.limit_duration_to_milliseconds(result.avg_processing_duration),
response_pending:
Helpers.limit_duration_to_milliseconds(result.avg_response_pending_duration),
total: Helpers.limit_duration_to_milliseconds(result.avg_total_duration)
},
max: %{
order_pending:
Helpers.limit_duration_to_milliseconds(result.max_order_pending_duration),
processing: Helpers.limit_duration_to_milliseconds(result.max_processing_duration),
response_pending:
Helpers.limit_duration_to_milliseconds(result.max_response_pending_duration),
total: Helpers.limit_duration_to_milliseconds(result.max_total_duration)
},
min: %{
order_pending:
Helpers.limit_duration_to_milliseconds(result.min_order_pending_duration),
processing: Helpers.limit_duration_to_milliseconds(result.min_processing_duration),
response_pending:
Helpers.limit_duration_to_milliseconds(result.min_response_pending_duration),
total: Helpers.limit_duration_to_milliseconds(result.min_total_duration)
}
}
}
end)
%{
data: data,
total: length(statistics),
page: page,
size: size
}
end
def get_jobs_duration_statistics(params \\ %{}) do
page =
Map.get(params, "page", 0)
|> StepFlow.Integer.force()
size =
Map.get(params, "size", 10)
|> StepFlow.Integer.force()
sub_query =
from(job in Job)
|> Jobs.apply_rights(params)
|> Jobs.apply_default_query_filters(params)
query =
from(
job in subquery(sub_query),
left_join: job_duration in JobDurations,
on: job_duration.job_id == job.id,
select: %{
name: job.name,
count: fragment("COUNT(name)::bigint"),
avg_order_pending_duration: fragment("AVG(order_pending_duration)::real"),
min_order_pending_duration: fragment("MIN(order_pending_duration)::real"),
max_order_pending_duration: fragment("MAX(order_pending_duration)::real"),
avg_processing_duration: fragment("AVG(processing_duration)::real"),
min_processing_duration: fragment("MIN(processing_duration)::real"),
max_processing_duration: fragment("MAX(processing_duration)::real"),
avg_response_pending_duration: fragment("AVG(response_pending_duration)::real"),
min_response_pending_duration: fragment("MIN(response_pending_duration)::real"),
max_response_pending_duration: fragment("MAX(response_pending_duration)::real"),
avg_total_duration: fragment("AVG(total_duration)::real"),
min_total_duration: fragment("MIN(total_duration)::real"),
max_total_duration: fragment("MAX(total_duration)::real")
},
group_by: job.name
)
statistics = Repo.all(query)
data =
statistics
|> Enum.slice(page * size, size)
|> Enum.map(fn result ->
%{
name: result.name,
durations: %{
count: result.count,
average: %{
order_pending:
Helpers.limit_duration_to_milliseconds(result.avg_order_pending_duration),
processing: Helpers.limit_duration_to_milliseconds(result.avg_processing_duration),
response_pending:
Helpers.limit_duration_to_milliseconds(result.avg_response_pending_duration),
total: Helpers.limit_duration_to_milliseconds(result.avg_total_duration)
},
max: %{
order_pending:
Helpers.limit_duration_to_milliseconds(result.max_order_pending_duration),
processing: Helpers.limit_duration_to_milliseconds(result.max_processing_duration),
response_pending:
Helpers.limit_duration_to_milliseconds(result.max_response_pending_duration),
total: Helpers.limit_duration_to_milliseconds(result.max_total_duration)
},
min: %{
order_pending:
Helpers.limit_duration_to_milliseconds(result.min_order_pending_duration),
processing: Helpers.limit_duration_to_milliseconds(result.min_processing_duration),
response_pending:
Helpers.limit_duration_to_milliseconds(result.min_response_pending_duration),
total: Helpers.limit_duration_to_milliseconds(result.min_total_duration)
}
}
}
end)
%{
data: data,
total: length(statistics),
page: page,
size: size
}
end
defp get_jobs_from_workflow(workflow_id, page, result \\ [])
defp get_jobs_from_workflow(_workflow_id, -1, result), do: result
defp get_jobs_from_workflow(workflow_id, page, result) do
%{
data: jobs,
total: total,
page: page,
size: _size
} = Jobs.internal_list_jobs(%{"workflow_id" => workflow_id, "page" => page})
result = result ++ jobs
if length(result) < total do
get_jobs_from_workflow(workflow_id, page + 1, result)
else
get_jobs_from_workflow(workflow_id, -1, result)
end
end
defp get_job_duration!(job) do
query =
from(
job_duration in JobDurations,
where: job_duration.job_id == ^job.id,
distinct: true,
select: %{
order_pending_duration: job_duration.order_pending_duration,
processing_duration: job_duration.processing_duration,
response_pending_duration: job_duration.response_pending_duration,
total_duration: job_duration.total_duration
},
limit: 1
)
case Repo.one(query) do
nil ->
%{
job_id: job.id,
workflow_id: job.workflow_id,
order_pending: 0.0,
processing: 0.0,
response_pending: 0.0,
total: 0.0
}
result ->
%{
job_id: job.id,
workflow_id: job.workflow_id,
order_pending: Helpers.limit_duration_to_milliseconds(result.order_pending_duration),
processing: Helpers.limit_duration_to_milliseconds(result.processing_duration),
response_pending:
Helpers.limit_duration_to_milliseconds(result.response_pending_duration),
total: Helpers.limit_duration_to_milliseconds(result.total_duration)
}
end
end
defp get_workflow_duration!(workflow) when workflow.jobs == [] do
%{
workflow_id: workflow.id,
order_pending: 0,
processing: 0,
response_pending: 0,
total: 0
}
end
defp get_workflow_duration!(workflow) do
jobs = get_jobs_from_workflow(workflow.id, 0)
jobs_durations =
jobs
|> Enum.map(fn job -> get_job_duration!(job) end)
query =
from(job in Job,
left_join: job_duration in JobDurations,
on: job.id == job_duration.job_id,
where: job.workflow_id == ^workflow.id,
select: %{
first_time: fragment("MIN(job_start)"),
last_time: fragment("MAX(job_end)")
}
)
workflow_duration =
case Repo.one(query) do
nil ->
0.0
result ->
case {result.last_time, result.first_time} do
{nil, _} -> 0.0
{_, nil} -> 0.0
{_, _} -> NaiveDateTime.diff(result.last_time, result.first_time)
end
end
order_pending_duration =
jobs_durations
|> Enum.map(fn duration -> duration.order_pending end)
|> Enum.sum()
|> Helpers.limit_duration_to_milliseconds()
process_duration =
jobs_durations
|> Enum.map(fn duration -> duration.processing end)
|> Enum.sum()
|> Helpers.limit_duration_to_milliseconds()
response_pending_duration =
jobs_durations
|> Enum.map(fn duration -> duration.response_pending end)
|> Enum.sum()
|> Helpers.limit_duration_to_milliseconds()
%{
workflow_id: workflow.id,
order_pending: order_pending_duration,
processing: process_duration,
response_pending: response_pending_duration,
total: workflow_duration
}
end
end