# Reactor Module
The `erlang.reactor` module provides low-level FD-based protocol handling for building custom servers. It enables Python to implement protocol logic while Erlang handles I/O scheduling via `enif_select`.
## Overview
The reactor pattern separates I/O multiplexing (handled by Erlang) from protocol logic (handled by Python). This provides:
- **Efficient I/O** - Erlang's `enif_select` for event notification
- **Protocol flexibility** - Python implements the protocol state machine
- **Zero-copy buffers** - ReactorBuffer provides zero-copy data access via buffer protocol
- **Works with any fd** - TCP, UDP, Unix sockets, pipes, etc.
### Architecture
```
┌──────────────────────────────────────────────────────────────────────┐
│ Reactor Architecture │
├──────────────────────────────────────────────────────────────────────┤
│ │
│ Erlang (BEAM) Python │
│ ───────────── ────── │
│ │
│ ┌─────────────────────┐ ┌─────────────────────────────┐ │
│ │ py_reactor_context │ │ erlang.reactor │ │
│ │ │ │ │ │
│ │ accept() ──────────┼──fd_handoff─▶│ init_connection(fd, info) │ │
│ │ │ │ │ │ │
│ │ enif_select(READ) │ │ ▼ │ │
│ │ │ │ │ Protocol.connection_made() │ │
│ │ ▼ │ │ │ │
│ │ {select, fd, READ} │ │ │ │
│ │ │ │ │ │ │
│ │ └─────────────┼─on_read_ready│ Protocol.data_received() │ │
│ │ │ │ │ │ │
│ │ action = "write_ │◀─────────────┼───────┘ │ │
│ │ pending" │ │ │ │
│ │ │ │ │ │ │
│ │ enif_select(WRITE) │ │ │ │
│ │ │ │ │ │ │
│ │ ▼ │ │ │ │
│ │ {select, fd, WRITE}│ │ │ │
│ │ │ │ │ │ │
│ │ └─────────────┼on_write_ready│ Protocol.write_ready() │ │
│ │ │ │ │ │
│ └─────────────────────┘ └─────────────────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────────┘
```
## Protocol Base Class
The `Protocol` class is the base for implementing custom protocols:
```python
import erlang.reactor as reactor
class Protocol(reactor.Protocol):
"""Base protocol attributes and methods."""
# Set by reactor on connection
fd: int # File descriptor
client_info: dict # Connection metadata from Erlang
write_buffer: bytearray # Buffer for outgoing data
closed: bool # Whether connection is closed
```
### Lifecycle Methods
#### `connection_made(fd, client_info)`
Called when a file descriptor is handed off from Erlang.
```python
def connection_made(self, fd: int, client_info: dict):
"""Called when fd is handed off from Erlang.
Args:
fd: File descriptor for the connection
client_info: Dict with connection metadata
- 'addr': Client IP address
- 'port': Client port
- 'type': Connection type (tcp, udp, unix, etc.)
"""
# Initialize per-connection state
self.request_buffer = bytearray()
```
#### `data_received(data) -> action`
Called when data has been read from the fd. The `data` argument is a `ReactorBuffer` - a bytes-like object that supports zero-copy access via the buffer protocol.
```python
def data_received(self, data: bytes) -> str:
"""Handle received data.
Args:
data: A bytes-like object (ReactorBuffer) supporting:
- Buffer protocol: memoryview(data) for zero-copy access
- Indexing/slicing: data[0], data[0:10]
- Bytes methods: data.startswith(), data.find(), data.decode()
- Comparison: data == b'...'
- Conversion: bytes(data) creates a copy
Returns:
Action string indicating what to do next
"""
self.request_buffer.extend(data)
if self.request_complete():
self.prepare_response()
return "write_pending"
return "continue" # Need more data
```
#### `write_ready() -> action`
Called when the fd is ready for writing.
```python
def write_ready(self) -> str:
"""Handle write readiness.
Returns:
Action string indicating what to do next
"""
if not self.write_buffer:
return "read_pending"
written = self.write(bytes(self.write_buffer))
del self.write_buffer[:written]
if self.write_buffer:
return "continue" # More to write
return "read_pending" # Done writing
```
#### `connection_lost()`
Called when the connection is closed.
```python
def connection_lost(self):
"""Called when connection closes.
Override to perform cleanup.
"""
# Clean up resources
self.cleanup()
```
### I/O Helper Methods
#### `read(size) -> bytes`
Read from the file descriptor:
```python
data = self.read(65536) # Read up to 64KB
if not data:
return "close" # EOF or error
```
#### `write(data) -> int`
Write to the file descriptor:
```python
written = self.write(response_bytes)
del self.write_buffer[:written] # Remove written bytes
```
## Zero-Copy ReactorBuffer
The `data` argument passed to `data_received()` is a `ReactorBuffer` - a special bytes-like type that provides zero-copy access to read data. The data is read by the NIF before acquiring the GIL, and wrapped in a ReactorBuffer that exposes the memory via Python's buffer protocol.
### Benefits
- **No data copying** - Data goes directly from kernel to Python without intermediate copies
- **Transparent compatibility** - ReactorBuffer acts like `bytes` in all common operations
- **Memory efficiency** - Large payloads don't require extra allocations
### Supported Operations
ReactorBuffer supports all common bytes operations:
```python
def data_received(self, data):
# Buffer protocol - zero-copy access
mv = memoryview(data)
first_byte = mv[0]
# Indexing and slicing
header = data[0:4]
last_byte = data[-1]
# Bytes methods
if data.startswith(b'GET'):
method = 'GET'
pos = data.find(b'\r\n')
count = data.count(b'/')
# String conversion
text = data.decode('utf-8')
# Comparison
if data == b'PING':
self.write_buffer.extend(b'PONG')
# Convert to bytes (creates a copy)
data_copy = bytes(data)
# 'in' operator
if b'HTTP' in data:
self.handle_http()
# Length
size = len(data)
# Extend bytearray (uses buffer protocol)
self.request_buffer.extend(data)
return "continue"
```
### Performance Considerations
The zero-copy benefit is in the NIF read path - data is read directly into a buffer that Python wraps without copying. This avoids the overhead of creating a Python bytes object for every read.
- **NIF read path**: Data goes directly from kernel to Python without intermediate copies
- **Parsing operations**: `startswith()`, `find()` etc. are optimized C implementations
- **Direct memoryview access**: Use `data.memoryview()` for maximum zero-copy performance
- **Creating bytes**: Call `bytes(data)` only when you need a persistent copy
```python
# For maximum performance, use memoryview slicing for comparisons
mv = data.memoryview()
if mv[:3] == b'GET':
# Process GET request
```
## Action Return Values
Protocol methods return action strings that tell Erlang what to do next:
| Action | Description | Erlang Behavior |
|--------|-------------|-----------------|
| `"continue"` | Keep current mode | Re-register same event |
| `"write_pending"` | Ready to write | Switch to write mode (`enif_select` WRITE) |
| `"read_pending"` | Ready to read | Switch to read mode (`enif_select` READ) |
| `"close"` | Close connection | Close fd and call `connection_lost()` |
## Factory Pattern
Register a protocol factory to create protocol instances for each connection:
```python
import erlang.reactor as reactor
class MyProtocol(reactor.Protocol):
# ... implementation
# Set the factory - called for each new connection
reactor.set_protocol_factory(MyProtocol)
# Get the protocol instance for an fd
proto = reactor.get_protocol(fd)
```
## Complete Example: Echo Protocol
Here's a complete echo server protocol:
```python
import erlang.reactor as reactor
class EchoProtocol(reactor.Protocol):
"""Simple echo protocol - sends back whatever it receives."""
def connection_made(self, fd, client_info):
super().connection_made(fd, client_info)
print(f"Connection from {client_info.get('addr')}:{client_info.get('port')}")
def data_received(self, data):
"""Echo received data back to client."""
if not data:
return "close"
# Buffer the data for writing
self.write_buffer.extend(data)
return "write_pending"
def write_ready(self):
"""Write buffered data."""
if not self.write_buffer:
return "read_pending"
written = self.write(bytes(self.write_buffer))
del self.write_buffer[:written]
if self.write_buffer:
return "continue" # More data to write
return "read_pending" # Done, wait for more input
def connection_lost(self):
print(f"Connection closed: fd={self.fd}")
# Register the factory
reactor.set_protocol_factory(EchoProtocol)
```
## Example: HTTP Protocol (Simplified)
```python
import erlang.reactor as reactor
class SimpleHTTPProtocol(reactor.Protocol):
"""Minimal HTTP/1.0 protocol."""
def __init__(self):
super().__init__()
self.request_buffer = bytearray()
def data_received(self, data):
self.request_buffer.extend(data)
# Check for end of headers
if b'\r\n\r\n' in self.request_buffer:
self.handle_request()
return "write_pending"
return "continue"
def handle_request(self):
"""Parse request and prepare response."""
request = self.request_buffer.decode('utf-8', errors='replace')
first_line = request.split('\r\n')[0]
method, path, _ = first_line.split(' ', 2)
# Simple response
body = f"Hello! You requested {path}"
response = (
f"HTTP/1.0 200 OK\r\n"
f"Content-Length: {len(body)}\r\n"
f"Content-Type: text/plain\r\n"
f"\r\n"
f"{body}"
)
self.write_buffer.extend(response.encode())
def write_ready(self):
if not self.write_buffer:
return "close" # HTTP/1.0: close after response
written = self.write(bytes(self.write_buffer))
del self.write_buffer[:written]
if self.write_buffer:
return "continue"
return "close"
reactor.set_protocol_factory(SimpleHTTPProtocol)
```
## Passing Sockets from Erlang to Python
### Method 1: Socket FD Handoff to Reactor
The most efficient way is to hand off the socket's file descriptor directly:
```erlang
%% Erlang: Accept and hand off to Python reactor
{ok, ClientSock} = gen_tcp:accept(ListenSock),
{ok, {Addr, Port}} = inet:peername(ClientSock),
%% Get the raw file descriptor
{ok, Fd} = inet:getfd(ClientSock),
%% Hand off to Python - Erlang no longer owns this socket
py_reactor_context:handoff(Fd, #{
addr => inet:ntoa(Addr),
port => Port,
type => tcp
}).
```
```python
# Python: Protocol handles the fd
import erlang.reactor as reactor
class MyProtocol(reactor.Protocol):
def data_received(self, data):
# self.fd is the socket fd from Erlang
self.write_buffer.extend(b"Got: " + data)
return "write_pending"
reactor.set_protocol_factory(MyProtocol)
```
### Method 2: Pass Socket FD to asyncio
For asyncio-based code, pass the fd and wrap it in Python:
```erlang
%% Erlang: Get fd and pass to Python
{ok, ClientSock} = gen_tcp:accept(ListenSock),
{ok, Fd} = inet:getfd(ClientSock),
%% Call Python with the fd
Ctx = py:context(1),
py:call(Ctx, my_handler, handle_connection, [Fd]).
```
```python
# Python: Wrap fd in asyncio
import asyncio
import socket
async def handle_connection(fd: int):
# Create socket from fd (Python takes ownership)
sock = socket.socket(fileno=fd)
sock.setblocking(False)
# Use asyncio streams
reader, writer = await asyncio.open_connection(sock=sock)
data = await reader.read(1024)
writer.write(b"Echo: " + data)
await writer.drain()
writer.close()
await writer.wait_closed()
def handle_connection_sync(fd: int):
"""Sync wrapper for Erlang call."""
asyncio.run(handle_connection(fd))
```
### Method 3: Pass Socket Object via Pickle (Not Recommended)
For simple cases, you can pickle socket info, but this is less efficient:
```erlang
%% Erlang: Pass connection info
{ok, {Addr, Port}} = inet:peername(ClientSock),
py:call(Ctx, my_handler, connect_to, [Addr, Port]).
```
```python
# Python: Create new connection (less efficient - new socket)
import socket
def connect_to(addr: str, port: int):
sock = socket.create_connection((addr, port))
# ... use socket
```
### Socket Ownership
When passing an fd from Erlang to Python, you must decide who owns it:
**Option 1: Transfer ownership to Python**
Erlang gives up the fd entirely. Don't close the Erlang socket.
```erlang
{ok, ClientSock} = gen_tcp:accept(ListenSock),
{ok, Fd} = inet:getfd(ClientSock),
py_reactor_context:handoff(Fd, #{type => tcp}).
%% Don't close ClientSock - Python owns the fd now
```
**Option 2: Duplicate the fd (recommended)**
Use `py:dup_fd/1` to create an independent copy. Both sides can close their own fd.
```erlang
{ok, ClientSock} = gen_tcp:accept(ListenSock),
{ok, Fd} = inet:getfd(ClientSock),
{ok, DupFd} = py:dup_fd(Fd),
py_reactor_context:handoff(DupFd, #{type => tcp}),
gen_tcp:close(ClientSock). %% Safe - Python has its own fd copy
```
This is safer because:
1. **Erlang controls its socket lifecycle** - GC won't affect Python
2. **Python has its own fd** - Independent of Erlang's socket
3. **No double-close issues** - Each side manages its own fd
## Integration with Erlang
### From Erlang: Starting a Reactor Server
```erlang
%% In your Erlang code
-module(my_server).
-export([start/1]).
start(Port) ->
%% Set up the Python protocol factory first
Ctx = py:context(1),
ok = py:exec(Ctx, <<"
import erlang.reactor as reactor
from my_protocols import MyProtocol
reactor.set_protocol_factory(MyProtocol)
">>),
%% Start accepting connections
{ok, ListenSock} = gen_tcp:listen(Port, [binary, {active, false}, {reuseaddr, true}]),
accept_loop(ListenSock).
accept_loop(ListenSock) ->
{ok, ClientSock} = gen_tcp:accept(ListenSock),
{ok, {Addr, Port}} = inet:peername(ClientSock),
%% Hand off to Python reactor
{ok, Fd} = inet:getfd(ClientSock),
py_reactor_context:handoff(Fd, #{
addr => inet:ntoa(Addr),
port => Port,
type => tcp
}),
accept_loop(ListenSock).
```
### How FDs Are Passed from Erlang to Python
1. Erlang accepts a connection and gets the socket fd
2. Erlang calls `py_reactor_context:handoff(Fd, ClientInfo)`
3. The NIF calls Python's `reactor.init_connection(fd, client_info)`
4. Protocol factory creates a new Protocol instance
5. `enif_select` is registered for read events on the fd
6. When events occur, Python callbacks handle the protocol logic
## Module API Reference
### `set_protocol_factory(factory)`
Set the factory function for creating protocols.
```python
reactor.set_protocol_factory(MyProtocol)
# or with a custom factory
reactor.set_protocol_factory(lambda: MyProtocol(custom_arg))
```
### `get_protocol(fd)`
Get the protocol instance for a file descriptor.
```python
proto = reactor.get_protocol(fd)
if proto:
print(f"Protocol state: {proto.client_info}")
```
### `init_connection(fd, client_info)`
Internal - called by NIF on fd handoff.
### `on_read_ready(fd, data)`
Internal - called by NIF when fd is readable. The `data` argument is a `ReactorBuffer` containing the bytes read from the fd.
### `on_write_ready(fd)`
Internal - called by NIF when fd is writable.
### `close_connection(fd)`
Internal - called by NIF to close connection.
## Subinterpreter Support
The reactor supports isolated subinterpreters via `py_reactor_context`. Each subinterpreter has its own reactor module cache, ensuring protocol factories are isolated between contexts.
```erlang
%% Create context with subinterpreter mode
{ok, Ctx1} = py_reactor_context:start_link(1, subinterp, #{
setup_code => <<"
import erlang.reactor as reactor
reactor.set_protocol_factory(EchoProtocol)
">>
}),
%% Create another context with different protocol
{ok, Ctx2} = py_reactor_context:start_link(2, subinterp, #{
setup_code => <<"
import erlang.reactor as reactor
reactor.set_protocol_factory(HttpProtocol)
">>
}).
```
Each context runs in its own subinterpreter with isolated protocol factory and connection state. This enables running multiple protocol handlers in the same BEAM VM without interference.
## See Also
- [Asyncio](asyncio.md) - Higher-level asyncio event loop for Python
- [Security](security.md) - Security sandbox documentation
- [Getting Started](getting-started.md) - Basic usage guide