# Nipper
**Lightweight, embeddable MQTT broker for BEAM-based edge applications**
[](https://hex.pm/packages/nipper)
[](https://hexdocs.pm/nipper)
[](LICENSE)
Nipper is a minimal MQTT v3.1.1 broker designed to run within your Elixir/Phoenix application's supervision tree. Perfect for edge computing, IoT gateways, and embedded systems where you need local MQTT messaging without the overhead of a separate broker service.
## π Features
### Core MQTT Protocol
- β
**MQTT 3.1.1** full protocol implementation
- β
**QoS 0 & QoS 1** message delivery
- β
**Persistent & clean sessions**
- β
**Keep-alive mechanism** with configurable timeouts
- β
**Will messages** and **retained messages**
- β
**Topic subscriptions** with wildcard support
- β
**Binary payload** support
### Security & DoS Protection π‘οΈ
- β
**Rate limiting** with sliding window algorithm
- Connection rate limiting (per IP)
- Message rate limiting (per client)
- Authentication failure protection
- β
**Packet size enforcement** with configurable limits
- β
**Resource monitoring** and automatic cleanup
- β
**Memory usage monitoring** with automatic GC
- β
**Pluggable authentication** system
### Architecture & Performance
- β
**Embedded-first**: Runs within your supervision tree
- β
**BEAM-native**: Leverages OTP patterns and Phoenix.PubSub
- β
**GenStage pipeline** for event streaming
- β
**High-performance TCP** with ThousandIsland
- β
**Non-blocking I/O** and efficient message routing
- β
**Automatic resource cleanup** on process crashes
### Observability & Monitoring
- β
**Comprehensive telemetry** using `:telemetry`
- β
**Connection lifecycle tracking**
- β
**Message flow monitoring**
- β
**Rate limiting alerts**
- β
**Memory usage alerts**
- β
**Resource monitoring events**
### Development & Testing
- β
**207+ comprehensive tests** (unit + integration)
- β
**Property-based testing** with StreamData
- β
**Code coverage** tracking
- β
**Static analysis** with Credo and Dialyzer
- β
**Benchmarking** support
## π Quick Start
### Installation
Add `nipper` to your list of dependencies in `mix.exs`:
```elixir
def deps do
[
{:nipper, "~> 0.1.0"}
]
end
```
### Basic Setup
Add to your application's supervision tree:
```elixir
defmodule MyApp.Application do
use Application
def start(_type, _args) do
children = [
# Your existing children...
Nipper.Supervisor
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
end
```
### Configuration
Configure in `config/config.exs`:
```elixir
config :nipper,
# Listener configuration
listeners: [
default: [
port: 1883,
transport: :tcp,
acceptors: 100,
max_connections: 10_000
]
],
# Authentication
auth: [
module: Nipper.Auth.AllowAll # For development
],
# Protocol limits
protocol: [
max_packet_size: 65_536,
max_client_id_length: 128,
default_keepalive: 60
],
# Rate limiting (DoS protection)
rate_limiter: [
connection_limit: 10, # connections per minute per IP
message_limit: 100, # messages per minute per client
auth_limit: 5 # auth failures per 5 minutes per IP
]
```
### Testing Your Setup
Start your application and test with a simple MQTT client:
```bash
# Terminal 1: Start your application
iex -S mix
# Terminal 2: Test with mosquitto_pub/sub
mosquitto_sub -h localhost -p 1883 -t "test/topic" &
mosquitto_pub -h localhost -p 1883 -t "test/topic" -m "Hello, Nipper!"
```
## π Advanced Usage
### Custom Authentication
Create a custom authentication module:
```elixir
defmodule MyApp.MqttAuth do
@behaviour Nipper.Auth
@impl true
def authenticate(conn_info) do
%{client_id: client_id, username: username, password: password} = conn_info
case verify_credentials(username, password) do
{:ok, user_info} ->
{:ok, %{user_id: user_info.id, permissions: user_info.permissions}}
:error ->
{:error, :not_authorized}
end
end
defp verify_credentials(username, password) do
# Your authentication logic here
end
end
```
Configure it:
```elixir
config :nipper,
auth: [module: MyApp.MqttAuth]
```
### Telemetry Integration
Set up telemetry handlers for monitoring:
```elixir
# In your application startup
:telemetry.attach_many(
"mqtt-metrics",
[
[:nipper, :client, :connected],
[:nipper, :client, :disconnected],
[:nipper, :message, :published],
[:nipper, :rate_limit, :exceeded],
[:nipper, :memory_monitor, :warning]
],
&MyApp.TelemetryHandler.handle_event/4,
%{}
)
```
### Upstream Integration
Configure event streaming to external systems:
```elixir
config :nipper,
upstream: [
enabled: true,
transport: MyApp.UpstreamTransport,
producer: [batch_size: 100, batch_timeout: 1000],
batcher: [max_batches: 10]
]
```
## ποΈ Architecture
```
βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Nipper.Supervisor β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Listener β βRateLimiter β βResourceMon β β
β β(ThousandIs.)β β (ETS) β β (GenServer) β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β βMemoryMon β β Router β β Upstream β β
β β(GenServer) β β(Phoenix.PS) β β Producer β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
```
### Key Components
- **Nipper.Listener**: ThousandIsland-based TCP server
- **Nipper.Connection**: Per-client connection handler
- **Nipper.RateLimiter**: DoS protection with sliding windows
- **Nipper.ResourceMonitor**: Process monitoring and cleanup
- **Nipper.MemoryMonitor**: Memory usage tracking and GC
- **Nipper.Router**: Message routing via Phoenix.PubSub
- **Nipper.Upstream.Producer**: Event streaming pipeline
## π§ͺ Development
### Running Tests
```bash
# Unit tests
mix test --exclude integration
# All tests
mix test
# With coverage
mix test --cover
```
### Code Quality
```bash
# Static analysis
mix credo --strict
mix dialyzer
# Documentation
mix docs
```
### Benchmarking
```bash
# Connection benchmarks
mix run benchmarks/connections.exs
# Message throughput
mix run benchmarks/messages.exs
```
## π Performance
### Benchmarks (on modern hardware)
- **Connections**: 1000+ concurrent clients
- **Message throughput**: 10,000+ messages/second
- **Memory usage**: <100MB for 1000 clients
- **Latency**: <1ms message routing
### Resource Limits (configurable)
- **Max connections**: 10,000 (default)
- **Max packet size**: 65KB (default)
- **Rate limits**: 10 conn/min, 100 msg/min per client
- **Memory thresholds**: 100MB warning, 500MB critical
## π£οΈ Roadmap
### Version 0.2.0 (Planned)
- [ ] MQTT 5.0 protocol support
- [ ] SSL/TLS transport layer
- [ ] Persistent message storage
- [ ] WebSocket transport support
- [ ] Enhanced authentication providers
### Version 0.3.0 (Planned)
- [ ] Cluster support and horizontal scaling
- [ ] MQTT bridge functionality
- [ ] Message transformation plugins
- [ ] Docker containerization
- [ ] Kubernetes deployment manifests
## π Known Limitations
This is version **0.1.0** - suitable for development and small-scale deployments:
- **Single-node only** (clustering planned for 0.3.0)
- **In-memory only** (persistence planned for 0.2.0)
- **MQTT 3.1.1 only** (5.0 support planned for 0.2.0)
- **TCP only** (WebSocket/SSL planned for 0.2.0)
## π€ Contributing
1. Fork it
2. Create your feature branch (`git checkout -b feature/amazing-feature`)
3. Add tests and ensure they pass (`mix test`)
4. Run code quality checks (`mix credo`, `mix dialyzer`)
5. Commit your changes (`git commit -am 'Add amazing feature'`)
6. Push to the branch (`git push origin feature/amazing-feature`)
7. Create a Pull Request
## π License
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
## π Acknowledgments
- [ThousandIsland](https://github.com/mtrudel/thousand_island) for high-performance TCP
- [Phoenix.PubSub](https://github.com/phoenixframework/phoenix_pubsub) for scalable messaging
- [GenStage](https://github.com/elixir-lang/gen_stage) for event streaming
- The Elixir and OTP communities for the excellent foundations
---
**Built with β€οΈ for the BEAM ecosystem**