defmodule Runbox.StateStore do
@moduledoc group: :internal
@moduledoc """
State store is interface for run state persistence.
State store gathers state of all run stateful components into scheduled sets = savepoints. These
savepoints are managed by `Runbox.StateStore.RunState`.
Workflow of the state store is as follows:
1. StateStore initialization
1. Initialize new run state via `Runbox.StateStore.init_run_state/3`
2. Load already existing run state via `Runbox.StateStore.load_run_state/1`
2. Save all registered stateful components via `Runbox.StateStore.Entity.update_state/3`
3. Confirm all messages of certain time were processed with `Runbox.StateStore.Entity.ack_processed_time/4`
"""
use GenServer
alias Runbox.StateStore.Entity
alias Runbox.StateStore.RunState
alias Runbox.StateStore.Savepoint
alias Runbox.StateStore.ScheduleUtils
alias Runbox.StateStore.Storage
@type run_id :: String.t()
@typep state :: %{
run_id: run_id(),
schedule: ScheduleUtils.schedule(),
run_state: nil | RunState.t()
}
@doc """
Starts state store for run with given `run_id`.
"""
@spec start_link(run_id(), ScheduleUtils.schedule()) :: GenServer.on_start()
def start_link(run_id, schedule) do
GenServer.start_link(__MODULE__, {run_id, schedule})
end
@doc """
Returns `true` when state store was initialized.
### Arguments
* `state_store_pid` pid of StateStore
"""
@spec initialized?(pid()) :: boolean
def initialized?(state_store_pid) do
GenServer.call(state_store_pid, :initialized?, :infinity)
end
@doc """
Initializes state store for new run with specified timestamp and entity definitions.
### Arguments
* `state_store_pid` pid of StateStore
* `timestamp` - run `start_from`, this timestamp is used to calculate savepoint timestamps
* `entity_defs` - state entity definitions defined as list of `{entity_id, entity_state}`
"""
@spec init_run_state(pid(), ScheduleUtils.epoch_ms(), [{Entity.id(), Entity.state()}]) ::
{:ok, [Entity.t()]}
def init_run_state(state_store_pid, timestamp, entity_defs) do
GenServer.call(
state_store_pid,
{:init_run_state, timestamp, entity_defs},
:infinity
)
end
@doc """
Loads run state for existing run.
Run state is loaded for already initialized run which has persisted at least one savepoint.
### Arguments
* `state_store_pid` pid of StateStore
* `update_fn` - function which takes a keyword list
`[{Entity.id(), Entity.state()}]` and can return an updated keyword list.
Useful to upgrade state store contents between versions.
"""
@spec load_run_state(pid(), (entity_defs -> entity_defs)) ::
{:ok, ScheduleUtils.epoch_ms(), [Entity.t()]} | {:error, term} | {:error, :no_savepoint}
when entity_defs: [{Entity.id(), Entity.state()}]
def load_run_state(state_store_pid, update_fn \\ & &1) do
GenServer.call(state_store_pid, {:load_run_state, update_fn}, :infinity)
end
@doc """
Saves entity state to all savepoints responsible for given timestamp.
### Arguments
* `entity` - contains state to be saved
* `timestamp` - timestamp associated with entity state
"""
@spec save(Entity.t(), ScheduleUtils.epoch_ms()) :: :ok
def save(%Entity{} = entity, timestamp) do
%Entity{state_store_pid: state_store_pid, id: entity_id, state: entity_state} =
entity
GenServer.cast(state_store_pid, {:save, timestamp, entity_id, entity_state})
end
@impl true
@spec init({run_id(), ScheduleUtils.schedule()}) :: {:ok, state}
def init({run_id, schedule}) do
{:ok,
%{
run_id: run_id,
schedule: schedule,
run_state: nil
}}
end
@impl true
@spec handle_call(msg, GenServer.from(), state) :: {:reply, boolean, state}
when msg: :initialized?
def handle_call(:initialized?, _from, state) do
{:reply, Storage.initialized?(state.run_id), state}
end
@impl true
@spec handle_call(msg, GenServer.from(), state) :: {:reply, {:ok, [Entity.t()]}, state}
when msg: {:init_run_state, ScheduleUtils.epoch_ms(), [{Entity.id(), Entity.state()}]}
def handle_call({:init_run_state, timestamp, entity_defs}, _from, state) do
:ok = Storage.init_storage(state.run_id)
{:ok, entities, run_state} = RunState.init(self(), state.schedule, timestamp, entity_defs)
{:reply, {:ok, entities}, %{state | run_state: run_state}}
end
@impl true
@spec handle_call(msg, GenServer.from(), state) ::
{:reply, {:ok, ScheduleUtils.epoch_ms(), [Entity.t()]} | {:error, :no_savepoint}, state}
when msg: {:load_run_state, (entity_defs -> entity_defs)},
entity_defs: [{Entity.id(), Entity.state()}]
def handle_call({:load_run_state, update_fn}, _from, state) do
:ok = Storage.init_storage(state.run_id)
case Storage.get_latest_savepoint(state.run_id) do
{:ok, timestamp, entity_defs} ->
entity_defs = update_fn.(entity_defs)
{:ok, entities, run_state} =
RunState.init(self(), state.schedule, timestamp, entity_defs)
{:reply, {:ok, timestamp, entities}, %{state | run_state: run_state}}
{:error, error} ->
{:reply, {:error, error}, state}
end
end
@impl true
@spec handle_cast(msg, state) :: {:noreply, state}
when msg: {:save, ScheduleUtils.epoch_ms(), Entity.id(), Entity.state()}
def handle_cast({:save, timestamp, entity_id, entity_state}, state) do
case RunState.save(state.run_state, timestamp, entity_id, entity_state) do
{:ok, run_state} ->
{:noreply, %{state | run_state: run_state}}
{:state_complete, savepoint, run_state} ->
timestamp = Savepoint.timestamp(savepoint)
entities = Savepoint.entity_states(savepoint)
:ok = Storage.save_run_state(state.run_id, timestamp, entities)
:ok = Storage.delete_old_savepoints(state.run_id)
{:noreply, %{state | run_state: run_state}}
end
end
end