# Event
Provides modules that can be used to simplify event production and consumption. All
modules are compatible with GenStage and are designed to simplify use of GenStage.
## Installation
Add `event` to your list of dependencies in `mix.exs`:
def deps do
{:event, "~> 0.1.3"}
## Usage
### Event Source
Variant of the GenStage `:producer` stage type. Provides options for automatic buffering of events, restricting the maximum number of events
allowed in the queue and a simplified interface compared to GenStage.
#### Example
defmodule MySource do
use Event.Source, name: __MODULE__, max_items: 1_000
@doc """
Synchronously notify consumers that an event has occurred. Will only
return once the event has been consumed or the timeout is reached
def sync_notify(event) do
Event.Source.sync_notify(__MODULE__, event)
@doc """
Asynchronously notify consumers that an event has occurred
def async_notify(event) do
Event.Source.async_notify(__MODULE__, event)
### Event Processor
Variant of the GenStage `:consumer_producer` stage type.
#### Example
defmodule MyProcessor do
use Event.Processor, name: __MODULE__
@doc """
Synchronously notify consumers that an event has occurred. Will only
return once the event has been consumed or the timeout is reached
def sync_notify(event) do
Event.Processor.sync_notify(__MODULE__, event)
@doc """
Asynchronously notify consumers that an event has occurred
def async_notify(event) do
Event.Processor.async_notify(__MODULE__, event)
@doc """
Handle events from an upstream source or processor
def handle_events(events, _from, state) do
{:noreply, Enum.map(events, fn event -> {:processed, event} end), state}
### Event Sink
Variant of the GenStage `:consumer` stage type. Simplifies return signatures, removing the need to return any events, since it's
the final stage in a pipeline.
#### Example
defmodule MySink do
use Event.Sink, name: __MODULE__
@doc """
Handle events from an upstream source or processor
def handle_events(events, _from, state) do
require Logger
|> Enum.each(fn event ->
Logger.debug "Received Event: #{inspect event}"
{:noreply, state}
#### Available Options
* `max_events` - The maximum number of events that the source will buffer. Any events sent to the producer after this limit has been reached will be discarded. If synchronously sent, the producer will reply with `{:error, :queue_full}`
* `dispatcher` - Specifies the GenStage dispatcher type for the source, defaults to `GenStage.BroadcastDispatcher`
All other options are passed to `GenStage.start_link`, allowing you to specify GenStage behaviour if required.