README.md

# MegasPinakas

An Elixir client library for Google Cloud BigTable, providing a high-level interface for data operations, table administration, and instance management via gRPC.

## Installation

Add `megas_pinakas` to your list of dependencies in `mix.exs`:

```elixir
def deps do
  [
    {:megas_pinakas, "~> 0.5.0"}
  ]
end
```

## Configuration

### Development with Emulator

For local development, use the BigTable emulator:

```bash
# Start the emulator
docker-compose up bigtable-emulator
```

Configure your application:

```elixir
# config/dev.exs
config :megas_pinakas, :emulator,
  host: "localhost",
  port: 8086,
  project_id: "dev-project"
```

Or set the environment variable:

```bash
export BIGTABLE_EMULATOR_HOST=localhost:8086
```

### Production with Goth Authentication

For production, use [Goth](https://github.com/peburrows/goth) for Google Cloud authentication:

```elixir
# Add to dependencies in mix.exs
def deps do
  [
    {:megas_pinakas, "~> 0.5.0"},
    {:goth, "~> 1.4"}
  ]
end
```

Configure Goth with your service account credentials:

```elixir
# config/runtime.exs
if config_env() == :prod do
  # Option 1: From environment variable (JSON string)
  credentials = System.get_env("GOOGLE_APPLICATION_CREDENTIALS_JSON") |> Jason.decode!()

  config :megas_pinakas, :goth, MegasPinakas.Goth
end
```

Add Goth to your application's supervision tree:

```elixir
# lib/my_app/application.ex
def start(_type, _args) do
  credentials =
    "GOOGLE_APPLICATION_CREDENTIALS_JSON"
    |> System.fetch_env!()
    |> Jason.decode!()

  children = [
    {Goth, name: MegasPinakas.Goth, source: {:service_account, credentials}},
    # ... other children
  ]

  Supervisor.start_link(children, strategy: :one_for_one)
end
```

Alternative: Use a credentials file:

```elixir
credentials = "path/to/service-account.json" |> File.read!() |> Jason.decode!()

children = [
  {Goth, name: MegasPinakas.Goth, source: {:service_account, credentials}}
]
```

Configure MegasPinakas to use Goth:

```elixir
# config/prod.exs
config :megas_pinakas, :goth, MegasPinakas.Goth
config :megas_pinakas, :default_pool_size, 10
```

## Usage

### Data Operations

```elixir
# Write a row
mutations = [
  MegasPinakas.set_cell("cf", "name", "John Doe"),
  MegasPinakas.set_cell("cf", "email", "john@example.com")
]
{:ok, _} = MegasPinakas.mutate_row("project", "instance", "users", "user#123", mutations)

# Read a row
{:ok, row} = MegasPinakas.read_row("project", "instance", "users", "user#123")

# Read multiple rows with filter
filter = MegasPinakas.family_filter("cf")
{:ok, stream} = MegasPinakas.read_rows("project", "instance", "users",
  rows: MegasPinakas.row_set(["user#1", "user#2", "user#3"]),
  filter: filter)

# Batch mutations
entries = [
  %{row_key: "row1", mutations: [MegasPinakas.set_cell("cf", "col", "val1")]},
  %{row_key: "row2", mutations: [MegasPinakas.set_cell("cf", "col", "val2")]}
]
{:ok, stream} = MegasPinakas.mutate_rows("project", "instance", "table", entries)

# Atomic increment
rules = [MegasPinakas.increment_rule("cf", "counter", 1)]
{:ok, _} = MegasPinakas.read_modify_write_row("project", "instance", "table", "row", rules)
```

### Table Administration

```elixir
alias MegasPinakas.Admin

# Create a table with column families
{:ok, table} = Admin.create_table("project", "instance", "my-table",
  column_families: %{
    "cf" => %{gc_rule: Admin.max_versions_gc_rule(1)},
    "metadata" => %{gc_rule: Admin.max_age_gc_rule(86400)}
  })

# List tables
{:ok, response} = Admin.list_tables("project", "instance")

# Modify column families
modifications = [
  Admin.create_column_family("new_cf", Admin.max_versions_gc_rule(3)),
  Admin.drop_column_family("old_cf")
]
{:ok, _} = Admin.modify_column_families("project", "instance", "table", modifications)

# Delete a table
{:ok, _} = Admin.delete_table("project", "instance", "my-table")
```

### Instance Administration

```elixir
alias MegasPinakas.InstanceAdmin

# Create an instance
clusters = %{
  "my-cluster" => %{
    location: "us-central1-b",
    serve_nodes: 3,
    storage_type: :SSD
  }
}
{:ok, operation} = InstanceAdmin.create_instance("project", "my-instance", clusters,
  display_name: "My Instance",
  type: :PRODUCTION)

# List instances
{:ok, response} = InstanceAdmin.list_instances("project")

# Create an app profile
{:ok, profile} = InstanceAdmin.create_app_profile("project", "instance", "profile-id",
  description: "My app profile",
  multi_cluster_routing: true)
```

## Filters

```elixir
# Filter by column family
filter = MegasPinakas.family_filter("cf")

# Filter by specific column
filter = MegasPinakas.column_filter("cf", "col")

# Limit cells per column
filter = MegasPinakas.cells_per_column_limit_filter(1)

# Chain filters (AND)
filter = MegasPinakas.chain_filters([
  MegasPinakas.family_filter("cf"),
  MegasPinakas.cells_per_column_limit_filter(1)
])

# Interleave filters (OR)
filter = MegasPinakas.interleave_filters([
  MegasPinakas.family_filter("cf1"),
  MegasPinakas.family_filter("cf2")
])
```

## Row Ranges

```elixir
# Specific row keys
row_set = MegasPinakas.row_set(["row1", "row2", "row3"])

# Row range (default: start inclusive, end exclusive)
range = MegasPinakas.row_range("user#100", "user#200")
row_set = MegasPinakas.row_set_from_ranges([range])

# Various range types
range = MegasPinakas.row_range_open("a", "z")       # Both exclusive: (a, z)
range = MegasPinakas.row_range_closed("a", "z")    # Both inclusive: [a, z]
range = MegasPinakas.row_range_open_closed("a", "z") # Start exclusive, end inclusive: (a, z]
range = MegasPinakas.row_range_from("user#500")    # From key to end: [user#500, ∞)
range = MegasPinakas.row_range_until("user#500")   # From start to key: [∅, user#500)
range = MegasPinakas.row_range_unbounded()         # All rows: [∅, ∞)
```

### Prefix Scans

Prefix scans efficiently retrieve all rows starting with a given prefix:

```elixir
# All users (rows starting with "user#")
range = MegasPinakas.row_range_prefix("user#")
# Internally creates range: ["user#", "user$") where $ is next char after #

# All posts for a specific user
range = MegasPinakas.row_range_prefix("user#123#posts#")
# Matches: user#123#posts#001, user#123#posts#002, etc.

# Combine with read_rows
{:ok, rows} = MegasPinakas.read_rows(project, instance, "table",
  rows: MegasPinakas.row_set_from_ranges([MegasPinakas.row_range_prefix("user#")]),
  rows_limit: 100
)
```

### Start and End Keys with Inclusive/Exclusive Bounds

Understanding inclusive vs exclusive bounds is crucial for pagination and range queries:

```elixir
# INCLUSIVE start, EXCLUSIVE end (default): [start, end)
# Use when: Standard range queries, initial page loads
range = MegasPinakas.row_range("user#100", "user#200")
# Includes: user#100, user#101, ..., user#199
# Excludes: user#200

# EXCLUSIVE start, EXCLUSIVE end: (start, end)
# Use when: Paginating after a known key
range = MegasPinakas.row_range_open("user#100", "user#200")
# Excludes: user#100 and user#200
# Includes: user#101, ..., user#199

# INCLUSIVE start, INCLUSIVE end: [start, end]
# Use when: You want both boundary keys included
range = MegasPinakas.row_range_closed("user#100", "user#200")
# Includes: user#100, user#101, ..., user#199, user#200

# EXCLUSIVE start, INCLUSIVE end: (start, end]
# Use when: Paginating backwards or specific boundary needs
range = MegasPinakas.row_range_open_closed("user#100", "user#200")
# Excludes: user#100
# Includes: user#101, ..., user#200
```

### Pagination Example

```elixir
# First page - start from beginning
first_page_range = MegasPinakas.row_range_prefix("user#")
{:ok, rows} = MegasPinakas.read_rows(project, instance, "users",
  rows: MegasPinakas.row_set_from_ranges([first_page_range]),
  rows_limit: 100
)

# Get last key from results
last_key = rows |> List.last() |> MegasPinakas.row_key()
# => "user#099"

# Next page - use EXCLUSIVE start to skip the last seen key
next_page_range = MegasPinakas.row_range_open(last_key, "user#" <> <<255>>)
# Or use the simpler approach with open start:
next_page_range = %Google.Bigtable.V2.RowRange{
  start_key: {:start_key_open, last_key},
  end_key: {:end_key_open, "user#" <> <<255>>}
}

{:ok, next_rows} = MegasPinakas.read_rows(project, instance, "users",
  rows: MegasPinakas.row_set_from_ranges([next_page_range]),
  rows_limit: 100
)
```

### Combining Prefix with Bounds

```elixir
# All user posts from a specific date range (using composite keys)
# Key format: user#<user_id>#posts#<timestamp>

# Posts for user 123 from 2024-01 only
start_key = "user#123#posts#2024-01-01"
end_key = "user#123#posts#2024-02-01"
range = MegasPinakas.row_range(start_key, end_key)  # [start, end)

# Posts for user 123 AFTER a specific post (for pagination)
last_seen_post = "user#123#posts#2024-01-15T10:30:00"
range = MegasPinakas.row_range_open(last_seen_post, "user#123#posts#" <> <<255>>)
# Excludes the last_seen_post, includes everything after until end of prefix
```

## Type-Aware Operations

The `MegasPinakas.Types` module provides type-safe encoding/decoding:

```elixir
alias MegasPinakas.Types

# Write typed values
Types.write_json(project, instance, "table", "row", "cf", "data", %{name: "John", age: 30})
Types.write_integer(project, instance, "table", "row", "cf", "count", 42)
Types.write_datetime(project, instance, "table", "row", "cf", "created", DateTime.utc_now())

# Read typed values
{:ok, data} = Types.read_json(project, instance, "table", "row", "cf", "data")
{:ok, count} = Types.read_integer(project, instance, "table", "row", "cf", "count")

# Write multiple typed cells
Types.write_cells(project, instance, "table", "row", [
  {:string, "cf", "name", "John Doe"},
  {:integer, "cf", "age", 30},
  {:json, "cf", "profile", %{city: "NYC"}},
  {:datetime, "cf", "created", DateTime.utc_now()}
])
```

## Row Builder

Fluent API for building multi-cell rows:

```elixir
alias MegasPinakas.Row

Row.new("user#123")
|> Row.put_string("cf", "name", "John Doe")
|> Row.put_integer("cf", "age", 30)
|> Row.put_json("cf", "profile", %{city: "NYC"})
|> Row.put_boolean("cf", "active", true)
|> Row.put_datetime("cf", "created", DateTime.utc_now())
|> Row.write(project, instance, "users")

# Type inference with put/5
Row.new("user#123")
|> Row.put("cf", "name", "John")      # Infers string
|> Row.put("cf", "age", 30)           # Infers integer
|> Row.put("cf", "score", 98.5)       # Infers float
|> Row.put("cf", "data", %{a: 1})     # Infers JSON
|> Row.write(project, instance, "users")
```

## Batch Builder

Build and execute batch mutations:

```elixir
alias MegasPinakas.Batch
alias MegasPinakas.Row

Batch.new()
|> Batch.add(Row.new("user#1") |> Row.put_string("cf", "name", "Alice"))
|> Batch.add(Row.new("user#2") |> Row.put_string("cf", "name", "Bob"))
|> Batch.add(Row.new("user#3") |> Row.put_string("cf", "name", "Charlie"))
|> Batch.write(project, instance, "users")
```

## Advanced Filters

The `MegasPinakas.Filter` module provides comprehensive filter support:

```elixir
alias MegasPinakas.Filter

# Row-level filters
Filter.row_key_regex_filter("^user#")
Filter.row_sample_filter(0.1)  # 10% sample

# Cell-level filters
Filter.cells_per_row_limit_filter(100)
Filter.cells_per_row_offset_filter(10)
Filter.column_qualifier_regex_filter("^meta_")

# Range filters
Filter.timestamp_range_filter(start_micros, end_micros)
Filter.value_range_filter(start_value_closed: "A", end_value_closed: "Z")
Filter.column_range_filter("cf", start_qualifier_closed: "a", end_qualifier_open: "m")

# Convenience filters
Filter.latest_only_filter()                    # Only latest version
Filter.time_window_filter(:hour, 24)           # Last 24 hours
Filter.column_latest_filter("cf", "name")      # Specific column, latest

# Composing filters
Filter.chain_filters([f1, f2, f3])              # AND
Filter.interleave_filters([f1, f2])             # OR
Filter.condition_filter(predicate, true_f, false_f)  # IF-THEN-ELSE
```

## Counters

Atomic counter operations:

```elixir
alias MegasPinakas.Counter

# Basic increment/decrement
{:ok, new_value} = Counter.increment(project, instance, "counters", "page#home", "stats", "views")
{:ok, new_value} = Counter.decrement(project, instance, "counters", "item#123", "stock", "count", 5)

# Get current value
{:ok, value} = Counter.get(project, instance, "counters", "page#home", "stats", "views")

# Set/reset
Counter.set(project, instance, "counters", "page#home", "stats", "views", 100)
Counter.reset(project, instance, "counters", "page#home", "stats", "views")

# Atomic multi-counter increment
{:ok, results} = Counter.increment_many(project, instance, "analytics", "user#123", [
  {"stats", "page_views", 1},
  {"stats", "clicks", 3}
])
```

## Time-Windowed Counters (Rate Limiting)

```elixir
alias MegasPinakas.CounterTTL

# Increment with time bucket
{:ok, count} = CounterTTL.increment(project, instance, "rate_limits", "api:user#123",
  "limits", "requests", bucket: :minute)

# Check rate limit
case CounterTTL.check_rate_limit(project, instance, "rate_limits", "api:user#123", 100, bucket: :minute) do
  {:ok, current_count} -> IO.puts("Under limit: #{current_count}")
  {:error, :rate_limited, reset_at} -> IO.puts("Rate limited until #{reset_at}")
end

# Get window sum
{:ok, total} = CounterTTL.get_window(project, instance, "rate_limits", "api:user#123",
  "limits", "requests", bucket: :minute, window_size: 5)
```

## Time Series

Time-series data with reverse timestamp ordering for efficient recent-first queries.

### Reverse Timestamp Ordering

BigTable sorts row keys lexicographically. To get recent data first (without scanning the entire table),
we use **reverse timestamps**: `max_timestamp - actual_timestamp`.

```
Normal timestamp:    2024-01-01 → "1704067200"  (sorts first, oldest)
                     2024-12-31 → "1735689599"  (sorts last, newest)

Reverse timestamp:   2024-01-01 → "8295932799"  (sorts last)
                     2024-12-31 → "8264310400"  (sorts first, newest!)
```

Row key format: `<metric_id>#<reverse_timestamp>`

This means a prefix scan like `row_range_prefix("cpu:server1#")` returns the most recent data first.

```elixir
alias MegasPinakas.TimeSeries

# Build reverse timestamp row keys manually
row_key = TimeSeries.time_series_row_key("cpu:server1", ~U[2024-01-15 10:00:00Z])
# => "cpu:server1#8296..."

# Convert timestamps
reverse_ts = TimeSeries.reverse_timestamp(~U[2024-01-15 10:00:00Z])
{:ok, original_dt} = TimeSeries.from_reverse_timestamp(reverse_ts)

# Parse row keys
{:ok, %{metric_id: id, timestamp: ts}} = TimeSeries.parse_row_key(row_key)
```

### Writing Data Points

```elixir
# Write a data point
TimeSeries.write_point(project, instance, "metrics", "cpu:server1",
  %{value: 0.85, tags: %{host: "srv1", region: "us-east"}})

# Write multiple points
TimeSeries.write_points(project, instance, "metrics", [
  %{metric_id: "cpu:server1", value: 0.85, timestamp: ~U[2024-01-15 10:00:00Z]},
  %{metric_id: "cpu:server2", value: 0.92, timestamp: ~U[2024-01-15 10:00:00Z]}
])
```

### Querying Data

```elixir
# Query recent points (most recent first due to reverse timestamps)
{:ok, points} = TimeSeries.query_recent(project, instance, "metrics", "cpu:server1", limit: 100)

# Query time range
{:ok, points} = TimeSeries.query_range(project, instance, "metrics", "cpu:server1",
  ~U[2024-01-01 00:00:00Z], ~U[2024-01-02 00:00:00Z])
```

## Streaming

Memory-efficient streaming with `Stream.resource`:

```elixir
alias MegasPinakas.Streaming

# Stream all rows with a prefix
Streaming.stream_prefix(project, instance, "users", "user#active:")
|> Stream.map(&MegasPinakas.row_to_map/1)
|> Stream.filter(fn data -> data["cf"]["status"] == "active" end)
|> Enum.take(100)

# Stream a range
Streaming.stream_range(project, instance, "table", "a", "z")
|> Stream.each(&process_row/1)
|> Stream.run()

# Stream with keys
Streaming.stream_rows_with_keys(project, instance, "users",
  rows: MegasPinakas.row_set_from_ranges([MegasPinakas.row_range_prefix("user#")])
)
|> Enum.into(%{})  # Map of row_key => data

# Chunked processing
Streaming.stream_in_chunks(project, instance, "logs",
  [rows: row_set],
  chunk_size: 100,
  process_fn: fn chunk -> process_batch(chunk) end
)
|> Enum.sum()

# Utilities
count = Streaming.count_rows(project, instance, "users", rows: row_set)
exists? = Streaming.rows_exist?(project, instance, "users", rows: row_set)
{:ok, first} = Streaming.first_row(project, instance, "users", rows: row_set)
```

## Cache

Simple key-value cache:

```elixir
alias MegasPinakas.Cache

# Basic operations
{:ok, _} = Cache.put(project, instance, "cache", "user:123", %{name: "John", age: 30})
{:ok, data} = Cache.get(project, instance, "cache", "user:123")
{:ok, _} = Cache.delete(project, instance, "cache", "user:123")

# Get or compute
{:ok, value} = Cache.get_or_put(project, instance, "cache", "expensive:key", fn ->
  expensive_computation()
end)

# Multi-key operations
{:ok, results} = Cache.get_many(project, instance, "cache", ["key1", "key2", "key3"])
{:ok, _} = Cache.put_many(project, instance, "cache", [{"key1", val1}, {"key2", val2}])
{:ok, _} = Cache.delete_many(project, instance, "cache", ["key1", "key2"])

# Existence check
exists? = Cache.exists?(project, instance, "cache", "user:123")

# Atomic operations
{:ok, new_val} = Cache.increment(project, instance, "cache", "counter", 1)
{:ok, new_val} = Cache.append(project, instance, "cache", "log", "new entry\n")
```

## License

Apache 2.0