README.md

# Yggdrasil

> *Yggdrasil* is an immense mythical tree that connects the nine worlds in
> Norse cosmology.

[![Build Status](https://travis-ci.org/gmtprime/yggdrasil.svg?branch=master)](https://travis-ci.org/gmtprime/yggdrasil) [![Hex pm](http://img.shields.io/hexpm/v/yggdrasil.svg?style=flat)](https://hex.pm/packages/yggdrasil) [![hex.pm downloads](https://img.shields.io/hexpm/dt/yggdrasil.svg?style=flat)](https://hex.pm/packages/yggdrasil)

Yggdrasil manages subscriptions to channels/queues in some broker. Simple
Redis, RabbitMQ and Postgres interfaces are implemented. By default the
messages are sent to the process calling the function to subscribe to the
channel.

### Starting the feed.

```
$ iex -S mix
Erlang/OTP 18 [erts-7.0] [source] [64-bit] [smp:8:8] [async-threads:10]
[kernel-poll:false]

Interactive Elixir (1.2.0) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> Yggdrasil.Feed.subscribe Yggdrasil.Feed,
...(1)>                          Yggdrasil.Broker.Redis,
...(1)>                          "trees",
...(1)>                          &(IO.puts "#{inspect &1}")
{:ok,
 %{broker: Yggdrasil.Broker.Redis, channel: "trees",
    ref: #Reference<0.0.4.895>}}
iex(2)>
```

And from `redis-cli`:

```
127.0.0.1:6379> publish "trees" "Yggdrasil"
(integer) 1
127.0.0.1:6379> publish "trees" "Yggdrasil"
(integer) 1
127.0.0.1:6379> publish "trees" "Yggdrasil"
(integer) 1
```

You'll receive in `iex`:

```
iex(2)>
%Yggdrasil.Proxy.Data{broker: Yggdrasil.Broker.Redis, data: \"Yggdrasil\", channel: \"trees\"}
%Yggdrasil.Proxy.Data{broker: Yggdrasil.Broker.Redis, data: \"Yggdrasil\", channel: \"trees\"}
%Yggdrasil.Proxy.Data{broker: Yggdrasil.Broker.Redis, data: \"Yggdrasil\", channel: \"trees\"}
iex(2)>
```

### Using the `Yggdrasil.Subscriber.Base` behaviour

The `Yggdrasil.Subscriber.Base` behaviour, defines the following callbacks:

```elixir
@doc """
Initializes the subscriber state.
"""
@callback init(args :: term) :: {:ok, state :: term} | {:error, reason :: term}

@doc """
Updates the subscriber state.
"""
@callback update_state(old_state :: term, new_state :: term) ::
  {:ok, state :: term} |
  {:stop, reason :: term, state :: term}

@doc """
Handles the messages received from the broker.
"""
@callback handle_message(message :: term, state :: term) ::
  {:ok, state :: term} |
  {:stop, reason :: term, state :: term}

@doc """
Handles the termination of the subscriber server.
"""
@callback terminate(reason :: term, state :: term) :: term
```

And by default it generates the following functions:

```elixir

@doc """
Starts a subscriber.

Args:
`feed` - Feed from where it'll receive the messages.
`args` - Internal state. State of the subscriber.
`options` - Options (GenServer options).

Returns:
GenServer output.
"""
@spec start_link(feed :: pid | atom | {atom, node}, args :: term,
                 options :: term) ::
    {:ok, pid} |
    :ignore |
    {:error, {:already_started, pid} | term}

@doc """
Updates the state of the subscriber.

Args:
`server` - Subscriber pid.
`state` - New state.
"""
@spec register(server :: pid | atom | {atom, node}, state :: term) ::
    :ok | :stopping

@doc """
Subscribes to a channel.

Args:
`server` - Subscriber pid.
`channel` - Channel name
"""
@spec subscribe(server :: pid | atom | {atom, node}, channel :: term) :: :ok

@doc """
Unsubscribes from a channel.

Args:
`server` - Subscriber pid.
`channel` - Channel name.
"""
@spec unsubscribe(server :: pid | atom | {atom, node}, channel :: term) :: :ok

@doc """
Stops the subscriber.

Args:
`server` - Subscriber pid.
`reason` - Reason to stop (default = `:normal`)
"""
@spec stop(server :: pid | atom | {atom, node}, reason :: term) :: term
```

Messages from a broker come always as the `defstruct Yggdrasil.Proxy.Data`:

```elixir
defmodule Yggdrasil.Proxy.Data do
  defstruct data: nil, channel: nil, broker: nil
  @type t :: %__MODULE__{data: term, channel: term, broker: atom}
end
```

To use this behaviour it's necessary to give a broker as argument. For example,
a subscriber that prints messages received from a Redis channel:

```elixir
defmodule Subscriber do
  use Yggdrasil.Subscriber.Base, broker: Yggdrasil.Broker.Redis
  alias Yggdrasil.Proxy.Data, as: Data

  def handle_message(%Data{channel: channel, data: message}, state) do
    spawn fn ->
      IO.puts "Received: #{inspect message} from #{inspect channel}"
    end
    {:ok, state}
  end
end
```

To execute it:

```
iex(1)> {:ok, conn} = Subscriber.start_link(Yggdrasil.Feed)
{:ok, #PID<0.350.0>}
iex(2)> Subscriber.subscribe(conn, "trees")
:ok
iex(3)>
```
      
If you execute the following command in Redis:

```
127.0.0.1:6379> PUBLISH "trees" "Yggdrasil!"
```

You'll get:

```
Received: "Yggdrasil!" from "trees"
:ok
iex(3)>
```        

And if you unsubscribe to channel `"trees"`, `Subscriber` no longer will
receive messages from that channel:

```
iex(3)> Subscriber.unsubscribe(conn, "trees")
:ok
```        

## New brokers.

You can code new brokers better suited for your needs.

There are two ways of accomplish this.

### Using the `Yggdrasil.Broker`

This behaviour is for implementation of new brokers. You need to implement
three functions: `subscribe/2`, `unsubscribe/1` and `handle_message/2`.
For example, a broker made out of a `GenServer`:

```elixir
defmodule Broker do
  use GenServer
  use Yggdrasil.Broker

  ###################
  # Client callbacks.
 
  def subscribe(channel, callback) do
    {:ok, conn} = __MODULE__.start_link
    conn |> GenServer.cast({:subscribe, channel, callback})
    {:ok, conn}
  end


  def unsubscribe(conn, _channel), do:
    __MODULE__.stop conn


  def handle_message(_conn, _channel, :subscribed), do:
    :subscribed
  def handle_message(_conn, _channel, {:message, message}), do:
    {:message, message}
  def handle_message(_conn, _channel, _ignored), do:
    :whatever

  ###################
  # Client functions.

  def start_link() do
    GenServer.start_link __MODULE__, nil, []
  end

  def publish(broker, channel, message), do:
    GenServer.cast broker, {:publish, channel, message}

  def stop(broker), do:
    GenServer.cast broker, :stop

  ###################
  # Server callbacks.

  def init(_) do
    {:ok, %{}}
  end

  def handle_cast(:stop, state), do:
    {:stop, :normal, state}
  def handle_cast({:subscribe, channel, callback}, state) do
    new_state = Map.put state, channel, callback
    callback.(:subscribed)
    {:noreply, new_state}
  end
  def handle_cast({:publish, channel, message}, state) do
    case Map.fetch state, channel do
      :error ->
        :ok
      {:ok, callback} ->
        callback.({:message, message})
    end
    {:noreply, state}
  end
  def handle_cast(_any, state), do:
    {:noreply, state}
```

You can now use the `Broker` with the `Subscriber`.

### Using the `Yggdrasil.Broker.GenericBroker`

This behaviour is used to modify the decoding of messages in an existent
broker.

Let's say we now want to assure we will have string messages coming
from our `Broker` and only receive messages from the broker every second
because we don't need all the published messages and we want an `:ets` called
`:test_cache` to store the last value produced, we would do:

```elixir
defmodule GenericBroker do
  use Yggdrasil.Broker.GenericBroker,
      broker: Broker,
      interval: 1000,
      cache: :test_cache

  def decode(channel, message) do:
    {:message, inspect(message)}
end
```

Now instead of using our `Broker`, we would use `GenericBroker` with
our `Subscriber`.

## Configuration

For the two brokers provided, you can use the following configuration file
(`config/config.exs`):

```elixir
use Mix.Config

# For Redis
config :exredis,
       host: "localhost",
       port: 6379,
       password: "MyPassword",
       reconnect: :no_reconnect,
       max_queue: :infinity

# For RabbitMQ
config :amqp,
       host: "localhost",
       port: 5672, 
       username: "guest",
       password: "guest",
       virtual_host: "/"

# For Postgres
# For more information check the options for Postgrex library.
config :postgrex, Yggdrasil.Repo,
  hostname: "localhost",
  username: "postgres",
  password: "postgres",
  database: "postgres"
```

## Installation

Ensure yggdrasil is started before your application:

```elixir
def application do
  [applications: [:yggdrasil]]
end
```

And this to install the last version from *Hex*:

```elixir
def deps do
  [{:yggdrasil, "~> 1.1.2"}]
end
```