defmodule Gnat.Jetstream.PullConsumer do
@moduledoc """
A behaviour which provides the NATS JetStream Pull Consumer functionalities.
When a Consumer is pull-based, it means that the messages will be delivered when the server
is asked for them.
## Example
Declare a module which uses `Gnat.Jetstream.PullConsumer` and implements `c:init/1` and
`c:handle_message/2` callbacks.
defmodule MyApp.PullConsumer do
use Gnat.Jetstream.PullConsumer
def start_link(arg) do
Jetstream.PullConsumer.start_link(__MODULE__, arg)
end
@impl true
def init(_arg) do
{:ok, nil,
connection_name: :gnat,
stream_name: "TEST_STREAM",
consumer_name: "TEST_CONSUMER"}
end
@impl true
def handle_message(message, state) do
# Do some processing with the message.
{:ack, state}
end
end
You can then place your Pull Consumer in a supervision tree. Remember that you need to have the
`Gnat.ConnectionSupervisor` set up.
defmodule MyApp.Application do
use Application
@impl true
def start(_type, _args) do
children = [
# Create NATS connection
{Gnat.ConnectionSupervisor, ...},
# Start NATS Jetstream Pull Consumer
MyApp.PullConsumer,
]
opts = [strategy: :one_for_one]
Supervisor.start_link(children, opts)
end
end
## Connection Options
In order to establish consumer connection with NATS, you need to pass several connection options
via keyword list in third element of a tuple returned from `c:init/1` callback.
Following options **must** be provided. Omitting this options will cause the process to raise
errors upon initialization:
* `:connection_name` - Gnat connection or `Gnat.ConnectionSupervisor` name/PID.
* `:stream_name` - name of an existing string the consumer will consume messages from.
* `:consumer_name` - name of an existing consumer pointing at the stream.
You can also pass the optional ones:
* `:connection_retry_timeout` - a duration in milliseconds after which the PullConsumer which
failed to establish NATS connection retries, defaults to `1000`
* `:connection_retries` - a number of attempts the PullConsumer will make to establish the NATS
connection. When this value is exceeded, the pull consumer stops with the `:timeout` reason,
defaults to `10`
* `:inbox_prefix` - allows the default `_INBOX.` prefix to be customized. Should end with a dot.
* `:domain` - use a JetStream domain, this is mostly used on leaf nodes.
## Dynamic Connection Options
It is possible that you have to determine some of the options dynamically depending on pull
consumer's init argument. To do so, it is recommended to derive these options values from some
init argument:
defmodule MyApp.PullConsumer do
use Gnat.Jetstream.PullConsumer
def start_link() do
Gnat.Jetstream.PullConsumer.start_link(__MODULE__, %{counter: counter})
end
@impl true
def init(%{counter: counter}) do
{:ok, nil,
connection_name: :gnat,
stream_name: "TEST_STREAM_#\{counter}",
consumer_name: "TEST_CONSUMER_#\{counter}"}
end
...
end
## How to supervise
A `PullConsumer` is most commonly started under a supervision tree. When we invoke
`use Gnat.Jetstream.PullConsumer`, it automatically defines a `child_spec/1` function that allows us
to start the pull consumer directly under a supervisor. To start a pull consumer under
a supervisor with an initial argument of :example, one may do:
children = [
{MyPullConsumer, :example}
]
Supervisor.start_link(children, strategy: :one_for_all)
While one could also simply pass the `MyPullConsumer` as a child to the supervisor, such as:
children = [
MyPullConsumer # Same as {MyPullConsumer, []}
]
Supervisor.start_link(children, strategy: :one_for_all)
A common approach is to use a keyword list, which allows setting init argument and server options,
for example:
def start_link(opts) do
{initial_state, opts} = Keyword.pop(opts, :initial_state, nil)
Gnat.Jetstream.PullConsumer.start_link(__MODULE__, initial_state, opts)
end
and then you can use `MyPullConsumer`, `{MyPullConsumer, name: :my_consumer}` or even
`{MyPullConsumer, initial_state: :example, name: :my_consumer}` as a child specification.
`use Gnat.Jetstream.PullConsumer` also accepts a list of options which configures the child
specification and therefore how it runs under a supervisor. The generated `child_spec/1` can be
customized with the following options:
* `:id` - the child specification identifier, defaults to the current module
* `:restart` - when the child should be restarted, defaults to `:permanent`
* `:shutdown` - how to shut down the child, either immediately or by giving it time to shut down
For example:
use Gnat.Jetstream.PullConsumer, restart: :transient, shutdown: 10_000
See the "Child specification" section in the `Supervisor` module for more detailed information.
The `@doc` annotation immediately preceding `use Jetstream.PullConsumer` will be attached to
the generated `child_spec/1` function.
## Name registration
A pull consumer is bound to the same name registration rules as GenServers.
Read more about it in the `GenServer` documentation.
"""
@doc """
Invoked when the server is started. `start_link/3` or `start/3` will block until it returns.
`init_arg` is the argument term (second argument) passed to `start_link/3`.
See `c:Connection.init/1` for more details.
"""
@callback init(init_arg :: term) ::
{:ok, state :: term(), connection_options()}
| :ignore
| {:stop, reason :: any}
@doc """
Invoked to synchronously process a message pulled by the consumer.
Depending on the value it returns, the acknowledgement is or is not sent.
## ACK actions
Possible ACK actions values explained:
* `:ack` - acknowledges the message was handled and requests delivery of the next message to
the reply subject.
* `:nack` - signals that the message will not be processed now and processing can move onto
the next message, NAK'd message will be retried.
* `:term` - instructs the server to stop redelivery of a message without acknowledging it as
successfully processed.
* `:noreply` - nothing is sent. You may send later asynchronously an ACK or NACK message using
the `Jetstream.ack/1` or `Jetstream.nack/1` and similar functions from `Jetstream` module.
## Example
def handle_message(message, state) do
IO.inspect(message)
{:ack, state}
end
"""
@callback handle_message(message :: Gnat.message(), state :: term()) ::
{ack_action, new_state}
when ack_action: :ack | :nack | :term | :noreply, new_state: term()
@typedoc """
The pull consumer reference.
"""
@type consumer :: GenServer.server()
@typedoc """
Connection option values used to connect the consumer to NATS server.
"""
@type connection_option ::
{:connection_name, GenServer.server()}
| {:stream_name, String.t()}
| {:consumer_name, String.t()}
| {:connection_retry_timeout, non_neg_integer()}
| {:connection_retries, non_neg_integer()}
| {:domain, String.t()}
@typedoc """
Connection options used to connect the consumer to NATS server.
"""
@type connection_options :: [connection_option()]
defmacro __using__(opts) do
quote location: :keep, bind_quoted: [opts: opts] do
@behaviour Gnat.Jetstream.PullConsumer
unless Module.has_attribute?(__MODULE__, :doc) do
@doc """
Returns a specification to start this module under a supervisor.
See the "Child specification" section in the `Supervisor` module for more detailed
information.
"""
end
@spec child_spec(arg :: GenServer.options()) :: Supervisor.child_spec()
def child_spec(arg) do
default = %{
id: __MODULE__,
start: {__MODULE__, :start_link, [arg]}
}
Supervisor.child_spec(default, unquote(Macro.escape(opts)))
end
defoverridable child_spec: 1
end
end
@doc """
Starts a pull consumer linked to the current process with the given function.
This is often used to start the pull consumer as part of a supervision tree.
Once the server is started, the `c:init/1` function of the given `module` is called with
`init_arg` as its argument to initialize the server. To ensure a synchronized start-up procedure,
this function does not return until `c:init/1` has returned.
See `GenServer.start_link/3` for more details.
"""
@spec start_link(module(), init_arg :: term(), options :: GenServer.options()) ::
GenServer.on_start()
def start_link(module, init_arg, options \\ []) when is_atom(module) and is_list(options) do
Connection.start_link(
Gnat.Jetstream.PullConsumer.Server,
%{module: module, init_arg: init_arg},
options
)
end
@doc """
Starts a `Jetstream.PullConsumer` process without links (outside of a supervision tree).
See `start_link/3` for more information.
"""
@spec start(module(), init_arg :: term(), options :: GenServer.options()) ::
GenServer.on_start()
def start(module, init_arg, options \\ []) when is_atom(module) and is_list(options) do
Connection.start(
Gnat.Jetstream.PullConsumer.Server,
%{module: module, init_arg: init_arg},
options
)
end
@doc """
Closes the pull consumer and stops underlying process.
## Example
{:ok, consumer} =
PullConsumer.start_link(ExamplePullConsumer,
connection_name: :gnat,
stream_name: "TEST_STREAM",
consumer_name: "TEST_CONSUMER"
)
:ok = PullConsumer.close(consumer)
"""
@spec close(consumer :: consumer()) :: :ok
def close(consumer) do
Connection.call(consumer, :close)
end
end