defmodule Spear.Filter do
@moduledoc """
A server-side filter to apply when reading events from an EventStoreDB
## Regular expressions
Elixir's built-in `Regex` module and `Kernel.sigil_r/2`
use PCRE-compatible regular expressions, but from [the EventStoreDB
codebase](https://github.com/EventStore/EventStore/commit/711a8622569cdee3b182a4bb0d2a32ab0c950a73#diff-f444f5f2feccef613b5a027880c6f810defd43d2f882b6b6a9c0612e50ca873aR3)
it seems that at time of writing, filtering is done with C-sharp's
`System.Text.RegularExpressions` built-in. C-sharp regular expressions
diverge in syntax a bit from PCRE-compatible expressions, so mileage may
vary when passing Elixir regular expressions.
As an escape hatch, you may build a `t:Spear.Filter.t/0` manually and use
a `t:String.t/0` as the `:by` field. This value will be untouched except
for last-minute encoding to go over-the-wire to the server.
```elixir
%Spear.Filter{
on: :stream_name,
by: "<some-complicated-regex>",
checkpoint_after: 1024
}
```
If possible, the list of prefixes should be used as the `:by` option as
they are unambiguous.
## Checkpoints
The EventStoreDB will emit checkpoint events to a subscriber regularly.
This prevents a possible (and perhaps probable) scenario where
- the EventStoreDB contains many events
- the client is searching for a small number of events relative to the size
of `:all`
- (and/or) the target events are sparsely spread throughout `:all`
Under these conditions, the EventStoreDB may progress through `:all` quite a
ways before finding an event. If the connection is severed between the client
and server while the EventStoreDB is part-way through a large drought of
targeted events, the server will need to re-seek through the drought
when the client re-connects and passes an old `:from` option to
`Spear.subscribe/4`.
These checkpoints will arrive in the mailbox of the subscriber process as
`t:Spear.Filter.Checkpoint.t/0` structs and may be used as a restore point
by passing them to the `:from` option of `Spear.subscribe/4`.
For example, say we have a subscriber process which is a GenServer and some
function `save_checkpoint/1` which saves checkpoint information durably
(to disk or a database for example). It will handle subscription events
in its `c:GenServer.handle_info/2` callback.
```elixir
defmodule MySubscriber do
use GenServer
alias Spear.{Event, Filter.Checkpoint}
..
@impl GenServer
def handle_info(%Event{} = event, state) do
# .. handle the event
event |> Event.to_checkpoint() |> save_checkpoint()
{:noreply, state}
end
def handle_info(%Checkpoint{} = checkpoint, state) do
save_checkpoint(checkpoint)
{:noreply, state}
end
end
```
## Checkpoint Interval
The EventStoreDB will send a checkpoint after filtering a configurable number
of events. The `:checkpoint_after` field can be used to configure this
behavior. Note that the `:checkpoint_after` is only allowed by the server
to be a multiple of `32`. Other values will be rounded to the nearest
multiple.
The default is set at `1024` (`32` * `32`) in Spear but this is tunable
per filter with `checkpoint_after/2`, or by manually adjusting the
`:checkpoint_after` field in this struct.
"""
require Spear.Records.Streams, as: Streams
require Spear.Records.Persistent, as: Persistent
require Spear.Records.Shared, as: Shared
@checkpoint_multiplier 32
@default_checkpoint_after 32 * @checkpoint_multiplier
@typedoc """
A filter which can be applied to a subscription to trigger server-side
result filtering
This filter type is intended to be passed as the `:filter` option to
`Spear.subscribe/4`.
## Examples
iex> import Spear.Filter
iex> ~f/^[^\\$].*/rs # exclude system events which start with "$"
%Spear.Filter{by: ~r/^[^\\$].*/, checkpoint_after: 1024, on: :stream_name}
"""
@type t :: %__MODULE__{
on: :event_type | :stream_name,
by: Regex.t() | String.t() | [String.t()],
checkpoint_after: pos_integer()
}
defstruct [:on, :by, checkpoint_after: @default_checkpoint_after]
@doc """
A sigil defining short-hand notation for writing filters
Filters may either filter _on_ EventStoreDB stream name or event type and may
either filter _by_ a regular expression or a list of prefix strings.
## Modifiers
This `f` sigil supports the following modifiers. (Note that modifiers are
the suffix of the sigil. E.g. the `i` in `~r/hello/i` is a modifier for
the regex sigil that declares that the match is case-insensitive.)
For the choice between stream-name and event-type filtering:
* `s` - filter on the stream name
* `t` - filter on the event type
For the choice between prefixes and regular expressions:
* `p` - filter by a list of prefixes. If this option is passed the sigil
body will be interpreted as a white-space separated list of prefixes
similar to `sigil_w/2` from the standard library
* `r` - filter using a regular expression
## Examples
iex> import Spear.Filter
iex> ~f/My.Aggregate.A- My.Aggregate.B-/ps
%Spear.Filter{
by: ["My.Aggregate.A-", "My.Aggregate.B-"],
checkpoint_after: 1024,
on: :stream_name
}
iex> ~f/^[^\\$].*/rs
%Spear.Filter{by: ~r/^[^\\$].*/, checkpoint_after: 1024, on: :stream_name}
"""
@doc since: "0.1.0"
@spec sigil_f(binary(), charlist()) :: t()
def sigil_f(source, mods) do
to_filter(source, mods)
end
@doc """
A sigil defining a filter, without escaping
Works the same as `sigil_f/2` but does not allow interpolation or escape
sequences.
## Examples
iex> import Spear.Filter
iex> ~F/^[^\\$].*/rs
%Spear.Filter{by: ~r/^[^\\$].*/, checkpoint_after: 1024, on: :stream_name}
"""
@doc since: "0.1.0"
@spec sigil_F(binary(), charlist()) :: t()
def sigil_F(source, mods) do
to_filter(source, mods)
end
defp to_filter(source, mods) when is_binary(source) and is_list(mods) do
:ok = check_modifiers!(source, mods)
on = if Enum.find(mods, &(&1 in [?s, ?t])) == ?t, do: :event_type, else: :stream_name
by =
case Enum.find(mods, &(&1 in [?p, ?r])) do
?p -> String.split(source)
_ -> Regex.compile!(source)
end
%__MODULE__{on: on, by: by}
end
defp check_modifiers!(source, mods) do
sigil = "~f/#{source}/#{mods}"
foreign_modifiers =
mods
|> Enum.reject(&(&1 in [?s, ?t, ?p, ?r]))
|> Enum.uniq()
cond do
foreign_modifiers != [] ->
raise ArgumentError, """
Unknown modifier(s) #{inspect(foreign_modifiers)} in #{sigil}
"""
Enum.member?(mods, ?s) and Enum.member?(mods, ?t) ->
raise ArgumentError, """
Modifiers `s` and `t` are mutually exclusive in sigil #{sigil}
"""
Enum.member?(mods, ?p) and Enum.member?(mods, ?r) ->
raise ArgumentError, """
Modifiers `p` and `r` are mutually exclusive in sigil #{sigil}
"""
true ->
:ok
end
end
@doc """
Sets the checkpoint interval
## Examples
iex> import Spear.Filter
iex> checkpoint_after(~f/^[^\\$].*/rs, 32 * 8)
%Spear.Filter{by: ~r/^[^\\$].*/, checkpoint_after: 256, on: :stream_name}
"""
@doc since: "0.1.0"
@spec checkpoint_after(t(), pos_integer()) :: t()
def checkpoint_after(%__MODULE__{} = filter, interval)
when is_integer(interval) and interval > 0 do
%__MODULE__{filter | checkpoint_after: interval}
end
@doc """
Produces a filter which excludes system events
This is a potentially common filter for subscribers reading from `:all`.
The `sigil_f/2` version is
```elixir
~f/^[^\\$].*/rs
```
## Examples
iex> Spear.Filter.exclude_system_events()
%Spear.Filter{by: ~r/^[^\\$].*/, checkpoint_after: 1024, on: :stream_name}
"""
@doc since: "0.1.0"
@spec exclude_system_events() :: t()
def exclude_system_events, do: ~f/^[^\$].*/rs
@doc false
def _to_filter_options(%__MODULE__{} = filter) do
Streams.read_req_options_filter_options(
checkpointIntervalMultiplier: div(filter.checkpoint_after, @checkpoint_multiplier),
filter: map_inner_filter(filter),
# YARD exactly how does one use the `:max` option here?
window: {:count, Shared.empty()}
)
end
defp map_inner_filter(%__MODULE__{} = filter) do
{map_filter_type(filter.on), map_filter_expression(filter.by)}
end
defp map_filter_type(:event_type), do: :event_type
defp map_filter_type(:stream_name), do: :stream_identifier
defp map_filter_expression(%Regex{} = regex) do
regex |> Regex.source() |> map_filter_expression()
end
defp map_filter_expression(regex) when is_binary(regex) do
Streams.read_req_options_filter_options_expression(regex: regex)
end
defp map_filter_expression(prefixes) when is_list(prefixes) do
Streams.read_req_options_filter_options_expression(prefix: prefixes)
end
@doc false
# coveralls-ignore-start
def _to_persistent_filter_options(%__MODULE__{} = filter) do
Persistent.create_req_all_options_filter_options(
checkpointIntervalMultiplier: div(filter.checkpoint_after, @checkpoint_multiplier),
filter: map_persistent_inner_filter(filter),
# YARD exactly how does one use the `:max` option here?
window: {:count, Shared.empty()}
)
end
defp map_persistent_inner_filter(%__MODULE__{} = filter) do
{map_filter_type(filter.on), map_persistent_filter_expression(filter.by)}
end
defp map_persistent_filter_expression(%Regex{} = regex) do
regex |> Regex.source() |> map_persistent_filter_expression()
end
defp map_persistent_filter_expression(regex) when is_binary(regex) do
Persistent.create_req_all_options_filter_options_expression(regex: regex)
end
defp map_persistent_filter_expression(prefixes) when is_list(prefixes) do
Persistent.create_req_all_options_filter_options_expression(prefix: prefixes)
end
# coveralls-ignore-stop
end
# IMHO this is a bit unnecessary and really just makes it harder to read
#
# defimpl Inspect, for: Spear.Filter do
# def inspect(filter, opts) do
# {escaped, _} =
# filter
# |> body()
# |> Code.Identifier.escape(?/)
#
# ["~f/", escaped, ?/, opts(filter)]
# |> IO.iodata_to_binary()
# |> Inspect.Algebra.color(:regex, opts)
# end
#
# defp body(%Spear.Filter{by: prefixes}) when is_list(prefixes), do: Enum.join(prefixes, " ")
# defp body(%Spear.Filter{by: %Regex{} = regex}), do: Regex.source(regex)
# defp body(%Spear.Filter{by: other}), do: to_string(other)
#
# defp opts(%Spear.Filter{by: by, on: on}) do
# [by_opt(by), on_opt(on)]
# end
#
# defp on_opt(:event_type), do: ?t
# defp on_opt(:stream_name), do: ?s
# defp on_opt(_), do: []
#
# defp by_opt(prefixes) when is_list(prefixes), do: ?p
# defp by_opt(%Regex{}), do: ?r
# defp by_opt(regex) when is_binary(regex), do: ?r
# defp by_opt(_), do: []
# end