AUTH.md

# SASL Authentication

KafkaEx supports SASL authentication for secure Kafka clusters. Multiple mechanisms are available with flexible configuration options.

## Supported Mechanisms

- **PLAIN** - Simple username/password (requires SSL/TLS)
- **SCRAM-SHA-256** - Secure challenge-response authentication (Kafka 0.10.2+)
- **SCRAM-SHA-512** - Secure challenge-response with stronger hash (Kafka 0.10.2+)
- **OAUTHBEARER**   - Token-based authentication using OAuth 2.0 bearer tokens (Kafka 2.0+)
- **MSK_IAM**       - AWS IAM authentication for Amazon MSK (MSK 2.7.1+)

## Configuration

### Via Application Config

```elixir
# config/config.exs
config :kafka_ex,
  brokers: [{"localhost", 9292}],
  use_ssl: true,
  ssl_options: [
    verify: :verify_peer,
    cacertfile: "/path/to/ca-cert"
  ],
  sasl: %{
    mechanism: :scram,
    username: System.get_env("KAFKA_USERNAME"),
    password: System.get_env("KAFKA_PASSWORD"),
    mechanism_opts: %{algo: :sha256}  # :sha256 or :sha512
  }
```

### Via Worker Options

```elixir
opts = [
  uris: [{"broker1", 9092}, {"broker2", 9092}],
  use_ssl: true,
  ssl_options: [verify: :verify_none],
  auth: KafkaEx.Auth.Config.new(%{
    mechanism: :plain,
    username: "alice",
    password: "secret123"
  })
]

{:ok, pid} = KafkaEx.create_worker(:my_worker, opts)
```

### Docker Compose Setup

The project includes Docker configurations for testing SASL authentication:

```bash
# Start Kafka with SASL enabled
docker-compose up -d

# All brokers use SSL/TLS. Authentication mechanisms by port:
# 9092-9094 - No authentication
# 9192-9194 - SASL/PLAIN
# 9292-9294 - SASL/SCRAM
# 9392-9394 - SASL/OAUTHBEARER
```

### Test Credentials

The Docker setup includes preconfigured test users:

**PLAIN and SCRAM:**
- `test` / `secret`
- `alice` / `alice-secret`
- `admin` / `admin-secret`

**OAUTHBEARER:**
- Uses unsecured validator (testing only)
- Accept any token with subject claim

**Example test configuration:**
```elixir
# Test with PLAIN
config :kafka_ex,
  brokers: [{"localhost", 9192}],
  use_ssl: true,
  ssl_options: [verify: :verify_none],
  sasl: %{mechanism: :plain, username: "test", password: "secret"}

# Test with SCRAM-SHA-256
config :kafka_ex,
  brokers: [{"localhost", 9292}],
  use_ssl: true,
  ssl_options: [verify: :verify_none],
  sasl: %{
    mechanism: :scram,
    username: "test",
    password: "secret",
    mechanism_opts: %{algo: :sha256}
  }
```

## Security Considerations

- Always use SSL/TLS with PLAIN mechanism - plain text passwords must be encrypted in transit
- Use environment variables for credentials - never hardcode passwords
- SCRAM is preferred over PLAIN when both are available

### Minimum Kafka Versions

- PLAIN: Kafka 0.9.0+
- SCRAM: Kafka 0.10.2+
- SASL-Oauthbearer: Kafka 2.0+

## Testing with Different Mechanisms

```elixir
# Test PLAIN authentication
config :kafka_ex,
  brokers: [{"localhost", 9192}],
  use_ssl: true,
  ssl_options: [verify: :verify_none],
  sasl: %{mechanism: :plain, username: "test", password: "secret"}

# Test SCRAM-SHA-256
config :kafka_ex,
  brokers: [{"localhost", 9292}],
  use_ssl: true,
  ssl_options: [verify: :verify_none],
  sasl: %{
    mechanism: :scram,
    username: "test",
    password: "secret",
    mechanism_opts: %{algo: :sha256}
  }
```

## OAUTHBEARER

