# ecql - Cassandra driver for Erlang.

A driver for cassandras native protocol. This driver uses cassandras stream
feature to create independent worker processes and avoid deadlocking.
Rather than exposing all features of cassandra ease of use and performance are
the main goals of this driver.
Additionally this driver includes a drop-in replacement for mnesia to make
a transition from mnesia to cassandra easier.

## Usage

To play with the driver in interactive mode there is a shell script. This script will start Cassandra in a container and run connect erlang to.
`$ ./`

Then you can play around on the interactive erlang shell:
Table = "CREATE TABLE IF NOT EXISTS key_value (key int PRIMARY KEY, value text);".

% These three lines:
ecql:execute_async("INSERT INTO key_value (key, value) VALUES(?, ?)", [1, "a number"]).
ecql:execute_async("INSERT INTO key_value (key, value) VALUES(?, ?)", [42, "The answer to life, the universe and everything"]).

% Do the same as this:
ecql:execute_batch("INSERT INTO key_value (key, value) VALUES(?, ?)", [
  [1, "a number"],
  [42, "The answer to life, the universe and everything"]

Let's insert some data and aggregate numbers

Table = "CREATE TABLE IF NOT EXISTS key_value (key int PRIMARY KEY, value text);".

Sequence = [[Idx, io_lib:format("This is ~p", [Idx])] || Idx <- lists:seq(1, 50000)].
ecql:execute_batch("INSERT INTO key_value (key, value) VALUES(?, ?)", Sequence).

% Calculate the sum of all keys on the server side:
Sum = ecql:select_value("SELECT SUM(key) FROM key_value").

% Or the sum of products on the client side:
ProdSum = ecql:foldl(
  fun([Idx], Acc) ->
    Acc + (Idx*Idx)
  "SELECT key FROM key_value"

## ecql:execute/1,2,3
* `ecql:execute(Cql, Arguments = [], Consistency = local_quorum)`

Executes a CQL statement and returns the results if any, otherwise ok or an error message. execute/1,2,3 always blocks until the execution is done.

* `ok` - for INSERT and CREATE statements
* `{[ColumnName], [Row]}` - for select statements
* `{error, Code, Reason}` - for any error. Code and Reason come from Cassandra error codes.

ok = ecql:execute("INSERT INTO key_value (key, value) (?, ?)", [0, "zero"]).

{[key, value], [0, <<"zero">>]} = ecql:execute("SELECT * FROM key_value WHERE key = 0").

{error,8704,"Undefined column name a"} = ecql:execute("INSERT INTO key_value (a, b) (?, ?)", [0, "zero"]).
CQL is any valid Cassandra Query Language statement. See here for CQL documentation:

### Prepared statements
All statements with more than one argument are **automatically  prepared internally** and the prepared statements are  cached and reused for following caches.

### Argument casting
Arguments are the arguments to be replaced in the CQL string. Arguments are being cast into the right type as much as possible. Most mentionable the Cassandra type **'text' accepts [iolists]( is always returned as a erlang binary**

ok = ecql:execute(
  "INSERT INTO key_value (key, value) VALUES(?, ?)",
  [0, ["deep iolists", [" can", $l, $o, $o, $k], " strange ", [2], " erlang newcomers"]]

BinaryString = ecql:select_value("SELECT value FROM key_value WHERE key = 0").
io:format("~s ~n", [BinaryString]).

Consistency is the [Cassandra consistency level]( The valid atom values are:

* `default` - Alias for local_quorum and default value when no consistency is provied.
* `one` - Consistency level one.
* `two` - Consistency level two.
* `three` - Consistency level three.
* `quorum` - Consistency level quorum.
* `all` - Consistency level quorum.
* `local_quorum` - Consistency level local quorum.
* `each_quorum` - Consistency level each quorum.
* `serial` - Consistency level serial.
* `local_serial` - Consistency level local serial.
* `local_one` - Consistency level local one.

## ecql:select/1,2,3
* `ecql:select(Cql, Arguments = [], Consistency = local_quorum)`

Alias for `execute/1,2,3` - there are no behavioural differences.

## ecql:execute_async/1,2,3
* `ecql:execute_async(Cql, Arguments = [], Consistency = local_quorum)`

Returns: `ok`

Same as `ecql:execute/1,2,3` but asynchronous. It does not wait for the return value. Internally the number of running asynchronous calls is counted and capped to 100 per calling process. If the number exceeds this limit the calling process is blocked until at least one asynchronous query finishes.

## ecql:sync/0
Waits for all `execute_async` asynchronous queries of the current process to finish. This is useful when placed behind a bulk of queries.

## ecql:batch/2,3,4
* `execute_batch(Cql, ArgumentListList, Type = unlogged, Consistency = local_quorum)`

Type can be any atom of:
* `unlogged`
* `logged`

Executes a [cassandra batch statement]( The batch is limited to only a single CQL statement but with multiple inputs. This can be used to delete, update or insert many entries at once.

ArgumentListList = [[Id] || Id <- lists:seq(1, 50000)].
ecql:execute_batch("DELETE FROM key_value WHERE key = ?", ArgumentListList).

## ecql:select_value/1,2,3
* `select_value(Cql, Arguments = [], Consistency = local_quorum)`

Returns: `Value`

Sugar around `execute/1,2,3` for select statements. It returns only a single value from the query. Errors in the query or `ok` results from `execute/1,2,3` result in a raised exception.


62> ecql:select_value("SELECT value FROM key_value WHERE key = 0").
<<"deep iolists canlook strange "...>>

63> ecql:select_value("INSERT INTO key_value (key, value) VALUES(-1, 'la')").
** exception error: no match of right hand side value ok
     in function  ecql:select_column/4 (src/ecql.erl, line 467)
     in call from ecql:select_value/3 (src/ecql.erl, line 420)

## ecql:select_column/1,2,3
* `select_column(Cql, ColumnNumber = 1, Arguments = [], Consistency = local_quorum)`

Returns: `[Value]`

Sugar around `execute/1,2,3` for select statements. It returns only a list with a single value from every row in the query. This is useful when querying for just a single field in a table. Errors in the query or `ok` results from `execute/1,2,3` result in a raised exception.


68> ecql:select_column("SELECT key FROM key_value LIMIT 100").

# Streaming functions
The driver does use Cassandra result pagination automatically internally and exposes this feature to allow stream processing of large amounts of data without loading them into memory with the following functions:

## ecql:foldl/3,4,5
* `ecql:foldl(Fun, Acc, Cql, Arguments = [], Consistency = local_quorum)`

`Fun = fun([Column1, Column2], Acc) -> NewAcc`

Returns: `NewAcc`

Extremely scalable method to process huge result sets. Like [lists:foldl]( the given function `Fun` is executed for each row of the query result. The benefit of using this function in the driver directly instead of loading the whole list first with `ecql:execute/1,2,3` is two fold:

1. The execution can start earlier and finish faster. Cassandra will send the first 1000 rows of a query as soon as they are ready and then continue sending incrementally.
1. Huge cassandra tables can be processed this way without timeouts or running out of memory on the client side.

ProdSum = ecql:foldl(
  fun([Idx], Acc) ->
    Acc + (Idx*Idx)
  "SELECT key FROM key_value"

## ecql:foldl_page/3,4,5
* `ecql:foldl_page(Fun, Acc, Cql, Arguments = [], Consistency = local_quorum)`

`Fun = fun([ColumnName] ,[Row], Acc) -> NewAcc`

Like `ecql:foldl/3,4,5` but passes in a whole page of ~1000 rows (size is not guaranteed) as well as the result header columns. Might be useful when trying to forward larger blocks of data e.g. to sockets or to nifs for further processing.

ProdSum = ecql:foldl_page(
  fun([key], Rows, Acc) ->
    lists:foldl(fun([Idx], A) ->
      A + (Idx*Idx)
    end, Acc, Rows)
  "SELECT key FROM key_value"

## ecql:foreach/2,3,4
* `ecql:foreach(Fun, Cql, Arguments = [], Consistency = local_quorum)`

Returns: `ok`

Extremely scalable way to iterate all rows of a huge query result. Similiar to `lists:foreach` executes the given Fun for each row of the result.

ecql:foreach(fun([Key, Value]) ->
  io:format("key: '~p' => '~p'~n", [Key, Value])
end, "SELECT * FROM key_value LIMIT 10").

## ecql:select_firstpage/1,2,3
* `ecql:select_firstpage(Cql, Arguments = [], Consistency = local_quorum)`

Returns: `{Page, Continuation}`

`Page = {[ColumnName], Rows}`
`Continuation = $end_of_table | tuple`

Low level function that is used to implement the other high level streaming functions. The the call returns a first page of results and a continuation. The continuation can be used to fetch further pages using `ecql:select_nextpage/1`.

Erlang shell example:
% Getting the first page
{Page1, Cont1} = ecql:select_firstpage("SELECT * FROM key_value").

case Cont1 of
  '$end_of_table' ->
    "one page!";
  _ ->
    {Page2, Cont2} = ecql:select_nextpage(Cont1),
    case Cont2 of
      '$end_of_table' ->
        "two pages!";
      _ ->
        {Page3, Cont3} = ecql:select_nextpage(Cont2),
        "three or more pages!"
% and so on


## ecql:select_nextpage/1,2,3
* `ecql:select_nextpage(Continuation)`

Returns: `{Page, Continuation}`

`Page = {[ColumnName], Rows}`
`Continuation = $end_of_table | tuple`

This function can only be used in conjunction with `ecql:select_firstpage/1,2,3`. Please see it's documentation and example.

# Additional Functions

## term_to_bin/1 & bin_to_term/1
* `Binary = ecql:term_to_bin(Term)`
* `Term = ecql:bin_to_term(Binary)`

These functions are usefull to store arbitary terms in Cassandra. term_to_bin is using compression and is deterministic - so it can also be used for equals queries.

Table = "CREATE TABLE IF NOT EXISTS term_kv (key blob PRIMARY KEY, value blob);".

ecql:execute_batch("INSERT INTO term_kv (key, value) VALUES(?, ?)", [
  [ecql:term_to_bin({die_hard, 1}), ecql:term_to_bin(#{good_guy => john, bad_guy => hans})],
  [ecql:term_to_bin({die_hard, 2}), ecql:term_to_bin(#{good_guy => john, bad_guy => ramon})],
  [ecql:term_to_bin({die_hard, 3}), ecql:term_to_bin(#{good_guy => john, bad_guy => simon})],
  [ecql:term_to_bin({die_hard, 4}), ecql:term_to_bin(#{good_guy => john, bad_guy => thomas})],
  [ecql:term_to_bin({die_hard, 5}), ecql:term_to_bin(#{good_guy => john, bad_guy => yuri})]

% Using tuple {die_hard, 4} to search
#{bad_guy := Name} = ecql:bin_to_term(
  ecql:select_value("SELECT value FROM term_kv WHERE key = ?",[
    ecql:term_to_bin({die_hard, 4})

io:format("The bad guy is: ~p~n", [Name]).

## ecql:release/0
* `ecql:release()`

Returns: `ok`

Releases the currently attached stream from the process. Usefull when it's known that this process is not going to do queries in the near future.

Background: Streams are currently a limited resource in the driver whose number can be configured and defaults to `100` per Cassandra host. Each stream is an independent Erlang process and can operate isolated and asynchronously from all other stream processes. The communication between a stream process and the client making queries is direct with no mediator after the initial handshake. The stream will be automatically released and returned to the pool once the client process terminates.

All of this usually happens in the background with the initial link between a client process and a stream process being established on the first query. In some cases it might be useful though to release the stream process even though the client process has not yet terminated.

# Configuration Parameters

These can be configured from the application configuration file following OTP standards or can be set before starting the driver app with `application:set_env(ecql, user, "rompompel")`

| Setting                | default            | Description                     |
| ---------------------- |:------------------:| -------------------------------:|
| cache_size             | `1000000`          | ecql_mnesia: Number of bytes used for the ecql_mnesia write-through cache |
| cluster_module         | `erlang`           | ecql_mnesia: Cluster module used for `nodes()` call to find other nodes |
| log                    | `disabled`         | what?                           |
| user                   | `"cassandra"`      | Cassandra Auth Username         |
| pass                   | `"cassandra"`      | Cassandra Auth Password         |
| hosts                  | `[{{127,0,0,1},9042}]` | List of hosts to connect to |
| keyspace               | `"ecql"`           | Keyspace to be used by all connections & streams |
| replication_strategy   | `"SimpleStrategy"` | Replication strategy to be used if keyspace does not yet exists and needs to be created. |
| replication_factor     | `2`                | Replication factor to be used if keyspace does not yet exists and needs to be created. |
| streams_per_connection | `100`              | Streams to open to each Server in the cluster |

# Todo List

* Make streams_per_connection dynamic!!
* Document performance numbers vs. other drivers
* Replace encode function with nif for performance.
* Read configuration from env
* Token aware routing
* Support multiple keyspaces & clusters