# Erlbus
> Message/Event Bus written in Erlang.
![CI](https://github.com/cabol/erlbus/workflows/CI/badge.svg)
The PubSub core is a clone of the original, remarkable, and proven
[Phoenix PubSub Layer][phx_pubsub],
but re-written in Erlang.
A new way to build soft real-time and high scalable messaging-based
applications, not centralized but distributed!
See the **[online documentation](https://hexdocs.pm/erlbus/)**.
[phx_pubsub]: https://hexdocs.pm/phoenix/Phoenix.PubSub.html
[phx_framework]: http://www.phoenixframework.org/
## Introduction
**Erlbus** is a simple and lightweight library/tool to build messaging-based
applications.
**Erlbus** PubSub implementation was taken from [Phoenix Framework][phx_framework],
which provides an amazing, scalable and proven PubSub solution. In addition to
this, **Erlbus** provides an usable and simpler interface on top of this
implementation.
You can read more about the PubSub implementation [HERE][phx_pubsub].
## Installation
### Erlang
In your `rebar.config`:
```erlang
{deps, [
{ebus, "0.3.0", {pkg, erlbus}}
]}.
```
### Elixir
In your `mix.exs`:
```elixir
def deps do
[
{:ebus, "~> 0.3", hex: :erlbus}
]
end
```
## Getting Started
Assuming you have a working Erlang installation (18 or later), building `erlbus`
should be as simple as:
```
$ git clone https://github.com/cabol/erlbus.git
$ cd erlbus
$ make
```
## Quick Start Example
Start an Erlang console with `erlbus` running:
```
make shell
```
Once into the erlang console:
```erlang
% subscribe the current shell process
ebus:sub(self(), "foo").
ok
% spawn a process
Pid = spawn_link(fun() -> timer:sleep(infinity) end).
<0.57.0>
% subscribe spawned PID
ebus:sub(Pid, "foo").
ok
% publish a message
ebus:pub("foo", {foo, "hi"}).
ok
% check received message for Pid
ebus_proc:messages(Pid).
[{foo,"hi"}]
% check received message for self
ebus_proc:messages(self()).
[{foo,"hi"}]
% unsubscribe self
ebus:unsub(self(), "foo").
ok
% publish other message
ebus:pub("foo", {foo, "hello"}).
ok
% check received message for Pid
ebus_proc:messages(Pid).
[{foo,"hi"},{foo,"hello"}]
% check received message for self (last message didn't arrive)
ebus_proc:messages(self()).
[{foo,"hi"}]
% check subscribers (only Pid should be in the returned list)
ebus:subscribers("foo").
[<0.57.0>]
% check topics
ebus:topics().
[<<"foo">>]
% subscribe self to other topic
ebus:sub(self(), "bar").
ok
% check topics
ebus:topics().
[<<"bar">>,<<"foo">>]
% publish other message
ebus:pub("bar", {bar, "hi bar"}).
ok
% check received message for Pid (last message didn't arrive)
ebus_proc:messages(Pid).
[{foo,"hi"},{foo,"hello"}]
% check received message for self
ebus_proc:messages(self()).
[{foo,"hi"},{bar,"hi bar"}]
```
> **Note:**
> You may have noticed that is not necessary additional steps/calls to
create/delete a topic, this is automatically handled by `ebus`, so you don't
worry about it!
Now, let's make it more fun, start two Erlang consoles, first one:
```
erl -name node1@127.0.0.1 -setcookie ebus -pa _build/default/lib/*/ebin -s ebus -config test/test.config
```
The second one:
```
erl -name node2@127.0.0.1 -setcookie ebus -pa _build/default/lib/*/ebin -s ebus -config test/test.config
```
Then what we need to do is put these Erlang nodes in cluster, so from any of
them send a ping to the other:
```erlang
% From node1 ping node2
net_adm:ping('node2@127.0.0.1').
pong
```
Excellent, we have both nodes in cluster, thanks to the beauty of
[Distributed Erlang](http://www.erlang.org/doc/reference_manual/distributed.html).
So, let's repeat the above exercise but now in two nodes.
In the `node1` create a handler and subscription to some topic:
```erlang
% create a callback fun to use ebus_proc utility
CB1 = fun(Msg) ->
io:format("CB1: ~p~n", [Msg])
end
#Fun<erl_eval.6.54118792>
% other callback but receiving additional arguments,
% which may be used when message arrives
CB2 = fun(Msg, Args) ->
io:format("CB2: Msg: ~p, Args: ~p~n", [Msg, Args])
end.
#Fun<erl_eval.12.54118792>
% use ebus_proc utility to spawn a handler
H1 = ebus_proc:spawn_handler(CB1).
<0.70.0>
H2 = ebus_proc:spawn_handler(CB2, ["any_ctx"]).
<0.72.0>
% subscribe handlers
ebus:sub(H1, "foo").
ok
ebus:sub(H2, "foo").
ok
```
Repeat the same thing above in `node2`.
Once you have handlers subscribed to the same channel in both nodes, publish
some messages from any node:
```erlang
% publish message
ebus:pub("foo", {foo, "again"}).
CB1: {foo,"again"}
CB2: Msg: {foo,"again"}, Args: "any_ctx"
ok
```
And in the other node you will see those messages have arrived too:
```erlang
CB1: {foo,"again"}
CB2: Msg: {foo,"again"}, Args: "any_ctx"
```
Let's check subscribers, so from any Erlang console:
```erlang
% returns local and remote subscribers
ebus:subscribers("foo").
[<7023.67.0>,<7023.69.0>,<0.70.0>,<0.72.0>]
```
You can also check the tests for more examples about using `ebus`.
So far, so good! Let's continue!
## Point-To-Point Example
The great thing here is that you don't need something special to implement a
point-to-point behavior. It is as simple as this:
```erlang
ebus:dispatch("topic1", #{payload => "M1"}).
```
Dispatch function gets the subscribers and then picks one of them to send the
message out. You can provide a dispatch function to pick up a subscriber,
otherwise, a default function is provided (picks a subscriber random).
Dispatch function comes in 3 different flavors:
* `ebus:dispatch/2`: receives the topic and the message.
* `ebus:dispatch/3`: receives the topic, message and a list of options.
* `ebus:dispatch/4`: same as previous but receives as 1st argument the name of
the server, which is placed by default in the other functions.
Dispatch options are:
* `{scope, local | global}`: allows you to choose if you want to pick a local
subscriber o any.
Default value: `local`.
* `{dispatch_fun, fun(([term()]) -> term())}`: function to pick up a
subscriber. If it isn't provided, a default random function is provided.
To see how this function is implemented go [HERE](./src/ebus.erl).
Let's see an example:
```erlang
% subscribe local process
ebus:sub(self(), "foo").
ok
% spawn a process
Pid = spawn_link(fun() -> timer:sleep(infinity) end).
<0.57.0>
% subscribe spawned PID
ebus:sub(Pid, "foo").
ok
% check that we have two subscribers
ebus:subscribers("foo").
[<0.57.0>,<0.38.0>]
% now dispatch a message (default dispatch fun and scope)
ebus:dispatch("foo", #{payload => foo}).
ok
% check that only one subscriber received the message
ebus_proc:messages(self()).
[#{payload => foo}]
ebus_proc:messages(Pid).
[]
% dispatch with options
Fun = fun([H | _]) -> H end.
#Fun<erl_eval.6.54118792>
ebus:dispatch("foo", <<"M1">>, [{scope, global}, {dispatch_fun, Fun}]).
ok
% check again
ebus_proc:messages(self()).
[#{payload => foo}]
ebus_proc:messages(Pid).
[<<"M1">>]
```
Extremely easy isn't?
## Distributed Erlbus
**Erlbus** is distributed by nature, it doesn't require any additional/magical
thing.
Once you have an Erlang cluster, messages are broadcasted using
[PG2](http://erlang.org/doc/man/pg2.html),
which is the default PubSub adapter. Remember, it's a [Phoenix PubSub][phx_pubsub]
clone, so the architecture and design it's the same.
[Phoenix Channels](http://www.phoenixframework.org/docs/channels) are supported
on PubSub layer, which is the core. Take a look at this
[blog post](http://www.phoenixframework.org/blog/the-road-to-2-million-websocket-connections).
## Important links
- [Examples](https://github.com/cabol/erlbus/tree/master/examples).
- [WEST](https://github.com/cabol/west).
## Running Tests
```
make test
```
## Building docs
```
make docs
```
> **Note:** Once you run previous command, a new folder `doc` is created,
and you'll have a pretty nice HTML documentation.
## Change Log
All notable changes to this project will be documented in the
[CHANGELOG.md](CHANGELOG.md).
## Copyright and License
Original work Copyright (c) 2014 Chris McCord
Modified work Copyright (c) 2016 Carlos Andres Bolaños
**Erlbus** source code is licensed under the [MIT License](LICENSE).
> **NOTE:** Pub/Sub implementation was taken from [Phoenix Framework](https://github.com/phoenixframework/phoenix).