```elixir
config :kafka_ex,
  brokers: [{"localhost", 9394}],
  use_ssl: true,
  sasl: %{
    mechanism: :oauthbearer,
    mechanism_opts: %{
      token_provider: &MyOAuth.get_token/0,
      extensions: %{"traceId" => "trace-123"}  # Optional KIP-342 extensions
    }
  }
```

### Token Provider Requirements

The `token_provider` must be a **0-arity function** that returns:
- `{:ok, token}` - where `token` is a non-empty binary string (typically a JWT)
- `{:error, reason}` - on failure

**Important:** The token provider is called **once per connection**, not cached by the library. Implement your own caching if needed:

```elixir
defmodule MyOAuth do
  use Agent

  def start_link(_) do
    Agent.start_link(fn -> %{token: nil, expires: 0} end, name: __MODULE__)
  end

  def get_token() do
    now = System.os_time(:second)

    case Agent.get(__MODULE__, & &1) do
      %{token: token, expires: exp} when exp > now and token != nil ->
        {:ok, token}

      _ ->
        # Fetch new token from OAuth provider
        with {:ok, token} <- fetch_from_oauth_server(),
             {:ok, expires_in} <- parse_expiry(token) do
          Agent.update(__MODULE__, fn _ ->
            %{token: token, expires: now + expires_in - 60}  # 60s buffer
          end)
          {:ok, token}
        end
    end
  end

  defp fetch_from_oauth_server(), do: # Your implementation
  defp parse_expiry(token), do: # Extract exp from JWT
end
```

### Extensions (Optional)

Extensions are KIP-342 custom key-value pairs sent to the broker:
- Must be a map of `%{string_key => string_value}`
- Cannot use reserved name `"auth"`
- Example: `%{"traceId" => "abc", "tenant" => "prod"}`

## Configuration Summary

### Quick Reference

| Mechanism | Username/Password | mechanism_opts | TLS Required | Min Kafka |
|-----------|-------------------|----------------|--------------|-----------|
| PLAIN | ✅ Required | None | ✅ Yes | 0.9.0+ |
| SCRAM | ✅ Required | `algo: :sha256\|:sha512` | ⚠️ Recommended | 0.10.2+ |
| OAUTHBEARER | ❌ Not used | `token_provider` (required), `extensions` (optional) | ⚠️ Recommended | 2.0+ |
| MSK_IAM | ❌ Not used | `region` (required), credential options | ✅ Yes | MSK 2.7.1+ |

### Detailed Configuration

**PLAIN**
- Simplest mechanism using plain username/password
- **CRITICAL:** Must use SSL/TLS (validation enforces this)
- Format: Binary concatenation per RFC 4616

**SCRAM-SHA-256 / SCRAM-SHA-512**
- Secure challenge-response authentication (RFC 5802/7677)
- Default algorithm: `:sha256`
- Password never sent in cleartext (PBKDF2 key derivation)
- Usernames with `=` or `,` are automatically escaped
- Server signature validated to prevent MITM attacks

**OAUTHBEARER**
- Uses OAuth 2.0 bearer tokens (typically JWT)
- Token provider must be 0-arity function
- Library does NOT cache tokens - implement caching in provider
- Called once per connection
- Supports KIP-342 extensions for custom metadata

**MSK_IAM**
- AWS IAM authentication using SigV4 signatures
- Requires port 9098 (MSK IAM listener)
- Uses OAUTHBEARER wire protocol internally
- Credential auto-discovery via `aws_credentials` library
- Session tokens supported for temporary credentials

## AWS MSK IAM (msk_iam)

AWS MSK IAM authentication for Amazon MSK clusters using IAM credentials.

```elixir
config :kafka_ex,
  brokers: [{"b-1.mycluster.kafka.us-east-1.amazonaws.com", 9098}],
  use_ssl: true,
  sasl: %{
    mechanism: :msk_iam,
    mechanism_opts: %{
      region: "us-east-1"
    }
  }
```

### Credentials Resolution

Credentials are resolved in order:

