defmodule Runbox.StateStore.Entity do
@moduledoc """
Module defines struct which acts as state store envelope for any stateful runtime component.
"""
alias __MODULE__, as: Entity
alias Runbox.StateStore.ScheduleUtils
defstruct run_id: nil,
id: nil,
state: nil,
timestamp: nil,
schedule: nil,
next_savepoint: nil,
last_savepoint: nil
@type id :: String.t() | atom
@type state :: term
@type t :: %Entity{
run_id: String.t(),
id: id,
state: state,
timestamp: ScheduleUtils.epoch_ms(),
schedule: ScheduleUtils.schedule(),
next_savepoint: ScheduleUtils.epoch_ms(),
last_savepoint: ScheduleUtils.epoch_ms()
}
@doc "Creates new state store entity"
@spec new(String.t(), id, ScheduleUtils.schedule(), ScheduleUtils.epoch_ms(), state) :: t
def new(run_id, id, schedule, timestamp, state) do
next_savepoint =
if schedule == :none, do: :none, else: ScheduleUtils.next_savepoint(schedule, timestamp)
%Entity{
run_id: run_id,
id: id,
schedule: schedule,
state: state,
timestamp: timestamp,
last_savepoint: timestamp,
next_savepoint: next_savepoint
}
end
@doc "Returns entity id"
@spec id(t) :: id
def id(%Entity{id: id}) do
id
end
@doc "Returns entity state"
@spec state(t) :: state
def state(%Entity{state: state}) do
state
end
@doc "Returns entity timestamp"
@spec timestamp(t) :: ScheduleUtils.epoch_ms()
def timestamp(%Entity{timestamp: timestamp}) do
timestamp
end
@doc "Returns entity schedule"
@spec schedule(t) :: ScheduleUtils.schedule()
def schedule(%Entity{schedule: schedule}) do
schedule
end
@doc """
Set entity `state` at the specified point in time.
"""
@spec update_state(t, ScheduleUtils.epoch_ms(), state) :: t
def update_state(%Entity{timestamp: current_ts} = entity, timestamp, state) do
# Prevent moving back to history due to erroneous timestamp
new_ts = max(current_ts, timestamp)
%Entity{entity | timestamp: new_ts, state: state}
end
@doc """
Confirm that all messages with lower timestamps were processed.
When the timestamp is higher than next savepoint the state is persisted into `StateStore` with
`StateStore.save/4` and next savepoint target is determined and stored in the entity.
The state is persisted only once per every savepoint.
"""
@spec ack_processed_time(t, ScheduleUtils.epoch_ms(), {module, atom, list}) ::
{:ok, t} | {:error, term}
def ack_processed_time(%Entity{schedule: schedule} = entity, timestamp, save_mfa) do
if should_persist?(entity, timestamp) do
# subtracting 1 from the timestamp, because it says everything up until timestamp is
# processed, so the last fully processed time is `timestamp - 1` and ScheduleUtils works with
# processed time
last_savepoint = ScheduleUtils.previous_savepoint(schedule, timestamp - 1)
next_savepoint = ScheduleUtils.next_savepoint(schedule, timestamp - 1)
{mod, fun, args} = save_mfa
:ok = apply(mod, fun, args ++ [entity, last_savepoint])
{:ok, %Entity{entity | last_savepoint: last_savepoint, next_savepoint: next_savepoint}}
else
{:ok, entity}
end
end
# public only for tests
@doc false
@spec latest_persisted_savepoint(t) :: ScheduleUtils.epoch_ms()
def latest_persisted_savepoint(%Entity{} = entity) do
entity.last_savepoint
end
@spec should_persist?(t, ScheduleUtils.epoch_ms()) :: boolean
defp should_persist?(%Entity{} = entity, now) do
# we are beyond the next_savepoint, thus all messages that belong to the savepoint were
# processed
now > entity.next_savepoint
end
end