defmodule Runbox.StateStore.RunState do
@moduledoc group: :internal
@moduledoc """
Module defines struct which represents state of all stateful components of the run runtime.
Run state handles savepoint management, first savepoint is created when initialized. Timestamps
of savepoints are being calculated via `Runbox.StateStore.ScheduleUtils`.
Savepoints timestamps are calculated as follows:
- run state is initialized in timestamp = `10_000` with schedule `60_000`
- 1st savepoint timestamp is `60_000`
- 2nd savepoint timestamp is `120_000`
- ... and so on ...
Main purpose of run state is to save entity state. Entity state is saved to all savepoints whose
timestamp is lower than or equal to entity state timestamp and state of this entity hasn't been
saved to particular savepoint yet.
Entity save works as follows:
- when entity state is saved in time range `<60_000;120_000)` and savepoint with
timestamp `60_000` is the only one incomplete -> entity is saved to 1st savepoint
- when entity state is saved in time range `<120_000;180_000)` and savepoints with
timestamp `60_000` and `120_000` are incomplete -> entity is saved to both 1st and 2nd savepoint
"""
require Logger
alias __MODULE__, as: RunState
alias Runbox.StateStore.Entity
alias Runbox.StateStore.Savepoint
alias Runbox.StateStore.ScheduleUtils
defstruct schedule: nil,
savepoints: %{},
latest_collected_savepoint: nil
@type t :: %RunState{
schedule: ScheduleUtils.schedule(),
savepoints: %{ScheduleUtils.epoch_ms() => Savepoint.t()},
latest_collected_savepoint: ScheduleUtils.epoch_ms()
}
@doc """
Initializes new run state.
"""
@spec init(pid(), ScheduleUtils.schedule(), ScheduleUtils.epoch_ms(), [
{Entity.id(), Entity.state()}
]) ::
{:ok, [Entity.t()], t}
def init(state_store_pid, schedule, timestamp, entity_defs) do
entities = construct_entities(state_store_pid, schedule, timestamp, entity_defs)
entity_ids = Enum.map(entities, &Entity.id/1)
next_savepoint_timestamp = ScheduleUtils.next_savepoint(schedule, timestamp)
next_savepoint = Savepoint.new(entity_ids, next_savepoint_timestamp)
run_state = %RunState{
schedule: schedule,
latest_collected_savepoint: timestamp,
savepoints: %{next_savepoint_timestamp => next_savepoint}
}
{:ok, entities, run_state}
end
@doc """
Returns count of registered entities in current scheduled savepoint.
"""
@spec registered_entities_count(t) :: integer
def registered_entities_count(%RunState{savepoints: savepoints}) do
if Enum.empty?(savepoints) do
0
else
curr_scheduled_savepoint_timestamp = min_timestamp(savepoints)
curr_scheduled_savepoint = savepoints[curr_scheduled_savepoint_timestamp]
Savepoint.registered_entities_count(curr_scheduled_savepoint)
end
end
@doc """
Saves entity state into all related savepoints.
When savepoint is completed (all registered entities has saved state), this savepoint is returned.
Entity state is saved to all savepoints which timestamp is lower or equal than timestamp of
entity state and that savepoint hasn't collected state of given entity yet.
Entity save works as follows:
- when entity state is saved in time range `<60_000;120_000)` and savepoint with
timestamp `60_000` is the only one incomplete -> entity is saved to 1st savepoint
- when entity state is saved in time range `<120_000;180_000)` and savepoints with
timestamp `60_000` and `120_000` are incomplete -> entity is saved to both 1st and 2nd savepoint
"""
@spec save(t, ScheduleUtils.epoch_ms(), Entity.id(), Entity.state()) ::
{:ok, t} | {:state_complete, Savepoint.t(), t}
def save(%RunState{} = run_state, entity_timestamp, entity_id, entity_state) do
if entity_timestamp <= run_state.latest_collected_savepoint do
# entity state belongs to already persisted savepoint = ignore
{:ok, run_state}
else
{complete_savepoint, run_state} =
run_state
|> maybe_create_missing_savepoints(entity_timestamp)
|> update_affected_savepoints(entity_timestamp, entity_id, entity_state)
|> maybe_pop_complete_savepoint()
case complete_savepoint do
:no_complete_savepoint ->
{:ok, run_state}
complete_savepoint ->
{:state_complete, complete_savepoint, run_state}
end
end
end
# public only for tests
@doc false
@spec incomplete_savepoints(t) :: [ScheduleUtils.epoch_ms()]
def incomplete_savepoints(%RunState{} = run_state) do
Map.keys(run_state.savepoints)
end
# public only for tests
@doc false
@spec fetch_savepoint(t, ScheduleUtils.epoch_ms()) :: {:ok, Savepoint.t()} | :error
def fetch_savepoint(%RunState{} = run_state, savepoint_timestamp) do
Map.fetch(run_state.savepoints, savepoint_timestamp)
end
@spec maybe_create_missing_savepoints(t, ScheduleUtils.epoch_ms()) :: t
defp maybe_create_missing_savepoints(%RunState{} = run_state, timestamp) do
if savepoint_to_save_entity_exists?(run_state, timestamp) do
# there is no savepoint missing
run_state
else
# we need to create all missing savepoints starting with largest incomplete savepoint
# and ending with the one responsible for timestamp
max_savepoint_timestamp = max_timestamp(run_state.savepoints)
# all new savepoints will be based on latest savepoint's registered entities
max_savepoint = run_state.savepoints[max_savepoint_timestamp]
registered_entities = Savepoint.registered_entities(max_savepoint)
# check that there won't be created too many savepoints
# this would happen if there is large gap between savepoints
# we decided that we'll drop all unreached savepoints and keep only the latest
# to prevent waiting and storing too much state in incomplete savepoints
if ScheduleUtils.unreached_savepoints_count(
run_state.schedule,
max_savepoint_timestamp,
timestamp
) >= 5 do
# large gap between savepoints has been detected = keep only the latest incomplete
# savepoint, remove the rest
savepoint_timestamp = ScheduleUtils.previous_savepoint(run_state.schedule, timestamp)
savepoints = %{
savepoint_timestamp => Savepoint.new(registered_entities, savepoint_timestamp)
}
%RunState{run_state | savepoints: savepoints}
else
# large gap hasn't been detected = create savepoint for each unreached savepoint
savepoints =
run_state.schedule
|> ScheduleUtils.unreached_savepoints(max_savepoint_timestamp, timestamp)
|> Enum.map(fn savepoint_timestamp ->
{savepoint_timestamp, Savepoint.new(registered_entities, savepoint_timestamp)}
end)
# and add it to run state savepoints
|> Enum.reduce(run_state.savepoints, fn {savepoint_timestamp, savepoint}, savepoints ->
Map.put(savepoints, savepoint_timestamp, savepoint)
end)
%RunState{run_state | savepoints: savepoints}
end
end
end
@spec savepoint_to_save_entity_exists?(t, ScheduleUtils.epoch_ms()) :: boolean
defp savepoint_to_save_entity_exists?(%RunState{} = run_state, entity_timestamp) do
max_timestamp(run_state.savepoints) + run_state.schedule > entity_timestamp
end
@spec update_affected_savepoints(t, ScheduleUtils.epoch_ms(), Entity.id(), Entity.state()) :: t
defp update_affected_savepoints(%RunState{} = run_state, timestamp, entity_id, entity_state) do
savepoints =
run_state.savepoints
# affected savepoint = savepoint which timestamp is lower or equal to given timestamp
|> Enum.filter(fn {savepoint_timestamp, _savepoint} ->
savepoint_timestamp <= timestamp
end)
# save entity state to these savepoints
|> Enum.map(fn {savepoint_timestamp, savepoint} ->
{savepoint_timestamp, Savepoint.save(savepoint, entity_id, entity_state)}
end)
# and add this updated version of savepoint to run state savepoints
|> Enum.reduce(run_state.savepoints, fn {savepoint_timestamp, savepoint}, savepoints ->
Map.put(savepoints, savepoint_timestamp, savepoint)
end)
%RunState{run_state | savepoints: savepoints}
end
@spec maybe_pop_complete_savepoint(t) :: {:no_complete_savepoint, t} | {Savepoint.t(), t}
defp maybe_pop_complete_savepoint(%RunState{} = run_state) do
complete_savepoints =
run_state.savepoints
# filter savepoints which has complete state
|> Enum.filter(fn {_savepoint_timestamp, savepoint} ->
Savepoint.all_saved?(savepoint)
end)
# and sort them by savepoint timestamp, highest timestamp is 1st in the list
|> Enum.sort_by(&elem(&1, 1), &>=/2)
case complete_savepoints do
[] ->
# there is no complete savepoint
{:no_complete_savepoint, run_state}
[{savepoint_timestamp, complete_savepoint} | _] ->
# there is at least one complete savepoint, take 1st, discard the rest
run_state =
run_state
|> maybe_remove_older_savepoints(savepoint_timestamp)
|> maybe_schedule_savepoint_after(complete_savepoint)
run_state = %RunState{run_state | latest_collected_savepoint: savepoint_timestamp}
{complete_savepoint, run_state}
end
end
@spec maybe_remove_older_savepoints(t, ScheduleUtils.epoch_ms()) :: t
defp maybe_remove_older_savepoints(%RunState{} = run_state, timestamp) do
savepoints =
run_state.savepoints
|> Enum.reject(fn {savepoint_timestamp, _savepoint} -> savepoint_timestamp <= timestamp end)
|> Map.new()
%RunState{run_state | savepoints: savepoints}
end
@spec maybe_schedule_savepoint_after(t, Savepoint.t()) :: t
defp maybe_schedule_savepoint_after(%RunState{} = run_state, %Savepoint{} = base_savepoint) do
# schedule next savepoint only when there is no incomplete one
if Enum.empty?(run_state.savepoints) do
timestamp = ScheduleUtils.next_savepoint(run_state.schedule, base_savepoint.timestamp)
registered_entities = Savepoint.registered_entities(base_savepoint)
savepoint = Savepoint.new(registered_entities, timestamp)
%RunState{run_state | savepoints: Map.put(run_state.savepoints, timestamp, savepoint)}
else
run_state
end
end
@spec construct_entities(pid(), ScheduleUtils.schedule(), ScheduleUtils.epoch_ms(), [
{Entity.id(), Entity.state()}
]) :: [Entity.t()]
defp construct_entities(state_store_pid, schedule, timestamp, entity_defs) do
Enum.map(entity_defs, fn
{entity_id, entity_state} ->
Entity.new(state_store_pid, entity_id, schedule, timestamp, entity_state)
end)
end
@spec max_timestamp(%{ScheduleUtils.epoch_ms() => Savepoint.t()}) :: ScheduleUtils.epoch_ms()
defp max_timestamp(savepoints) do
savepoints
|> Enum.max_by(fn {timestamp, _} -> timestamp end)
|> elem(0)
end
@spec min_timestamp(%{ScheduleUtils.schedule() => Savepoint.t()}) :: ScheduleUtils.epoch_ms()
defp min_timestamp(savepoints) do
savepoints
|> Enum.min_by(fn {timestamp, _} -> timestamp end)
|> elem(0)
end
end