1. `credential_provider` - custom 0-arity function returning `{:ok, access_key, secret_key}` or `{:ok, access_key, secret_key, session_token}`
2. `access_key_id` + `secret_access_key` - explicit credentials in config
3. `aws_credentials` - automatic discovery supporting:
   - Environment variables (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `AWS_SESSION_TOKEN`)
   - IRSA (IAM Roles for Service Accounts on EKS)
   - Instance metadata (EC2, ECS task roles)
   - Credential files (`~/.aws/credentials`)
   - Web Identity Token (`AWS_WEB_IDENTITY_TOKEN_FILE`)

### Explicit Credentials

```elixir
mechanism_opts: %{
  region: "us-east-1",
  access_key_id: System.get_env("AWS_ACCESS_KEY_ID"),
  secret_access_key: System.get_env("AWS_SECRET_ACCESS_KEY"),
  session_token: System.get_env("AWS_SESSION_TOKEN")
}
```

### Custom Credentials Provider

```elixir
mechanism_opts: %{
  region: "us-east-1",
  credential_provider: fn ->
    {:ok, "AKIA...", "secret...", "session..."}
  end
}
```

### Dependencies

Add to `mix.exs`:

```elixir
{:aws_signature, "~> 0.4"},   # SigV4 signing
{:aws_credentials, "~> 1.0"}  # credential discovery (IRSA, instance metadata, etc.)
```

`aws_credentials` is optional if you provide explicit credentials or a custom `credential_provider`.

### IAM Policy

```json
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": ["kafka-cluster:Connect", "kafka-cluster:DescribeCluster"],
      "Resource": "arn:aws:kafka:us-east-1:123456789:cluster/my-cluster/*"
    },
    {
      "Effect": "Allow",
      "Action": ["kafka-cluster:*Topic*", "kafka-cluster:ReadData", "kafka-cluster:WriteData"],
      "Resource": "arn:aws:kafka:us-east-1:123456789:topic/my-cluster/*"
    },
    {
      "Effect": "Allow",
      "Action": ["kafka-cluster:AlterGroup", "kafka-cluster:DescribeGroup"],
      "Resource": "arn:aws:kafka:us-east-1:123456789:group/my-cluster/*"
    }
  ]
}
```

### Notes

- Port **9098** (not 9092/9094)
- Requires MSK with Kafka 2.7.1+
- Host is injected automatically from connection context for SigV4 signing

## Integration with Existing Code

SASL authentication is transparent to the rest of your KafkaEx usage:

```elixir
# Once configured with SASL, use KafkaEx.API normally
{:ok, client} = KafkaEx.API.start_client()

{:ok, metadata} = KafkaEx.API.metadata(client)
{:ok, result} = KafkaEx.API.produce_one(client, "my-topic", 0, "message")
{:ok, messages} = KafkaEx.API.fetch(client, "my-topic", 0, 0)
```

## Troubleshooting

### Common Issues by Mechanism

#### PLAIN
- **`:plain_requires_tls`** - PLAIN authentication attempted without SSL/TLS enabled
  - **Solution:** Set `use_ssl: true` in configuration
- **`Connection refused`** - Wrong port or broker not listening
  - **Solution:** Verify port 9192 (test setup) or your configured PLAIN port
- **`Authentication failed`** - Invalid username/password
  - **Solution:** Check credentials match JAAS config on broker PLAIN listener

#### SCRAM
- **`Authentication failed`** - Invalid credentials or algorithm mismatch
  - **Solution:** Verify username/password and ensure `:sha256` or `:sha512` matches broker
- **`Invalid server nonce`** - Server nonce doesn't contain client nonce
  - **Solution:** Check broker SCRAM implementation (rare, likely broker bug)
- **`Invalid server signature`** - Server signature verification failed
  - **Solution:** Password mismatch or MITM attack detected
- **`Unsupported SCRAM algorithm`** - Broker doesn't support requested algorithm
  - **Solution:** Try `:sha256` (more widely supported than `:sha512`)

#### OAUTHBEARER
- **Token provider returns `{:error, ...}`** - Token fetch failed
  - **Solution:** Fix token provider implementation or OAuth server issues
