README.md

# Huginn

ClickHouse client for Elixir using gRPC with connection pooling.

## Features

- gRPC protocol for efficient binary communication
- Connection pooling with health monitoring
- Support for all 4 ClickHouse gRPC methods:
  - `ExecuteQuery` - simple request/response
  - `ExecuteQueryWithStreamInput` - streaming inserts
  - `ExecuteQueryWithStreamOutput` - streaming large results
  - `ExecuteQueryWithStreamIO` - bidirectional streaming
- Both password and JWT authentication
- Query cancellation support
- Automatic result parsing

## Installation

Add `huginn` to your list of dependencies in `mix.exs`:

```elixir
def deps do
  [
    {:huginn, "~> 0.3.0"}
  ]
end
```

## Configuration

Configure the ClickHouse connection in your `config/config.exs`:

```elixir
config :huginn, :clickhouse,
  host: "localhost",
  port: 9100,
  database: "default",
  auth: {:password, "default", ""},
  pool_size: 5
```

### Configuration Options

| Option | Default | Description |
|--------|---------|-------------|
| `:host` | (required) | ClickHouse server hostname |
| `:port` | `9100` | gRPC port |
| `:database` | `"default"` | Default database |
| `:auth` | `nil` | `{:password, user, pass}` or `{:jwt, token}` |
| `:pool_size` | `5` | Number of connections |
| `:ssl` | `false` | Enable SSL/TLS |
| `:pool_name` | `:clickhouse_pool` | Pool name for multiple pools |

### Production Configuration

```elixir
# config/prod.exs
config :huginn, :clickhouse,
  host: System.get_env("CLICKHOUSE_HOST"),
  port: String.to_integer(System.get_env("CLICKHOUSE_PORT", "9100")),
  database: System.get_env("CLICKHOUSE_DATABASE", "default"),
  auth: {:password, System.get_env("CLICKHOUSE_USER"), System.get_env("CLICKHOUSE_PASSWORD")},
  pool_size: 10,
  ssl: true
```

## Usage

### gRPC Methods Overview

| gRPC Method | Function | Use Case |
|-------------|----------|----------|
| `ExecuteQuery` | `query/2`, `insert/3` | Simple queries, small inserts |
| `ExecuteQueryWithStreamInput` | `insert_stream/3` | Large file imports |
| `ExecuteQueryWithStreamOutput` | `stream_query/2` | Large result sets |
| `ExecuteQueryWithStreamIO` | `stream_io/1` | Bidirectional streaming |

### Simple Queries (ExecuteQuery)

```elixir
# Execute a query
{:ok, result} = Huginn.query("SELECT * FROM system.tables LIMIT 10")

# Get results as maps
maps = Huginn.Clickhouse.Result.to_maps(result)

# Query with options
{:ok, result} = Huginn.query(
  "SELECT * FROM users WHERE status = 'active'",
  database: "mydb",
  format: "JSONEachRow",
  timeout: 30_000
)

# Raising version
result = Huginn.query!("SELECT 1")
```

### Inserts (ExecuteQuery)

```elixir
# Simple insert with TabSeparated data
data = "john\t25\njane\t30"
{:ok, _} = Huginn.insert(
  "INSERT INTO users (name, age) FORMAT TabSeparated",
  data
)

# JSONEachRow format
data = ~s({"name":"john","age":25}\n{"name":"jane","age":30})
{:ok, _} = Huginn.insert("INSERT INTO users FORMAT JSONEachRow", data)
```

### Streaming Inserts (ExecuteQueryWithStreamInput)

```elixir
# Stream from a file
File.stream!("large_data.csv", [], 65_536)
|> Huginn.insert_stream("INSERT INTO logs FORMAT CSV")

# Stream with progress tracking using Agent
{:ok, counter} = Agent.start_link(fn -> 0 end)

large_data
|> Stream.chunk_every(1000)
|> Stream.map(fn chunk ->
  Agent.update(counter, &(&1 + length(chunk)))
  Enum.join(chunk, "\n")
end)
|> Huginn.insert_stream("INSERT INTO events FORMAT TabSeparated")

IO.puts("Inserted #{Agent.get(counter, & &1)} rows")
Agent.stop(counter)
```

### Streaming Results (ExecuteQueryWithStreamOutput)

```elixir
# Basic streaming
Huginn.stream_query("SELECT * FROM large_table")
|> Enum.each(fn
  {:ok, result} -> process_chunk(result)
  {:error, error} -> Logger.error("Error: #{inspect(error)}")
end)

# Stream rows directly
Huginn.stream_rows("SELECT * FROM events")
|> Stream.take(1000)
|> Enum.to_list()

# Stream as maps
Huginn.stream_maps("SELECT name, age FROM users")
|> Stream.filter(fn %{"age" => age} -> String.to_integer(age) > 18 end)
|> Enum.to_list()
```

