defmodule Polyn.PullConsumer do
@moduledoc """
Use `Polyn.PullConsumer` to connect and process messages from an existing [NATS Consumer](https://docs.nats.io/nats-concepts/jetstream/consumers)
that was setup with [Polyn CLI](https://github.com/SpiffInc/polyn-cli). This module is a
wrapper around `Jetstream.PullConsumer` that does schema validation with the received messages.
A key difference that Polyn adds is that the `:consumer_name` will be taken care of for you
by using the passed `type` and configured `:source_root`. You can pass `:source` to `start_link/3`
to get a more specific `:consumer_name`. This type of Consumer is meant for simple use cases that
don't involve concurrency or batching.
## Example
defmodule MyApp.PullConsumer do
use Polyn.PullConsumer
def start_link(arg) do
Polyn.PullConsumer.start_link(__MODULE__, arg,
connection_name: :gnat,
type: "user.created.v1")
end
@impl true
def init(_arg) do
{:ok, nil}
end
@impl true
def handle_message(message, state) do
# Do some processing with the message.
{:ack, state}
end
end
"""
use Jetstream.PullConsumer
alias Polyn.Serializers.JSON
@doc """
Invoked when the server is started. `start_link/3` or `start/3` will block until it returns.
`init_arg` is the argument term (second argument) passed to `start_link/3`.
See `c:Connection.init/1` for more details.
"""
@callback init(init_arg :: term) ::
{:ok, state :: term()}
| :ignore
| {:stop, reason :: any}
@doc """
Invoked to synchronously process a message pulled by the consumer.
Depending on the value it returns, the acknowledgement is or is not sent.
Polyn will deserialize the message body into a `Polyn.Event` struct and use
that as the first argument, followed by the original message, follwed by the state.
## ACK actions
See `c:Jetstream.PullConsumer.handle_message/2` for available options
## Example
def handle_message(event, _message, state) do
IO.inspect(event)
{:ack, state}
end
"""
@callback handle_message(
event :: Polyn.Event.t(),
message :: Jetstream.message(),
state :: term()
) ::
{ack_action, new_state}
when ack_action: :ack | :nack | :term | :noreply, new_state: term()
@typedoc """
Options for starting a Polyn.PullConsumer
* `:type` - Required. The event type to consume
* `:connection_name` - Required. The Gnat connection identifier
* `:source` - Optional. More specific name for the consumer to add to the `:source_root`
* All other options will be assumed to be GenServer options
"""
@type option ::
{:type, binary()}
| {:source, binary()}
| {:connection_name, Gnat.t()}
| GenServer.option()
@doc false
defmacro __using__(opts) do
quote location: :keep, bind_quoted: [opts: opts] do
@behaviour Polyn.PullConsumer
@spec child_spec(arg :: GenServer.options()) :: Supervisor.child_spec()
def child_spec(arg) do
default = %{
id: __MODULE__,
start: {__MODULE__, :start_link, [arg]}
}
Supervisor.child_spec(default, unquote(Macro.escape(opts)))
end
defoverridable child_spec: 1
end
end
@doc """
Starts a pull consumer linked to the current process with the given function.
This is often used to start the pull consumer as part of a supervision tree.
Once the server is started, the `c:init/1` function of the given `module` is called with
`init_arg` as its argument to initialize the server. To ensure a synchronized start-up procedure,
this function does not return until `c:init/1` has returned.
See `GenServer.start_link/3` for more details.
## Example
{:ok, consumer} =
Polyn.PullConsumer.start_link(ExamplePullConsumer, %{initial_arg: "foo"},
connection_name: :gnat,
type: "user.updated.v1",
stream: "TEST_STREAM",
)
"""
@spec start_link(module(), init_arg :: term(), options :: [option()]) ::
GenServer.on_start()
def start_link(module, init_arg, options \\ []) when is_atom(module) and is_list(options) do
Jetstream.PullConsumer.start_link(
__MODULE__,
{initial_state(module, options), init_arg},
options
)
end
@doc """
Starts a `Jetstream.PullConsumer` process without links (outside of a supervision tree).
See `start_link/3` for more information.
"""
@spec start(module(), init_arg :: term(), options :: [option()]) ::
GenServer.on_start()
def start(module, init_arg, options \\ []) when is_atom(module) and is_list(options) do
Jetstream.PullConsumer.start(__MODULE__, {initial_state(module, options), init_arg}, options)
end
@doc """
Closes the pull consumer and stops underlying process.
## Example
{:ok, consumer} =
PullConsumer.start_link(ExamplePullConsumer, %{initial_arg: "foo"},
connection_name: :gnat,
type: "user.updated.v1",
stream: "TEST_STREAM",
)
:ok = PullConsumer.close(consumer)
"""
@spec close(consumer :: Jetstream.PullConsumer.consumer()) :: :ok
def close(consumer) do
Jetstream.PullConsumer.close(consumer)
end
@impl Jetstream.PullConsumer
def init({%{module: module} = internal_state, init_arg}) do
case module.init(init_arg) do
{:ok, state} ->
# Keep the `module` in the internal state so we can know
# what functions to call
internal_state = %{internal_state | state: state}
{:ok, internal_state, connection_options(internal_state)}
other ->
other
end
end
@impl Jetstream.PullConsumer
def handle_message(message, %{module: module, state: state} = internal_state) do
conn = Map.fetch!(internal_state, :connection_name)
case JSON.deserialize(message.body, conn, store_name: internal_state.store_name) do
{:ok, event} ->
{response, state} = module.handle_message(event, message, state)
{response, %{internal_state | state: state}}
{:error, error} ->
# If a validation error happens we want to tell NATS to stop sending the message
# and that it won't be processed (ACKTERM) and will prevent us from raising the
# same error over and over.
Jetstream.ack_term(message)
raise Polyn.ValidationException, error
end
end
defp initial_state(module, opts) do
%{
module: module,
state: nil,
store_name: store_name(opts),
connection_name: Keyword.fetch!(opts, :connection_name),
type: Keyword.fetch!(opts, :type),
source: Keyword.get(opts, :source)
}
end
defp connection_options(%{
type: type,
source: source,
connection_name: connection_name
}) do
consumer_name = Polyn.Naming.consumer_name(type, source)
stream = Polyn.Naming.lookup_stream_name!(connection_name, type)
[connection_name: connection_name, stream_name: stream, consumer_name: consumer_name]
end
defp store_name(opts) do
Keyword.get(opts, :store_name, Polyn.SchemaStore.store_name())
end
end