lib/runbox/state_store/storage.ex

defmodule Runbox.StateStore.Storage do
  @moduledoc group: :internal
  @moduledoc """
  Behaviour for generic state store storage.

  Behaviour defines callbacks to handle savepoint persistence.
  """

  alias Runbox.StateStore.Entity
  alias Runbox.StateStore.ScheduleUtils

  @type run_id :: String.t()
  @type config :: {module, term}

  @doc """
  Predicate returning true when state store of given run has been initialized.

  ### Arguments
  * `storage_state` - behaviour implementation state
  * `run_id`
  """
  @callback handle_initialized?(storage_state :: term, run_id()) :: boolean

  @doc """
  Initializes state store for given run.

  ### Arguments
  * `storage_state` - behaviour implementation state
  * `run_id`
  """
  @callback handle_init_storage(storage_state :: term, run_id()) :: :ok

  @doc """
  Returns latest persisted savepoint of given run.

  ### Arguments
  * `storage_state` - behaviour implementation state
  * `run_id`
  """
  @callback handle_get_latest_savepoint(storage_state :: term, run_id()) ::
              {:ok, ScheduleUtils.epoch_ms(), [{Entity.id(), Entity.state()}]} | {:error, term()}

  @doc """
  Returns latest persisted savepoint timestamp of given run.

  ### Arguments
  * `storage_state` - behaviour implementation state
  * `run_id`
  """
  @callback handle_get_latest_savepoint_timestamp(storage_state :: term, run_id()) ::
              {:ok, ScheduleUtils.epoch_ms()} | {:error, reason :: term}

  @doc """
  Persists run state (savepoint) of given run.

  ### Arguments
  * `storage_state` - behaviour implementation state
  * `run_id`
  * `timestamp` - timestamp of given savepoint
  * `entities` - savepoint state representation as `{entity_id, entity_state}`
  """
  @callback handle_save_savepoint(storage_state :: term, run_id(), ScheduleUtils.epoch_ms(), [
              {Entity.id(), Entity.state()}
            ]) :: :ok

  @doc """
  Deletes old savepoints of given run but keeps n latest savepoints.

  ### Arguments
  * `storage_state` - behaviour implementation state
  * `run_id`
  * `keep_n` - count of persisted savepoints which should remain undeleted
  """
  @callback handle_delete_old_savepoints(storage_state :: term, run_id(), keep_n :: integer) ::
              :ok

  @doc """
  Deletes storage of run identified by `run_id`.

  ### Arguments
  * `storage_state` - behaviour implementation state
  * `run_id`
  """
  @callback handle_delete_run(storage_state :: term, run_id :: run_id()) ::
              :ok | {:error, reason :: term}

  @doc """
  Initialize state store config.

  We store StateStore configuration in ETS table, because current API defines both per running
  run functions and global StateStore functions.
  """
  @spec init_config(config()) :: :ok
  def init_config(config) do
    :ets.new(__MODULE__, [:public, :set, :named_table])
    :ets.insert(__MODULE__, {:config, config})
    :ok
  end

  @spec get_config :: config()
  def get_config do
    [[config]] = :ets.match(__MODULE__, {:config, :"$1"})
    config
  end

  @doc """
  Predicate returning true when state store of given run has been initialized.

  ### Arguments
  * `run_id`
  """
  @spec initialized?(run_id()) :: boolean
  def initialized?(run_id) do
    {mod, config} = get_config()

    case mod.handle_initialized?(config, run_id) do
      true -> true
      false -> false
    end
  end

  @doc """
  Initializes state store for given run.

  ### Arguments
  * `run_id`
  """
  @spec init_storage(run_id()) :: :ok
  def init_storage(run_id) do
    {mod, config} = get_config()
    :ok = mod.handle_init_storage(config, run_id)
  end

  @doc """
  Returns latest persisted savepoint timestamp of given run.

  ### Arguments
  * `run_id`
  """
  @spec last_persisted_timestamp(run_id()) :: {:ok, ScheduleUtils.epoch_ms()} | {:error, term}
  def last_persisted_timestamp(run_id) do
    {mod, config} = get_config()

    case mod.handle_get_latest_savepoint_timestamp(config, run_id) do
      {:ok, last_persisted_timestamp} -> {:ok, last_persisted_timestamp}
      {:error, reason} -> {:error, reason}
    end
  end

  @doc """
  Returns latest persisted savepoint of given run.

  ### Arguments
  * `run_id`
  """
  @spec get_latest_savepoint(run_id()) ::
          {:ok, ScheduleUtils.epoch_ms(), [{Entity.id(), Entity.state()}]} | {:error, :no_savepoint}
  def get_latest_savepoint(run_id) do
    {mod, config} = get_config()

    case mod.handle_get_latest_savepoint(config, run_id) do
      {:ok, timestamp, entity_defs} -> {:ok, timestamp, entity_defs}
      {:error, reason} -> {:error, reason}
    end
  end

  @doc """
  Persists run state (savepoint) of given run.

  ### Arguments
  * `run_id`
  * `timestamp` - timestamp of given savepoint
  * `entities` - savepoint state representation as `{entity_id, entity_state}`
  """
  @spec save_run_state(run_id(), ScheduleUtils.epoch_ms(), [{Entity.id(), Entity.state()}]) :: :ok
  def save_run_state(run_id, timestamp, entities) do
    {mod, config} = get_config()
    :ok = mod.handle_save_savepoint(config, run_id, timestamp, entities)
  end

  @doc """
  Deletes old savepoints of given run but keeps n latest savepoints.

  ### Arguments
  * `run_id`
  * `keep_n` - count of persisted savepoints which should remain undeleted
  """
  @spec delete_old_savepoints(run_id(), keep_n :: integer) :: :ok
  def delete_old_savepoints(run_id, keep_n \\ 1) do
    {mod, config} = get_config()
    :ok = mod.handle_delete_old_savepoints(config, run_id, keep_n)
  end

  @doc """
  Deletes storage of run identified by `run_id`.

  ### Arguments
  * `run_id`
  """
  @spec delete_run(run_id()) :: :ok | {:error, reason :: term}
  def delete_run(run_id) do
    {mod, config} = get_config()

    case mod.handle_delete_run(config, run_id) do
      :ok -> :ok
      {:error, reason} -> {:error, reason}
    end
  end
end