# Yggdrasil
> *Yggdrasil* is an immense mythical tree that connects the nine worlds in
> Norse cosmology.
[![Build Status](https://travis-ci.org/gmtprime/yggdrasil.svg?branch=master)](https://travis-ci.org/gmtprime/yggdrasil) [![Hex pm](http://img.shields.io/hexpm/v/yggdrasil.svg?style=flat)](https://hex.pm/packages/yggdrasil) [![hex.pm downloads](https://img.shields.io/hexpm/dt/yggdrasil.svg?style=flat)](https://hex.pm/packages/yggdrasil)
Yggdrasil manages subscriptions to channels/queues in some broker. Simple
Redis, RabbitMQ and Postgres interfaces are implemented. By default the
messages are sent to the process calling the function to subscribe to the
channel.
### Starting the feed.
```
$ iex -S mix
Erlang/OTP 18 [erts-7.0] [source] [64-bit] [smp:8:8] [async-threads:10]
[kernel-poll:false]
Interactive Elixir (1.2.0) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> Yggdrasil.Feed.subscribe Yggdrasil.Feed,
...(1)> Yggdrasil.Broker.Redis,
...(1)> "trees",
...(1)> &(IO.puts "#{inspect &1}")
{:ok,
%{broker: Yggdrasil.Broker.Redis, channel: "trees",
ref: #Reference<0.0.4.895>}}
iex(2)>
```
And from `redis-cli`:
```
127.0.0.1:6379> publish "trees" "Yggdrasil"
(integer) 1
127.0.0.1:6379> publish "trees" "Yggdrasil"
(integer) 1
127.0.0.1:6379> publish "trees" "Yggdrasil"
(integer) 1
```
You'll receive in `iex`:
```
iex(2)>
%Yggdrasil.Proxy.Data{broker: Yggdrasil.Broker.Redis, data: \"Yggdrasil\", channel: \"trees\"}
%Yggdrasil.Proxy.Data{broker: Yggdrasil.Broker.Redis, data: \"Yggdrasil\", channel: \"trees\"}
%Yggdrasil.Proxy.Data{broker: Yggdrasil.Broker.Redis, data: \"Yggdrasil\", channel: \"trees\"}
iex(2)>
```
### Using the `Yggdrasil.Subscriber.Base` behaviour
The `Yggdrasil.Subscriber.Base` behaviour, defines the following callbacks:
```elixir
@doc """
Initializes the subscriber state.
"""
@callback init(args :: term) :: {:ok, state :: term} | {:error, reason :: term}
@doc """
Updates the subscriber state.
"""
@callback update_state(old_state :: term, new_state :: term) ::
{:ok, state :: term} |
{:stop, reason :: term, state :: term}
@doc """
Handles the messages received from the broker.
"""
@callback handle_message(message :: term, state :: term) ::
{:ok, state :: term} |
{:stop, reason :: term, state :: term}
@doc """
Handles the termination of the subscriber server.
"""
@callback terminate(reason :: term, state :: term) :: term
```
And by default it generates the following functions:
```elixir
@doc """
Starts a subscriber.
Args:
`feed` - Feed from where it'll receive the messages.
`args` - Internal state. State of the subscriber.
`options` - Options (GenServer options).
Returns:
GenServer output.
"""
@spec start_link(feed :: pid | atom | {atom, node}, args :: term,
options :: term) ::
{:ok, pid} |
:ignore |
{:error, {:already_started, pid} | term}
@doc """
Updates the state of the subscriber.
Args:
`server` - Subscriber pid.
`state` - New state.
"""
@spec register(server :: pid | atom | {atom, node}, state :: term) ::
:ok | :stopping
@doc """
Subscribes to a channel.
Args:
`server` - Subscriber pid.
`channel` - Channel name
"""
@spec subscribe(server :: pid | atom | {atom, node}, channel :: term) :: :ok
@doc """
Unsubscribes from a channel.
Args:
`server` - Subscriber pid.
`channel` - Channel name.
"""
@spec unsubscribe(server :: pid | atom | {atom, node}, channel :: term) :: :ok
@doc """
Stops the subscriber.
Args:
`server` - Subscriber pid.
`reason` - Reason to stop (default = `:normal`)
"""
@spec stop(server :: pid | atom | {atom, node}, reason :: term) :: term
```
Messages from a broker come always as the `defstruct Yggdrasil.Proxy.Data`:
```elixir
defmodule Yggdrasil.Proxy.Data do
defstruct data: nil, channel: nil, broker: nil
@type t :: %__MODULE__{data: term, channel: term, broker: atom}
end
```
To use this behaviour it's necessary to give a broker as argument. For example,
a subscriber that prints messages received from a Redis channel:
```elixir
defmodule Subscriber do
use Yggdrasil.Subscriber.Base, broker: Yggdrasil.Broker.Redis
alias Yggdrasil.Proxy.Data, as: Data
def handle_message(%Data{channel: channel, data: message}, state) do
spawn fn ->
IO.puts "Received: #{inspect message} from #{inspect channel}"
end
{:ok, state}
end
end
```
To execute it:
```
iex(1)> {:ok, conn} = Subscriber.start_link(Yggdrasil.Feed)
{:ok, #PID<0.350.0>}
iex(2)> Subscriber.subscribe(conn, "trees")
:ok
iex(3)>
```
If you execute the following command in Redis:
```
127.0.0.1:6379> PUBLISH "trees" "Yggdrasil!"
```
You'll get:
```
Received: "Yggdrasil!" from "trees"
:ok
iex(3)>
```
And if you unsubscribe to channel `"trees"`, `Subscriber` no longer will
receive messages from that channel:
```
iex(3)> Subscriber.unsubscribe(conn, "trees")
:ok
```
## New brokers.
You can code new brokers better suited for your needs.
There are two ways of accomplish this.
### Using the `Yggdrasil.Broker`
This behaviour is for implementation of new brokers. You need to implement
three functions: `subscribe/2`, `unsubscribe/1` and `handle_message/2`.
For example, a broker made out of a `GenServer`:
```elixir
defmodule Broker do
use GenServer
use Yggdrasil.Broker
###################
# Client callbacks.
def subscribe(channel, callback) do
{:ok, conn} = __MODULE__.start_link
conn |> GenServer.cast({:subscribe, channel, callback})
{:ok, conn}
end
def unsubscribe(conn, _channel), do:
__MODULE__.stop conn
def handle_message(_conn, _channel, :subscribed), do:
:subscribed
def handle_message(_conn, _channel, {:message, message}), do:
{:message, message}
def handle_message(_conn, _channel, _ignored), do:
:whatever
###################
# Client functions.
def start_link() do
GenServer.start_link __MODULE__, nil, []
end
def publish(broker, channel, message), do:
GenServer.cast broker, {:publish, channel, message}
def stop(broker), do:
GenServer.cast broker, :stop
###################
# Server callbacks.
def init(_) do
{:ok, %{}}
end
def handle_cast(:stop, state), do:
{:stop, :normal, state}
def handle_cast({:subscribe, channel, callback}, state) do
new_state = Map.put state, channel, callback
callback.(:subscribed)
{:noreply, new_state}
end
def handle_cast({:publish, channel, message}, state) do
case Map.fetch state, channel do
:error ->
:ok
{:ok, callback} ->
callback.({:message, message})
end
{:noreply, state}
end
def handle_cast(_any, state), do:
{:noreply, state}
```
You can now use the `Broker` with the `Subscriber`.
### Using the `Yggdrasil.Broker.GenericBroker`
This behaviour is used to modify the decoding of messages in an existent
broker.
Let's say we now want to assure we will have string messages coming
from our `Broker` and only receive messages from the broker every second
because we don't need all the published messages and we want an `:ets` called
`:test_cache` to store the last value produced, we would do:
```elixir
defmodule GenericBroker do
use Yggdrasil.Broker.GenericBroker,
broker: Broker,
interval: 1000,
cache: :test_cache
def decode(channel, message) do:
{:message, inspect(message)}
end
```
Now instead of using our `Broker`, we would use `GenericBroker` with
our `Subscriber`.
## Configuration
For the two brokers provided, you can use the following configuration file
(`config/config.exs`):
```elixir
use Mix.Config
# For Redis
config :exredis,
host: "localhost",
port: 6379,
password: "MyPassword",
reconnect: :no_reconnect,
max_queue: :infinity
# For RabbitMQ
config :amqp,
host: "localhost",
port: 5672,
username: "guest",
password: "guest",
virtual_host: "/"
# For Postgres
# For more information check the options for Postgrex library.
config :postgrex, Yggdrasil.Repo,
hostname: "localhost",
username: "postgres",
password: "postgres",
database: "postgres"
```
## Installation
Ensure yggdrasil is started before your application:
```elixir
def application do
[applications: [:yggdrasil]]
end
```
And this to install the last version from *Hex*:
```elixir
def deps do
[{:yggdrasil, "~> 1.1.2"}]
end
```