lib/runbox/state_store/entity.ex

defmodule Runbox.StateStore.Entity do
  @moduledoc group: :internal
  @moduledoc """
  Module defines struct which acts as state store envelope for any stateful runtime component.
  """

  alias __MODULE__, as: Entity
  alias Runbox.StateStore.ScheduleUtils

  defstruct run_id: nil,
            id: nil,
            state: nil,
            timestamp: nil,
            schedule: nil,
            next_savepoint: nil,
            last_savepoint: nil

  @type id :: String.t() | atom
  @type state :: term
  @type t(state) :: %Entity{
          run_id: String.t(),
          id: id,
          state: state,
          timestamp: ScheduleUtils.epoch_ms(),
          schedule: ScheduleUtils.schedule(),
          next_savepoint: ScheduleUtils.epoch_ms(),
          last_savepoint: ScheduleUtils.epoch_ms()
        }
  @type t() :: t(state())

  @doc "Creates new state store entity"
  @spec new(String.t(), id, ScheduleUtils.schedule(), ScheduleUtils.epoch_ms(), state) :: t
  def new(run_id, id, schedule, timestamp, state) do
    next_savepoint =
      if schedule == :none, do: :none, else: ScheduleUtils.next_savepoint(schedule, timestamp)

    %Entity{
      run_id: run_id,
      id: id,
      schedule: schedule,
      state: state,
      timestamp: timestamp,
      last_savepoint: timestamp,
      next_savepoint: next_savepoint
    }
  end

  @doc "Returns entity id"
  @spec id(t) :: id
  def id(%Entity{id: id}) do
    id
  end

  @doc "Returns entity state"
  @spec state(t) :: state
  def state(%Entity{state: state}) do
    state
  end

  @doc "Returns entity timestamp"
  @spec timestamp(t) :: ScheduleUtils.epoch_ms()
  def timestamp(%Entity{timestamp: timestamp}) do
    timestamp
  end

  @doc "Returns entity schedule"
  @spec schedule(t) :: ScheduleUtils.schedule()
  def schedule(%Entity{schedule: schedule}) do
    schedule
  end

  @doc """
  Set entity `state` at the specified point in time.
  """
  @spec update_state(t, ScheduleUtils.epoch_ms(), state) :: t
  def update_state(%Entity{timestamp: current_ts} = entity, timestamp, state) do
    # Prevent moving back to history due to erroneous timestamp
    new_ts = max(current_ts, timestamp)
    %Entity{entity | timestamp: new_ts, state: state}
  end

  @doc """
  Confirm that all messages with lower timestamps were processed.

  When the timestamp is *higher* than next planned savepoint, the state is persisted into
  `StateStore` with `StateStore.save/4` and next savepoint target is determined and stored in the
  entity. The state is persisted only once per every savepoint.

  Acking a `timestamp` means that the current time is `timestamp`, i.e., all messages with timestamp
  lower than `timestamp` were processed, and there may possibly be some messages with timestamp
  equal to `timestamp` that were not processed yet.

  It is important to call this function *before* processing the message with timestamp `timestamp`,
  so that the state of the passed entity belongs to the timestamp before that message was processed.
  This is critical for ensuring consistency of savepoints. If this function was called after
  processing a message, the timestamp attached to a savepoint would be older than its state.

  Optionally you can specify an entity transformation that is applied before save. This
  transformation is only execute before save, so it can be expensive. The transformation returns
  either `{:ok, entity}` or `{:error, any()}` if there's any error during the transformation
  preventing the entity to be saved.
  """
  @spec ack_processed_time(
          t,
          ScheduleUtils.epoch_ms(),
          {module, atom, list},
          before_save_transform :: (t() -> {:ok, t()} | {:error, reason :: any()}) | nil
        ) ::
          {:ok, t} | {:error, term}
  def ack_processed_time(
        %Entity{schedule: schedule} = entity,
        timestamp,
        save_mfa,
        before_save_transform \\ nil
      ) do
    if should_persist?(entity, timestamp) do
      # subtracting 1 from the timestamp, because it says everything up until timestamp is
      # processed, so the last fully processed time is `timestamp - 1` and ScheduleUtils works with
      # processed time
      last_savepoint = ScheduleUtils.previous_savepoint(schedule, timestamp - 1)
      next_savepoint = ScheduleUtils.next_savepoint(schedule, timestamp - 1)

      entity_transformation_result =
        if is_function(before_save_transform, 1) do
          before_save_transform.(entity)
        else
          {:ok, entity}
        end

      case entity_transformation_result do
        {:ok, transformed_entity} ->
          {mod, fun, args} = save_mfa
          :ok = apply(mod, fun, args ++ [transformed_entity, last_savepoint])
          {:ok, %Entity{entity | last_savepoint: last_savepoint, next_savepoint: next_savepoint}}

        error ->
          {:error, {:cannot_transform_entity, error}}
      end
    else
      {:ok, entity}
    end
  end

  # public only for tests
  @doc false
  @spec latest_persisted_savepoint(t) :: ScheduleUtils.epoch_ms()
  def latest_persisted_savepoint(%Entity{} = entity) do
    entity.last_savepoint
  end

  @spec should_persist?(t, ScheduleUtils.epoch_ms()) :: boolean
  defp should_persist?(%Entity{} = entity, now) do
    # we are beyond the next_savepoint, thus all messages that belong to the savepoint were
    # processed
    now > entity.next_savepoint
  end
end