defmodule Kvasir.EventSource do
@type stream_opts :: Keyword.t()
defmacro __using__(opts \\ []) do
{event_storage, event_storage_opts} =
case Macro.expand(
opts[:source] || raise("Need to set `:source`."),
__CALLER__
) do
{m, o} -> {m, o}
m -> {m, []}
end
cold_storage = Enum.map(opts[:storage] || [], &Macro.expand(&1, __CALLER__))
cold_storages =
cold_storage
|> Enum.map(&if(is_tuple(&1), do: elem(&1, 0), else: &1))
|> Enum.with_index()
|> Enum.map(fn {k, v} -> {k, Module.concat(__CALLER__.module, :"Source#{v + 1}")} end)
cold_storage_setup =
cold_storage
|> Enum.with_index()
|> Enum.map(fn {cold, index} ->
{m, o} =
case cold do
{m, o} -> {m, o}
m -> {m, []}
end
quote do
unquote(m).child_spec(
unquote(Module.concat(__CALLER__.module, :"Source#{index + 1}")),
config(
unquote(opts[:label] || :"storage#{index + 1}"),
Keyword.merge(unquote(o), opts)
)
)
end
end)
encryption = Macro.expand(opts[:encryption], __CALLER__) || false
compression = Macro.expand(opts[:compression], __CALLER__) || false
encryption_opts =
opt_escape(opts[:encryption_opts], __CALLER__) || {Kvasir.Encryption.AES, []}
compression_opts =
opt_escape(opts[:compression_opts], __CALLER__) || {Kvasir.Compression.ZLib, []}
Module.put_attribute(__CALLER__.module, :encryption, encryption)
Module.put_attribute(__CALLER__.module, :encryption_opts, encryption_opts)
Module.put_attribute(__CALLER__.module, :compression, compression)
Module.put_attribute(__CALLER__.module, :compression_opts, compression_opts)
Module.put_attribute(__CALLER__.module, :cold_storages, cold_storages)
quote do
@after_compile unquote(__MODULE__)
@before_compile unquote(__MODULE__)
import unquote(__MODULE__), only: [topic: 2, topic: 3]
Module.register_attribute(__MODULE__, :topics, accumulate: true)
@encryption unquote(encryption)
@encryption_opts unquote(encryption_opts)
@compression unquote(compression)
@compression_opts unquote(compression_opts)
@doc false
@spec child_spec(Keyword.t()) :: map
def child_spec(opts \\ []) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [opts]}
}
end
@doc false
@spec start_link(Keyword.t()) ::
{:ok, pid} | {:error, {:already_started, pid} | {:shutdown, term} | term}
def start_link(opts \\ []) do
topics =
case opts[:topics] do
nil -> __topics__()
filter -> Map.take(__topics__(), filter)
end
topics
|> Enum.map(fn {_, %{module: m}} -> m end)
|> Enum.each(&apply(&1, :regenerate, []))
opts =
Keyword.put(
opts,
:initialize,
Map.new(topics, fn {k, v} -> {k, v.partitions} end)
)
source_child_spec =
__source__().child_spec(
Module.concat(__MODULE__, Source),
config(:source, Keyword.merge(unquote(event_storage_opts), opts))
)
children =
if System.get_env("KVASIR_DISABLE_COLD_STORAGE", "false") in ["true", "1"] do
[source_child_spec]
else
[source_child_spec | unquote(cold_storage_setup)]
end
### Testings
# Code.compile_quoted(
# quote do
# defmodule unquote(__MODULE__.Metrics.Resolver) do
# def host, do: {{127, 0, 0, 1}, 9125}
# end
# end
# )
# children = [Kvasir.Metrics.Dispatcher.child_spec(source: __MODULE__) | children]
### Testing
__topics__()
|> Map.values()
|> Enum.each(
&Kvasir.Event.Encoding.Topic.create(
&1,
events: :all,
overwrite: true,
extra:
quote do
@doc ~S"""
Generate a topic module made for encoding/decoding
a subset of events.
## Examples
```elixir
iex> MySource.MyTopic.filter([MyEvent])
MySource.MyTopic.F3227A3894E15B922A187CE92BE2DA902
```
"""
@spec filter([Kvasir.Event.t()]) :: module
def filter(events) do
Kvasir.Event.Encoding.Topic.create(
unquote(Macro.escape(&1)),
overwrite: false,
only: events
)
end
end
)
)
Supervisor.start_link(
Enum.reject(children, &(&1 == false)),
strategy: :one_for_one,
name: __MODULE__
)
end
@doc ~S"""
Publish an event to a given topic.
## Examples
```elixir
iex> publish("users", UserEvent.create("bob"))
:ok
```
"""
@spec publish(String.t(), Kvasir.Event.t(), Keyword.t()) ::
{:ok, Kvasir.Event.t()} | {:error, atom}
def publish(topic, event, opts \\ []) do
with t = %{key: topic_key, partitions: partitions} <-
__topics__()[topic] || {:error, :unknown_topic} do
if k = opts[:key] do
with {:ok, key} <- topic_key.parse(k, opts),
{:ok, partition} <- topic_key.partition(key, partitions) do
e =
event
|> Kvasir.Event.set_key(key)
|> Kvasir.Event.set_partition(partition)
|> Kvasir.Event.set_topic(topic)
commit(t, e)
end
else
commit(t, event)
end
end
end
defp commit(a, b) do
__source__().commit(unquote(Module.concat(__CALLER__.module, Source)), a, b)
end
# defp commit(a, b) do
# with {:ok, e} <-
# __source__().commit(unquote(Module.concat(__CALLER__.module, Source)), a, b) do
# # metric = [
# # "event|",
# # e.__struct__.__event__(:type),
# # "|",
# # e.__meta__.topic,
# # ":",
# # to_string(e.__meta__.partition),
# # ":",
# # to_string(e.__meta__.offset),
# # "|",
# # "key:#{Kvasir.Event.key(e)}"
# # ]
# # :poolboy.transaction(
# # __MODULE__.Metrics,
# # &GenServer.cast(&1, {:metric, metric}),
# # 5000
# # )
# # |> IO.inspect()
# # IO.puts(["========\n", metric, "\n========"])
# {:ok, e}
# end
# end
@doc ~S"""
Subscribe to a given topic with a module.
The callback_module needs to implement both `init/3` and `event/2`.
The callbacks are:
- `init(topic, partition, opts)` (returns `{:ok, state}`)
- `event(event, state)` (return `:ok` or `{:ok, state}`)
## Examples
```elixir
iex> subscribe("users", MyUsersSubscriber)
:ok
```
"""
@spec subscribe(topic :: String.t(), callback_module :: module, opts :: Keyword.t()) ::
{:ok, pid} | {:error, atom}
def subscribe(topic, callback_module, opts \\ []) do
if t = __topics__()[topic] do
unquote(__MODULE__).subscribe(__MODULE__, t, callback_module, opts)
else
{:error, :unknown_topic}
end
end
@doc ~S"""
Start listening for new events for a given topic.
On each incoming event the given `callback` is called
with as only input the event.
The callback must return `:ok` to continue to the next event.
All other results will stop the listener.
## Examples
```elixir
iex> listen("users", fn event -> IO.inspect(event); :ok end)
{:ok, <pid>}
"""
@spec listen(topic :: String.t(), callback :: fun, opts :: Keyword.t()) ::
{:ok, pid} | {:error, atom}
def listen(topic, callback, opts \\ []) do
if t = __topics__()[topic] do
unquote(__MODULE__).listen(__MODULE__, t, callback, opts)
else
{:error, :unknown_topic}
end
end
@doc ~S"""
Stream events from a given topic.
## Examples
```elixir
iex> stream("users")
#EventStream<"users">
```
"""
@spec stream(String.t(), Keyword.t()) :: {:ok, EventStream.t()} | {:error, atom}
def stream(topic, opts \\ []) do
if t = __topics__()[topic] do
unquote(__MODULE__).stream(__MODULE__, t, opts)
else
{:error, :unknown_topic}
end
end
@doc ~S"""
Create a test stream of given events from a given topic.
## Examples
```elixir
iex> test_stream("users", [])
[]
```
"""
@spec test_stream(String.t(), [Kvasir.Event.t()], Keyword.t()) ::
{:ok, Enumerable.t()} | {:error, atom}
def test_stream(topic, events, opts \\ []) do
if t = __topics__()[topic] do
unquote(__MODULE__).test_stream(__MODULE__, t, events, opts)
else
{:error, :unknown_topic}
end
end
@doc ~S"""
Generate a dedicated publisher module for a given topic.
This publisher needs to be started with `start_link`,
but can also added a child to a Supervisor.
## Examples
```elixir
# iex> generate_dedicated_publisher(MyPublisher, "users")
# iex> MyPublisher.publish(<event>)
```
"""
@spec generate_dedicated_publisher(name :: module, Kvasir.topic(), opts :: Keyword.t()) ::
:ok | {:error, atom}
def generate_dedicated_publisher(name, topic, opts \\ []) do
with t = %{key: topic_key, partitions: partitions} <-
__topics__()[topic] || {:error, :unknown_topic} do
__source__().generate_dedicated_publisher(
unquote(Module.concat(__CALLER__.module, Source)),
name,
t,
opts
)
end
end
@doc false
@spec __source__ :: term
def __source__, do: unquote(event_storage)
@doc false
@spec __storages__ :: term
def __storages__, do: unquote(cold_storages)
@doc false
@spec config(atom, Keyword.t()) :: Keyword.t()
def config(_name, opts), do: opts
defoverridable config: 2
end
end
defmacro __before_compile__(_) do
quote do
@doc false
@spec __topics__ :: %{required(String.t()) => Kvasir.Topic.t()}
def __topics__, do: Map.new(@topics)
end
end
def __after_compile__(env, _bytecode) do
freezers =
env.module.__topics__
|> Map.values()
|> Enum.filter(& &1.freeze)
|> Enum.map(fn %{module: m} -> Module.concat(m, "Freezer") end)
Code.compile_quoted(
quote do
defmodule unquote(Module.concat(env.module, "Freezer")) do
@moduledoc ~S"""
Copy events to cold storage.
"""
@doc false
@spec child_spec(Keyword.t()) :: map
def child_spec(_opts \\ []) do
%{id: __MODULE__, start: {__MODULE__, :start_link, []}}
end
@doc false
@spec start_link :: {:ok, pid}
def start_link, do: Supervisor.start_link(unquote(freezers), strategy: :one_for_one)
end
end
)
end
@build_ins %{
string: Kvasir.Key.String
}
defp opt_escape(nil, _env), do: nil
defp opt_escape(opt, env) do
case Macro.expand(opt, env) do
{a, b} -> {Macro.expand(a, env), Macro.expand(b, env)}
a -> {Macro.expand(a, env), []}
end
end
defmacro topic(topic, key_format, opts \\ []) do
setup = %Kvasir.Topic{
module:
topic
|> String.split(".")
|> Enum.map(&Macro.camelize/1)
|> Enum.join(".")
|> (&Module.concat(__CALLER__.module, &1)).(),
freeze: Keyword.get(opts, :freeze, true),
topic: topic,
key: Macro.expand(@build_ins[key_format] || key_format, __CALLER__),
partitions: opts[:partitions] || 4,
events: opts |> Keyword.get(:events, []) |> Enum.map(&Macro.expand(&1, __CALLER__)),
encryption:
Macro.expand(opts[:encryption], __CALLER__) ||
Module.get_attribute(__CALLER__.module, :encryption),
encryption_opts:
opt_escape(opts[:encryption_opts], __CALLER__) ||
Module.get_attribute(__CALLER__.module, :encryption_opts),
compression:
Macro.expand(opts[:compression], __CALLER__) ||
Module.get_attribute(__CALLER__.module, :compression),
compression_opts:
opt_escape(opts[:compression_opts], __CALLER__) ||
Module.get_attribute(__CALLER__.module, :compression_opts)
}
cold_topics =
__CALLER__.module
|> Module.get_attribute(:cold_storages)
|> Enum.map(fn {_, cold} ->
Module.concat([
setup.module,
"Freezer",
String.trim_leading(inspect(cold), inspect(__CALLER__.module) <> ".")
])
end)
freezers =
if setup.freeze do
__CALLER__.module
|> Module.get_attribute(:cold_storages)
|> Enum.reduce(
quote do
defmodule unquote(Module.concat(setup.module, "Freezer")) do
@moduledoc ~S"""
Copy events to cold storage.
"""
@doc false
@spec child_spec(Keyword.t()) :: map
def child_spec(_opts \\ []) do
%{id: __MODULE__, start: {__MODULE__, :start_link, []}}
end
@doc false
@spec start_link :: {:ok, pid}
def start_link,
do: Supervisor.start_link(unquote(cold_topics), strategy: :one_for_one)
end
end,
fn s = {_, cold}, acc ->
mod =
Module.concat([
setup.module,
"Freezer",
String.trim_leading(inspect(cold), inspect(__CALLER__.module) <> ".")
])
quote do
unquote(acc)
defmodule unquote(mod) do
@moduledoc false
use Kvasir.Storage.Freezer,
source: unquote(__CALLER__.module),
topic: unquote(topic),
storage: unquote(s)
end
end
end
)
end
lookup =
Enum.reduce(
setup.events,
{:__block__, [],
[
{:@, [context: Elixir, import: Kernel], [{:doc, [context: Elixir], [false]}]},
{:@, [context: Elixir, import: Kernel],
[
{:spec, [context: Elixir],
[
{:"::", [],
[
{:"#{topic}_event_lookup", [],
[
{{:., [], [{:__aliases__, [alias: false], [:String]}, :t]}, [], []}
]},
{:|, [], [{:module, [], Elixir}, nil]}
]}
]}
]}
]},
fn event, acc ->
t = event.__event__(:type)
e = event.__event__(:replaced_by) || event
quote do
unquote(acc)
def unquote(:"#{topic}_event_lookup")(unquote(t)), do: unquote(e)
end
end
)
filter =
quote do
@doc ~S"""
Generate a topic module made for encoding/decoding
a subset of events.
## Examples
```elixir
iex> MySource.MyTopic.filter([MyEvent])
MySource.MyTopic.F3227A3894E15B922A187CE92BE2DA902
```
"""
@spec filter([Kvasir.Event.t()]) :: module
def filter(events) do
Kvasir.Event.Encoding.Topic.create(
unquote(Macro.escape(setup)),
true,
overwrite: false,
only: events
)
end
end
regenerate_filter =
quote do
unquote(filter)
@doc false
def regenerate
def regenerate, do: :ok
end
quote do
Module.put_attribute(
__MODULE__,
:topics,
{unquote(topic),
unquote(Macro.escape(setup))
|> Map.put(
:event_lookup,
unquote(
{:&, [],
[
{:/, [context: Elixir, import: Kernel],
[
{{:., [],
[
{:__aliases__, [alias: false], [__CALLER__.module]},
:"#{topic}_event_lookup"
]}, [], []},
1
]}
]}
)
)
|> Map.put_new_lazy(:doc, fn ->
case Module.delete_attribute(__MODULE__, :doc) do
{_, doc} -> doc
_ -> ""
end
end)}
)
unquote(
Enum.reduce(
setup.events,
nil,
"e do
unquote(&2)
require unquote(&1)
end
)
)
@doc false
unquote(lookup)
def unquote(:"#{topic}_event_lookup")(_), do: nil
unquote(
Kvasir.Event.Encoding.Topic.generate(
setup,
false,
quote do
unquote(filter)
@doc false
def regenerate do
Kvasir.Event.Encoding.Topic.create(
unquote(Macro.escape(setup)),
true,
extra: unquote(Macro.escape(regenerate_filter)),
overwrite: true,
events: :all
)
end
end
)
)
unquote(freezers)
end
end
def subscribe(source, topic, callback_module, opts) do
source.__source__().subscribe(Module.concat(source, Source), topic, callback_module, opts)
end
def listen(source, topic, callback, opts) do
source.__source__().listen(Module.concat(source, Source), topic, callback, opts)
end
def stream(source, topic, opts) do
# raise "Check ColdStorage and EventStorage for criteria."
if opts[:key] && opts[:partition] do
raise "Can not set both key and partition, since id determines partition."
end
id =
if k = opts[:key] do
case topic.key.parse(k, opts) do
{:ok, kv} -> kv
{:error, reason} -> raise "Invalid topic key: #{inspect(reason)}"
end
end
partition =
cond do
p = opts[:partition] -> p
id -> id |> topic.key.partition(topic.partitions) |> elem(1)
:all -> nil
end
events = events(opts[:events])
missing = Enum.filter(events || [], &(&1 not in topic.events))
unless missing == [] do
raise "The following events do not belong to the topic:\n#{missing |> Enum.map(&" #{inspect(&1)}") |> Enum.join("\n")}"
end
from =
if f = opts[:from] do
cond do
Kvasir.Offset.empty?(f) ->
Kvasir.Offset.create(partition, 0)
is_nil(partition) ->
f
o = f.partitions[partition] ->
%{f | partitions: %{partition => o}}
:not_set ->
raise "Partition or Key set, but given `:from` offset does not contain this partition."
end
else
if is_nil(partition) do
Kvasir.Offset.create(Map.new(0..(topic.partitions - 1), &{&1, 0}))
else
Kvasir.Offset.create(partition, 0)
end
end
%EventStream{
source: source,
topic: topic,
id: id,
partition: partition,
from: from,
events: events,
endless: opts[:endless] || false
}
end
def test_stream(source, topic, events, opts) do
import Kvasir.Event,
only: [
key: 1,
partition: 1,
set_key: 2,
set_key_type: 2,
set_offset: 2,
set_partition: 2,
set_source: 2,
set_timestamp: 2,
set_topic: 2,
type: 1
]
key = if k = opts[:key], do: topic.key.parse!(k)
partition = opts[:partition]
p = fn
nil -> Enum.random(0..(topic.partitions - 1))
k -> topic.key.partition!(k, topic.partitions)
end
event_option = events(opts[:events])
missing = Enum.filter(event_option || [], &(&1 not in topic.events))
unless missing == [] do
raise "The following events do not belong to the topic:\n#{missing |> Enum.map(&" #{inspect(&1)}") |> Enum.join("\n")}"
end
event_filter = if event_option, do: Enum.map(event_option, &type/1)
{:ok,
events
|> Enum.reduce({%{}, []}, fn event, {off, acc} ->
{e, k} =
case event do
{e, k} -> {e, if(k, do: topic.key.parse!(k))}
e -> {e, key}
end
pp = p.(k)
off = Map.update(off, pp, 0, &(&1 + 1))
o = off[pp]
e_set =
e
|> set_key(k)
|> set_key_type(topic.key)
|> set_offset(o)
|> set_partition(pp)
|> set_source(source)
|> set_timestamp(UTCDateTime.utc_now())
|> set_topic(topic.topic)
{off, [e_set | acc]}
end)
|> elem(1)
|> :lists.reverse()
|> Enum.filter(&(is_nil(event_filter) or type(&1) in event_filter))
|> Enum.filter(&(is_nil(key) or key(&1) == key))
|> Enum.filter(&(is_nil(partition) or partition(&1) == partition))}
end
defp events(nil), do: nil
defp events(events) when is_list(events), do: events
defp events(event) when is_atom(event), do: [event]
end