defmodule StepFlow.Statistics.JobsDurations do
@moduledoc """
The JobsStatistics context.
"""
import Ecto.Query, warn: false
alias StepFlow.Controllers.Jobs, as: ControllersJobs
alias StepFlow.Controllers.Progressions
alias StepFlow.Jobs
alias StepFlow.Jobs.Status
alias StepFlow.Repo
alias StepFlow.Statistics.Helpers
alias StepFlow.Statistics.JobDurations
require Logger
def set_job_durations(job_id) do
job = Jobs.get_job_with_status!(job_id)
durations = compute_job_duration!(job)
case get_by(%{"job_id" => job.id}) do
nil -> create_job_durations(durations)
job_durations -> update_job_durations(job_durations, durations)
end
end
defp compute_job_duration!(job) do
job_creation = get_job_creation(job)
job_start = get_job_process_start(job, job_creation)
order_pending_duration =
max(0.0, NaiveDateTime.diff(job_start, job_creation))
|> Helpers.limit_duration_to_milliseconds()
process_duration =
max(0.0, get_job_process_duration(job, job_start))
|> Helpers.limit_duration_to_milliseconds()
job_end = get_job_process_end(job, job_start)
response_pending_duration =
get_response_pending_duration(job, job_end)
|> Helpers.limit_duration_to_milliseconds()
total =
(order_pending_duration + process_duration + response_pending_duration)
|> Helpers.limit_duration_to_milliseconds()
%{
job_id: job.id,
job_start: job_start,
job_end: job_end,
order_pending_duration: order_pending_duration,
processing_duration: process_duration,
response_pending_duration: response_pending_duration,
total_duration: total,
inserted_at: job.inserted_at,
updated_at: job.updated_at
}
end
defp get_job_creation(job) do
job.inserted_at
end
defp get_job_process_start(job, job_creation) do
case job.progressions
|> Enum.filter(fn job_progression -> job_progression.progression == 0 end)
|> Enum.sort_by(fn s -> {s.inserted_at, s.id} end)
|> List.last() do
nil ->
case ControllersJobs.get_last_status(job.status) do
nil -> job_creation
last_status -> get_last_progression_time(job) || last_status.inserted_at
end
first_progression ->
first_progression.datetime
end
end
defp get_job_process_duration(job, job_start) do
case ControllersJobs.get_last_status(job.status) do
nil ->
0
last_status ->
case Map.get(last_status.description, "execution_duration") do
nil ->
job_end = get_last_progression_time(job) || last_status.inserted_at
NaiveDateTime.diff(job_end, job_start)
execution_duration ->
execution_duration
end
end
end
defp get_job_process_end(job, job_start) do
case ControllersJobs.get_last_status(job.status) do
nil ->
job_start
last_status ->
case Map.get(last_status.description, "execution_duration") do
nil ->
get_last_progression_time(job) || last_status.inserted_at
execution_duration ->
NaiveDateTime.add(job_start, round(execution_duration))
end
end
end
defp get_last_progression_time(job) do
case Progressions.get_last_progression(job.progressions) do
nil ->
nil
last_progression ->
last_progression.datetime
end
end
defp get_response_pending_duration(job, job_end) do
case {ControllersJobs.get_last_status(job.status),
Progressions.get_last_progression(job.progressions)} do
{nil, nil} ->
0
{nil, last_progression} ->
NaiveDateTime.diff(last_progression.datetime, job_end)
{last_status, nil} ->
NaiveDateTime.diff(last_status.inserted_at, job_end)
{last_status, last_progression} ->
progression_datetime = Status.extract_datetime(last_progression)
case NaiveDateTime.compare(last_status.inserted_at, progression_datetime) do
:lt -> NaiveDateTime.diff(progression_datetime, job_end)
:eq -> NaiveDateTime.diff(last_status.inserted_at, job_end)
:gt -> NaiveDateTime.diff(last_status.inserted_at, job_end)
end
end
end
def get_by(%{"job_id" => job_id}) do
Ecto.Query.from(j in JobDurations, where: j.job_id == ^job_id)
|> Ecto.Query.order_by([j], desc: j.updated_at)
|> Repo.all()
|> List.first()
end
@doc """
Creates a job durations.
## Examples
iex> create_job_durations(%{field: value})
{:ok, %JobDurations{}}
iex> create_job_durations(%{field: bad_value})
{:error, %Ecto.Changeset{}}
"""
def create_job_durations(attrs \\ %{}) do
job_id = Map.get(attrs, :job_id)
case get_by(%{"job_id" => job_id}) do
nil ->
%JobDurations{}
|> JobDurations.changeset(attrs)
|> Repo.insert()
_ ->
Logger.error("#{__MODULE__}: There is already a job durations for this job ID!")
end
end
@doc """
Updates a job durations.
## Examples
iex> update_job_durations(job_durations, %{field: new_value})
{:ok, %JobDurations{}}
iex> update_job_durations(job_durations, %{field: bad_value})
{:error, %Ecto.Changeset{}}
"""
def update_job_durations(%JobDurations{} = job_durations, attrs) do
job_durations
|> JobDurations.changeset(attrs)
|> Repo.update()
end
end