lib/runbox/state_store/entity.ex

defmodule Runbox.StateStore.Entity do
  @moduledoc """
  Module defines struct which acts as state store envelope for any stateful runtime component.
  """

  alias __MODULE__, as: Entity
  alias Runbox.StateStore.ScheduleUtils

  defstruct run_id: nil,
            id: nil,
            state: nil,
            timestamp: nil,
            schedule: nil,
            next_savepoint: nil,
            last_savepoint: nil

  @type id :: String.t() | atom
  @type state :: term
  @type t :: %Entity{
          run_id: String.t(),
          id: id,
          state: state,
          timestamp: ScheduleUtils.epoch_ms(),
          schedule: ScheduleUtils.schedule(),
          next_savepoint: ScheduleUtils.epoch_ms(),
          last_savepoint: ScheduleUtils.epoch_ms()
        }

  @doc "Creates new state store entity"
  @spec new(String.t(), id, ScheduleUtils.schedule(), ScheduleUtils.epoch_ms(), state) :: t
  def new(run_id, id, schedule, timestamp, state) do
    next_savepoint =
      if schedule == :none, do: :none, else: ScheduleUtils.next_savepoint(schedule, timestamp)

    %Entity{
      run_id: run_id,
      id: id,
      schedule: schedule,
      state: state,
      timestamp: timestamp,
      last_savepoint: timestamp,
      next_savepoint: next_savepoint
    }
  end

  @doc "Returns entity id"
  @spec id(t) :: id
  def id(%Entity{id: id}) do
    id
  end

  @doc "Returns entity state"
  @spec state(t) :: state
  def state(%Entity{state: state}) do
    state
  end

  @doc "Returns entity timestamp"
  @spec timestamp(t) :: ScheduleUtils.epoch_ms()
  def timestamp(%Entity{timestamp: timestamp}) do
    timestamp
  end

  @doc "Returns entity schedule"
  @spec schedule(t) :: ScheduleUtils.schedule()
  def schedule(%Entity{schedule: schedule}) do
    schedule
  end

  @doc """
  Set entity `state` at the specified point in time.
  """
  @spec update_state(t, ScheduleUtils.epoch_ms(), state) :: t
  def update_state(%Entity{timestamp: current_ts} = entity, timestamp, state) do
    # Prevent moving back to history due to erroneous timestamp
    new_ts = max(current_ts, timestamp)
    %Entity{entity | timestamp: new_ts, state: state}
  end

  @doc """
  Confirm that all messages with lower timestamps were processed.

  When the timestamp is higher than next savepoint the state is persisted into `StateStore` with
  `StateStore.save/4` and next savepoint target is determined and stored in the entity.
  The state is persisted only once per every savepoint.
  """
  @spec ack_processed_time(t, ScheduleUtils.epoch_ms(), {module, atom, list}) ::
          {:ok, t} | {:error, term}
  def ack_processed_time(%Entity{schedule: schedule} = entity, timestamp, save_mfa) do
    if should_persist?(entity, timestamp) do
      # subtracting 1 from the timestamp, because it says everything up until timestamp is
      # processed, so the last fully processed time is `timestamp - 1` and ScheduleUtils works with
      # processed time
      last_savepoint = ScheduleUtils.previous_savepoint(schedule, timestamp - 1)
      next_savepoint = ScheduleUtils.next_savepoint(schedule, timestamp - 1)

      {mod, fun, args} = save_mfa
      :ok = apply(mod, fun, args ++ [entity, last_savepoint])
      {:ok, %Entity{entity | last_savepoint: last_savepoint, next_savepoint: next_savepoint}}
    else
      {:ok, entity}
    end
  end

  # public only for tests
  @doc false
  @spec latest_persisted_savepoint(t) :: ScheduleUtils.epoch_ms()
  def latest_persisted_savepoint(%Entity{} = entity) do
    entity.last_savepoint
  end

  @spec should_persist?(t, ScheduleUtils.epoch_ms()) :: boolean
  defp should_persist?(%Entity{} = entity, now) do
    # we are beyond the next_savepoint, thus all messages that belong to the savepoint were
    # processed
    now > entity.next_savepoint
  end
end