### Using Agent to Accumulate Results

```elixir
# Accumulate all rows with error counting
{:ok, agent} = Agent.start_link(fn -> %{rows: [], errors: 0} end)

Huginn.stream_query("SELECT * FROM metrics")
|> Enum.each(fn
  {:ok, result} ->
    Agent.update(agent, fn state ->
      %{state | rows: state.rows ++ result.rows}
    end)
  {:error, _} ->
    Agent.update(agent, fn state ->
      %{state | errors: state.errors + 1}
    end)
end)

final_state = Agent.get(agent, & &1)
IO.puts("Got #{length(final_state.rows)} rows, #{final_state.errors} errors")
Agent.stop(agent)
```

### Custom Stream.resource Pattern

```elixir
defmodule MyApp.ClickHouseStream do
  @moduledoc "Custom streaming with backpressure control"

  def stream_with_backpressure(query, batch_size \\ 100) do
    Stream.resource(
      fn -> init_query(query) end,
      fn state -> next_batch(state, batch_size) end,
      fn _state -> :ok end
    )
  end

  defp init_query(query) do
    stream = Huginn.stream_rows(query)
    %{stream: stream, buffer: [], done: false}
  end

  defp next_batch(%{done: true} = state, _batch_size) do
    {:halt, state}
  end

  defp next_batch(%{stream: stream, buffer: buffer} = state, batch_size) do
    {rows, rest} =
      stream
      |> Stream.take(batch_size - length(buffer))
      |> Enum.to_list()
      |> then(fn new_rows -> {buffer ++ new_rows, stream} end)

    if length(rows) < batch_size do
      {[rows], %{state | done: true}}
    else
      {[Enum.take(rows, batch_size)], %{state | buffer: Enum.drop(rows, batch_size)}}
    end
  end
end

# Usage
MyApp.ClickHouseStream.stream_with_backpressure("SELECT * FROM events")
|> Enum.each(fn batch ->
  process_batch(batch)
  Process.sleep(100)  # Rate limiting
end)
```

### Bidirectional Streaming (ExecuteQueryWithStreamIO)

```elixir
# Interactive query session
{output, send} = Huginn.stream_io()

# Send queries
send.(Huginn.Clickhouse.Query.build("SELECT 1"))
send.(Huginn.Clickhouse.Query.build("SELECT 2"))

# Process results
Task.async(fn ->
  Enum.each(output, fn
    {:ok, result} -> IO.inspect(result.rows)
    {:error, err} -> IO.puts("Error: #{inspect(err)}")
  end)
end)
```

### Query Cancellation

```elixir
# Start a long-running query with custom ID
query_id = "my-long-query-#{System.unique_integer()}"

task = Task.async(fn ->
  Huginn.query("SELECT sleep(300)", query_id: query_id)
end)

# Cancel after some time
Process.sleep(5_000)
:ok = Huginn.cancel(query_id)

# Cancel queries by condition
Huginn.cancel_where("elapsed > 60")
Huginn.cancel_where("user = 'admin'")

# List running queries
{:ok, result} = Huginn.running_queries()
Huginn.Clickhouse.Result.to_maps(result)
|> Enum.each(fn q -> IO.puts("#{q["query_id"]}: #{q["query"]}") end)
```

### Health Check

```elixir
case Huginn.ping() do
  :ok -> IO.puts("Connected!")
  {:error, reason} -> IO.puts("Failed: #{inspect(reason)}")
end
```

## Development

### Prerequisites

- Elixir 1.14+
- Docker and Docker Compose
- `protoc` compiler (`brew install protobuf` on macOS)

### Start ClickHouse

```bash
docker-compose up -d
```

This starts ClickHouse with gRPC enabled on port 9100.

### Run Tests

```bash
mix test
```

### Generate Documentation

```bash
mix docs
```

### Regenerate Proto Files

If you need to regenerate the proto files after updating the `.proto` file:

```bash
# Install protoc-gen-elixir
mix escript.install hex protobuf

# Generate Elixir code
protoc --elixir_out=plugins=grpc:./lib/huginn/proto \
  --proto_path=./priv/protos \
  clickhouse_grpc.proto
```

## Architecture

```
lib/huginn/
├── clickhouse/
│   ├── client.ex      # High-level API (all 4 gRPC methods)
│   ├── config.ex      # Configuration management
│   ├── query.ex       # QueryInfo message builders
│   ├── result.ex      # Result parsing utilities
│   └── stream.ex      # Streaming helpers
└── proto/
    └── clickhouse_grpc.pb.ex  # Generated protobuf code
```

## License

MIT