Skip to main content

guides/memory_pressure.md

# Memory Pressure Monitoring

Memory pressure monitoring enables adaptive behavior when system memory becomes constrained. This guide covers the monitoring architecture, pressure levels, and integration patterns.

## Overview

The `reckon_db_memory` module provides:

| Function | Purpose |
|----------|---------|
| `start_link/0,1` | Start the memory monitor |
| `level/0` | Get current pressure level |
| `configure/1` | Update thresholds |
| `on_pressure_change/1` | Register callback |
| `remove_callback/1` | Remove callback |
| `get_stats/0` | Get memory statistics |
| `check_now/0` | Force immediate check |

## Architecture

![Memory Pressure Levels](assets/memory_levels.svg)

## Pressure Levels

| Level | Default Threshold | Description |
|-------|-------------------|-------------|
| `normal` | < 70% | Full caching, all features enabled |
| `elevated` | 70-85% | Reduce cache sizes, flush more often |
| `critical` | > 85% | Pause non-essential operations, aggressive cleanup |

## Starting the Monitor

### Default Configuration

```erlang
{ok, _Pid} = reckon_db_memory:start_link().
```

### Custom Configuration

```erlang
{ok, _Pid} = reckon_db_memory:start_link(#{
    elevated_threshold => 0.60,    %% Trigger elevated at 60%
    critical_threshold => 0.80,    %% Trigger critical at 80%
    check_interval => 5000         %% Check every 5 seconds
}).
```

## Checking Pressure Level

### Current Level

```erlang
Level = reckon_db_memory:level(),
%% normal | elevated | critical
```

### With Statistics

```erlang
Stats = reckon_db_memory:get_stats(),
%% #{
%%     level => normal,
%%     memory_used => 2147483648,      %% bytes
%%     memory_total => 4294967296,     %% bytes
%%     last_check => 1735689600000,    %% timestamp
%%     callback_count => 3
%% }
```

## Registering Callbacks

### On Pressure Change

```erlang
CallbackRef = reckon_db_memory:on_pressure_change(fun(Level) ->
    case Level of
        normal ->
            logger:info("Memory pressure normalized"),
            restore_full_functionality();
        elevated ->
            logger:warning("Memory pressure elevated"),
            reduce_cache_sizes();
        critical ->
            logger:error("Memory pressure critical!"),
            pause_non_essential_operations()
    end
end).
```

### Removing Callback

```erlang
ok = reckon_db_memory:remove_callback(CallbackRef).
```

## Memory Detection

### How Memory is Measured

The monitor uses `memsup` (SASL memory supervisor):

```erlang
get_memory_info() ->
    MemData = memsup:get_system_memory_data(),
    case MemData of
        [] ->
            %% Fallback to erlang:memory()
            ErlangMem = erlang:memory(),
            Used = proplists:get_value(total, ErlangMem, 0),
            {Used, Used * 2};
        _ ->
            Total = proplists:get_value(total_memory, MemData, 0),
            Free = proplists:get_value(free_memory, MemData, 0),
            Cached = proplists:get_value(cached_memory, MemData, 0),
            Buffered = proplists:get_value(buffered_memory, MemData, 0),
            Available = Free + Cached + Buffered,
            Used = Total - Available,
            {Used, Total}
    end.
```

### Available Memory Calculation

Available memory includes:
- Free memory
- Cached memory (can be reclaimed)
- Buffered memory (can be reclaimed)

This provides a more accurate picture than just free memory.

## Integration Patterns

### 1. Subscription Backpressure

```erlang
%% In reckon_db_emitter or subscription handler
init(State) ->
    CallbackRef = reckon_db_memory:on_pressure_change(fun(Level) ->
        gen_server:cast(self(), {memory_pressure, Level})
    end),
    {ok, State#{memory_callback => CallbackRef}}.

handle_cast({memory_pressure, critical}, State) ->
    %% Pause subscription delivery
    {noreply, State#{paused => true}};
handle_cast({memory_pressure, _}, State) ->
    %% Resume
    {noreply, State#{paused => false}}.
```

### 2. Cache Management

```erlang
handle_info(check_cache, State) ->
    case reckon_db_memory:level() of
        critical ->
            ets:delete_all_objects(my_cache),
            logger:warning("Cache cleared due to memory pressure");
        elevated ->
            evict_oldest_entries(my_cache, 0.5);  %% Evict 50%
        normal ->
            ok
    end,
    {noreply, State}.
```

### 3. Write Throttling

