defmodule Ecspanse.Server do
@moduledoc """
The server is responsible for managing the internal state of the framework,
scheduling and running the Systems, and batching the Events.
The server is started by adding the module that invokes `use Ecspanse` to the supervision tree.
> #### Info {: .info}
> There is only one server instance running per otp app.
> Trying to create another setup module that `use Ecspanse` and
> adding it to the supervision tree will result in an error.
"""
require Ex2ms
require Logger
alias Ecspanse.Frame
alias Ecspanse.System
alias Ecspanse.Util
@typedoc "Debug server state"
@type debug_next_frame :: {:next_frame, Ecspanse.Server.State.t()}
@doc """
Utility function. Returns all the internal state of the framework: `t:Ecspanse.Server.State.t/0`.
This can be useful for debugging systems scheduling and events batching.
> #### This function is intended for use only in testing and development environments. {: .warning}
"""
@spec debug() :: Ecspanse.Server.State.t()
def debug do
GenServer.call(__MODULE__, :debug)
end
@doc """
Utility function. The server switches to test mode.
At the beginning of each frame, a `t:debug_next_frame/0` tupple message will be sent
to the process passed as function argument.
> #### This function is intended for use only in testing and development environments. {: .warning}
"""
@spec test_server(pid()) :: :ok
def test_server(test_pid) do
GenServer.cast(__MODULE__, {:test_server, test_pid})
end
#############################
# INTERNAL STATE #
#############################
defmodule State do
@moduledoc """
The internal state of the framework.
"""
@typedoc "The internal state of the framework."
@type t :: %__MODULE__{
status:
:startup_systems
| :frame_start_systems
| :batch_systems
| :frame_end_systems
| :all_systems_run
| :frame_ended,
frame_timer: :running | :finished,
ecspanse_module: module(),
system_run_conditions_map: map(),
startup_systems: list(Ecspanse.System.t()),
frame_start_systems: list(Ecspanse.System.t()),
batch_systems: list(list(Ecspanse.System.t())),
frame_end_systems: list(Ecspanse.System.t()),
shutdown_systems: list(Ecspanse.System.t()),
scheduled_systems: list(Ecspanse.System.t()),
await_systems: list(reference()),
system_modules: MapSet.t(module()),
last_frame_monotonic_time: integer(),
fps_limit: non_neg_integer(),
delta: non_neg_integer(),
frame_data: Frame.t(),
events_ets_table: atom(),
test: boolean(),
test_pid: pid() | nil
}
@enforce_keys [
:ecspanse_module,
:last_frame_monotonic_time,
:fps_limit,
:delta
]
defstruct status: :startup_systems,
frame_timer: :running,
ecspanse_module: nil,
system_run_conditions_map: %{},
startup_systems: [],
frame_start_systems: [],
batch_systems: [],
frame_end_systems: [],
shutdown_systems: [],
scheduled_systems: [],
await_systems: [],
system_modules: MapSet.new(),
last_frame_monotonic_time: nil,
fps_limit: :unlimited,
delta: 0,
frame_data: %Frame{},
events_ets_table: nil,
test: false,
test_pid: nil
end
### SERVER ###
use GenServer
@doc false
def start_link(payload) do
GenServer.start_link(__MODULE__, payload, name: __MODULE__)
end
@impl true
def init(payload) do
# The main reason for using ETS tables are:
# - keep under control the GenServer memory usage
# - elimitate GenServer bottlenecks. Various Systems or Queries can read directly from the ETS tables.
# This is the main ETS table that holds the components state
# as a list of `{{Ecspanse.Entity.id(), component_module :: module()}, tags :: list(atom()),component_state :: struct()}`
# All processes can read and write to this table. But writing should only be done through Commands.
# The race condition is handled by the System Component locking.
# Commands should validate that only Systems are writing to this table.
:ets.new(Util.components_state_ets_table(), [
:set,
:public,
:named_table,
read_concurrency: true,
write_concurrency: :auto
])
# This is the ETS table that holds the resources state
# as a list of `{resource_module :: module(), resource_state :: struct()}`
# All processes can read and write to this table.
# But writing should only be done through Commands.
# Commands should validate that only Systems are writing to this table.
:ets.new(Util.resources_state_ets_table(), [
:set,
:public,
:named_table,
read_concurrency: true,
write_concurrency: false
])
# These ETS tables stores Events as a list of event structs wraped
# in a tuple {{MyEventModule, key :: any()}, %MyEvent{}}.
# There are 2 event tables that alternate every frame.
# While the events in one are being processed, the other is being filled.
# Every frame, the objects in the processed table are deleted.
# Any process can read and write to this table.
# But the logic responsible to write to this table should check the stored values are actually event structs.
# Before being sent to the Systems, the events are sorted by their inserted_at timestamp, and group in batches.
# The batches are determined by the unicity of the event {EventModule, key} per batch.
Enum.each(Util.events_ets_tables(), fn table ->
:ets.new(table, [
:duplicate_bag,
:public,
:named_table,
read_concurrency: true,
write_concurrency: true
])
end)
# This ETS table holds the value of the currently used events table.
# See the above comment for more details.
# It is optimized for multiple reads
:ets.new(Util.dual_events_ets_table(), [
:set,
:public,
:named_table,
read_concurrency: true,
write_concurrency: false
])
events_ets_table = Util.events_ets_tables() |> List.first()
:ets.insert(
Util.dual_events_ets_table(),
{:current, events_ets_table}
)
Ecspanse.Projection.Supervisor.start_link()
state = %State{
ecspanse_module: payload.ecspanse_module,
last_frame_monotonic_time: Elixir.System.monotonic_time(:millisecond),
delta: 0,
fps_limit: payload.fps_limit,
events_ets_table: events_ets_table
}
# Special system that creates the default resources
create_default_resources_system =
%System{
module: Ecspanse.System.CreateDefaultResources,
queue: :startup_systems,
execution: :sync,
run_conditions: []
}
%Ecspanse.Data{operations: operations} = state.ecspanse_module.setup(%Ecspanse.Data{})
operations = operations ++ [{:add_system, create_default_resources_system}]
state = operations |> Enum.reverse() |> apply_operations(state)
send(self(), :run)
{:ok, state}
end
@impl true
def handle_call(:debug, _from, state) do
{:reply, state, state}
end
@impl true
# WARNING: to be used only for development and testing.
def handle_cast({:test_server, test_pid}, state) do
state = %{
state
| test: true,
test_pid: test_pid
}
{:noreply, state}
end
@impl true
def handle_info(:run, state) do
# there are no events during startup
event_batches = []
state = %{
state
| scheduled_systems: state.startup_systems,
frame_data: %Frame{event_batches: event_batches}
}
:ets.delete_all_objects(state.events_ets_table)
Util.swithch_events_ets_table()
send(self(), :run_next_system)
{:noreply, state}
end
def handle_info(:start_frame, state) do
# use monotonic time
# https://til.hashrocket.com/posts/k6kydebcau-precise-timings-with-monotonictime
frame_monotonic_time = Elixir.System.monotonic_time(:millisecond)
delta = frame_monotonic_time - state.last_frame_monotonic_time
event_batches =
state.events_ets_table
|> :ets.tab2list()
|> batch_events()
# Delete all events from the ETS table
:ets.delete_all_objects(state.events_ets_table)
# Frame limit
# in order to finish a frame, two conditions must be met:
# 1. the frame time must pass: eg 1000/60 milliseconds.
# . this sets the frame_timer: from :running to :finished
# 2. all the frame systems must have finished running, and all the projections must have finished updating
# . this sets the status: to :frame_ended,
# So, when state.frame_timer == :finished && state.status == :frame_ended, the frame is finished
one_sec = 1000
limit = if state.fps_limit == :unlimited, do: 0, else: one_sec / state.fps_limit
# the systems run conditions are refreshed every frame
# this is intentional behaviour for performance reasons
# but also to avoid inconsistencies in the components
state = refresh_system_run_conditions_map(state)
state = %{
state
| status: :frame_start_systems,
frame_timer: :running,
scheduled_systems: state.frame_start_systems,
last_frame_monotonic_time: frame_monotonic_time,
delta: delta,
frame_data: %Frame{
delta: delta,
event_batches: event_batches
}
}
Process.send_after(self(), :finish_frame_timer, round(limit))
send(self(), :run_next_system)
# if the test_server/1 function is called, send the state to the test process
if state.test do
send(state.test_pid, {:next_frame, state})
end
{:noreply, state}
end
# finished running strartup systems (sync) and starting the loop
def handle_info(
:run_next_system,
%State{scheduled_systems: [], status: :startup_systems} = state
) do
send(self(), :start_frame)
{:noreply, state}
end
# finished running systems at the beginning of the frame (sync) and scheduling the batch systems
def handle_info(
:run_next_system,
%State{scheduled_systems: [], status: :frame_start_systems} = state
) do
state = %{state | status: :batch_systems, scheduled_systems: state.batch_systems}
send(self(), :run_next_system)
{:noreply, state}
end
# finished running batch systems (async per batch) and scheduling the end of the frame systems
def handle_info(:run_next_system, %State{scheduled_systems: [], status: :batch_systems} = state) do
state = %{state | status: :frame_end_systems, scheduled_systems: state.frame_end_systems}
send(self(), :run_next_system)
{:noreply, state}
end
# finished running systems at the end of the frame (sync) and scheduling the end of frame
def handle_info(
:run_next_system,
%State{scheduled_systems: [], status: :frame_end_systems} = state
) do
send(self(), :finished_running_all_systems)
{:noreply, state}
end
# running batch (async) systems. This runs only for `batch_systems` status
def handle_info(
:run_next_system,
%State{scheduled_systems: [systems_batch | batches], status: :batch_systems} = state
) do
systems_batch = Enum.filter(systems_batch, &run_system?(&1, state.system_run_conditions_map))
case systems_batch do
[] ->
state = %{state | scheduled_systems: batches, await_systems: []}
send(self(), :run_next_system)
{:noreply, state}
systems_batch ->
# Choosing this approach instead of using `Task.async_stream` because
# we don't want to block the server while processing the batch
# Also it re-uses the same code as the sync systems
refs = Enum.map(systems_batch, &run_system(&1, state))
state = %{state | scheduled_systems: batches, await_systems: refs}
{:noreply, state}
end
end
# running sync systems
def handle_info(:run_next_system, %State{scheduled_systems: [system | systems]} = state) do
if run_system?(system, state.system_run_conditions_map) do
ref = run_system(system, state)
state = %{state | scheduled_systems: systems, await_systems: [ref]}
{:noreply, state}
else
state = %{state | scheduled_systems: systems, await_systems: []}
send(self(), :run_next_system)
{:noreply, state}
end
end
# systems finished running and triggering next. The message is sent by the Task
def handle_info({ref, :finished_system_execution}, %State{await_systems: [ref]} = state)
when is_reference(ref) do
Process.demonitor(ref, [:flush])
state = %State{state | await_systems: []}
send(self(), :run_next_system)
{:noreply, state}
end
def handle_info(
{ref, :finished_system_execution},
%State{await_systems: system_refs} = state
)
when is_reference(ref) do
unless ref in system_refs do
raise "Received System message from unexpected System: #{inspect(ref)}"
end
Process.demonitor(ref, [:flush])
state = %State{state | await_systems: List.delete(system_refs, ref)}
{:noreply, state}
end
# finishing the frame systems execution
# scheduing projections
def handle_info(:finished_running_all_systems, state) do
Task.async(fn ->
projection_pids =
DynamicSupervisor.which_children(Ecspanse.Projection.Supervisor)
|> Enum.map(fn {_, pid, _, _} -> pid end)
Task.async_stream(
projection_pids,
fn pid ->
# Ensure against race conditions when the projection was terminated
try do
GenServer.call(pid, :update)
catch
:exit, {:noproc, {GenServer, :call, _}} ->
:ok
end
end,
ordered: false,
max_concurrency: length(projection_pids) + 1
)
|> Stream.run()
:finished_projection_updates
end)
state = %State{
state
| status: :all_systems_run
}
{:noreply, state}
end
# finished the frame projection updates
def handle_info(
{ref, :finished_projection_updates},
state
)
when is_reference(ref) do
Process.demonitor(ref, [:flush])
state = %State{state | status: :frame_ended}
if state.frame_timer == :finished do
events_ets_table = Util.events_ets_table()
Util.swithch_events_ets_table()
state = %State{state | events_ets_table: events_ets_table}
send(self(), :start_frame)
{:noreply, state}
else
{:noreply, state}
end
end
# finished the frame timer
def handle_info(:finish_frame_timer, state) do
state = %State{state | frame_timer: :finished}
if state.status == :frame_ended do
events_ets_table = Util.events_ets_table()
Util.swithch_events_ets_table()
state = %State{state | events_ets_table: events_ets_table}
send(self(), :start_frame)
{:noreply, state}
else
{:noreply, state}
end
end
@impl true
def terminate(_reason, state) do
# Running shutdown_systems. Those cannot run in the standard way because the process is shutting down.
# They are executed sync, in the ordered they were added.
Enum.each(state.shutdown_systems, fn system ->
task =
Task.async(fn ->
prepare_system_process(system)
system.module.run(state.frame_data)
end)
Task.await(task)
end)
end
### HELPER ###
defp run_system(system, state) do
%Task{ref: ref} =
Task.async(fn ->
prepare_system_process(system)
system.module.schedule_run(state.frame_data)
:finished_system_execution
end)
ref
end
# This happens in the System process
defp prepare_system_process(system) do
Process.put(:ecs_process_type, :system)
Process.put(:system_execution, system.execution)
Process.put(:system_module, system.module)
Process.put(:locked_components, system.module.__locked_components__())
end
defp apply_operations([], state), do: state
defp apply_operations([operation | operations], state) do
%State{} = state = apply_operation(operation, state)
apply_operations(operations, state)
end
# batch async systems
defp apply_operation(
{:add_system,
%System{queue: :batch_systems, module: system_module, run_after: []} = system},
state
) do
state = validate_unique_system(system_module, state)
batch_systems = Map.get(state, :batch_systems)
# should return a list of lists
new_batch_systems = batch_system(system, batch_systems, [])
Map.put(state, :batch_systems, new_batch_systems)
|> Map.put(
:system_run_conditions_map,
add_to_system_run_conditions_map(state.system_run_conditions_map, system)
)
end
defp apply_operation(
{:add_system,
%System{queue: :batch_systems, module: system_module, run_after: after_systems} = system},
state
) do
state = validate_unique_system(system_module, state)
batch_systems = Map.get(state, :batch_systems)
system_modules = batch_systems |> List.flatten() |> Enum.map(& &1.module)
non_exising_systems = after_systems -- system_modules
if length(non_exising_systems) > 0 do
raise "Systems #{inspect(non_exising_systems)} does not exist. A system can run only after existing systems"
end
# should return a list of lists
new_batch_systems = batch_system_after(system, after_systems, batch_systems, [])
Map.put(state, :batch_systems, new_batch_systems)
|> Map.put(
:system_run_conditions_map,
add_to_system_run_conditions_map(state.system_run_conditions_map, system)
)
end
# add sequential systems to their queues
defp apply_operation(
{:add_system, %System{queue: queue, module: system_module} = system},
state
) do
state = validate_unique_system(system_module, state)
Map.put(state, queue, Map.get(state, queue) ++ [system])
|> Map.put(
:system_run_conditions_map,
add_to_system_run_conditions_map(state.system_run_conditions_map, system)
)
end
defp validate_unique_system(system_module, state) do
Ecspanse.Util.validate_ecs_type(
system_module,
:system,
ArgumentError,
"The module #{inspect(system_module)} must be a System"
)
if MapSet.member?(state.system_modules, system_module) do
raise "System #{inspect(system_module)} already exists. Server systems must be unique."
end
%State{state | system_modules: MapSet.put(state.system_modules, system_module)}
end
defp batch_system(system, [], []) do
[[system]]
end
defp batch_system(system, [], checked_batches) do
checked_batches ++ [[system]]
end
defp batch_system(system, [batch | batches], checked_batches) do
system_locked_components = system.module.__locked_components__()
batch_locked_components =
Enum.map(batch, & &1.module.__locked_components__()) |> List.flatten()
if batch_locked_components -- system_locked_components == batch_locked_components do
updated_batch = batch ++ [system]
checked_batches ++ [updated_batch] ++ batches
else
batch_system(system, batches, checked_batches ++ [batch])
end
end
defp batch_system_after(system, [] = _after_systems, remaining_batches, checked_batches) do
batch_system(system, remaining_batches, checked_batches)
end
defp batch_system_after(system, after_system_modules, [batch | batches], checked_batches) do
remaining_after_systems = after_system_modules -- Enum.map(batch, & &1.module)
batch_system_after(
system,
remaining_after_systems,
batches,
checked_batches ++ [batch]
)
end
# builds a map with all running conditions from all systems
# this allows to run the conditions only per frame
defp add_to_system_run_conditions_map(
existing_conditions,
%{run_conditions: run_conditions} = _system
) do
run_conditions
|> Enum.reduce(existing_conditions, fn condition, acc ->
# Adding false as initial value for the condition
# because this cannot run on startup systems
# this will be updated in the refresh_system_run_conditions_map
Map.put(acc, condition, false)
end)
end
# takes state and returns state
defp refresh_system_run_conditions_map(state) do
state.system_run_conditions_map
|> Enum.reduce(
state,
fn {{module, function, args} = condition, _value}, state ->
result = apply(module, function, args)
unless is_boolean(result) do
raise "System run condition functions must return a boolean. Got: #{inspect(result)}. For #{inspect({module, function, args})}."
end
%State{
state
| system_run_conditions_map: Map.put(state.system_run_conditions_map, condition, result)
}
end
)
end
defp run_system?(system, run_conditions_map) do
Enum.all?(system.run_conditions, fn condition ->
Map.get(run_conditions_map, condition) == true
end)
end
defp batch_events(events) do
# inserted_at is the System time in milliseconds when the event was created
events
|> Enum.sort_by(fn {_k, v} -> v.inserted_at end, &</2)
|> do_event_batches([])
end
defp do_event_batches([], batches), do: batches
defp do_event_batches(events, batches) do
current_events = Enum.uniq_by(events, fn {{_module, batch_key}, _v} -> batch_key end)
batch =
Enum.map(current_events, fn {_, v} -> v end)
remaining_events = events -- current_events
do_event_batches(remaining_events, batches ++ [batch])
end
end