lib/runbox/state_store.ex

defmodule Runbox.StateStore do
  @moduledoc group: :internal
  @moduledoc """
  State store is interface for run state persistence.

  State store gathers state of all run stateful components into scheduled sets = savepoints. These
  savepoints are managed by `Runbox.StateStore.RunState`.

  Workflow of the state store is as follows:
  1. StateStore initialization
    1. Initialize new run state via `Runbox.StateStore.init_run_state/3`
    2. Load already existing run state via `Runbox.StateStore.load_run_state/1`
  2. Save all registered stateful components via `Runbox.StateStore.Entity.update_state/3`
  3. Confirm all messages of certain time were processed with `Runbox.StateStore.Entity.ack_processed_time/4`
  """

  use GenServer

  alias Runbox.StateStore.Entity
  alias Runbox.StateStore.RunState
  alias Runbox.StateStore.Savepoint
  alias Runbox.StateStore.ScheduleUtils
  alias Runbox.StateStore.Storage

  @type run_id :: String.t()

  @typep state :: %{
           run_id: run_id(),
           schedule: ScheduleUtils.schedule(),
           run_state: nil | RunState.t()
         }

  @doc """
  Starts state store for run with given `run_id`.
  """
  @spec start_link(run_id(), ScheduleUtils.schedule()) :: GenServer.on_start()
  def start_link(run_id, schedule) do
    GenServer.start_link(__MODULE__, {run_id, schedule})
  end

  @doc """
  Returns `true` when state store was initialized.

  ### Arguments
  * `state_store_pid` pid of StateStore
  """
  @spec initialized?(pid()) :: boolean
  def initialized?(state_store_pid) do
    GenServer.call(state_store_pid, :initialized?, :infinity)
  end

  @doc """
  Initializes state store for new run with specified timestamp and entity definitions.

  ### Arguments
  * `state_store_pid` pid of StateStore
  * `timestamp` - run `start_from`, this timestamp is used to calculate savepoint timestamps
  * `entity_defs` - state entity definitions defined as list of `{entity_id, entity_state}`
  """
  @spec init_run_state(pid(), ScheduleUtils.epoch_ms(), [{Entity.id(), Entity.state()}]) ::
          {:ok, [Entity.t()]}
  def init_run_state(state_store_pid, timestamp, entity_defs) do
    GenServer.call(
      state_store_pid,
      {:init_run_state, timestamp, entity_defs},
      :infinity
    )
  end

  @doc """
  Loads run state for existing run.

  Run state is loaded for already initialized run which has persisted at least one savepoint.

  ### Arguments
  * `state_store_pid` pid of StateStore
  * `update_fn` - function which takes a keyword list
    `[{Entity.id(), Entity.state()}]` and can return an updated keyword list.
    Useful to upgrade state store contents between versions.
  """
  @spec load_run_state(pid(), (entity_defs -> entity_defs)) ::
          {:ok, ScheduleUtils.epoch_ms(), [Entity.t()]} | {:error, term} | {:error, :no_savepoint}
        when entity_defs: [{Entity.id(), Entity.state()}]
  def load_run_state(state_store_pid, update_fn \\ & &1) do
    GenServer.call(state_store_pid, {:load_run_state, update_fn}, :infinity)
  end

  @doc """
  Saves entity state to all savepoints responsible for given timestamp.

  ### Arguments
  * `entity` - contains state to be saved
  * `timestamp` - timestamp associated with entity state
  """
  @spec save(Entity.t(), ScheduleUtils.epoch_ms()) :: :ok
  def save(%Entity{} = entity, timestamp) do
    %Entity{state_store_pid: state_store_pid, id: entity_id, state: entity_state} =
      entity

    GenServer.cast(state_store_pid, {:save, timestamp, entity_id, entity_state})
  end

  @impl true
  @spec init({run_id(), ScheduleUtils.schedule()}) :: {:ok, state}
  def init({run_id, schedule}) do
    {:ok,
     %{
       run_id: run_id,
       schedule: schedule,
       run_state: nil
     }}
  end

  @impl true
  @spec handle_call(msg, GenServer.from(), state) :: {:reply, boolean, state}
        when msg: :initialized?
  def handle_call(:initialized?, _from, state) do
    {:reply, Storage.initialized?(state.run_id), state}
  end

  @impl true
  @spec handle_call(msg, GenServer.from(), state) :: {:reply, {:ok, [Entity.t()]}, state}
        when msg: {:init_run_state, ScheduleUtils.epoch_ms(), [{Entity.id(), Entity.state()}]}
  def handle_call({:init_run_state, timestamp, entity_defs}, _from, state) do
    :ok = Storage.init_storage(state.run_id)

    {:ok, entities, run_state} = RunState.init(self(), state.schedule, timestamp, entity_defs)

    {:reply, {:ok, entities}, %{state | run_state: run_state}}
  end

  @impl true
  @spec handle_call(msg, GenServer.from(), state) ::
          {:reply, {:ok, ScheduleUtils.epoch_ms(), [Entity.t()]} | {:error, :no_savepoint}, state}
        when msg: {:load_run_state, (entity_defs -> entity_defs)},
             entity_defs: [{Entity.id(), Entity.state()}]
  def handle_call({:load_run_state, update_fn}, _from, state) do
    :ok = Storage.init_storage(state.run_id)

    case Storage.get_latest_savepoint(state.run_id) do
      {:ok, timestamp, entity_defs} ->
        entity_defs = update_fn.(entity_defs)

        {:ok, entities, run_state} =
          RunState.init(self(), state.schedule, timestamp, entity_defs)

        {:reply, {:ok, timestamp, entities}, %{state | run_state: run_state}}

      {:error, error} ->
        {:reply, {:error, error}, state}
    end
  end

  @impl true
  @spec handle_cast(msg, state) :: {:noreply, state}
        when msg: {:save, ScheduleUtils.epoch_ms(), Entity.id(), Entity.state()}
  def handle_cast({:save, timestamp, entity_id, entity_state}, state) do
    case RunState.save(state.run_state, timestamp, entity_id, entity_state) do
      {:ok, run_state} ->
        {:noreply, %{state | run_state: run_state}}

      {:state_complete, savepoint, run_state} ->
        timestamp = Savepoint.timestamp(savepoint)
        entities = Savepoint.entity_states(savepoint)
        :ok = Storage.save_run_state(state.run_id, timestamp, entities)
        :ok = Storage.delete_old_savepoints(state.run_id)
        {:noreply, %{state | run_state: run_state}}
    end
  end
end