lib/step_flow/models/jobs/status.ex

defmodule StepFlow.Jobs.Status do
  use Ecto.Schema
  import Ecto.Changeset
  import Ecto.Query, warn: false
  import EctoEnum

  alias StepFlow.Jobs
  alias StepFlow.Jobs.Job
  alias StepFlow.Jobs.Status
  alias StepFlow.Metrics.JobInstrumenter
  alias StepFlow.Repo
  alias StepFlow.Workflows
  require Logger

  @moduledoc false

  defenum(StateEnum, [
    # State can start from: queued, skipped, error
    # Unknown, Retrying -->
    # --> Processing, Ready_to_init, Paused, Error, Dropped
    "queued",
    # Processing, Queued -->
    # --> Processing, Dropped
    "paused",
    # Unknown -->
    "skipped",
    # Queued -->
    # --> Completed, Error, Paused, Stopped
    "processing",
    # Error -->
    # --> Queued
    "retrying",
    # Queued, Processing -->
    # --> Retrying, Deleting
    "error",
    # Processing -->
    "completed",
    # Queued -->
    # --> Initializing
    "ready_to_init",
    # Initialized, Initializing -->
    # --> Starting
    "ready_to_start",
    # Processing -->
    # --> Updating
    "update",
    # Processing -->
    # --> Processing, Deleting
    "stopped",
    # Ready_to_init -->
    # --> Initialized, Ready_to_start
    "initializing",
    # Ready_to_start -->
    # --> Processing
    "starting",
    # Updating -->
    # --> Processing
    "updating",
    # Stopped -->
    # --> Completed
    "deleting",
    # Queued, Paused -->
    "dropped",
    # DEPRECATED
    "running",
    # DEPRECATED
    "initialized",
    # --> Queued, Skipped
    "unknown"
  ])

  defp state_map_lookup(value, key \\ false) do
    state_map = %{
      0 => :queued,
      1 => :skipped,
      2 => :processing,
      3 => :retrying,
      4 => :error,
      5 => :completed,
      6 => :ready_to_init,
      7 => :ready_to_start,
      8 => :update,
      9 => :stopped,
      10 => :initializing,
      11 => :starting,
      12 => :updating,
      13 => :unknown,
      14 => :paused,
      # DEPRECATED
      15 => :running,
      # DEPRECATED
      16 => :initialized,
      17 => :deleting,
      18 => :dropped
    }

    if key do
      if is_number(value) do
        value
      else
        state_map
        |> Enum.find(fn {_key, val} -> val == value end)
        |> elem(0)
      end
    else
      if is_number(value) do
        state_map[value]
      else
        case Map.values(state_map) |> Enum.member?(value) do
          true -> value
          _ -> nil
        end
      end
    end
  end

  def state_enum_label(value) do
    to_atom_state(value)
    |> Atom.to_string()
  end

  defp to_atom_state(value) do
    case state_map_lookup(value) do
      nil -> :unknown
      value -> value
    end
  end

  defp state_enum_position(value) do
    state_map_lookup(value, true)
  end

  defp transition_map_lookup(value) do
    state_map = %{
      0 => [:processing, :ready_to_init, :paused, :error, :dropped],
      1 => [],
      2 => [:completed, :error, :paused, :stopped],
      3 => [:queued],
      4 => [:deleting, :retrying],
      5 => [],
      6 => [:initializing],
      7 => [:starting],
      8 => [:updating],
      9 => [:deleting, :processing],
      10 => [:ready_to_start, :error],
      11 => [:processing],
      12 => [:processing],
      13 => [:queued, :skipped],
      14 => [:dropped, :processing],
      # DEPRECATED
      15 => [],
      # DEPRECATED
      16 => [],
      17 => [:completed, :error],
      18 => []
    }

    if is_number(value) do
      state_map[value]
    else
      case Map.values(state_map) |> Enum.member?(value) do
        true -> value
        _ -> nil
      end
    end
  end

  defp to_atom_transition(value) do
    case transition_map_lookup(value) do
      nil -> []
      value -> value
    end
  end

  schema "step_flow_status" do
    field(:state, StepFlow.Jobs.Status.StateEnum)
    field(:datetime, :utc_datetime_usec)
    field(:description, :map, default: %{})
    belongs_to(:job, Job, foreign_key: :job_id)
    has_many(:workflow_status, Workflows.Status, on_delete: :delete_all)

    timestamps()
  end

  @doc false
  def changeset(%Status{} = job, attrs) do
    job
    |> cast(attrs, [:datetime, :state, :job_id, :description])
    |> foreign_key_constraint(:job_id)
    |> validate_required([:state, :job_id])
  end

  def convert_to_string(%Status{} = status), do: convert_to_string(status.state)

  def convert_to_string(status) when is_binary(status), do: status

  def convert_to_string(status) do
    state_enum_label(status)
  end

  def get_datetime_from_payload(payload) do
    case Map.get(payload, "datetime") do
      nil -> get_datetime_from_description(payload)
      value -> value
    end
  end

  def get_datetime_from_description(description) do
    case Map.get(description, :description, %{}) |> Map.get("datetime") do
      nil -> NaiveDateTime.to_string(DateTime.utc_now())
      value -> value
    end
  end

  def set_job_status(job_id, status, description \\ %{}, timestamp \\ nil) do
    # Ensure status is string
    string_status = convert_to_string(status)

    job = Jobs.get_job!(job_id) |> Repo.preload([:status])

    last_status = StepFlow.Controllers.Jobs.get_last_status(job.status)

    if convert_to_string(last_status) == string_status do
      Logger.warn(
        "Transition to the same state #{string_status}, ignoring and fall back to last status."
      )

      {:ok, last_status}
    else
      if check_transition(last_status, string_status) do
        datetime =
          if timestamp do
            timestamp
          else
            get_datetime_from_description(description)
          end

        JobInstrumenter.inc(:step_flow_jobs_status_total, job.name, string_status)

        %Status{}
        |> Status.changeset(%{
          job_id: job_id,
          state: status,
          datetime: datetime,
          description: description
        })
        |> Repo.insert()
      else
        {:error,
         "Status transition from #{convert_to_string(last_status)} to #{string_status} not allowed for job #{job_id}."}
      end
    end
  end

  defp check_transition(last_status, new_status) do
    case {last_status, new_status} do
      paired when paired in [{nil, "queued"}, {nil, "error"}, {nil, "skipped"}] ->
        true

      {nil, _} ->
        false

      {_, _} ->
        enum_position = state_enum_position(last_status.state)
        allowed_transitions = to_atom_transition(enum_position)

        String.to_atom(new_status) in allowed_transitions
    end
  end

  def extract_datetime(status) when is_nil(status.datetime), do: status.inserted_at
  def extract_datetime(status), do: status.datetime
end