defmodule Offbroadway.EventRelay.Producer do
@moduledoc """
A GenStage producer that continuously receives messages from an EventRelay Destination/Topic
For a quick getting started on using Broadway with EventRelay, please see
the [EventRelay Guide](https://hexdocs.pm/).
## Example
defmodule BE.Broadway do
use Broadway
alias Broadway.Message
def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module:
{Offbroadway.EventRelay.Producer,
destination_id: "...",
host: "localhost",
port: "50051",
token: "...",
certfile: "/path/to/clientcertfile.pem",
keyfile: "/path/to/clientkeyfile.pem",
cacertfile: "/path/to/cacertfile.pem"},
],
processors: [
default: [concurrency: 10]
]
)
end
def handle_message(_processor_name, message, _context) do
# Do something with the message/event
message
end
end
The above configuration will set up a producer that continuously pulls
events from the configured destination and sends them downstream.
## Options
#{NimbleOptions.docs(Offbroadway.EventRelay.Options.definition())}
### Authentication
For specifics of how to setup authetication in EventRelay use the
[Authentication Guide](https://github.com/eventrelay/eventrelay/wiki/Authentication-and-Authorization) in the EventRelay Wiki.
See the [Options](#options) section for authentication configuration options.
## Acknowledgements
This producer uses the `PullQueuedEventsRequest` in EventRelay which locks
an event for a specific destination when it is pulled from the system.
That means any client that pulls events for that destination in the
future will not receive that event.
Dealing with failures it is up to the user of this producer to implement
your own failure handling via your own acknowledgement handling.
See the example below.
`transformer: {__MODULE__, :transform, []}` in combination with
`acknowledger: {__MODULE__, :ack_id, :ack_data}` is what allows you to
setup up your own acknowledger.
In the future EventRelay will support unlocking an event via the [GRPC API](https://github.com/eventrelay/eventrelay/wiki/GRPC)
which would allow you to re-enqueue an event for processing again on failure.
### Example
defmodule BE.Broadway do
use Broadway
alias Broadway.Message
def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module:
{Offbroadway.EventRelay.Producer,
destination_id: "...",
host: "localhost",
port: "50051",
token: "...",
certfile: "/path/to/clientcertfile.pem",
keyfile: "/path/to/clientkeyfile.pem",
cacertfile: "/path/to/cacertfile.pem"},
transformer: {__MODULE__, :transform, []}
],
processors: [
default: [concurrency: 10]
]
)
end
def transform(message, _opts) do
%{
message
| acknowledger: {__MODULE__, :ack_id, :ack_data}
}
end
def handle_message(_processor_name, message, _context) do
# Do something with the message/event
message
end
def ack(:ack_id, _successful, _failed) do
# Handle the failed messages/events here
end
end
"""
use GenStage
require Logger
alias Offbroadway.EventRelay.{Options, Client}
alias Broadway.Message
def start_link(config) do
GenStage.start_link(__MODULE__, config)
end
@impl true
def init(opts) do
Logger.debug("Offbroadway.EventRelay.Producer.init(#{inspect(opts)})")
{_producer_module, producer_opts} = opts[:broadway][:producer][:module]
pull_interval = opts[:pull_interval]
{:ok, channel} = Client.prepare_channel(opts)
pull_timer = schedule_next_pull(0)
{:producer,
%{
demand: 0,
channel: channel,
pull_timer: pull_timer,
pull_interval: pull_interval,
pull_task: nil,
producer_opts: producer_opts
}}
end
def prepare_for_start(_module, broadway_opts) do
{producer_module, client_opts} = broadway_opts[:producer][:module]
opts = NimbleOptions.validate!(client_opts, Options.definition())
broadway_opts =
put_in(broadway_opts, [:producer, :module], {producer_module, opts})
Logger.debug("Offbroadway.EventRelay.Producer.prepare_for_start(#{inspect(broadway_opts)}")
{[], broadway_opts}
end
@impl true
def handle_info(:pull_events, %{pull_timer: nil} = state) do
Logger.debug("Offbroadway.EventRelay.Producer.handle_info(:pull_events, #{inspect(state)})")
{:noreply, [], state}
end
def handle_info(:pull_events, state) do
Logger.debug("Offbroadway.EventRelay.Producer.handle_info(:pull_events, #{inspect(state)}")
handle_pull_events(%{state | pull_timer: nil})
end
# This gets called by the Task.async executed in pull_events_from_event_relay
def handle_info(
{ref, events},
%{demand: demand, pull_task: %{ref: ref}} = state
) do
Logger.debug(
"Offbroadway.EventRelay.Producer.handle_info(#{inspect(events)}, #{inspect(state)})"
)
new_demand = demand - length(events)
pull_timer =
case {events, new_demand} do
{[], _} ->
schedule_next_pull(state.pull_interval)
{_, 0} ->
nil
_ ->
schedule_next_pull(0)
end
{:noreply, events, %{state | demand: new_demand, pull_timer: pull_timer, pull_task: nil}}
end
@impl true
def handle_info(msg, state) do
Logger.info("Offbroadway.EventRelay.Producer.handle_info(#{inspect(msg)}, #{inspect(state)})")
{:noreply, [], state}
end
@impl true
def handle_demand(incoming_demand, %{demand: demand} = state) do
handle_pull_events(%{state | demand: demand + incoming_demand})
end
defp handle_pull_events(%{pull_timer: nil, demand: demand, pull_task: nil} = state)
when demand > 0 do
task = pull_events_from_event_relay(state, demand)
{:noreply, [], %{state | pull_task: task}}
end
defp handle_pull_events(state) do
{:noreply, [], state}
end
defp pull_events_from_event_relay(state, total_demand) do
Logger.debug(
"Offbroadway.EventRelay.Producer.pull_events_from_event_relay(#{inspect(state)}, #{inspect(total_demand)})"
)
%{channel: channel, producer_opts: producer_opts} = state
destination_id = producer_opts[:destination_id]
Task.async(fn ->
channel
|> Client.pull_queued_events(destination_id, total_demand)
|> handle_pull_queued_events()
end)
end
def schedule_next_pull(pull_interval) do
Logger.debug("Offbroadway.EventRelay.Producer.schedule_next_pull(#{inspect(pull_interval)})")
Process.send_after(self(), :pull_events, pull_interval)
end
def handle_pull_queued_events({:ok, result}) do
Enum.map(result.events, fn event ->
%Message{
data: event,
acknowledger: Broadway.NoopAcknowledger.init()
}
end)
end
def handle_pull_queued_events(error) do
Logger.error(
"Offbroadway.EventRelay.Producer.pull_events_from_event_relay error=#{inspect(error)}"
)
[]
end
end