# Erlang-native asyncio Event Loop
This guide covers the Erlang-native asyncio event loop implementation that provides high-performance async I/O for Python applications running within erlang_python.
## Overview
The `ErlangEventLoop` is a custom asyncio event loop backed by Erlang's scheduler using `enif_select` for I/O multiplexing. This replaces Python's polling-based event loop with true event-driven callbacks integrated into the BEAM VM.
All asyncio functionality is available through the unified `erlang` module:
```python
import erlang
# Preferred way to run async code
erlang.run(main())
```
### Key Benefits
- **Sub-millisecond latency** - Events are delivered immediately via Erlang messages instead of polling every 10ms
- **Zero CPU usage when idle** - No busy-waiting or polling overhead
- **Full GIL release during waits** - Python's Global Interpreter Lock is released while waiting for events
- **Native Erlang scheduler integration** - I/O events are handled by BEAM's scheduler
### Architecture
```
┌──────────────────────────────────────────────────────────────────────────────┐
│ ErlangEventLoop Architecture │
├──────────────────────────────────────────────────────────────────────────────┤
│ │
│ Python (asyncio) Erlang (BEAM) │
│ ──────────────── ───────────── │
│ │
│ ┌──────────────────┐ ┌────────────────────────────────────┐ │
│ │ ErlangEventLoop │ │ py_event_worker │ │
│ │ │ │ │ │
│ │ call_later() ──┼─{timer,ms,id}─▶│ erlang:send_after(ms, self, {}) │ │
│ │ call_at() │ │ │ │ │
│ │ │ │ ▼ │ │
│ │ add_reader() ──┼──{add_fd,fd}──▶│ enif_select(fd, READ) │ │
│ │ add_writer() │ │ │ │ │
│ │ │ │ ▼ │ │
│ │ │◀──{fd_ready}───│ handle_info({select, ...}) │ │
│ │ │◀──{timeout}────│ handle_info({timeout, ...}) │ │
│ │ │ │ │ │
│ │ _run_once() │ └────────────────────────────────────┘ │
│ │ │ │ │
│ │ ▼ │ ┌────────────────────────────────────┐ │
│ │ process pending │ │ │ │
│ │ callbacks │ │ │ │
│ └──────────────────┘ │ │ │
│ │ │ │
│ ┌──────────────────┐ └────────────────────────────────────┘ │
│ │ asyncio (via │ │
│ │ erlang.run()) │ ┌────────────────────────────────────┐ │
│ │ sleep() │ │ asyncio.sleep() uses call_later() │ │
│ │ gather() │─call_later()──▶│ which triggers erlang:send_after │ │
│ │ wait_for() │ │ │ │
│ │ create_task() │ └────────────────────────────────────┘ │
│ └──────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────────────────┘
```
**Components:**
| Component | Role |
|-----------|------|
| `ErlangEventLoop` | Python asyncio event loop using Erlang for I/O and timers |
| `py_event_worker` | Erlang gen_server handling FDs, timers, and task processing |
| `erlang.run()` | Entry point to run asyncio code with the Erlang event loop |
## Usage Patterns
### Pattern 1: `erlang.run()` (Recommended)
The preferred way to run async code, matching uvloop's API:
```python
import erlang
async def main():
await asyncio.sleep(1.0) # Uses erlang:send_after internally
print("Done!")
# Simple and clean
erlang.run(main())
```
### Pattern 2: With `asyncio.Runner` (Python 3.11+)
```python
import asyncio
import erlang
with asyncio.Runner(loop_factory=erlang.new_event_loop) as runner:
runner.run(main())
```
### Pattern 3: `erlang.install()` (Deprecated in Python 3.12+)
This pattern installs the ErlangEventLoopPolicy globally. It's deprecated in Python 3.12+ because `asyncio.run()` no longer respects global policies:
```python
import asyncio
import erlang
erlang.install() # Deprecated in 3.12+, use erlang.run() instead
asyncio.run(main())
```
### Pattern 4: Manual Loop Management
For cases where you need direct control:
```python
import asyncio
import erlang
loop = erlang.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
finally:
loop.close()
```
## Execution Mode Detection
The `erlang` module can detect the Python execution mode:
```python
from erlang import detect_mode, ExecutionMode
mode = detect_mode()
if mode == ExecutionMode.FREE_THREADED:
print("Running in free-threaded mode (no GIL)")
elif mode == ExecutionMode.SUBINTERP:
print("Running in subinterpreter with per-interpreter GIL")
else:
print("Running with shared GIL")
```
**ExecutionMode values:**
- `FREE_THREADED` - Python 3.13+ with `Py_GIL_DISABLED` (no GIL)
- `SUBINTERP` - Python 3.12+ running in a subinterpreter
- `SHARED_GIL` - Traditional Python with shared GIL
## TCP Support
### Client Connections
Use `create_connection()` to establish TCP client connections:
```python
import asyncio
class EchoClientProtocol(asyncio.Protocol):
def __init__(self, message, on_con_lost):
self.message = message
self.on_con_lost = on_con_lost
def connection_made(self, transport):
transport.write(self.message.encode())
def data_received(self, data):
print(f'Received: {data.decode()}')
def connection_lost(self, exc):
self.on_con_lost.set_result(True)
async def main():
loop = asyncio.get_running_loop()
on_con_lost = loop.create_future()
transport, protocol = await loop.create_connection(
lambda: EchoClientProtocol('Hello, World!', on_con_lost),
host='127.0.0.1',
port=8888
)
try:
await on_con_lost
finally:
transport.close()
```
### TCP Servers
Use `create_server()` to create TCP servers:
```python
import asyncio
class EchoServerProtocol(asyncio.Protocol):
def connection_made(self, transport):
peername = transport.get_extra_info('peername')
print(f'Connection from {peername}')
self.transport = transport
def data_received(self, data):
message = data.decode()
print(f'Received: {message}')
# Echo back
self.transport.write(data)
def connection_lost(self, exc):
print('Connection closed')
async def main():
loop = asyncio.get_running_loop()
server = await loop.create_server(
EchoServerProtocol,
host='127.0.0.1',
port=8888,
reuse_address=True
)
async with server:
await server.serve_forever()
```
### Transport Class
The `_ErlangSocketTransport` class implements the asyncio Transport interface with these features:
- Non-blocking I/O using Erlang's `enif_select`
- Write buffering with automatic drain
- Connection lifecycle management (`connection_made`, `connection_lost`, `eof_received`)
- Extra info access via `get_extra_info()` (socket, sockname, peername)
## UDP/Datagram Support
The event loop provides full UDP/datagram support through `create_datagram_endpoint()`.
### Creating UDP Endpoints
```python
import asyncio
class EchoUDPProtocol(asyncio.DatagramProtocol):
def connection_made(self, transport):
self.transport = transport
print(f'Listening on {transport.get_extra_info("sockname")}')
def datagram_received(self, data, addr):
message = data.decode()
print(f'Received {message!r} from {addr}')
# Echo back to sender
self.transport.sendto(data, addr)
def error_received(self, exc):
print(f'Error received: {exc}')
def connection_lost(self, exc):
print('Connection closed')
async def main():
loop = asyncio.get_running_loop()
# Create UDP server
transport, protocol = await loop.create_datagram_endpoint(
EchoUDPProtocol,
local_addr=('127.0.0.1', 9999)
)
try:
await asyncio.sleep(3600) # Run for an hour
finally:
transport.close()
```
### Parameters
The `create_datagram_endpoint()` method accepts these parameters:
| Parameter | Type | Description |
|-----------|------|-------------|
| `protocol_factory` | callable | Factory function returning a `DatagramProtocol` |
| `local_addr` | tuple | Local `(host, port)` to bind to |
| `remote_addr` | tuple | Remote `(host, port)` to connect to (optional) |
| `family` | int | Socket family (`AF_INET` or `AF_INET6`) |
| `proto` | int | Socket protocol number |
| `flags` | int | `getaddrinfo` flags |
| `reuse_address` | bool | Allow reuse of local address (`SO_REUSEADDR`) |
| `reuse_port` | bool | Allow reuse of local port (`SO_REUSEPORT`) |
| `allow_broadcast` | bool | Allow sending to broadcast addresses (`SO_BROADCAST`) |
| `sock` | socket | Pre-existing socket to use (overrides other options) |
### DatagramProtocol Callbacks
Implement these callbacks in your `DatagramProtocol`:
```python
class MyDatagramProtocol(asyncio.DatagramProtocol):
def connection_made(self, transport):
"""Called when the endpoint is ready."""
self.transport = transport
def datagram_received(self, data, addr):
"""Called when a datagram is received.
Args:
data: The received bytes
addr: The sender's (host, port) tuple
"""
pass
def error_received(self, exc):
"""Called when a send or receive operation fails.
Args:
exc: The exception that occurred
"""
pass
def connection_lost(self, exc):
"""Called when the transport is closed.
Args:
exc: Exception if abnormal close, None otherwise
"""
pass
```
### Connected vs Unconnected UDP
**Unconnected UDP** (default): Each datagram can be sent to any destination:
```python
# Server can send to any client
transport, protocol = await loop.create_datagram_endpoint(
MyProtocol,
local_addr=('0.0.0.0', 9999)
)
# Send to different destinations
transport.sendto(b'Hello', ('192.168.1.100', 5000))
transport.sendto(b'World', ('192.168.1.101', 5000))
```
**Connected UDP**: The endpoint is bound to a specific remote address:
```python
# Client connected to specific server
transport, protocol = await loop.create_datagram_endpoint(
MyProtocol,
remote_addr=('127.0.0.1', 9999)
)
# Can use sendto without address
transport.sendto(b'Hello') # Goes to connected address
```
### Example: UDP Echo Server and Client
**Server:**
```python
import asyncio
from erlang import ErlangEventLoop
class EchoServerProtocol(asyncio.DatagramProtocol):
def connection_made(self, transport):
self.transport = transport
sockname = transport.get_extra_info('sockname')
print(f'UDP Echo Server listening on {sockname}')
def datagram_received(self, data, addr):
print(f'Received {len(data)} bytes from {addr}')
# Echo back
self.transport.sendto(data, addr)
async def run_server():
loop = asyncio.get_running_loop()
transport, _ = await loop.create_datagram_endpoint(
EchoServerProtocol,
local_addr=('127.0.0.1', 9999)
)
try:
await asyncio.sleep(3600)
finally:
transport.close()
asyncio.run(run_server())
```
**Client:**
```python
import asyncio
class EchoClientProtocol(asyncio.DatagramProtocol):
def __init__(self, message, on_response):
self.message = message
self.on_response = on_response
self.transport = None
def connection_made(self, transport):
self.transport = transport
print(f'Sending: {self.message}')
transport.sendto(self.message.encode())
def datagram_received(self, data, addr):
print(f'Received: {data.decode()} from {addr}')
self.on_response.set_result(data)
def error_received(self, exc):
print(f'Error: {exc}')
async def run_client():
loop = asyncio.get_running_loop()
on_response = loop.create_future()
transport, _ = await loop.create_datagram_endpoint(
lambda: EchoClientProtocol('Hello UDP!', on_response),
remote_addr=('127.0.0.1', 9999)
)
try:
response = await asyncio.wait_for(on_response, timeout=5.0)
print(f'Echo response: {response.decode()}')
finally:
transport.close()
asyncio.run(run_client())
```
### Broadcast UDP
Enable broadcast for sending to broadcast addresses:
```python
transport, protocol = await loop.create_datagram_endpoint(
MyProtocol,
local_addr=('0.0.0.0', 0),
allow_broadcast=True
)
# Send to broadcast address
transport.sendto(b'Broadcast message', ('255.255.255.255', 9999))
```
## Performance
The event loop includes several optimizations for high-throughput applications.
### Built-in Optimizations
| Optimization | Description | Impact |
|-------------|-------------|--------|
| **Cached function lookups** | `ast.literal_eval` cached at module init | Avoids import per callback |
| **O(1) timer cancellation** | Handle-to-callback reverse map | Was O(n) iteration |
| **O(1) duplicate detection** | Hash set for pending events | Was O(n) linear scan |
| **Lock-free event consumption** | Detach queue under lock, process outside | Reduced contention |
| **Object pooling** | Reuse event structures via freelist | Fewer allocations |
| **Deque method caching** | Pre-bound `append`/`popleft` methods | Avoids attribute lookup |
### Performance Build
For maximum performance, rebuild with the `PERF_BUILD` cmake option:
```bash
# Clean build with performance optimizations
rm -rf _build/cmake
mkdir -p _build/cmake && cd _build/cmake
cmake ../../c_src -DPERF_BUILD=ON
cmake --build .
```
This enables:
- `-O3` optimization level
- Link-Time Optimization (LTO)
- `-march=native` (CPU-specific optimizations)
- `-ffast-math` and `-funroll-loops`
**Note:** Performance builds are not portable across different CPU architectures due to `-march=native`.
### Benchmarking
Run the event loop benchmarks to measure performance:
```bash
python3 examples/benchmark_event_loop.py
```
Example output:
```
Timer throughput: 150,000 timers/sec
Callback dispatch: 200,000 callbacks/sec
I/O ready detection: <1ms latency
```
## Low-level APIs
The event loop is backed by NIFs (Native Implemented Functions) that provide direct access to Erlang's event system. These are primarily for internal use and testing.
### Event Loop NIFs
From Erlang, you can access the low-level event loop operations:
```erlang
%% Create a new event loop instance
{ok, LoopRef} = py_nif:event_loop_new().
%% Add a reader callback for a file descriptor
{ok, FdRef} = py_nif:add_reader(LoopRef, Fd, CallbackId).
%% Remove a reader
ok = py_nif:remove_reader(LoopRef, FdRef).
%% Poll for events (returns number of events ready)
NumEvents = py_nif:poll_events(LoopRef, TimeoutMs).
%% Get pending callback events
Pending = py_nif:get_pending(LoopRef).
%% Returns: [{CallbackId, read|write}, ...]
%% Destroy the event loop
py_nif:event_loop_destroy(LoopRef).
```
### UDP Socket NIFs (for testing)
```erlang
%% Create a UDP socket bound to a port
{ok, {Fd, Port}} = py_nif:create_test_udp_socket(0). % 0 = ephemeral port
%% Send data via UDP
ok = py_nif:sendto_test_udp(Fd, <<"hello">>, <<"127.0.0.1">>, 9999).
%% Receive data
{ok, {Data, {Host, Port}}} = py_nif:recvfrom_test_udp(Fd, MaxBytes).
%% Set broadcast option
ok = py_nif:set_udp_broadcast(Fd, true).
%% Close the socket
py_nif:close_test_fd(Fd).
```
## Integration with Erlang
The event loop integrates with Erlang's message passing system through a worker process:
```erlang
%% Start the event worker
{ok, LoopRef} = py_nif:event_loop_new(),
{ok, WorkerPid} = py_event_worker:start_link(<<"worker">>, LoopRef),
ok = py_nif:event_loop_set_worker(LoopRef, WorkerPid).
```
Events are delivered as Erlang messages, enabling the event loop to participate in BEAM's supervision trees and distributed computing capabilities.
## Event Loop Architecture
Each `ErlangEventLoop` instance has its own isolated capsule with a dedicated pending queue. This ensures that timers and FD events are properly routed to the correct loop instance.
### Multi-threaded Example
```python
from erlang import ErlangEventLoop
import threading
def run_tasks(loop_id):
"""Each thread gets its own event loop."""
loop = ErlangEventLoop()
results = []
def callback():
results.append(loop_id)
# Schedule callbacks - isolated to this loop
loop.call_soon(callback)
loop.call_later(0.01, callback)
# Process events
import time
deadline = time.time() + 0.5
while time.time() < deadline and len(results) < 2:
loop._run_once()
time.sleep(0.01)
loop.close()
return results
# Run in separate threads
t1 = threading.Thread(target=run_tasks, args=('loop_a',))
t2 = threading.Thread(target=run_tasks, args=('loop_b',))
t1.start()
t2.start()
t1.join()
t2.join()
# Each thread only sees its own callbacks
```
### Internal Architecture
Each event loop has an associated worker process that handles timer and FD events:
```
┌─────────────────────────────────────────────────────────────────┐
│ py_event_worker │
│ │
│ Receives: │
│ - Timer expirations from erlang:send_after │
│ - FD ready events from enif_select │
│ - task_ready messages for processing tasks │
│ │
│ Dispatches events to the loop's pending queue │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌───────────┐
│ Loop │
│ pending │
└───────────┘
```
Each loop has its own pending queue, ensuring callbacks are processed only by the loop that scheduled them. The worker dispatches timer, FD events, and tasks to the correct loop.
## Erlang Timer Integration
When using `erlang.run()` to execute asyncio code, standard asyncio functions like `asyncio.sleep()` are automatically backed by Erlang's native timer system for maximum performance.
### Overview
Unlike Python's standard polling-based event loop, the Erlang event loop uses `erlang:send_after/3` for timers and integrates directly with the BEAM scheduler. This eliminates Python event loop overhead (~0.5-1ms per operation) and provides more precise timing.
### Architecture
```
┌─────────────────────────────────────────────────────────────────────────┐
│ asyncio.sleep() via ErlangEventLoop │
│ │
│ Python Erlang │
│ ────── ────── │
│ │
│ ┌─────────────────┐ ┌─────────────────────────────────┐ │
│ │ asyncio.sleep │ │ py_event_worker │ │
│ │ (0.1) │ │ │ │
│ └────────┬────────┘ │ │ │
│ │ │ │ │
│ ▼ │ │ │
│ ┌─────────────────┐ │ │ │
│ │ ErlangEventLoop │──{timer,100, │ erlang:send_after(100ms) │ │
│ │ call_later() │ Id}─────▶│ │ │ │
│ └────────┬────────┘ │ ▼ │ │
│ │ │ handle_info({timeout, ...}) │ │
│ ┌────────▼────────┐ │ │ │ │
│ │ Yield to event │ │ ▼ │ │
│ │ loop (dirty │ │ py_nif:dispatch_timer() │ │
│ │ scheduler │◀─────────────│ │ │ │
│ │ released) │ callback └─────────┼───────────────────────┘ │
│ └────────┬────────┘ │ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────┐ ┌─────────────────────────────────┐ │
│ │ Resume after │ │ Timer callback dispatched to │ │
│ │ timer fires │ │ Python pending queue │ │
│ └─────────────────┘ └─────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
```
**Key features:**
- **Dirty scheduler released during sleep** - Python yields to event loop, freeing the dirty NIF thread
- **BEAM scheduler integration** - Uses Erlang's native timer system
- **Zero CPU usage** - No polling, event-driven callback
- **Sub-millisecond precision** - Timers managed by BEAM scheduler
### Basic Usage
```python
import erlang
import asyncio
async def my_handler():
# Sleep using Erlang's timer system
await asyncio.sleep(0.1) # 100ms - uses erlang:send_after internally
return "done"
# Run a coroutine with Erlang event loop
result = erlang.run(my_handler())
```
### API Reference
When using `erlang.run()` or the Erlang event loop, all standard asyncio functions work seamlessly with Erlang's backend.
#### erlang.sleep(seconds)
Sleep for the specified duration. Works in both async and sync contexts.
```python
import erlang
# Async context - yields to event loop
async def async_handler():
await erlang.sleep(0.1) # Uses asyncio.sleep() internally
return "done"
# Sync context - blocks Python, releases dirty scheduler
def sync_handler():
erlang.sleep(0.1) # Suspends Erlang process via receive/after
return "done"
```
**Behavior by Context:**
| Context | Mechanism | Effect |
|---------|-----------|--------|
| Async (`await erlang.sleep()`) | `asyncio.sleep()` via `call_later()` | Yields to event loop, dirty scheduler released |
| Sync (`erlang.sleep()`) | `erlang.call('_py_sleep')` with `receive/after` | Blocks Python, Erlang process suspends, dirty scheduler released |
Both modes allow other Erlang processes and Python contexts to run during the sleep.
#### asyncio.sleep(delay)
Sleep for the specified delay. Uses Erlang's `erlang:send_after/3` internally.
```python
import erlang
import asyncio
async def example():
# Simple sleep - uses Erlang timer system
await asyncio.sleep(0.05) # 50ms
erlang.run(example())
```
#### erlang.run(coro)
Run a coroutine to completion using an ErlangEventLoop.
```python
import erlang
import asyncio
async def main():
await asyncio.sleep(0.01)
return 42
result = erlang.run(main())
assert result == 42
```
#### asyncio.gather(*coros, return_exceptions=False)
Run coroutines concurrently and gather results.
```python
import erlang
import asyncio
async def task(n):
await asyncio.sleep(0.01)
return n * 2
async def main():
results = await asyncio.gather(task(1), task(2), task(3))
assert results == [2, 4, 6]
erlang.run(main())
```
#### asyncio.wait_for(coro, timeout)
Wait for a coroutine with a timeout.
```python
import erlang
import asyncio
async def fast_task():
await asyncio.sleep(0.01)
return 'done'
async def main():
try:
result = await asyncio.wait_for(fast_task(), timeout=1.0)
except asyncio.TimeoutError:
print("Task timed out")
erlang.run(main())
```
#### asyncio.create_task(coro, *, name=None)
Create a task to run a coroutine in the background.
```python
import erlang
import asyncio
async def background_work():
await asyncio.sleep(0.1)
return 'background_done'
async def main():
task = asyncio.create_task(background_work())
# Do other work while task runs
await asyncio.sleep(0.05)
# Wait for task to complete
result = await task
assert result == 'background_done'
erlang.run(main())
```
#### erlang.spawn_task(coro, *, name=None)
Spawn an async task from both sync and async contexts. This is useful for fire-and-forget background work from synchronous Python code called by Erlang.
```python
import erlang
# From sync code called by Erlang
def handle_request(data):
# This works even though there's no running event loop
erlang.spawn_task(process_async(data))
return 'ok'
# From async code
async def handler():
# Also works in async context
erlang.spawn_task(background_work())
await other_work()
async def process_async(data):
await asyncio.sleep(0.1)
# Do async processing...
async def background_work():
await asyncio.sleep(0.1)
# Do background work...
```
**Key features:**
- Works in sync context where `asyncio.get_running_loop()` would fail
- Returns `asyncio.Task` for optional await/cancel
- Automatically wakes up the event loop to ensure the task runs promptly
- Works with both ErlangEventLoop and standard asyncio loops
#### asyncio.wait(fs, *, timeout=None, return_when=ALL_COMPLETED)
Wait for multiple futures/tasks.
```python
import erlang
import asyncio
async def main():
tasks = [
asyncio.create_task(asyncio.sleep(0.01))
for i in range(3)
]
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.ALL_COMPLETED
)
assert len(done) == 3
assert len(pending) == 0
erlang.run(main())
```
#### Event Loop Functions
```python
import erlang
import asyncio
# Create a new Erlang-backed event loop
loop = erlang.new_event_loop()
# Set the current event loop
asyncio.set_event_loop(loop)
# Get the running loop (raises RuntimeError if none)
loop = asyncio.get_running_loop()
```
#### Context Manager for Timeouts
```python
import erlang
import asyncio
async def main():
async with asyncio.timeout(1.0):
await slow_operation() # Raises TimeoutError if > 1s
erlang.run(main())
```
### Performance Comparison
| Operation | Standard asyncio | Erlang Event Loop | Improvement |
|-----------|------------------|-------------------|-------------|
| sleep(1ms) | ~1.5ms | ~1.1ms | ~27% faster |
| Event loop overhead | ~0.5-1ms | ~0 | Erlang scheduler |
| Timer precision | 10ms polling | Sub-ms | BEAM scheduler |
| Idle CPU | Polling | Zero | Event-driven |
### When to Use Erlang Event Loop
**Use `erlang.run()` when:**
- You need precise sub-millisecond timing
- Your app makes many small sleep calls
- You want to eliminate Python event loop overhead
- Your app is running inside erlang_python
**Use standard `asyncio.run()` when:**
- You're running outside the Erlang VM
- Testing Python code in isolation
## Async Worker Backend (Internal)
The `py:async_call/3,4` and `py:await/1,2` APIs use an event-driven backend based on `py_event_loop`.
### Architecture
```
┌─────────────┐ ┌─────────────────┐ ┌──────────────────────┐
│ Erlang │ │ C NIF │ │ py_event_loop │
│ py:async_ │ │ (no thread) │ │ (Erlang process) │
│ call() │ │ │ │ │
└──────┬──────┘ └────────┬────────┘ └──────────┬───────────┘
│ │ │
│ 1. Message to │ │
│ event_loop │ │
│─────────────────────┼────────────────────────>│
│ │ │
│ 2. Return Ref │ │
│<────────────────────┼─────────────────────────│
│ │ │
│ │ enif_select (wait) │
│ │ ┌───────────────────┐ │
│ │ │ Run Python │ │
│ │ │ erlang.send(pid, │ │
│ │ │ result) │ │
│ │ └───────────────────┘ │
│ │ │
│ 3. {async_result} │ │
│<──────────────────────────────────────────────│
│ (direct erlang.send from Python) │
│ │ │
```
### Key Components
| Component | Role |
|-----------|------|
| `py_event_loop_pool` | Pool manager for event loop-based async execution |
| `run_async/2` (internal) | Submit coroutine to event loop |
| `_run_and_send` | Python wrapper that sends result via `erlang.send()` |
| `nif_event_loop_run_async` | NIF for direct coroutine submission |
### Performance Benefits
| Aspect | Previous (pthread) | Current (event_loop) |
|--------|-------------------|---------------------|
| Latency | ~10-20ms polling | <1ms (enif_select) |
| CPU idle | 100 wakeups/sec | Zero |
| Threads | N pthreads | 0 extra threads |
| GIL | Acquire/release in thread | Already held in callback |
| Shutdown | pthread_join (blocking) | Clean Erlang messages |
The event-driven model eliminates the polling overhead of the previous pthread+usleep
implementation, resulting in significantly lower latency for async operations.
## Erlang Callbacks from Python
Python code can call registered Erlang functions using `erlang.call()`. This enables Python handlers to leverage Erlang's concurrency and I/O capabilities.
### erlang.call() - Blocking Callbacks
`erlang.call(name, *args)` calls a registered Erlang function and blocks until it returns.
```python
import erlang
def handler():
# Call Erlang function - blocks until complete
result = erlang.call('my_callback', arg1, arg2)
return process(result)
```
**Behavior:**
- Blocks the current Python execution until the Erlang callback completes
- Code executes exactly once (no replay)
- The callback can release the dirty scheduler by using Erlang's `receive` (e.g., `erlang.sleep()`, `channel.receive()`)
- Quick callbacks hold the dirty scheduler; callbacks that wait via `receive` release it
### Explicit Scheduling API
For long-running operations or when you need to release the dirty scheduler, use the explicit scheduling functions. These return `ScheduleMarker` objects that **must be returned from your handler** to take effect.
#### erlang.schedule(callback_name, *args)
Release the dirty scheduler and continue via an Erlang callback.
```python
import erlang
# Register callback in Erlang:
# py_callback:register(<<"compute">>, fun([X]) -> X * 2 end).
def handler(x):
# Returns ScheduleMarker - MUST be returned from handler
return erlang.schedule('compute', x)
# Nothing after this executes - Erlang callback continues
```
The result is transparent to the caller:
```erlang
%% Caller just gets the callback result
{ok, 10} = py:call('__main__', 'handler', [5]).
```
#### erlang.schedule_py(module, func, args=None, kwargs=None)
Release the dirty scheduler and continue by calling a Python function.
```python
import erlang
def compute(x, multiplier=2):
return x * multiplier
def handler(x):
# Schedule Python function - releases dirty scheduler
return erlang.schedule_py('__main__', 'compute', [x], {'multiplier': 3})
```
This is useful for:
- Breaking up long computations
- Allowing other Erlang processes to run
- Cooperative multitasking
#### erlang.schedule_inline(module, func, args=None, kwargs=None)
Release the dirty scheduler and continue by calling a Python function via `enif_schedule_nif()` - bypassing Erlang messaging entirely.
```python
import erlang
def process_chunk(data, offset=0, results=None):
"""Process data in chunks with inline continuations."""
if results is None:
results = []
chunk_end = min(offset + 100, len(data))
for i in range(offset, chunk_end):
results.append(transform(data[i]))
if chunk_end < len(data):
# Continue inline - no Erlang messaging overhead
return erlang.schedule_inline(
'__main__', 'process_chunk',
args=[data, chunk_end, results]
)
return results
```
**When to use `schedule_inline` vs `schedule_py`:**
| Aspect | `schedule_inline` | `schedule_py` |
|--------|-------------------|---------------|
| Flow | Python -> NIF -> enif_schedule_nif -> Python | Python -> NIF -> Erlang message -> Python |
| Speed | ~3x faster for tight loops | Slower due to messaging |
| Use case | Pure Python chains, no Erlang interaction | When you need Erlang messaging between steps |
| Overhead | Minimal (direct NIF continuation) | Higher (gen_server call) |
**Important:** `schedule_inline` captures the caller's globals/locals, ensuring correct namespace resolution even with subinterpreters.
#### erlang.consume_time_slice(percent)
Check if the NIF time slice is exhausted. Returns `True` if you should yield, `False` if more time remains.
```python
import erlang
def long_computation(items, start_idx=0):
results = []
for i in range(start_idx, len(items)):
results.append(process(items[i]))
# Check if we should yield (1% of time slice per iteration)
if erlang.consume_time_slice(1):
# Time slice exhausted - save progress and reschedule
return erlang.schedule_py(
'__main__', 'long_computation',
[items], {'start_idx': i + 1}
)
return results
```
**Parameters:**
- `percent` (1-100): How much of the time slice was consumed by recent work
**Returns:**
- `True`: Time slice exhausted, you should yield
- `False`: More time remains, continue processing
### When to Use Each Pattern
| Pattern | Use When | Dirty Scheduler |
|---------|----------|-----------------|
| `erlang.call()` | Quick operations or callbacks that use `receive` | Held (unless callback suspends via `receive`) |
| `erlang.schedule()` | Need to call Erlang callback and always release scheduler | Released |
| `erlang.schedule_py()` | Long Python computation, need Erlang interaction between steps | Released |
| `erlang.schedule_inline()` | Tight Python loops, no Erlang interaction needed (~3x faster) | Released |
| `consume_time_slice()` | Fine-grained control over yielding | N/A (checks time slice) |
### Example: Cooperative Long-Running Task
```python
import erlang
def process_batch(items, batch_size=100, offset=0):
"""Process items in batches, yielding between batches."""
end = min(offset + batch_size, len(items))
# Process this batch
for i in range(offset, end):
expensive_operation(items[i])
if end < len(items):
# More work to do - yield and continue
return erlang.schedule_py(
'__main__', 'process_batch',
[items], {'batch_size': batch_size, 'offset': end}
)
return 'done'
```
### Important Notes
1. **Must return the marker**: `schedule()` and `schedule_py()` return `ScheduleMarker` objects that must be returned from your handler function. Calling them without returning has no effect:
```python
def wrong():
erlang.schedule('callback', arg) # No effect!
return "oops" # This is returned instead
def correct():
return erlang.schedule('callback', arg) # Works
```
2. **Cannot be nested**: The schedule marker must be the direct return value. You cannot return it from a nested function:
```python
def outer():
def inner():
return erlang.schedule('callback', arg)
return inner() # Works - marker propagates up
def broken():
def inner():
erlang.schedule('callback', arg) # Wrong - not returned
inner()
return "oops"
```
## Limitations
### Subprocess Operations Not Supported
The `ErlangEventLoop` does not support subprocess operations:
```python
# These will raise NotImplementedError:
loop.subprocess_shell(...)
loop.subprocess_exec(...)
# asyncio.create_subprocess_* will also fail
await asyncio.create_subprocess_shell(...)
await asyncio.create_subprocess_exec(...)
```
**Why?** Subprocess operations use `fork()` which would corrupt the Erlang VM. See [Security](security.md) for details.
**Alternative:** Use Erlang ports (`open_port/2`) for subprocess management. You can register an Erlang function that runs shell commands and call it from Python via `erlang.call()`.
### Signal Handling Not Supported
The `ErlangEventLoop` does not support signal handlers:
```python
# These will raise NotImplementedError:
loop.add_signal_handler(signal.SIGTERM, handler)
loop.remove_signal_handler(signal.SIGTERM)
```
**Why?** Signal handling should be done at the Erlang VM level. The BEAM has its own signal handling infrastructure that's integrated with supervisors and the OTP design patterns.
**Alternative:** Handle signals in Erlang using the `kernel` application's signal handling or write a port program that forwards signals to Erlang processes.
## Protocol-Based I/O
For building custom servers with low-level protocol handling, see the [Reactor](reactor.md) module. The reactor provides FD-based protocol handling where Erlang manages I/O scheduling via `enif_select` and Python implements protocol logic.
## Async Task API (Erlang)
The `py_event_loop` module provides a high-level API for submitting async Python tasks from Erlang. This API is inspired by uvloop and uses a thread-safe task queue, allowing task submission from any dirty scheduler without blocking.
### Architecture
```
┌─────────────────────────────────────────────────────────────────────────────┐
│ Async Task Submission │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Erlang Process C NIF Layer py_event_worker │
│ ─────────────── ───────────── ───────────────── │
│ │
│ py_event_loop: nif_submit_task handle_info(task_ready) │
│ create_task(M,F,A) │ │ │
│ │ │ Thread-safe enqueue │ │
│ │──────────────────▶ (pthread_mutex) │ │
│ │ │ │ │
│ │ │ enif_send(task_ready)──▶ │
│ │ │ │ │
│ │ │ │ py_nif:process_ready │
│ │ │ │ │ │
│ │ │ │ ▼ │
│ │ │ │ Run Python coro │
│ │ │ │ │ │
│ │◀─────────────────────────────────────────────────┘ │
│ │ {async_result, Ref, {ok, Result}} │ │
│ │ │
└─────────────────────────────────────────────────────────────────────────────┘
```
**Key Features:**
- Thread-safe submission from any dirty scheduler via `enif_send`
- Non-blocking task creation
- Message-based result delivery
- Fire-and-forget support
### API Reference
#### py_event_loop:run/3,4
Blocking execution of an async Python function. Submits the task and waits for the result.
```erlang
%% Basic usage
{ok, Result} = py_event_loop:run(my_module, my_async_func, [arg1, arg2]).
%% With options (timeout, kwargs)
{ok, Result} = py_event_loop:run(aiohttp, get, [Url], #{
timeout => 10000,
kwargs => #{headers => #{}}
}).
```
**Parameters:**
- `Module` - Python module name (atom or binary)
- `Func` - Python function name (atom or binary)
- `Args` - List of positional arguments
- `Opts` - Options map (optional):
- `timeout` - Timeout in milliseconds (default: 5000)
- `kwargs` - Keyword arguments map (default: #{})
**Returns:**
- `{ok, Result}` - Task completed successfully
- `{error, Reason}` - Task failed or timed out
#### py_event_loop:create_task/3,4
Non-blocking task submission. Returns immediately with a reference for awaiting the result later.
```erlang
%% Submit task
Ref = py_event_loop:create_task(my_module, my_async_func, [arg1]).
%% Do other work while task runs...
do_other_work(),
%% Await result when needed
{ok, Result} = py_event_loop:await(Ref).
```
**Parameters:**
- `Module` - Python module name (atom or binary)
- `Func` - Python function name (atom or binary)
- `Args` - List of positional arguments
- `Kwargs` - Keyword arguments map (optional, default: #{})
**Returns:**
- `reference()` - Task reference for awaiting
#### py_event_loop:await/1,2
Wait for an async task result.
```erlang
%% Default timeout (5 seconds)
{ok, Result} = py_event_loop:await(Ref).
%% Custom timeout
{ok, Result} = py_event_loop:await(Ref, 10000).
%% Infinite timeout
{ok, Result} = py_event_loop:await(Ref, infinity).
```
**Parameters:**
- `Ref` - Task reference from `create_task`
- `Timeout` - Timeout in milliseconds or `infinity` (optional, default: 5000)
**Returns:**
- `{ok, Result}` - Task completed successfully
- `{error, Reason}` - Task failed with error
- `{error, timeout}` - Timeout waiting for result
#### py_event_loop:spawn_task/3,4
Fire-and-forget task execution. Submits the task but does not wait for or return the result.
```erlang
%% Background logging
ok = py_event_loop:spawn_task(logger, log_event, [EventData]).
%% With kwargs
ok = py_event_loop:spawn_task(metrics, record, [Name, Value], #{tags => Tags}).
```
**Parameters:**
- `Module` - Python module name (atom or binary)
- `Func` - Python function name (atom or binary)
- `Args` - List of positional arguments
- `Kwargs` - Keyword arguments map (optional, default: #{})
**Returns:**
- `ok` - Task submitted (result is discarded)
### Example: Concurrent HTTP Requests
```erlang
%% Submit multiple requests concurrently
Refs = [
py_event_loop:create_task(aiohttp, get, [<<"https://api.example.com/users">>]),
py_event_loop:create_task(aiohttp, get, [<<"https://api.example.com/posts">>]),
py_event_loop:create_task(aiohttp, get, [<<"https://api.example.com/comments">>])
],
%% Await all results
Results = [py_event_loop:await(Ref, 10000) || Ref <- Refs].
```
### Example: Background Processing
```erlang
%% Fire-and-forget analytics
handle_request(Request) ->
%% Process request...
Response = process(Request),
%% Log analytics in background (don't wait)
ok = py_event_loop:spawn_task(analytics, track_event, [
<<"page_view">>,
#{path => Request#request.path, user_id => Request#request.user_id}
]),
Response.
```
### Thread Safety
The async task API is fully thread-safe:
- `create_task` and `spawn_task` can be called from any Erlang process, including processes running on dirty schedulers
- Task submission uses `enif_send` which is safe to call from any thread
- The task queue uses mutex protection for thread-safe enqueueing
- Results are delivered via standard Erlang message passing
This means you can safely call `py_event_loop:create_task` from within a callback that's already running on a dirty NIF scheduler.
## Event Loop Pool
The `py_event_loop_pool` module provides a pool of event loops for parallel Python coroutine execution. Inspired by libuv's "one loop per thread" model, each loop has its own worker and maintains event ordering.
### Process Affinity
All tasks from the same Erlang process are routed to the same event loop (via PID hash). This guarantees that timers and related async operations from a single process execute in order.
```
┌─► [loop_1] ──► [worker_1] ──► ordered execution
[process] ──► [hash(PID)] ─┼─► [loop_2] ──► [worker_2] ──► ordered execution
└─► [loop_N] ──► [worker_N] ──► ordered execution
```
### API
The pool provides the same API as `py_event_loop`, but with automatic load distribution:
```erlang
%% Get event loop for current process (always the same loop for same PID)
{ok, LoopRef} = py_event_loop_pool:get_loop().
%% Submit task and await result
Ref = py_event_loop_pool:create_task(math, sqrt, [16.0]),
{ok, 4.0} = py_event_loop_pool:await(Ref).
%% Blocking call
{ok, 4.0} = py_event_loop_pool:run(math, sqrt, [16.0]).
%% Fire-and-forget
ok = py_event_loop_pool:spawn_task(logger, info, [<<"message">>]).
%% Pool statistics
#{num_loops := N, supported := true} = py_event_loop_pool:get_stats().
```
### Configuration
Configure the pool size via application environment:
```erlang
%% sys.config
[
{erlang_python, [
%% Number of event loops (default: erlang:system_info(schedulers))
{event_loop_pool_size, 8}
]}
].
```
### When to Use
| Use Case | Module |
|----------|--------|
| Single caller, ordered tasks | `py_event_loop` |
| Multiple callers, parallel execution | `py_event_loop_pool` |
| High throughput, many concurrent processes | `py_event_loop_pool` |
### Performance
Benchmarks on 14-core system with Python 3.14:
| Pattern | Throughput |
|---------|------------|
| Sequential (single loop) | 83k tasks/sec |
| Sequential (pool) | 150k tasks/sec |
| Concurrent (50 processes) | 164k tasks/sec |
| Fire-and-collect (10k tasks) | 417k tasks/sec |
### Example: Parallel Processing
```erlang
%% Process items in parallel using multiple Erlang processes
%% Each process gets its own event loop for ordered execution
process_batch(Items) ->
Parent = self(),
Pids = [spawn_link(fun() ->
Results = [begin
Ref = py_event_loop_pool:create_task(processor, handle, [Item]),
py_event_loop_pool:await(Ref)
end || Item <- Chunk],
Parent ! {done, self(), Results}
end) || Chunk <- partition(Items, 100)],
[receive {done, Pid, R} -> R end || Pid <- Pids].
```
## See Also
- [Reactor](reactor.md) - Low-level FD-based protocol handling
- [Security](security.md) - Sandbox and blocked operations
- [Threading](threading.md) - For `erlang.async_call()` in asyncio contexts
- [Streaming](streaming.md) - For working with Python generators
- [Getting Started](getting-started.md) - Basic usage guide