docs/buffer.md

# Buffer API

The Buffer API provides a zero-copy input buffer for streaming data from Erlang to Python. Buffers use shared memory with GIL-released blocking reads for efficient data transfer.

## Overview

Buffers are designed for scenarios where Erlang writes data chunks and Python needs to consume them:

- Zero-copy access via Python's buffer protocol (`memoryview`)
- File-like interface (`read`, `readline`, `readlines`)
- Blocking reads that release the GIL while waiting
- Fast substring search using `memchr`/`memmem`

Use buffers when you need:
- Streaming data from Erlang to Python
- Zero-copy access to binary data
- File-like interface for Python code expecting `read()` methods

## Quick Start

### Erlang Side

```erlang
%% Create a buffer (chunked - unknown size)
{ok, Buf} = py_buffer:new(),

%% Or with known content length (pre-allocates memory)
{ok, Buf} = py_buffer:new(4096),

%% Write data chunks
ok = py_buffer:write(Buf, <<"chunk1">>),
ok = py_buffer:write(Buf, <<"chunk2">>),

%% Signal end of data
ok = py_buffer:close(Buf),

%% Pass to Python handler
py:call(Ctx, myapp, handle_request, [#{<<"input">> => Buf}]).
```

### Python Side

```python
def handle_request(environ):
    input_buf = environ['input']

    # Read all data
    body = input_buf.read()

    # Or read line by line
    for line in input_buf:
        process(line)

    # Or read specific amount
    chunk = input_buf.read(1024)
```

## Erlang API

### `py_buffer:new/0`

Create a buffer for chunked/streaming data (unknown content length).

```erlang
{ok, Buf} = py_buffer:new().
```

The buffer starts with a default capacity (64KB) and grows as needed.

### `py_buffer:new/1`

Create a buffer with known content length.

```erlang
{ok, Buf} = py_buffer:new(ContentLength).
```

**Arguments:**
- `ContentLength` - Expected total size in bytes, or `undefined` for chunked

Pre-allocating avoids buffer growth overhead when content length is known.

### `py_buffer:write/2`

Write binary data to the buffer.

```erlang
ok = py_buffer:write(Buf, Data).
```

**Arguments:**
- `Buf` - Buffer reference from `new/0,1`
- `Data` - Binary data to append

**Returns:**
- `ok` - Data written successfully
- `{error, closed}` - Buffer was closed

Writing signals any waiting Python readers via `pthread_cond_broadcast`.

### `py_buffer:close/1`

Signal end of data (EOF).

```erlang
ok = py_buffer:close(Buf).
```

After closing:
- No more data can be written
- Python's `read()` returns remaining data then empty bytes
- Waiting Python threads are woken up

## Python API

### `PyBuffer` class

The buffer appears in Python as `erlang.PyBuffer` when passed from Erlang.

```python
from erlang import PyBuffer
```

#### `read(size=-1)`

Read up to `size` bytes, blocking if needed.

```python
data = buf.read()      # Read all (blocks until EOF)
chunk = buf.read(1024) # Read up to 1024 bytes
```

**Behavior:**
- If `size=-1`, reads all data (waits for EOF if content length known)
- If data available, returns immediately
- If empty, blocks until data arrives (GIL released during wait)
- Returns empty bytes at EOF

#### `read_nonblock(size=-1)`

Read available bytes without blocking. For async I/O.

```python
chunk = buf.read_nonblock(1024)  # Read up to 1024 available bytes
data = buf.read_nonblock()       # Read all available bytes
```

**Behavior:**
- Returns immediately with whatever data is available
- Never blocks, even if no data available
- Returns empty bytes if nothing available (check `readable_amount()` first)
- Use with `readable_amount()` and `at_eof()` for async I/O loops

#### `readable_amount()`

Return number of bytes available without blocking.

```python
available = buf.readable_amount()
if available > 0:
    data = buf.read_nonblock(available)
```

**Returns:** Number of bytes that can be read immediately.

#### `at_eof()`

Check if buffer is at EOF with no more data.

```python
while not buf.at_eof():
    if buf.readable_amount() > 0:
        chunk = buf.read_nonblock(4096)
        process(chunk)
    else:
        await asyncio.sleep(0.001)  # Yield to event loop
```

**Returns:** `True` if EOF signaled AND all data has been read.

#### `readline(size=-1)`

Read one line, blocking if needed.

```python
line = buf.readline()  # Read until newline or EOF
```

**Returns:** Bytes including the trailing newline, or empty at EOF.

Uses `memchr` for fast newline scanning.

#### `readlines(hint=-1)`

Read all lines as a list.

```python
lines = buf.readlines()  # ['line1\n', 'line2\n', ...]
```

**Arguments:**
- `hint` - Optional size hint; stops after approximately this many bytes

#### `seek(offset, whence=0)`

Seek to position within already-written data.

```python
buf.seek(0)      # Seek to beginning (SEEK_SET)
buf.seek(10, 1)  # Seek forward 10 bytes (SEEK_CUR)
buf.seek(-5, 2)  # Seek 5 bytes before end (SEEK_END, requires EOF)
```

**Limitations:**
- Cannot seek past written data
- `SEEK_END` requires EOF flag set

#### `tell()`

Return current read position.

```python
pos = buf.tell()  # Current byte offset
```

#### `find(sub, start=0, end=None)`

Fast substring search using `memchr`/`memmem`.

```python
idx = buf.find(b'\n')       # Find first newline
idx = buf.find(b'boundary') # Find multipart boundary
```

**Returns:** Lowest index where substring found, or -1 if not found.

Single-byte search uses `memchr` (very fast). Multi-byte uses `memmem`.

