[![Build Status](](
# Pulserl 
#### An Apache Pulsar client for Erlang/Elixir
__Version:__ 0.1.0

Pulserl is an Erlang client for the Apache Pulsar Pub/Sub system with both producer and consumer
implementations. It requires version __2.0+__ of Apache Pulsar and __18.0+__ of Erlang. 
Pulserl uses the [binary protocol](
to interact with the Pulsar brokers and exposes a very simple API. 
## Quick Examples

The examples assume you have a running Pulsar broker at `localhost:6650`, a topic called `test-topic` (can be partitioned or not) and `rebar3` installed.

_Note: Pulserl uses `pulserl` and `Shared` as the default subscription name and type.
 So, if that subscription (not the consumer) under the topic `test-topic` does not exists, we make sure in this example to create it first by creating
 the consumer before producing any message to the topic._

Fetch, compile and start the erlang shell.
  git clone,
  cd pulserl
  rebar3 shell

In the Erlang shell
  rr(pulserl).  %% load the api records
  %% A demo function to log the value of consumed messages
  %% that will be produced blow.

  %% Asycnhrounous produce
  Promise = pulserl:produce("test-topic", "Asycn produce message").
  pulserl:await(Promise).  %% Wait broker ack
  #messageId{ledger_id = 172,entry_id = 7,
             topic = <<"persistent://public/default/test-topic">>,
             partition = -1,batch = undefined}

  %% Asycnhrounous produce. Response notification is via callback (fun/1)
  pulserl:produce("test-topic", "Hello", fun(Res) -> io:format("Response: ~p~n", [Res]) end).

  %% Synchronous produce
  pulserl:sync_produce("test-topic", "Sync produce message").
  #messageId{ledger_id = 176,entry_id = 11,
             topic = <<"persistent://public/default/test-topic">>,
             partition = -1,batch = undefined}


## Feature Matrix

 - [x] [Basic Producer](
 - [x] [Basic Consumer](
 - [x] [Partitioned topics](
 - [x] [Batching](
 - [ ] [Compression](
 - [x] [TLS](
 - [ ] [Authentication (token, tls)](
 - [ ] [Reader API](
 - [x] [Proxy Support (for Kubernetes)](
 - [x] [Effectively-Once](
 - [ ] [Schema](
 - [x] Consumer seek
 - [ ] [Multi-topics consumer](
 - [ ] [Topics regex consumer](
 - [ ] [Compacted topics](
 - [x] User defined properties producer/consumer
 - [ ] Reader hasMessageAvailable
 - [ ] [Hostname verification](
 - [ ] [Multi Hosts Service Url support](
 - [x] [Key_shared](
 - [ ] key based batcher (didn't find a documentation) ?
 - [x] [Negative Acknowledge](
 - [x] [Delayed Delivery Messages](
 - [x] [Dead Letter Policy](
 - [ ] [Interceptors](
 _Thank you [Sabudaye]( for this information_

## Installation
 [Pulserl is available in Hex]( for easy installation by added it to your project dependencies.

In your Erlang project's `rebar.config` 
{deps, [
   {pulserl, "<latest-version>"}

In your Elixir project's `mix.exs` 
def deps do
    {:pulserl, "~> <latest-version>"}

## API Usage

### Client Setup
  In pulserl, the client as of now (for API simplicity) is a singleton (local registered `gen_server`) 
  and can be created during startup by the `application controller` or on demand at a later time.
  The client has the responsibility of creating the TCP connections,
  maintaining the connection pool, and ensures these connections are 
  maximally used by the producers and consumers. The client is also responsible
  for querying metadata needed to initialize a producer or consumer; it does this
  by creating a metadata socket during initialization by using the provided configurations.

  #### Automatic client startup
   You can configure the client that will be auto-started by providing
   the following configuration for `pulserl` in your `sys.config` file.
  {pulserl, [
    {autostart, true} %% If false, the client will be created on startup. Default is true.
    %% The TCP connect timeout in milliseconds. Default is 30000.
    , {connect_timeout_ms, 30000}
    %% The maximum connections to each broker the client should create.
    %% Default is 1. Increasing this may improve I/O throughput
    , {max_connections_per_broker, 1}
    %% The underlying TCP socket options.
    , {socket_options, [{nodelay, true}]}
    %% The service url. Default is the non TLS url: "pulsar://${hostname}:6650"
    , {service_url, "pulsar+ssl://localhost:6651/"}
    %% The trust certificate file path. Required only if the TLS service url is used.
    %% See
    , {tls_trust_certs_file, "/path/to/cacert.pem"}
  #### On demand client startup
  The `pulserl:start_client/1,2` API can be used to start the pulserl client when needed. 
 ServiceUrl = "pulsar+ssl://localhost:6651/",
 Config = #clientConfig{
             connect_timeout_ms = 30000,
             max_connections_per_broker = 1,
             socket_options = [{nodelay, true}],
             tls_trust_certs_file = "/path/to/cacert.pem"
 ok = pulserl:start_client(ServiceUrl, ClientConfig).

### Producer 
Pulserl creates a `gen_server` process per topic. For a topic of `n` partition, it creates
a parent producer under the `pulserl_producer_sup` tree which in turn `start_link` and 
manage `n` child producers. The parent producer serves as a facade to the internal producers.
The parent monitor the child processes (internal partitioned producers) for resilience, 
route client calls to one of the child processes using different routing modes. 
A producer during initialization is assigned a connection by the client based on its topic metadata.
The producers uses a queueing mechanism on message sending. 
Each send is internally a `` to the producer process. The caller is added to 
a queue and replied immediately with `ok.` This initial early reply frees up the caller to do 
other tasks if the response is not need immediately. Internally if message send is trigger, i.e
when batching is not enable or batching enabled but a batch send is triggered, the producer
asynchronously send (`gen_sever.cast/2`) the message(s) to the `pulserl_conn` process. When the
connection process receives the response it will `!` send it to associated the producer which in 
turn dequeue the associated caller and reply to it.
The producer provides synchronous and asynchronous send API.

In synchronous mode, the call will wait for the broker to acknowledge the message. 
If the acknowledgment is not received and a `send_timeout` is specified, a `{error, send_timeout}`
is sent to client on timed out. 

The asynchronous mode provides two API. One uses a `promise` that will be used to probe 
for a response or error. The other uses callback `fun/1` that will be invoke internally 
by the producer process when there is a response or error.

#### Starting a producer

## Contribute 

For issues, comments, recommendation or feedback please [do it here](

Contributions are highly welcome.