defmodule Runbox.StateStore.Storage do
@moduledoc group: :internal
@moduledoc """
Behaviour for generic state store storage.
Behaviour defines callbacks to handle savepoint persistence.
"""
alias Runbox.StateStore.Entity
alias Runbox.StateStore.ScheduleUtils
@type run_id :: String.t()
@type config :: {module, term}
@doc """
Predicate returning true when state store of given run has been initialized.
### Arguments
* `storage_state` - behaviour implementation state
* `run_id`
"""
@callback handle_initialized?(storage_state :: term, run_id()) :: boolean
@doc """
Initializes state store for given run.
### Arguments
* `storage_state` - behaviour implementation state
* `run_id`
"""
@callback handle_init_storage(storage_state :: term, run_id()) :: :ok
@doc """
Returns latest persisted savepoint of given run.
### Arguments
* `storage_state` - behaviour implementation state
* `run_id`
"""
@callback handle_get_latest_savepoint(storage_state :: term, run_id()) ::
{:ok, ScheduleUtils.epoch_ms(), [{Entity.id(), Entity.state()}]} | {:error, term()}
@doc """
Returns latest persisted savepoint timestamp of given run.
### Arguments
* `storage_state` - behaviour implementation state
* `run_id`
"""
@callback handle_get_latest_savepoint_timestamp(storage_state :: term, run_id()) ::
{:ok, ScheduleUtils.epoch_ms()} | {:error, reason :: term}
@doc """
Persists run state (savepoint) of given run.
### Arguments
* `storage_state` - behaviour implementation state
* `run_id`
* `timestamp` - timestamp of given savepoint
* `entities` - savepoint state representation as `{entity_id, entity_state}`
"""
@callback handle_save_savepoint(storage_state :: term, run_id(), ScheduleUtils.epoch_ms(), [
{Entity.id(), Entity.state()}
]) :: :ok
@doc """
Deletes old savepoints of given run but keeps n latest savepoints.
### Arguments
* `storage_state` - behaviour implementation state
* `run_id`
* `keep_n` - count of persisted savepoints which should remain undeleted
"""
@callback handle_delete_old_savepoints(storage_state :: term, run_id(), keep_n :: integer) ::
:ok
@doc """
Deletes storage of run identified by `run_id`.
### Arguments
* `storage_state` - behaviour implementation state
* `run_id`
"""
@callback handle_delete_run(storage_state :: term, run_id :: run_id()) ::
:ok | {:error, reason :: term}
@doc """
Initialize state store config.
We store StateStore configuration in ETS table, because current API defines both per running
run functions and global StateStore functions.
"""
@spec init_config(config()) :: :ok
def init_config(config) do
:ets.new(__MODULE__, [:public, :set, :named_table])
:ets.insert(__MODULE__, {:config, config})
:ok
end
@spec get_config :: config()
def get_config do
[[config]] = :ets.match(__MODULE__, {:config, :"$1"})
config
end
@doc """
Predicate returning true when state store of given run has been initialized.
### Arguments
* `run_id`
"""
@spec initialized?(run_id()) :: boolean
def initialized?(run_id) do
{mod, config} = get_config()
case mod.handle_initialized?(config, run_id) do
true -> true
false -> false
end
end
@doc """
Initializes state store for given run.
### Arguments
* `run_id`
"""
@spec init_storage(run_id()) :: :ok
def init_storage(run_id) do
{mod, config} = get_config()
:ok = mod.handle_init_storage(config, run_id)
end
@doc """
Returns latest persisted savepoint timestamp of given run.
### Arguments
* `run_id`
"""
@spec last_persisted_timestamp(run_id()) :: {:ok, ScheduleUtils.epoch_ms()} | {:error, term}
def last_persisted_timestamp(run_id) do
{mod, config} = get_config()
case mod.handle_get_latest_savepoint_timestamp(config, run_id) do
{:ok, last_persisted_timestamp} -> {:ok, last_persisted_timestamp}
{:error, reason} -> {:error, reason}
end
end
@doc """
Returns latest persisted savepoint of given run.
### Arguments
* `run_id`
"""
@spec get_latest_savepoint(run_id()) ::
{:ok, ScheduleUtils.epoch_ms(), [{Entity.id(), Entity.state()}]} | {:error, :no_savepoint}
def get_latest_savepoint(run_id) do
{mod, config} = get_config()
case mod.handle_get_latest_savepoint(config, run_id) do
{:ok, timestamp, entity_defs} -> {:ok, timestamp, entity_defs}
{:error, reason} -> {:error, reason}
end
end
@doc """
Persists run state (savepoint) of given run.
### Arguments
* `run_id`
* `timestamp` - timestamp of given savepoint
* `entities` - savepoint state representation as `{entity_id, entity_state}`
"""
@spec save_run_state(run_id(), ScheduleUtils.epoch_ms(), [{Entity.id(), Entity.state()}]) :: :ok
def save_run_state(run_id, timestamp, entities) do
{mod, config} = get_config()
:ok = mod.handle_save_savepoint(config, run_id, timestamp, entities)
end
@doc """
Deletes old savepoints of given run but keeps n latest savepoints.
### Arguments
* `run_id`
* `keep_n` - count of persisted savepoints which should remain undeleted
"""
@spec delete_old_savepoints(run_id(), keep_n :: integer) :: :ok
def delete_old_savepoints(run_id, keep_n \\ 1) do
{mod, config} = get_config()
:ok = mod.handle_delete_old_savepoints(config, run_id, keep_n)
end
@doc """
Deletes storage of run identified by `run_id`.
### Arguments
* `run_id`
"""
@spec delete_run(run_id()) :: :ok | {:error, reason :: term}
def delete_run(run_id) do
{mod, config} = get_config()
case mod.handle_delete_run(config, run_id) do
:ok -> :ok
{:error, reason} -> {:error, reason}
end
end
end