defmodule Runbox.StateStore.Entity do
@moduledoc group: :internal
@moduledoc """
Module defines struct which acts as state store envelope for any stateful runtime component.
"""
alias __MODULE__, as: Entity
alias Runbox.StateStore.ScheduleUtils
defstruct run_id: nil,
id: nil,
state: nil,
timestamp: nil,
schedule: nil,
next_savepoint: nil,
last_savepoint: nil
@type id :: String.t() | atom
@type state :: term
@type t(state) :: %Entity{
run_id: String.t(),
id: id,
state: state,
timestamp: ScheduleUtils.epoch_ms(),
schedule: ScheduleUtils.schedule(),
next_savepoint: ScheduleUtils.epoch_ms(),
last_savepoint: ScheduleUtils.epoch_ms()
}
@type t() :: t(state())
@doc "Creates new state store entity"
@spec new(String.t(), id, ScheduleUtils.schedule(), ScheduleUtils.epoch_ms(), state) :: t
def new(run_id, id, schedule, timestamp, state) do
next_savepoint =
if schedule == :none, do: :none, else: ScheduleUtils.next_savepoint(schedule, timestamp)
%Entity{
run_id: run_id,
id: id,
schedule: schedule,
state: state,
timestamp: timestamp,
last_savepoint: timestamp,
next_savepoint: next_savepoint
}
end
@doc "Returns entity id"
@spec id(t) :: id
def id(%Entity{id: id}) do
id
end
@doc "Returns entity state"
@spec state(t) :: state
def state(%Entity{state: state}) do
state
end
@doc "Returns entity timestamp"
@spec timestamp(t) :: ScheduleUtils.epoch_ms()
def timestamp(%Entity{timestamp: timestamp}) do
timestamp
end
@doc "Returns entity schedule"
@spec schedule(t) :: ScheduleUtils.schedule()
def schedule(%Entity{schedule: schedule}) do
schedule
end
@doc """
Set entity `state` at the specified point in time.
"""
@spec update_state(t, ScheduleUtils.epoch_ms(), state) :: t
def update_state(%Entity{timestamp: current_ts} = entity, timestamp, state) do
# Prevent moving back to history due to erroneous timestamp
new_ts = max(current_ts, timestamp)
%Entity{entity | timestamp: new_ts, state: state}
end
@doc """
Confirm that all messages with lower timestamps were processed.
When the timestamp is *higher* than next planned savepoint, the state is persisted into
`StateStore` with `StateStore.save/4` and next savepoint target is determined and stored in the
entity. The state is persisted only once per every savepoint.
Acking a `timestamp` means that the current time is `timestamp`, i.e., all messages with timestamp
lower than `timestamp` were processed, and there may possibly be some messages with timestamp
equal to `timestamp` that were not processed yet.
It is important to call this function *before* processing the message with timestamp `timestamp`,
so that the state of the passed entity belongs to the timestamp before that message was processed.
This is critical for ensuring consistency of savepoints. If this function was called after
processing a message, the timestamp attached to a savepoint would be older than its state.
Optionally you can specify an entity transformation that is applied before save. This
transformation is only execute before save, so it can be expensive. The transformation returns
either `{:ok, entity}` or `{:error, any()}` if there's any error during the transformation
preventing the entity to be saved.
"""
@spec ack_processed_time(
t,
ScheduleUtils.epoch_ms(),
{module, atom, list},
before_save_transform :: (t() -> {:ok, t()} | {:error, reason :: any()}) | nil
) ::
{:ok, t} | {:error, term}
def ack_processed_time(
%Entity{schedule: schedule} = entity,
timestamp,
save_mfa,
before_save_transform \\ nil
) do
if should_persist?(entity, timestamp) do
# subtracting 1 from the timestamp, because it says everything up until timestamp is
# processed, so the last fully processed time is `timestamp - 1` and ScheduleUtils works with
# processed time
last_savepoint = ScheduleUtils.previous_savepoint(schedule, timestamp - 1)
next_savepoint = ScheduleUtils.next_savepoint(schedule, timestamp - 1)
entity_transformation_result =
if is_function(before_save_transform, 1) do
before_save_transform.(entity)
else
{:ok, entity}
end
case entity_transformation_result do
{:ok, transformed_entity} ->
{mod, fun, args} = save_mfa
:ok = apply(mod, fun, args ++ [transformed_entity, last_savepoint])
{:ok, %Entity{entity | last_savepoint: last_savepoint, next_savepoint: next_savepoint}}
error ->
{:error, {:cannot_transform_entity, error}}
end
else
{:ok, entity}
end
end
# public only for tests
@doc false
@spec latest_persisted_savepoint(t) :: ScheduleUtils.epoch_ms()
def latest_persisted_savepoint(%Entity{} = entity) do
entity.last_savepoint
end
@spec should_persist?(t, ScheduleUtils.epoch_ms()) :: boolean
defp should_persist?(%Entity{} = entity, now) do
# we are beyond the next_savepoint, thus all messages that belong to the savepoint were
# processed
now > entity.next_savepoint
end
end