defmodule Stargate do
@moduledoc """
Stargate provides an Elixir client for the Apache Pulsar distributed message
log service, based on the Pulsar project's websocket API.
### Producer
Create a producer process under your application's supervision tree with the following:
options = [
name: :pulsar_app,
host: [{:"broker-url.com", 8080}],
producer: [
persistence: "non-persistent",
tenant: "marketing",
namespace: "public",
topic: "new-stuff"
]
]
Stargate.Supervisor.start_link(options)
Once the producer is running, pass messages to the client by pid or by the named
registry entry:
Stargate.produce(producer, [{"key", "value"}])
If you won't be producing frequently you can choose to run ad hoc produce commands against
the url of the Pulsar cluster/topic as follows:
url = "ws://broker-url.com:8080/ws/v2/producer/non-persistent/marketing/public/new-stuff"
Stargate.produce(url, [{"key, "value"}])
### Consumer and Reader
Both consumers and readers connected to Pulsar via Stargate process received messages the
same way. Stargate takes care of receiving the messages and sending acknowledgements back
to the cluster so all you need to do is start a process and define a module in your application
that invokes `use Stargate.Receiver.MessageHandler` and has a `handle_message/1` or `handle_message/2`
function as follows:
defmodule Publicize.MessageHandler do
use Stargate.Receiver.MessageHandler
def handle_message(%{context: context, payload: payload}) do
publish_to_channel(payload, context)
:ack
end
defp publish_to_channel(payload, context) do
...do stuff...
end
end
The `handle_message/1` must return either `:ack` or `:continue` in order to ack successful
processing of the message back to the cluster or continue processing without ack (in the event
you want to do a bulk/cumulative ack at a later time). If using the `handle_message/2` callback
for handlers that keep state across messages handled, it must return `{:ack, state}` or
`{:continue, state}`.
Then, create a consumer or reader process under your application's supevision tree with the following:
options = [
name: :pulsar_app,
host: [{:"broker-url.com", 8080}]
consumer: [ <====== replace with `:reader` for a reader client
tenant: "internal",
namespace: "research",
topic: "ready-to-release",
subscription: "rss-feed", <====== required for a `:consumer`
handler: Publicizer.MessageHandler
]
]
Stargate.Supervisor.start_link(options)
Readers and Consumers share the same configuration API with the two key differences that the
`:consumer` key in the options differentiates from the `:reader` key, as well as the requirement
to provide a `"subscription"` to a consumer for the cluster to manage messages.
"""
defdelegate produce(url_or_connection, message), to: Stargate.Producer
defdelegate produce(connection, message, mfa), to: Stargate.Producer
@type tenant :: String.t()
@type namespace :: String.t()
@type topic :: String.t()
@type persistence :: String.t()
@type component :: :producer | :producer_ack | :consumer | :consumer_ack | :reader | :reader_ack
@type key_opt ::
{:persistence, persistence()}
| {:name, atom()}
| {:registry, atom()}
| {:component, component()}
@doc """
Generate the via-tuple needed for addressing a process within the Stargate supervision tree. Expects
at minimum the tenant, namespace, and topic of the process being addressed and assumes by default the
desired process is the Producer of a persistent topic managed by the default supervisor/registry.
iex> Stargate.registry_key("foo", "bar", "baz")
{:via, Registry, {:sg_reg_default, {:producer, "persistent", "foo", "bar", "baz"}}}
iex> Stargate.registry_key("foo", "bar", "baz", registry: MyCustom.Registry, persistence: "non-persistent", component: :producer_ack)
{:via, Registry, {MyCustom.Registry, {:producer_ack, "non-persistent", "foo", "bar", "baz"}}}
"""
@spec registry_key(tenant(), namespace(), topic(), [key_opt]) ::
{:via, Registry, {atom(), {component(), persistence(), tenant(), namespace(), topic()}}}
def registry_key(tenant, namespace, topic, opts \\ []) do
name = Keyword.get(opts, :name, :default)
registry = Keyword.get(opts, :registry) || :"sg_reg_#{name}"
component = Keyword.get(opts, :component, :producer)
persistence = Keyword.get(opts, :persistence, "persistent")
{:via, Registry, {registry, {component, persistence, tenant, namespace, topic}}}
end
defmodule Message do
@moduledoc """
Defines the Elixir Struct that represents the structure of a Pulsar message.
The struct combines the "location" data of the received messages (persistent vs. non-persistent,
tenant, namespace, topic) with the payload, any key and/or properties provided with the message,
and the publication timestamp as an DateTime struct, and the messageId assigned by the cluster.
### Example
message = %Stargate.Message{
topic: "ready-for-release",
namespace: "research",
tenant: "internal",
persistence: "persistent",
message_id: "CAAQAw==",
payload: "Hello World",
key: "1234",
properties: nil,
publish_time: ~U[2020-01-10 18:13:34.443264Z]
}
"""
@type t :: %__MODULE__{
topic: String.t(),
namespace: String.t(),
tenant: String.t(),
persistence: String.t(),
message_id: String.t(),
payload: String.t(),
key: String.t(),
properties: map(),
publish_time: DateTime.t()
}
defstruct [
:topic,
:namespace,
:tenant,
:persistence,
:message_id,
:payload,
:key,
:properties,
:publish_time
]
@doc """
Create a %Stargate.Message{} struct from a list of arguments. Takes the map decoded from
the json message payload received from Pulsar and adds the tenant, namespace, topic, persistence
information to maintain "location awareness" of a message's source topic.
Creating a %Stargate.Message{} via the `new/5` function automatically converts the ISO8601-formatted
publish timestamp to a DateTime struct and decodes the message payload from the Base64 encoding
received from the cluster.
"""
@spec new(map(), String.t(), String.t(), String.t(), String.t()) :: Stargate.Message.t()
def new(message, persistence, tenant, namespace, topic) do
{:ok, timestamp, _} =
message
|> Map.get("publishTime")
|> DateTime.from_iso8601()
payload =
message
|> Map.get("payload")
|> Base.decode64!()
%Message{
topic: topic,
namespace: namespace,
tenant: tenant,
persistence: persistence,
message_id: message["messageId"],
payload: payload,
key: Map.get(message, "key", ""),
properties: Map.get(message, "properties", %{}),
publish_time: timestamp
}
end
end
end