#### Buffer Protocol

Buffers support Python's buffer protocol for zero-copy access:

```python
# Create memoryview for zero-copy access
mv = memoryview(buf)

# Access without copying
first_byte = mv[0]
slice_data = bytes(mv[10:20])

# Release when done
mv.release()
```

**Properties:**
- `readonly=True` - Buffer is read-only from Python
- `ndim=1` - One-dimensional byte array

#### Iteration

Line-by-line iteration:

```python
for line in buf:
    process(line)
```

Equivalent to calling `readline()` until EOF.

#### Properties and Methods

```python
buf.readable()   # True - always readable
buf.writable()   # False - not writable from Python
buf.seekable()   # True - limited seeking supported
buf.closed       # True if buffer is closed
len(buf)         # Available bytes (write_pos - read_pos)
buf.close()      # Mark buffer as closed
```

## Architecture

```
Erlang                              Python
------                              ------

py_buffer:new() -----------------> Buffer created
                                   (pthread mutex+cond initialized)

py_buffer:write(Buf, Data)
       |
       v
  memcpy to buffer
  pthread_cond_broadcast() ------> read()/readline() wakes up
                                   (GIL was released during wait)
                                          |
                                          v
                                   Return data to Python

py_buffer:close() ---------------> EOF flag set
                                   Waiting readers return
```

**Memory Layout:**

```
py_buffer_resource_t
+------------------+
| data*            | --> [chunk1][chunk2][chunk3]...
| capacity         |     ^       ^
| write_pos        | ----+       |
| read_pos         | ------------+
| content_length   |
| mutex            |
| data_ready (cond)|
| eof              |
| closed           |
| view_count       |
+------------------+
```

## Performance Tips

1. **Use known content length** when available - avoids buffer reallocation:
   ```erlang
   ContentLength = byte_size(Body),
   {ok, Buf} = py_buffer:new(ContentLength).
   ```

2. **Write in reasonable chunks** - very small writes have overhead:
   ```erlang
   %% Good: write accumulated chunks
   ok = py_buffer:write(Buf, AccumulatedData).

   %% Less efficient: many tiny writes
   %% [py_buffer:write(Buf, <<B>>) || B <- binary_to_list(Data)].
   ```

3. **Use memoryview for zero-copy** when processing large bodies:
   ```python
   mv = memoryview(buf)
   # Process without copying
   boundary_pos = buf.find(b'--boundary')
   part = bytes(mv[:boundary_pos])
   ```

4. **Use find() for parsing** - `memchr`/`memmem` are faster than Python string methods.

## Examples

### HTTP Request Body Handling

```erlang
%% Buffer HTTP body
{ok, Buf} = py_buffer:new(byte_size(Body)),
ok = py_buffer:write(Buf, Body),
ok = py_buffer:close(Buf),

%% Build request environ
Environ = #{
    <<"method">> => <<"POST">>,
    <<"path">> => <<"/api/data">>,
    <<"content_type">> => <<"application/json">>,
    <<"content_length">> => byte_size(Body),
    <<"body">> => Buf
},

%% Call handler
{ok, Response} = py:call(myapp, handle, [Environ]).
```

### Chunked Transfer

```erlang
%% Create buffer for chunked encoding
{ok, Buf} = py_buffer:new(),

%% Spawn writer process
spawn(fun() ->
    %% Simulate receiving chunks
    lists:foreach(fun(Chunk) ->
        ok = py_buffer:write(Buf, Chunk),
        timer:sleep(10)  % Simulate network delay
    end, get_chunks()),
    ok = py_buffer:close(Buf)
end),

%% Python can start reading immediately
%% read() will block until data available
py:call(myapp, stream_handler, [Buf]).
```

### Multipart Form Parsing

```python
def parse_multipart(buf, boundary):
    """Parse multipart form data from buffer."""
    parts = []

    while True:
        # Find next boundary using fast memmem
        idx = buf.find(boundary.encode())
        if idx == -1:
            break

        # Read headers until blank line
        headers = {}
        while True:
            line = buf.readline()
            if line == b'\r\n':
                break
            name, value = line.split(b':', 1)
            headers[name.strip()] = value.strip()

        # Read content until next boundary
        # ... process part
        parts.append({'headers': headers, 'data': data})

    return parts
```

### Async I/O Integration

For asyncio applications, use the non-blocking methods to avoid blocking the event loop:

```python
import asyncio
from erlang import PyBuffer

async def read_buffer_async(buf):
    """Read from buffer without blocking the event loop."""
    chunks = []

    while not buf.at_eof():
        available = buf.readable_amount()
        if available > 0:
            # Read available data
            chunk = buf.read_nonblock(4096)
            chunks.append(chunk)
        else:
            # Yield to event loop, check again soon
            await asyncio.sleep(0.001)

    return b''.join(chunks)

async def process_body_async(environ):
    """Process body in async context."""
    buf = environ['body']

    # Read body without blocking
    body = await read_buffer_async(buf)
    return json.loads(body)
```

For production use, consider integrating with Erlang's event notification:

```python
async def read_with_notification(buf, notify_channel):
    """Read using Erlang channel for data-ready notifications."""
    chunks = []

    while not buf.at_eof():
        available = buf.readable_amount()
        if available > 0:
            chunk = buf.read_nonblock(available)
            chunks.append(chunk)
        else:
            # Wait for Erlang to signal data is ready
            await notify_channel.async_receive()

    return b''.join(chunks)
```

## See Also

- [Channel](channel.md) - Bidirectional message passing
- [Reactor](reactor.md) - FD-based protocol handling
- [Getting Started](getting-started.md) - Basic usage guide