```erlang
append_with_throttle(Store, Stream, Version, Events) ->
    case reckon_db_memory:level() of
        critical ->
            timer:sleep(100),  %% Slow down writes
            append_with_throttle(Store, Stream, Version, Events);
        elevated ->
            timer:sleep(10),   %% Minor throttle
            reckon_db_streams:append(Store, Stream, Version, Events);
        normal ->
            reckon_db_streams:append(Store, Stream, Version, Events)
    end.
```

## Configuration

### Dynamic Reconfiguration

```erlang
ok = reckon_db_memory:configure(#{
    elevated_threshold => 0.75,
    critical_threshold => 0.90,
    check_interval => 15000
}).
```

### Getting Current Config

```erlang
Config = reckon_db_memory:get_config(),
%% #{
%%     elevated_threshold => 0.70,
%%     critical_threshold => 0.85,
%%     check_interval => 10000
%% }
```

## Forcing Checks

```erlang
%% Force immediate memory check
CurrentLevel = reckon_db_memory:check_now().
```

Useful for:
- Testing pressure response
- After known memory-heavy operations
- Before starting large batch jobs

## Telemetry

Pressure changes emit telemetry:

```erlang
%% Event: [reckon_db, memory, pressure_changed]
%% Measurements:
%%   #{usage_ratio => float()}  %% 0.0 - 1.0
%% Metadata:
%%   #{old_level => atom(), new_level => atom()}
```

### Example Handler

```erlang
telemetry:attach(
    <<"memory-pressure-handler">>,
    [reckon_db, memory, pressure_changed],
    fun(_Event, #{usage_ratio := Ratio}, #{old_level := Old, new_level := New}, _Config) ->
        logger:notice("Memory pressure: ~p -> ~p (~.1f%)",
                      [Old, New, Ratio * 100])
    end,
    #{}
).
```

## Cluster Considerations

### Local Monitoring

Memory pressure is monitored locally on each node:

```
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│     Node 1      │    │     Node 2      │    │     Node 3      │
│ ┌─────────────┐ │    │ ┌─────────────┐ │    │ ┌─────────────┐ │
│ │ Memory      │ │    │ │ Memory      │ │    │ │ Memory      │ │
│ │ Monitor     │ │    │ │ Monitor     │ │    │ │ Monitor     │ │
│ │ (normal)    │ │    │ │ (elevated)  │ │    │ │ (normal)    │ │
│ └─────────────┘ │    │ └─────────────┘ │    │ └─────────────┘ │
└─────────────────┘    └─────────────────┘    └─────────────────┘
```

Each node manages its own memory pressure independently.

### Cross-Node Awareness

For cluster-wide memory awareness, consider:

```erlang
%% Broadcast pressure to other nodes
on_pressure_change(fun(Level) ->
    rpc:multicall(nodes(), my_module, handle_remote_pressure, [node(), Level])
end).
```

## Best Practices

### 1. Register Early

```erlang
%% In application start
start(_Type, _Args) ->
    {ok, _} = reckon_db_memory:start_link(),
    register_pressure_handlers(),
    ...
```

### 2. Graceful Degradation

```erlang
%% Implement graceful degradation, not hard failures
handle_pressure(critical) ->
    %% Don't crash, just slow down
    reduce_batch_sizes(),
    increase_flush_intervals(),
    pause_optional_projections();
handle_pressure(elevated) ->
    %% Warning mode
    reduce_cache_ttls();
handle_pressure(normal) ->
    restore_defaults().
```

### 3. Monitor Recovery

```erlang
%% Track recovery from pressure
handle_pressure_change(OldLevel, NewLevel) ->
    case {OldLevel, NewLevel} of
        {critical, elevated} ->
            logger:info("Memory recovering - still elevated");
        {critical, normal} ->
            logger:info("Memory fully recovered");
        {elevated, critical} ->
            logger:error("Memory degrading to critical");
        _ ->
            ok
    end.
```

### 4. Test Under Pressure

```erlang
%% In tests, simulate pressure
test_critical_behavior() ->
    %% Force critical level
    meck:new(reckon_db_memory, [passthrough]),
    meck:expect(reckon_db_memory, level, fun() -> critical end),

    %% Test behavior
    ?assertEqual(expected_behavior, my_module:do_something()),

    meck:unload(reckon_db_memory).
```

## SASL Configuration

For `memsup` to work, configure SASL:

```erlang
%% In sys.config
{sasl, [
    {sasl_error_logger, {file, "log/sasl.log"}},
    {errlog_type, error}
]},
{os_mon, [
    {start_memsup, true},
    {memsup_system_only, false},
    {memory_check_interval, 60}  %% OS check interval (seconds)
]}
```

## See Also

- [Subscriptions](subscriptions.md) - Event subscription with backpressure
- [Storage Internals](storage_internals.md) - Khepri memory usage
- [Scavenging](scavenging.md) - Free memory by removing old events