- **Empty token string** - Token provider returned empty string
  - **Solution:** Ensure token provider returns non-empty binary JWT
- **Authentication failed** - Broker rejected bearer token
  - **Solution:** Check token validity, expiration, and broker OAuth configuration

#### MSK_IAM
- **`:no_credentials`** - No AWS credentials found
  - **Solution:** Set `AWS_ACCESS_KEY_ID`/`AWS_SECRET_ACCESS_KEY` or configure IAM role
- **`Connection timeout`** - Cannot reach MSK broker
  - **Solution:** Check security groups allow port 9098 from client
- **`Authentication failed`** - IAM credentials invalid or insufficient permissions
  - **Solution:** Verify IAM policy includes `kafka-cluster:Connect` action
- **`Invalid region`** - Region format incorrect
  - **Solution:** Use valid AWS region string (e.g., `"us-east-1"`)
- **SigV4 signing failure** - AWS signature generation failed
  - **Solution:** Check `aws_signature` dependency and credential format

### General Errors

- **`:unsupported_sasl_mechanism`** - Broker doesn't support requested mechanism
  - **Solution:** Check broker configuration and Kafka version compatibility
- **`:illegal_sasl_state`** - Protocol state error during authentication
  - **Solution:** Rare, indicates protocol implementation issue
- **`:correlation_mismatch`** - Request/response pairing issue
  - **Solution:** Should not occur in normal operation, file bug report
- **SSL handshake error** - TLS negotiation failed
  - **Solution:** Verify SSL certificates or use `verify: :verify_none` for testing (NOT production)

### Debugging Tips

1. **Enable Debug Logging**
   ```elixir
   config :logger, level: :debug
   ```
   This shows SASL protocol messages and authentication flow.

2. **Verify Broker Configuration**
   - Check Kafka server.properties for listener configuration
   - Verify JAAS config file has user credentials for PLAIN/SCRAM
   - Check security.inter.broker.protocol setting

3. **Test with kafka-console-producer**
   ```bash
   kafka-console-producer --bootstrap-server localhost:9192 \
     --topic test \
     --producer.config client-plain.properties
   ```
   If this works, issue is in KafkaEx configuration.

4. **Verify User Exists in Kafka**
   ```bash
   # For SCRAM users
   kafka-configs --bootstrap-server localhost:9092 \
     --describe --entity-type users --entity-name alice
   ```

5. **Check MSK IAM Policy**
   - Ensure `kafka-cluster:Connect` is allowed
   - Verify cluster ARN matches your MSK cluster
   - Check IAM role/user has policy attached

### Implementation Details

**Authentication happens during socket creation:**
- Blocking operation during `KafkaEx.API.start_client/1`
- Occurs after SSL/TLS handshake (if `use_ssl: true`)
- Failures prevent client from starting
- Re-authentication happens automatically on connection loss

**Packet mode switching:**
- Initial auth uses raw socket mode
- After successful auth, switches to length-prefixed mode (`:packet, 4`)

**Correlation IDs:**
- Used internally to match requests with responses
- Mismatches indicate protocol errors (should not happen)

## Advanced: Custom Authentication

For OAuth or custom mechanisms, implement the `KafkaEx.Auth.Mechanism` behaviour:

```elixir
defmodule MyAuth do
  @behaviour KafkaEx.Auth.Mechanism

  def mechanism_name(_), do: "OAUTHBEARER"

  def authenticate(config, send_fun) do
    # Custom authentication logic
    :ok
  end
end
```

## Implementation Notes

### Version Compatibility

The SASL implementation handles different Kafka versions appropriately:

- Kafka 0.9.x: Skips API versions call (not supported)
- Kafka 0.10.0-0.10.1: Queries API versions, supports PLAIN only
- Kafka 0.10.2+: Full support including SCRAM mechanisms

### Technical Details

- Authentication occurs immediately after socket creation
- The implementation handles packet mode switching between raw and length-prefixed formats
- Correlation IDs are used to match requests with responses
- Server signatures are validated in SCRAM authentication
- Passwords are never logged and are redacted in inspect output