defmodule StepFlow.Jobs.Status do
use Ecto.Schema
import Ecto.Changeset
import Ecto.Query, warn: false
import EctoEnum
alias StepFlow.Jobs
alias StepFlow.Jobs.Job
alias StepFlow.Jobs.Status
alias StepFlow.Metrics.JobInstrumenter
alias StepFlow.Repo
alias StepFlow.Workflows
require Logger
@moduledoc false
defenum(StateEnum, [
# State can start from: queued, skipped, error
# Unknown, Retrying -->
# --> Processing, Ready_to_init, Paused, Error, Dropped
"queued",
# Processing, Queued -->
# --> Processing, Dropped
"paused",
# Unknown -->
"skipped",
# Queued -->
# --> Completed, Error, Paused, Stopped
"processing",
# Error -->
# --> Queued
"retrying",
# Queued, Processing -->
# --> Retrying, Deleting
"error",
# Processing -->
"completed",
# Queued -->
# --> Initializing
"ready_to_init",
# Initialized, Initializing -->
# --> Starting
"ready_to_start",
# Processing -->
# --> Updating
"update",
# Processing -->
# --> Processing, Deleting
"stopped",
# Ready_to_init -->
# --> Initialized, Ready_to_start
"initializing",
# Ready_to_start -->
# --> Processing
"starting",
# Updating -->
# --> Processing
"updating",
# Stopped -->
# --> Completed
"deleting",
# Queued, Paused -->
"dropped",
# DEPRECATED
"running",
# DEPRECATED
"initialized",
# --> Queued, Skipped
"unknown"
])
defp state_map_lookup(value, key \\ false) do
state_map = %{
0 => :queued,
1 => :skipped,
2 => :processing,
3 => :retrying,
4 => :error,
5 => :completed,
6 => :ready_to_init,
7 => :ready_to_start,
8 => :update,
9 => :stopped,
10 => :initializing,
11 => :starting,
12 => :updating,
13 => :unknown,
14 => :paused,
# DEPRECATED
15 => :running,
# DEPRECATED
16 => :initialized,
17 => :deleting,
18 => :dropped
}
if key do
if is_number(value) do
value
else
state_map
|> Enum.find(fn {_key, val} -> val == value end)
|> elem(0)
end
else
if is_number(value) do
state_map[value]
else
case Map.values(state_map) |> Enum.member?(value) do
true -> value
_ -> nil
end
end
end
end
def state_enum_label(value) do
to_atom_state(value)
|> Atom.to_string()
end
defp to_atom_state(value) do
case state_map_lookup(value) do
nil -> :unknown
value -> value
end
end
defp state_enum_position(value) do
state_map_lookup(value, true)
end
defp transition_map_lookup(value) do
state_map = %{
0 => [:processing, :ready_to_init, :paused, :error, :dropped],
1 => [],
2 => [:completed, :error, :paused, :stopped],
3 => [:queued],
4 => [:deleting, :retrying],
5 => [],
6 => [:initializing],
7 => [:starting],
8 => [:updating],
9 => [:deleting, :processing],
10 => [:ready_to_start, :error],
11 => [:processing],
12 => [:processing],
13 => [:queued, :skipped],
14 => [:dropped, :processing],
# DEPRECATED
15 => [],
# DEPRECATED
16 => [],
17 => [:completed, :error],
18 => []
}
if is_number(value) do
state_map[value]
else
case Map.values(state_map) |> Enum.member?(value) do
true -> value
_ -> nil
end
end
end
defp to_atom_transition(value) do
case transition_map_lookup(value) do
nil -> []
value -> value
end
end
schema "step_flow_status" do
field(:state, StepFlow.Jobs.Status.StateEnum)
field(:datetime, :utc_datetime_usec)
field(:description, :map, default: %{})
belongs_to(:job, Job, foreign_key: :job_id)
has_many(:workflow_status, Workflows.Status, on_delete: :delete_all)
timestamps()
end
@doc false
def changeset(%Status{} = job, attrs) do
job
|> cast(attrs, [:datetime, :state, :job_id, :description])
|> foreign_key_constraint(:job_id)
|> validate_required([:state, :job_id])
end
def convert_to_string(%Status{} = status), do: convert_to_string(status.state)
def convert_to_string(status) when is_binary(status), do: status
def convert_to_string(status) do
state_enum_label(status)
end
def get_datetime_from_payload(payload) do
case Map.get(payload, "datetime") do
nil -> get_datetime_from_description(payload)
value -> value
end
end
def get_datetime_from_description(description) do
case Map.get(description, :description, %{}) |> Map.get("datetime") do
nil -> NaiveDateTime.to_string(DateTime.utc_now())
value -> value
end
end
def set_job_status(job_id, status, description \\ %{}, timestamp \\ nil) do
# Ensure status is string
string_status = convert_to_string(status)
job = Jobs.get_job!(job_id) |> Repo.preload([:status])
last_status = StepFlow.Controllers.Jobs.get_last_status(job.status)
if convert_to_string(last_status) == string_status do
Logger.warn(
"Transition to the same state #{string_status}, ignoring and fall back to last status."
)
{:ok, last_status}
else
if check_transition(last_status, string_status) do
datetime =
if timestamp do
timestamp
else
get_datetime_from_description(description)
end
JobInstrumenter.inc(:step_flow_jobs_status_total, job.name, string_status)
%Status{}
|> Status.changeset(%{
job_id: job_id,
state: status,
datetime: datetime,
description: description
})
|> Repo.insert()
else
{:error,
"Status transition from #{convert_to_string(last_status)} to #{string_status} not allowed for job #{job_id}."}
end
end
end
defp check_transition(last_status, new_status) do
case {last_status, new_status} do
paired when paired in [{nil, "queued"}, {nil, "error"}, {nil, "skipped"}] ->
true
{nil, _} ->
false
{_, _} ->
enum_position = state_enum_position(last_status.state)
allowed_transitions = to_atom_transition(enum_position)
String.to_atom(new_status) in allowed_transitions
end
end
def extract_datetime(status) when is_nil(status.datetime), do: status.inserted_at
def extract_datetime(status), do: status.datetime
end