lib/step_flow/jobs/status.ex

defmodule StepFlow.Jobs.Status do
  use Ecto.Schema
  import Ecto.Changeset
  import Ecto.Query, warn: false
  import EctoEnum

  alias StepFlow.Jobs
  alias StepFlow.Jobs.Job
  alias StepFlow.Jobs.Status
  alias StepFlow.Repo
  alias StepFlow.Workflows
  require Logger

  @moduledoc false

  defenum(StateEnum, [
    # State can start from: queued, skipped, error
    # Unknown, Retrying -->
    # --> Processing, Ready_to_init, Paused, Error
    "queued",
    # Processing, Queued -->
    # --> Processing
    "paused",
    # Unknown -->
    "skipped",
    # Queued -->
    # --> Completed, Error, Paused, Stopped
    "processing",
    # Error -->
    # --> Queued
    "retrying",
    # Queued, Processing -->
    # --> Retrying, Deleting
    "error",
    # Processing -->
    "completed",
    # Queued -->
    # --> Initializing
    "ready_to_init",
    # Initialized, Initializing -->
    # --> Starting
    "ready_to_start",
    # Processing -->
    # --> Updating
    "update",
    # Processing -->
    # --> Processing, Deleting
    "stopped",
    # Ready_to_init -->
    # --> Initialized, Ready_to_start
    "initializing",
    # Ready_to_start -->
    # --> Processing
    "starting",
    # Updating -->
    # --> Processing
    "updating",
    # Stopped -->
    # --> Completed
    "deleting",
    # DEPRECATED
    "running",
    # DEPRECATED
    "initialized",
    # --> Queued, Skipped
    "unknown"
  ])

  defp state_map_lookup(value, key \\ false) do
    state_map = %{
      0 => :queued,
      1 => :skipped,
      2 => :processing,
      3 => :retrying,
      4 => :error,
      5 => :completed,
      6 => :ready_to_init,
      7 => :ready_to_start,
      8 => :update,
      9 => :stopped,
      10 => :initializing,
      11 => :starting,
      12 => :updating,
      13 => :unknown,
      14 => :paused,
      # DEPRECATED
      15 => :running,
      # DEPRECATED
      16 => :initialized,
      17 => :deleting
    }

    if key do
      if is_number(value) do
        value
      else
        state_map
        |> Enum.find(fn {_key, val} -> val == value end)
        |> elem(0)
      end
    else
      if is_number(value) do
        state_map[value]
      else
        case Map.values(state_map) |> Enum.member?(value) do
          true -> value
          _ -> nil
        end
      end
    end
  end

  def state_enum_label(value) do
    to_atom_state(value)
    |> Atom.to_string()
  end

  defp to_atom_state(value) do
    case state_map_lookup(value) do
      nil -> :unknown
      value -> value
    end
  end

  defp state_enum_position(value) do
    state_map_lookup(value, true)
  end

  defp transition_map_lookup(value) do
    state_map = %{
      0 => [:processing, :ready_to_init, :paused, :error],
      1 => [],
      2 => [:completed, :error, :paused, :stopped],
      3 => [:queued],
      4 => [:deleting, :retrying],
      5 => [],
      6 => [:initializing],
      7 => [:starting],
      8 => [:updating],
      9 => [:deleting, :processing],
      10 => [:ready_to_start],
      11 => [:processing],
      12 => [:processing],
      13 => [:queued, :skipped],
      14 => [:processing],
      # DEPRECATED
      15 => [],
      # DEPRECATED
      16 => [],
      17 => [:completed]
    }

    if is_number(value) do
      state_map[value]
    else
      case Map.values(state_map) |> Enum.member?(value) do
        true -> value
        _ -> nil
      end
    end
  end

  defp to_atom_transition(value) do
    case transition_map_lookup(value) do
      nil -> []
      value -> value
    end
  end

  schema "step_flow_status" do
    field(:state, StepFlow.Jobs.Status.StateEnum)
    field(:description, :map, default: %{})
    belongs_to(:job, Job, foreign_key: :job_id)
    has_many(:workflow_status, Workflows.Status, on_delete: :delete_all)

    timestamps()
  end

  @doc false
  def changeset(%Status{} = job, attrs) do
    job
    |> cast(attrs, [:state, :job_id, :description])
    |> foreign_key_constraint(:job_id)
    |> validate_required([:state, :job_id])
  end

  def convert_to_string(%Status{} = status), do: convert_to_string(status.state)

  def convert_to_string(status) when is_binary(status), do: status

  def convert_to_string(status) do
    state_enum_label(status)
  end

  def set_job_status(job_id, status, description \\ %{}) do
    # Ensure status is string
    string_status = convert_to_string(status)

    job = Jobs.get_job!(job_id) |> Repo.preload([:status])
    last_status = get_last_status(job.status)

    if convert_to_string(last_status) == string_status do
      Logger.warn(
        "Transition to the same state #{string_status}, ignoring and fall back to last status."
      )

      {:ok, last_status}
    else
      if check_transition(last_status, string_status) do
        %Status{}
        |> Status.changeset(%{job_id: job_id, state: status, description: description})
        |> Repo.insert()
      else
        {:error,
         "Status transition from #{convert_to_string(last_status)} to #{string_status} not allowed"}
      end
    end
  end

  defp check_transition(last_status, new_status) do
    case {last_status, new_status} do
      paired when paired in [{nil, "queued"}, {nil, "error"}, {nil, "skipped"}] ->
        true

      {nil, _} ->
        false

      {_, _} ->
        enum_position = state_enum_position(last_status.state)
        allowed_transitions = to_atom_transition(enum_position)

        String.to_atom(new_status) in allowed_transitions
    end
  end

  @doc """
  Returns the last updated status of a list of status.
  """
  def get_last_status(status) when is_list(status) do
    status
    |> Enum.sort(fn state_1, state_2 ->
      case NaiveDateTime.compare(state_1.inserted_at, state_2.inserted_at) do
        :lt -> true
        :gt -> false
        :eq -> state_1.id < state_2.id
      end
    end)
    |> List.last()
  end

  def get_last_status(%Status{} = status), do: status
  def get_last_status(_status), do: nil

  @doc """
  Returns the last status id of a list of status.
  """
  def get_last_status_id(status) when is_list(status) do
    status
    |> Enum.sort(fn state_1, state_2 ->
      state_1.id < state_2.id
    end)
    |> List.last()
  end

  def get_last_status_id(%Status{} = status), do: status
  def get_last_status_id(_status), do: nil

  @doc """
  Returns action linked to status
  """
  def get_action(status) do
    case status.state do
      :queued -> "create"
      :ready_to_init -> "init_process"
      :ready_to_start -> "start_process"
      :update -> "update_process"
      :stopped -> "delete"
      :error -> "delete"
      _ -> "none"
    end
  end

  @doc """
  Returns action linked to status as parameter
  """
  def get_action_parameter(status) do
    action = get_action(status)
    [%{"id" => "action", "type" => "string", "value" => action}]
  end
end