defmodule Kvasir.Event do
require Logger
@type t :: struct
defstruct [
:value,
:__meta__
]
defmacro __using__(opts \\ []) do
on_error = opts[:on_error] || :halt
deprecated =
case opts[:deprecated] do
true -> "Event has been deprecated."
d when is_binary(d) -> d
_ -> false
end
if opts[:key_type] do
Logger.warn(fn ->
"""
Kvasir: #{inspect(__CALLER__.module)} set `:key_type`. This has been deprecated.
`:key_type` is now set per topic in the `Kvasir.EventSource`.
"""
end)
end
Module.put_attribute(__CALLER__.module, :describe, opts[:describe])
quote do
import Kvasir.Event, only: [event: 1, event: 2, upgrade: 2, version: 1, version: 2]
@before_compile Kvasir.Event
@on_error unquote(on_error)
@compress unquote(opts[:compress] || false)
@event_deprecated unquote(deprecated)
# Version Tracking
Module.register_attribute(__MODULE__, :version, persist: true, accumulate: true)
@version {Version.parse!("1.0.0"), nil, "Create event."}
end
end
defmacro __before_compile__(env) do
upgrades =
env.module
|> Module.get_attribute(:upgrades, [])
|> Enum.reduce(nil, fn {version, func}, acc ->
quote do
def upgrade(
%Version{
major: unquote(Macro.var(:major, nil)),
minor: unquote(Macro.var(:minor, nil)),
patch: unquote(Macro.var(:patch, nil))
},
event
)
when unquote(guard(version)) do
_ = unquote(Macro.var(:major, nil))
_ = unquote(Macro.var(:minor, nil))
_ = unquote(Macro.var(:patch, nil))
unquote(func)(event)
end
unquote(acc)
end
end)
quote do
@doc ~S"""
Upgrade event payload from older to current version.
## Examples
```elixir
iex> upgrade(#Version<1.0.0>, %Event{...})
```
"""
@spec upgrade(Version.t(), map) :: {:ok, map} | {:error, atom}
def upgrade(version, event)
unquote(upgrades)
def upgrade(_, event), do: {:ok, event}
end
end
defmacro field(name, type \\ :string, opts \\ []), do: property(__CALLER__, name, type, opts)
def property(caller, name, type, opts) do
opts = Keyword.put_new(opts, :sensitive, false)
t = Kvasir.Type.lookup(type)
Module.put_attribute(caller.module, :"field_#{name}", t)
quote do
Module.put_attribute(
__MODULE__,
:fields,
{unquote(name), unquote(t),
unquote(opts)
|> Keyword.put_new_lazy(:doc, fn ->
case Module.delete_attribute(__MODULE__, :doc) do
{_, doc} -> doc
_ -> nil
end
end)}
)
end
end
defmacro version(version, updated \\ nil) do
precision = version |> String.graphemes() |> Enum.count(&(&1 == "."))
v = Version.parse!(version <> String.duplicate(".0", 2 - precision))
quote do
@version {unquote(Macro.escape(v)), unquote(updated),
elem(Module.delete_attribute(__MODULE__, :doc) || {0, ""}, 1)}
end
end
defmacro upgrade(version, do: block) do
upgrades = Module.get_attribute(__CALLER__.module, :upgrades, [])
func = :"__upgrade_#{Enum.count(upgrades)}"
Module.put_attribute(__CALLER__.module, :upgrades, [{version, func} | upgrades])
quote do
defp unquote(func)(unquote(Macro.var(:event, __CALLER__.context))) do
unquote(block)
end
end
end
defmacro event(type), do: create_event(__CALLER__, type)
defmacro event(type, replaced_by: other) do
{app, version, hex, hexdocs, source} = Kvasir.Util.documentation(__CALLER__)
quote do
@event_deprecated unquote("Replaced by `#{Macro.expand(other, __CALLER__)}`.")
@doc ~S"""
Create an event based on given fields.
## Examples
```elixir
iex> create(field: :value)
```
"""
if @event_deprecated do
@deprecated @event_deprecated
end
@spec create(Keyword.t()) :: {:ok, Kvasir.Event.t()} | {:error, reason :: atom}
defdelegate create(fields \\ []), to: unquote(other)
@doc ~S"""
Create an event based on given fields.
## Examples
```elixir
iex> create!(field: :value)
```
"""
if @event_deprecated do
@deprecated @event_deprecated
end
@spec create!(Keyword.t()) :: Kvasir.Event.t() | no_return
defdelegate create!(fields \\ []), to: unquote(other)
@doc ~S"""
Describe the event applied to the given key.
## Examples
```elixir
iex> describe("User<64523>", <event>)
"User<64523> event-ed."
```
"""
if @event_deprecated do
@deprecated @event_deprecated
end
@spec describe(String.t(), Kvasir.Event.t()) :: String.t()
defdelegate describe(key, event), to: unquote(other)
@doc false
@spec __event__(atom) :: term
def __event__(:replaced_by), do: unquote(other)
def __event__(:type), do: unquote(Kvasir.Util.name(type))
def __event__(:app), do: {unquote(app), unquote(version)}
def __event__(:hex), do: unquote(hex)
def __event__(:hexdocs), do: unquote(hexdocs)
def __event__(:source), do: unquote(source)
def __event__(:deprecated), do: @event_deprecated
def __event__(:doc), do: @moduledoc
def __event__(info), do: unquote(other).__event__(info)
@doc false
@spec __event__(atom, atom) :: term
defdelegate __event__(type, field), to: unquote(other)
end
end
defmacro event(type, do: block), do: create_event(__CALLER__, type, block)
defp create_event(caller, type, block \\ nil) do
{app, version, hex, hexdocs, source} = Kvasir.Util.documentation(caller)
quote do
Module.register_attribute(__MODULE__, :fields, accumulate: true)
try do
import Kvasir.Event, only: [field: 1, field: 2, field: 3]
unquote(block)
after
:ok
end
@sensitive_fields @fields
|> Enum.filter(fn {_, _, o} -> o[:sensitive] end)
|> Enum.map(&elem(&1, 0))
@event_fields Enum.reverse(@fields)
defstruct Enum.map(@event_fields, fn {f, _, opts} -> {f, opts[:default]} end) ++
[__meta__: %Kvasir.Event.Meta{}]
Module.register_attribute(__MODULE__, :__event__, persist: true)
Module.put_attribute(__MODULE__, :__event__, unquote(Kvasir.Util.name(type)))
@version_history Enum.sort(@version, &(Version.compare(elem(&1, 0), elem(&2, 0)) != :gt))
@current_version @version_history
|> List.last()
|> elem(0)
@doc false
@spec __event__(atom) :: term
def __event__(:type), do: unquote(Kvasir.Util.name(type))
def __event__(:fields), do: @event_fields
def __event__(:on_error), do: @on_error
def __event__(:deprecated), do: @event_deprecated
def __event__(:sensitive), do: @sensitive_fields
def __event__(:doc), do: @moduledoc
def __event__(:version), do: @current_version
def __event__(:history), do: @version_history
def __event__(:app), do: {unquote(app), unquote(version)}
def __event__(:hex), do: unquote(hex)
def __event__(:hexdocs), do: unquote(hexdocs)
def __event__(:source), do: unquote(source)
def __event__(:compress), do: @compress
def __event__(:replaced_by), do: nil
@field_type Map.new(@fields, fn {k, v, _} -> {k, v} end)
@doc false
@spec __event__(atom, atom) :: term
def __event__(:type, field), do: @field_type[field]
@doc ~S"""
Create an event based on given fields.
## Examples
```elixir
iex> create(field: :value)
```
"""
if @event_deprecated do
@deprecated @event_deprecated
end
@spec create(Keyword.t()) :: {:ok, Kvasir.Event.t()} | {:error, reason :: atom}
def create(fields \\ []), do: Kvasir.Event.Encoding.create(__MODULE__, Map.new(fields))
@doc ~S"""
Create an event based on given fields.
## Examples
```elixir
iex> create!(field: :value)
```
"""
if @event_deprecated do
@deprecated @event_deprecated
end
@spec create!(Keyword.t()) :: Kvasir.Event.t() | no_return
def create!(fields \\ []) do
case create(fields) do
{:ok, event} -> event
{:error, reason} -> raise("#{inspect(__MODULE__)}: Create failed #{inspect(reason)}")
end
end
@doc ~S"""
Describe the event applied to the given key.
## Examples
```elixir
iex> describe("User<64523>", <event>)
"User<64523> event-ed."
```
"""
if @event_deprecated do
@deprecated @event_deprecated
end
@spec describe(String.t(), Kvasir.Event.t()) :: String.t()
def describe(key, event)
require Kvasir.Describer
Kvasir.Describer.event(unquote(type))
defoverridable create: 1, describe: 2
defimpl Jason.Encoder, for: __MODULE__ do
alias Jason.EncodeError
alias Jason.Encoder.Map
alias Kvasir.Event.Encoding
def encode(value, opts) do
case Encoding.encode(value) do
{:ok, data} -> Map.encode(data, opts)
{:error, error} -> %EncodeError{message: "Event Encoding Error: #{error}"}
end
end
end
defimpl Inspect, for: __MODULE__ do
import Inspect.Algebra
def inspect(data = %event{}, opts) do
fields = event.__event__(:fields)
a =
data
|> Map.drop(~w(__meta__ __struct__)a)
|> Map.new(fn {k, v} ->
if v != nil and k in event.__event__(:sensitive) do
o = Enum.find_value(fields, [], fn {f, _, o} -> if(f == k, do: o) end)
{k, %Kvasir.Event.Sensitive{opts: o, value: v, type: event.__event__(:type, k)}}
else
{k, v}
end
end)
offset =
if o = data.__meta__.offset do
"#{data.__meta__.topic}:#{data.__meta__.partition}:#{o}"
else
"UNPUBLISHED"
end
if a == %{} do
concat([
{:doc_color, :doc_nil, [:reset]},
"⊰",
inspect(data.__struct__),
{:doc_color, :doc_nil, [:italic, :yellow]},
"<",
offset,
">",
{:doc_color, :doc_nil, :reset},
"⊱"
])
else
concat([
{:doc_color, :doc_nil, [:reset]},
"⊰",
inspect(data.__struct__),
{:doc_color, :doc_nil, [:italic, :yellow]},
"<",
offset,
">",
{:doc_color, :doc_nil, :reset},
remove(to_doc(a, opts)),
{:doc_color, :doc_nil, :reset},
"⊱"
])
end
end
defp remove({a, "%{", c}), do: {a, "{", c}
defp remove({a, b, c}), do: {a, remove(b), c}
defp remove({a, b, c, d}), do: {a, remove(b), c, d}
end
end
end
defdelegate encode(event), to: Kvasir.Event.Encoding
defdelegate encode(topic, event, opts \\ []), to: Kvasir.Event.Encoding
defdelegate decode(topic, event, opts \\ []), to: Kvasir.Event.Encoding
def describe(event = %t{__meta__: %{key: k, key_type: key}}) do
subject = if key, do: key.describe(k), else: ""
t.describe(subject, event)
end
@doc ~S"""
"""
@spec event?(any) :: boolean
def event?(%event{}), do: event?(event)
def event?(event) when is_atom(event), do: :erlang.function_exported(event, :__event__, 1)
def event?(_), do: false
@spec timestamp(t) :: UTCDateTime.t()
def timestamp(%{__meta__: %{timestamp: ts}}), do: ts
def timestamp(_), do: nil
@spec id(t) :: term
def id(event), do: key(event)
@spec key(t) :: term
def key(%{__meta__: %{key: key}}), do: key
def key(_), do: nil
@spec sub_key(t) :: term
def sub_key(%{__meta__: %{sub_key: key}}), do: key
def sub_key(_), do: nil
@spec partition(t) :: non_neg_integer
def partition(%{__meta__: %{partition: partition}}), do: partition
@doc ~S"""
Set a key for an event.
"""
@spec set_key(t, term) :: t
def set_key(e = %{__meta__: meta}, key), do: %{e | __meta__: %{meta | key: key}}
@doc ~S"""
Set a key type for an event.
"""
@spec set_key_type(t, module) :: t
def set_key_type(e = %{__meta__: meta}, type), do: %{e | __meta__: %{meta | key_type: type}}
@doc ~S"""
Set an offset for an event.
"""
@spec set_offset(t, term) :: t
def set_offset(e = %{__meta__: meta}, offset), do: %{e | __meta__: %{meta | offset: offset}}
@doc ~S"""
Set a topic for an event.
"""
@spec set_topic(t, term) :: t
def set_topic(e = %{__meta__: meta}, topic), do: %{e | __meta__: %{meta | topic: topic}}
@doc ~S"""
Set a partition for an event.
"""
@spec set_partition(t, term) :: t
def set_partition(e = %{__meta__: meta}, partition),
do: %{e | __meta__: %{meta | partition: partition}}
@doc ~S"""
Set a timestamp for an event.
"""
@spec set_timestamp(t, UTCDateTime.t()) :: t
def set_timestamp(e = %{__meta__: meta}, timestamp),
do: %{e | __meta__: %{meta | timestamp: timestamp}}
@doc ~S"""
Set a source for an event.
"""
@spec set_source(t, module) :: t
def set_source(e = %{__meta__: meta}, source), do: %{e | __meta__: %{meta | source: source}}
# @spec key(t) :: :string | :integer
# def key_type(%event{}), do: event.__event__(:key_type)
# def key_type(event) when is_atom(event), do: event.__event__(:key_type)
# def key_type(_), do: :string
@spec on_error(t) :: :halt | :skip
def on_error(%__MODULE__{}), do: :halt
def on_error(%event{}), do: event.__event__(:on_error)
@spec type(t) :: String.t() | nil
def type(%event{}), do: event.__event__(:type)
def type(event) when is_atom(event), do: event.__event__(:type)
def type(_), do: nil
### Guard ###
# Consider Elixir <= 1.12 legacy and make guard compilation not break.
legacy? =
System.version()
|> String.split(".")
|> Enum.at(1)
|> String.to_integer()
|> Kernel.<=(12)
if legacy? do
defp guard(version) do
version
|> Version.Parser.lexer([])
|> to_guard(nil)
end
else
defp guard(version) do
version
|> Version.Parser.lexer()
|> to_guard(nil)
end
end
@major Macro.var(:major, nil)
@minor Macro.var(:minor, nil)
@patch Macro.var(:patch, nil)
defp to_guard([], acc), do: acc
if legacy? do
defp to_guard([:|| | rest], acc), do: guard_or(acc, to_guard(rest, nil))
defp to_guard([:&& | rest], acc), do: guard_and(acc, to_guard(rest, nil))
defp to_guard([a, b | rest], _acc), do: to_guard(rest, condition(a, b))
else
defp to_guard([:and | rest], acc), do: guard_or(acc, to_guard(rest, nil))
defp to_guard([:or | rest], acc), do: guard_and(acc, to_guard(rest, nil))
defp to_guard([a, b | rest], _acc), do: to_guard(rest, condition(b, a))
end
defp condition(:==, version) do
version
|> parse_parts()
|> Enum.reduce(nil, fn {k, v}, acc ->
guard_and(acc, quote(do: unquote(Macro.var(k, nil)) == unquote(v)))
end)
end
defp condition(:~>, version) do
case parse_version(version) do
[_] -> condition(:>=, version)
[a, _] -> guard_and(condition(:>=, version), condition(:<, "#{a + 1}.0"))
[a, b, _] -> guard_and(condition(:>=, version), condition(:<, "#{a}.#{b + 1}.0"))
end
end
defp condition(:>, version) do
case parse_version(version) do
[a] ->
quote do: unquote() > unquote(a)
[a, b] ->
quote do:
unquote(@major) > unquote(a) or
(unquote(@major) == unquote(a) and unquote(@minor) > unquote(b))
[a, b, c] ->
quote do:
unquote(@major) > unquote(a) or
(unquote(@major) == unquote(a) and unquote(@minor) > unquote(b)) or
(unquote(@major) == unquote(a) and unquote(@minor) == unquote(b) and
unquote(@patch) > unquote(c))
end
end
defp condition(:>=, version) do
case parse_version(version) do
[a] ->
quote do: unquote(@major) >= unquote(a)
[a, b] ->
quote do:
unquote(@major) > unquote(a) or
(unquote(@major) == unquote(a) and unquote(@minor) >= unquote(b))
[a, b, c] ->
quote do:
unquote(@major) > unquote(a) or
(unquote(@major) == unquote(a) and unquote(@minor) > unquote(b)) or
(unquote(@major) == unquote(a) and unquote(@minor) == unquote(b) and
unquote(@patch) >= unquote(c))
end
end
defp condition(:<, version) do
case parse_version(version) do
[a] ->
quote do: unquote(@major) < unquote(a)
[a, b] ->
quote do:
unquote(@major) < unquote(a) or
(unquote(@major) == unquote(a) and unquote(@minor) < unquote(b))
[a, b, c] ->
quote do:
unquote(@major) < unquote(a) or
(unquote(@major) == unquote(a) and unquote(@minor) < unquote(b)) or
(unquote(@major) == unquote(a) and unquote(@minor) == unquote(b) and
unquote(@patch) < unquote(c))
end
end
defp condition(:<=, version) do
case parse_version(version) do
[a] ->
quote do: unquote(@major) <= unquote(a)
[a, b] ->
quote do:
unquote(@major) < unquote(a) or
(unquote(@major) == unquote(a) and unquote(@minor) <= unquote(b))
[a, b, c] ->
quote do:
unquote(@major) < unquote(a) or
(unquote(@major) == unquote(a) and unquote(@minor) < unquote(b)) or
(unquote(@major) == unquote(a) and unquote(@minor) == unquote(b) and
unquote(@patch) <= unquote(c))
end
end
defp guard_and(a, nil), do: a
defp guard_and(nil, b), do: b
defp guard_and(a, b) do
quote do: unquote(a) and unquote(b)
end
defp guard_or(a, nil), do: a
defp guard_or(nil, b), do: b
defp guard_or(a, b) do
quote do: unquote(a) or unquote(b)
end
defp parse_version({a, b, c, _, _}) do
[a || 0, b || 0, c || 0]
end
defp parse_version(version) do
case String.split(version, ".") do
[] -> :error
[a] -> [String.to_integer(a)]
[a, b] -> [String.to_integer(a), String.to_integer(b)]
[a, b, c | _] -> [String.to_integer(a), String.to_integer(b), String.to_integer(c)]
end
end
defp parse_parts(version) do
case parse_version(version) do
[] -> :error
[a] -> [major: a]
[a, b] -> [major: a, minor: b]
[a, b, c | _] -> [major: a, minor: b, patch: c]
end
end
end