defmodule Spear do
@moduledoc """
A sharp EventStoreDB 20+ client backed by mint
## Streams
Spear uses the term _stream_ across different contexts. There are four
possible contexts for the term _stream_ in Spear:
- HTTP2 streams of data
- gRPC stream requests, responses, and bidirectional communication
- EventStoreDB streams of events
- Elixir `Stream`s
Descriptions of each are given in the [Streams guide](guides/streams.md).
## Connections
Spear needs a connection to interact with an EventStoreDB. Spear provides
the `Spear.Connection` GenServer for this purpose. Connections are referred
to as "`conn`" in the documentation.
Like an `Ecto.Repo`, it can be handy to have a module which itself represents
a connection to an EventStoreDB. For this, Spear provides `Spear.Client`
which allows one to call any function in `Spear` without the `conn` argument
on the client module.
```elixir
defmodule MyApp.MyClient do
use Spear.Client,
otp_app: :my_app
end
iex> MyApp.MyClient.start_link(connection_string: "esdb://localhost:2113")
iex> MyApp.MyClient.stream!("my_stream") |> Enum.to_list()
[
%Spear.Event{},
%Spear.Event{},
..
]
```
See the `Spear.Client` module for more information.
## Record interfaces
The `Spear.Records.*` modules provide macro interfaces for matching and
creating messages sent and received from the EventStoreDB. These are mostly
intended for internal uses such as the mapping between a
`Spear.Records.Streams.read_resp/0` and a `t:Spear.Event.t/0`, but they can
also be used to extract values from any raw response records (e.g. those
returned from functions where the `raw?: true` option is passed).
iex> import Spear.Records.Streams, only: [read_resp: 0, read_resp: 1]
iex> event = Spear.stream!(conn, "my_stream", raw?: true) |> Enum.take(1) |> List.first()
{:"event_store.client.streams.ReadResp", {:checkpoint, ..}}
iex> match?(read_resp(), event)
true
iex> match?(read_resp(content: {:checkpoint, _}), event)
true
Macros in these modules are generated with `Record.defrecord/2` with the
contents extracted from the protobuf messages (indirectly via `:gpb`).
"""
import Spear.Records.Shared, only: [empty: 0]
require Spear.Records.Streams, as: Streams
require Spear.Records.Users, as: Users
require Spear.Records.Operations, as: Operations
require Spear.Records.Gossip, as: Gossip
require Spear.Records.Persistent, as: Persistent
require Spear.Records.Monitoring, as: Monitoring
require Spear.Records.ServerFeatures, as: ServerFeatures
require Spear.Records.Shared, as: Shared
@doc """
Collects an EventStoreDB stream into an enumerable
This function may raise in cases where the gRPC requests fail to read events
from the EventStoreDB (in cases of timeout or unavailability).
This function does not raise if a stream does not exist (is empty), instead
returning an empty enumerable `[]`.
`connection` may be any valid GenServer name (including PIDs) for a process
running the `Spear.Connection` GenServer.
`stream_name` can be any stream, existing or not, including projected
streams such as category streams or event-type streams. The `:all` atom
may be passed as `stream_name` to read all events in the EventStoreDB.
## Options
* `:from` - (default: `:start`) the EventStoreDB stream revision from which to
read. Valid values include `:start`, `:end`, any non-negative integer
representing the event number revision in the stream and events. Event
numbers are inclusive (e.g. reading from `0` will first return the
event with revision `0` in the stream, if one exists). `:start` and `:end`
are treated as inclusive (e.g. `:start` will return the first event in
the stream). Events (either `Spear.Event` or ReadResp records) can also
be supplied and will be treated as inclusive.
* `:direction` - (default: `:forwards`) the direction in which to read the
EventStoreDB stream. Valid values include `:forwards` and `:backwards`.
Reading the EventStoreDB stream forwards will return events in the order
in which they were written to the EventStoreDB; reading backwards will
return events in the opposite order.
* `:filter` - (default: `nil`) the server-side filter to apply. This option
is only valid if the `stream_name` is `:all`. See `Spear.Filter` for more
information. This feature requires EventStoreDB vTODO+.
* `:resolve_links?` - (default: `true`) whether or not to request that
link references be resolved. See the moduledocs for more information
about link resolution.
* `:chunk_size` - (default: `128`) the number of events to read from the
EventStoreDB at a time. Any positive integer is valid. See the enumeration
characteristics section below for more information about how `:chunk_size`
works and how to tune it.
* `:timeout` - (default: `5_000` - 5s) the time allowed for the read of a
single chunk of events in the EventStoreDB stream. This time is _not_
cumulative: an EventStoreDB stream 100 events long which takes 5s to read
each chunk may be read in chunks of 20 events culumaltively in 25s. A
timeout of `5_001`ms would not raise a timeout error in that scenario
(assuming the chunk read consistently takes `<= 5_000` ms).
* `:raw?:` - (default: `false`) controls whether or not the enumerable
`event_stream` is decoded to `Spear.Event` structs from their raw
`Spear.Records.Streams.read_resp/0` output. Setting `raw?: true` prevents
this transformation and leaves each event as a `ReadResp` record. See
`Spear.Event.from_read_response/2` for more information.
* `:credentials` - (default: `nil`) a two-tuple `{username, password}` to
use as credentials for the request. This option overrides any credentials
set in the connection configuration, if present. See the
[Security guide](guides/security.md) for more details.
## Enumeration Characteristics
The `event_stream` `t:Enumerable.t/0` returned by this function initially
contains a buffer of bytes from the first read of the stream `stream_name`.
This buffer potentially contains up to `:chunk_size` messages when run.
The enumerable is written as a formula which abstracts away the chunking
nature of the gRPC requests, however, so even though the EventStoreDB stream is
read in chunks (per the `:chunk_size` option), the entire EventStoreDB stream
can be read by running the enumeration (e.g. with `Enum.to_list/1`). Note
that the stream will make a gRPC request to read more events whenever the
buffer runs dry with up to `:chunk_size` messages filling the buffer on each
request.
`:chunk_size` is difficult to tune as it causes a tradeoff between (gRPC)
request duration and number of messages added to the buffer. A higher
`:chunk_size` may hydrate the buffer with more events and reduce the number
of gRPC requests needed to read an entire stream, but it also increases
the number of messages that will be sent over the network per request which
could decrease reliability. Generally speaking, a lower `:chunk_size` is
appropriate for streams in which the events are large and a higher
`:chunk_size` is appropriate for streams with many small events. Manual
tuning and trial-and-error can be used to find a performant `:chunk_size`
setting for any individual environment.
## Examples
iex> Spear.stream!(MyConnection, "es_supported_clients", chunk_size: 1) |> Enum.take(1)
[
%Spear.Event{
body: %{"languages" => ["typescript", "javascript"], "runtime" => "NodeJS"},
id: "1fc908c1-af32-4d06-a9bd-3bf86a833fdf",
metadata: %{..},
type: "grpc-client"
}
]
# say we have 5 events in the "es_supported_clients" stream
iex> Spear.stream!(MyConnection, "es_supported_clients", chunk_size: 3) |> Enum.count()
5
"""
@doc since: "0.1.0"
@doc api: :streams
@spec stream!(
connection :: Spear.Connection.t(),
stream_name :: String.t() | :all,
opts :: Keyword.t()
) ::
event_stream :: Enumerable.t()
def stream!(connection, stream_name, opts \\ []) do
default_stream_opts = [
from: :start,
direction: :forwards,
chunk_size: 128,
filter: nil,
resolve_links?: true,
through: fn stream -> Stream.map(stream, &Spear.Event.from_read_response/1) end,
timeout: 5_000,
raw?: false,
credentials: nil
]
opts =
default_stream_opts
|> Keyword.merge(opts)
|> Keyword.put(:connection, connection)
|> Keyword.put(:stream, stream_name)
through =
if opts[:raw?] do
& &1
else
opts[:through]
end
Spear.Reading.Stream.new!(opts) |> through.()
end
@doc """
Reads a chunk of events from an EventStoreDB stream into an enumerable
Unlike `stream!/3`, this function will only read one chunk of events at a time
specified by the `:max_count` option. This function also does not raise in
cases of error, instead returning an ok- or error-tuple.
If the `stream_name` EventStoreDB stream does not exist (is empty) and the
gRPC request succeeds for this function, `{:ok, []}` will be returned.
## Options
* `:from` - (default: `:start`) the EventStoreDB stream revision from which to
read. Valid values include `:start`, `:end`, any non-negative integer
representing the event number revision in the stream and events. Event
numbers are inclusive (e.g. reading from `0` will first return the
event with revision `0` in the stream, if one exists). `:start` and `:end`
are treated as inclusive (e.g. `:start` will return the first event in
the stream). Events (either `Spear.Event` or ReadResp records) can also
be supplied and will be treated as inclusive.
* `:direction` - (default: `:forwards`) the direction in which to read the
EventStoreDB stream. Valid values include `:forwards` and `:backwards`.
Reading the EventStoreDB stream forwards will return events in the order
in which they were written to the EventStoreDB; reading backwards will
return events in the opposite order.
* `:filter` - (default: `nil`) the server-side filter to apply. This option
is only valid if the `stream_name` is `:all`. See `Spear.Filter` for more
information. This feature requires EventStoreDB vTODO+.
* `:resolve_links?` - (default: `true`) whether or not to request that
link references be resolved. See the moduledocs for more information
about link resolution.
* `:max_count` - (default: `42`) the maximum number of events to read from
the EventStoreDB stream. Any positive integer is valid. Even if the stream
is longer than this `:max_count` option, only `:max_count` events will
be returned from this function. `:infinity` is _not_ a valid value for
`:max_count`. Use `stream!/3` for an enumerable which reads an EventStoreDB
stream in its entirety in chunked network requests.
* `:timeout` - (default: `5_000` - 5s) the time allowed for the read of the
single chunk of events in the EventStoreDB stream. Note that the gRPC request
which reads events from the EventStoreDB is front-loaded in this function:
the `:timeout` covers the time it takes to read the events. The timeout
may be exceeded
* `:raw?:` - (default: `false`) controls whether or not the enumerable
`event_stream` is decoded to `Spear.Event` structs from their raw
`ReadResp` output. Setting `raw?: true` prevents this transformation and
leaves each event as a `Spear.Records.Streams.read_resp/0` record. See
`Spear.Event.from_read_response/2` for more information.
* `:credentials` - (default: `nil`) a two-tuple `{username, password}` to
use as credentials for the request. This option overrides any credentials
set in the connection configuration, if present. See the
[Security guide](guides/security.md) for more details.
## Timing and Timeouts
The gRPC request which reads events from the EventStoreDB is front-loaded
in this function: this function returns immediately after receiving all data
off the wire from the network request. This means that the `:timeout` option
covers the gRPC request and response time but not any time spend decoding
the response (see the Enumeration Characteristics section below for more
details on how the enumerable decodes messages).
The default timeout of 5s may not be enough time in cases of either reading
very large numbers of events or reading events with very large bodies.
Note that _up to_ the `:max_count` of events is returned from this call
depending on however many events are in the EventStoreDB stream being read.
When tuning the `:timeout` option, make sure to test against a stream which
is at least as long as `:max_count` events.
## Enumeration Characteristics
The `event_stream` `t:Enumerable.t/0` returned in the success case of this
function is a wrapper around the bytes received from the gRPC response. Note
that when the `{:ok, event_stream}` is returned, the gRPC request has already
concluded.
This offers only marginal performance improvement: an enumerable is returned
mostly for consistency in the Spear API.
## Examples
# say we have 5 events in the stream "es_supported_clients"
iex> {:ok, events} = Spear.read_stream(conn, "es_supported_clients", max_count: 2)
iex> events |> Enum.count()
2
iex> {:ok, events} = Spear.read_stream(conn, "es_supported_clients", max_count: 10)
iex> events |> Enum.count()
5
iex> events |> Enum.take(1)
[
%Spear.Event{
body: %{"languages" => ["typescript", "javascript"], "runtime" => "NodeJS"},
id: "1fc908c1-af32-4d06-a9bd-3bf86a833fdf",
metadata: %{..},
type: "grpc-client"
}
]
"""
@doc since: "0.1.0"
@doc api: :streams
@spec read_stream(Spear.Connection.t(), String.t() | :all, Keyword.t()) ::
{:ok, event_stream :: Enumerable.t()} | {:error, any()}
def read_stream(connection, stream_name, opts \\ []) do
default_read_opts = [
from: :start,
direction: :forwards,
max_count: 42,
filter: nil,
resolve_links?: true,
through: fn stream ->
stream
|> Stream.filter(&Spear.Event.event?/1)
|> Stream.map(&Spear.Event.from_read_response/1)
end,
timeout: 5_000,
raw?: false,
credentials: nil
]
opts =
default_read_opts
|> Keyword.merge(opts)
|> Keyword.put(:connection, connection)
|> Keyword.put(:stream, stream_name)
through =
if opts[:raw?] do
& &1
else
opts[:through]
end
with {:ok, stream} <- Spear.Reading.Stream.read_chunk(opts) do
{:ok, through.(stream)}
end
end
@doc """
Appends an enumeration of events to an EventStoreDB stream
`event_stream` is an enumerable which may either be a
collection of `t:Spear.Event.t/0` structs or more low-level
`Spear.Records.Streams.append_resp/0` records. In cases where the enumerable
produces `t:Spear.Event.t/0` structs, they will be lazily mapped to
`Spear.Records.Streams.append_req/0` records before being encoded to wire
data.
See the [Writing Events](guides/writing_events.md) guide for more information
about writing events.
## Options
* `:expect` - (default: `:any`) the expectation to set on the
status of the stream. The write will fail if the expectation fails. See
`Spear.ExpectationViolation` for more information about expectations.
* `:timeout` - (default: `5_000` - 5s) the GenServer timeout for calling
the RPC.
* `:raw?` - (default: `false`) a boolean which controls whether the return
signature should be a simple `:ok | {:error, any()}` or
`{:ok, AppendResp.t()} | {:error, any()}`. This can be used to extract
metadata and information from the append response which is not available
through the simplified return API, such as the stream's revision number
after writing the events.
* `:credentials` - (default: `nil`) a two-tuple `{username, password}` to
use as credentials for the request. This option overrides any credentials
set in the connection configuration, if present. See the
[Security guide](guides/security.md) for more details.
## Examples
iex> [Spear.Event.new("es_supported_clients", %{})]
...> |> Spear.append(conn, expect: :exists)
:ok
iex> [Spear.Event.new("es_supported_clients", %{})]
...> |> Spear.append(conn, expect: :empty)
{:error, %Spear.ExpectationViolation{current: 1, expected: :empty}}
"""
@doc since: "0.1.0"
@doc api: :streams
@spec append(
event_stream :: Enumerable.t(),
connection :: Spear.Connection.t(),
stream_name :: String.t(),
opts :: Keyword.t()
) :: :ok | {:error, reason :: Spear.ExpectationViolation.t() | any()}
def append(event_stream, conn, stream_name, opts \\ []) when is_binary(stream_name) do
default_write_opts = [
expect: :any,
raw?: false,
stream: stream_name
]
opts = default_write_opts |> Keyword.merge(opts)
params = Enum.into(opts, %{})
messages =
[Spear.Writing.build_append_request(params)]
|> Stream.concat(event_stream)
|> Stream.map(&Spear.Writing.to_append_request/1)
case request(
conn,
Streams,
:Append,
messages,
Keyword.take(opts, [:credentials, :timeout])
) do
{:ok, Streams.append_resp(result: {:success, _})} ->
:ok
{:ok, Streams.append_resp(result: {:wrong_expected_version, expectation_violation})} ->
{:error, Spear.Writing.map_expectation_violation(expectation_violation)}
error ->
error
end
end
@doc """
Appends an enumeration of events to an EventStoreDB stream
BatchAppend is a feature added in EventStoreDB version 21.6.0 which aims
to optimize append throughput. It works like a persistent subscription
in reverse: the client sends chunks of events to write and once the
events have been committed, the client receives an acknowledgement
message.
BatchAppends have a life cycle: a new batch append request is created
by passing the `:new` atom as the `request_id` argument and a request
is concluded by calling `Spear.cancel_subscription/2` on the `request_id`.
See the [Writing Events](guides/writing_events.md) guide for more information
about batching and when to prefer the `append_batch/5` function over
`append/4`.
## Fragmentation
By default, each invocation of `append_batch/5` sends a single protobuf
message across the wire for all events in the `event_stream` argument.
If the `event_stream` argument is very large (> ~1MB), this may be undesirable
from a networking perspective.
The `:done?` flag can be set to `false` to mark a batch as a fragment,
which prevents the EventStoreDB from attempting to commit any events
sent for the batch until a fragment with the `:done?` flag set
to `true` is sent. Each batch of events after the first fragment must
pass the initial fragment's `batch_id` in the `:batch_id` option. A
set of fragments can be concluded by sending a final `append_batch/5`
with the `:done?` option set to `true`.
## Options
* `:done?` - (default: `true`) controls whether the current chunk of events
being written is complete. See the "Fragmentation" section above for
more details.
* `:batch_id` - (default: `Spear.Uuid.uuid_v4()`) the unique ID of the
batch of events being appended. This must be passed the `batch_id`
returned by the first request which sets `:done?` to `false` until
a final fragment is appended (`done?: true`). See the "Fragmentation"
section above for more details.
* `:expect` - (default: `:any`) the expectation to set on the
status of the stream. The write will fail if the expectation fails. See
`Spear.ExpectationViolation` for more information about expectations.
* `:send_ack_to` - (default: `self()`) a process or process name which
should receive acknowledgement messages detailing whether a batch has
succeeded or failed to be committed by the deadline.
* `:raw?` - (default: `false`) a boolean which controls whether messages
emitted to the `:send_ack_to` process are decoded from
`Spear.Records.Streams.batch_append_resp/0` records. Spear preserves
most of the information when decoding the record into the
`t:Spear.BatchAppendResult.t/0` struct, but it discards duplicated
information such as stream name and expected revision. Setting
the `:raw?` flag allows one to decode the record however they wish.
* `:credentials` - (default: `nil`) a two-tuple `{username, password}` to
use as credentials for the request. This option overrides any credentials
set in the connection configuration, if present. See the
[Security guide](guides/security.md) for more details.
* `:deadline` - (default: `nil`) the deadline for the batch to be appended.
This is like `:timeout` but is interpreted on the EventStoreDB server.
This may be a `DateTime` or a tuple of `{seconds, nanos}` since 1970
(smeared) but may be a tuple `{:duration, seconds, nanos}` when using
EventStoreDB version 21.10.5 or higher.
* `:timeout` - (default: `5_000`) the timeout for the initial call to
open the batch request.
## Examples
iex> {:ok, first_batch_id, request_id} =
...> Spear.append_batch(first_batch, conn, :new, first_stream_name)
{:ok, "496ba076-098f-4108-a2ca-c73f7b94c06f", #Reference<0.1691282361.2492989441.105913>}
iex> receive do
...> %Spear.BatchAppendResult{request_id: ^request_id, batch_id: ^first_batch_id, result: result} ->
...> result
...> end
:ok
iex> {:ok, second_batch_id, ^request_id} =
...> Spear.append_batch(second_batch, conn, batch_id, second_stream_name)
{:ok, "3a1ed972-716c-4679-9cc7-c23c3544e538", #Reference<0.1691282361.2492989441.105913>}
iex> receive(do: (%Spear.BatchAppendResult{request_id: ^request_id, batch_id: ^second_batch_id, result: result}) -> result))
:ok
iex> {:ok, third_batch_id, ^request_id} =
...> Spear.append_batch(third_batch, conn, batch_id, third_stream_name)
{:ok, "b4eb1330-8c59-48d0-8a0b-2df33672cc0b", #Reference<0.1691282361.2492989441.105913>}
iex> receive(do: (%Spear.BatchAppendResult{request_id: ^request_id, batch_id: ^third_batch_id, result: result}) -> result))
:ok
iex> Spear.cancel_subscription(conn, request_id)
:ok
"""
@doc since: "0.10.0"
@doc api: :streams
@spec append_batch(
event_stream :: Enumerable.t(),
connection :: Spear.Connection.t(),
request_id :: reference() | :new,
stream_name :: String.t(),
opts :: Keyword.t()
) ::
{:ok, batch_id :: String.t(), request_id :: reference()}
| {:error, term()}
def append_batch(event_stream, conn, request_id, stream_name, opts \\ [])
when is_binary(stream_name) and (is_reference(request_id) or request_id == :new) do
declared_batch_id = Keyword.get(opts, :batch_id)
default_write_opts = [
explicit_batch_id?: declared_batch_id != nil,
expect: :any,
deadline: nil,
done?: true,
raw?: false,
batch_id: declared_batch_id || Spear.Event.uuid_v4(),
timeout: 5_000
]
opts =
default_write_opts
|> Keyword.merge(opts)
|> Keyword.merge(events: event_stream, stream_name: stream_name)
message =
opts
|> Map.new()
|> Spear.Writing.build_batch_append_request()
call_or_cast_batch(conn, message, request_id, opts)
end
# the first batch is a GenServer.call/3 so we can get back the request_id
defp call_or_cast_batch(conn, message, :new, opts) do
subscriber = Keyword.get(opts, :send_ack_to, self())
through = &Spear.BatchAppendResult.from_record(&1, &2, opts[:raw?])
request =
%Spear.Request{
api: {Streams, :BatchAppend},
messages: [message],
credentials: opts[:credentials]
}
|> Spear.Request.expand()
call = {{:subscription, subscriber, through}, request}
case Connection.call(conn, call, opts[:timeout]) do
{:ok, subscription} when is_reference(subscription) ->
{:ok, opts[:batch_id], subscription}
# coveralls-ignore-start
error ->
error
# coveralls-ignore-stop
end
end
# subsequent batches are casts: totally async
defp call_or_cast_batch(conn, message, request_id, opts) do
:ok = Connection.cast(conn, {:push, request_id, message})
{:ok, opts[:batch_id], request_id}
end
@doc """
A convenience wrapper around `append_batch/5` for transforming a stream
into a batch append operation.
The `append_batch/5` function provides fine-grained control over the
batch append feature. This function transforms an input stream of batches
to apply the `append_batch/5` on all of them, making sure to clean up
the request once the batch is finished.
The expected `batch_stream` is an enumerable with each element follows the
format
```elixir
{stream_name :: String.t(), events :: [Spear.Event.t()]}
# or
{stream_name :: String.t(), events :: [Spear.Event.t()], opts :: Keyword.t()}
```
Where `opts` can be any option below. The options below are applied to
each call to `append_batch/5` when provided, except `:credentials`
which is only applied when specified on the first batch.
The resulting stream must be run (with an `Enum` function or `Stream.run/1`).
Each element is mapped to the acknowledgement `t:Spear.BatchAppendResult.t/0`
responses for each batch attempting to be appended.
## Options
* `:expect` - (default: `:any`) the expectation to set on the
status of the stream. The write will fail if the expectation fails. See
`Spear.ExpectationViolation` for more information about expectations.
* `:raw?` - (default: `false`) a boolean which controls whether messages
emitted to the `:send_ack_to` process are decoded from
`Spear.Records.Streams.batch_append_resp/0` records. Spear preserves
most of the information when decoding the record into the
`t:Spear.BatchAppendResult.t/0` struct, but it discards duplicated
information such as stream name and expected revision. Setting
the `:raw?` flag allows one to decode the record however they wish.
* `:credentials` - (default: `nil`) a two-tuple `{username, password}` to
use as credentials for the request. This option overrides any credentials
set in the connection configuration, if present. See the
[Security guide](guides/security.md) for more details.
* `:timeout` - (default: `5_000`) the timeout for the initial call to
open the batch request. After the first batch, this argument instead
controls how long each `append_batch/5` operation will await an
acknowledgement.
## Examples
batch_stream
|> Spear.append_batch_stream(conn)
|> Enum.reduce_while(:ok, fn ack, _acc ->
case ack.result do
:ok -> {:cont, :ok}
{:error, reason} -> {:halt, {:error, reason}}
end
end)
#=> :ok
"""
@doc since: "0.10.0"
@doc api: :streams
@spec append_batch_stream(batch_stream :: Enumerable.t(), connection :: Spear.Connection.t()) ::
Enumerable.t()
def append_batch_stream(batch_stream, conn) do
Stream.transform(
batch_stream,
fn -> :new end,
fn element, request_id ->
{stream, events, opts} = map_append_batch_element(element)
timeout = Keyword.get(opts, :timeout, 5_000)
{:ok, batch_id, request_id} = Spear.append_batch(events, conn, request_id, stream, opts)
result =
receive do
%Spear.BatchAppendResult{request_id: ^request_id, batch_id: ^batch_id} = ack ->
ack
after
timeout ->
{:error, :timeout}
end
{[result], request_id}
end,
fn
:new -> :ok
request_id -> Spear.cancel_subscription(conn, request_id)
end
)
end
defp map_append_batch_element({stream, events}), do: {stream, events, []}
defp map_append_batch_element({stream, events, opts}) do
{stream, events, Keyword.take(opts, [:expect, :raw?, :timeout])}
end
@doc """
Subscribes a process to an EventStoreDB stream
Unlike `read_stream/3` or `stream!/3`, this function does not return an
enumerable. Instead the `subscriber` process is signed up to receive messages
for subscription events. Events are emitted in order as info messages with
the signature
```elixir
Spear.Event.t() | Spear.Filter.Checkpoint.t()
```
or if the `raw?: true` option is provided,
`Spear.Records.Streams.read_resp/0` records will be returned in the shape of
```elixir
{subscription :: reference(), Spear.Records.Streams.read_resp()}
```
This function will block the caller until the subscription has been
confirmed by the EventStoreDB.
When the subscription is terminated, the subscription process will receive a
message in the form of `{:eos, subscription, reason}`. `{:eos, subscription,
:closed}` is emitted when the connection between EventStoreDB and subscriber
is severed and `{:eos, subscription, :dropped}` is emitted when the
EventStoreDB explicitly drops a subscription. If this message is received,
the subscription is considered to be concluded and the subscription process
must re-subscribe from the last received event or checkpoint to resume
the subscription. `subscription` is the reference returned by this function.
Events can be correlated to their subscription via the `subscription`
reference returned by this function. The subscription reference is included
in `Spear.Event.metadata.subscription`,
`Spear.Filter.Checkpoint.subscription`, and in the
`{:eos, subscription, reason}` tuples as noted above.
Subscriptions can be gracefully shut down with `Spear.cancel_subscription/3`.
The subscription will be cancelled by the connection process if the
subscriber process exits.
## Options
* `:from` - (default: `:start`) the EventStoreDB stream revision from which to
read. Valid values include `:start`, `:end`, any non-negative integer
representing the event number revision in the stream and events. Event
numbers are exclusive (e.g. reading from `0` will first return the
event numbered `1` in the stream, if one exists). `:start` and `:end`
are treated as inclusive (e.g. `:start` will return the first event in
the stream). Events and checkpoints (`t:Spear.Event.t/0`, ReadResp
records, or `t:Spear.Filter.Checkpoint.t/0`) can also be supplied and will
be treated as exclusive.
* `:filter` - (default: `nil`) the server-side filter to apply. This option
is only valid if the `stream_name` is `:all`. See `Spear.Filter` for more
information.
* `:resolve_links?` - (default: `true`) whether or not to request that
link references be resolved. See the moduledocs for more information
about link resolution.
* `:timeout` - (default: `5_000`) the time to wait for the EventStoreDB
to confirm the subscription request.
* `:raw?` - (default: `false`) controls whether the events are sent as
raw `ReadResp` records or decoded into `t:Spear.Event.t/0`s
* `:credentials` - (default: `nil`) a two-tuple `{username, password}` to
use as credentials for the request. This option overrides any credentials
set in the connection configuration, if present. See the
[Security guide](guides/security.md) for more details.
## Examples
# say there are 3 events in the EventStoreDB stream "my_stream"
iex> {:ok, sub} = Spear.subscribe(conn, self(), "my_stream", from: 0)
{:ok, #Reference<0.1160763861.3015180291.51238>}
iex> flush
%Spear.Event{} # second event
%Spear.Event{} # third event
:ok
iex> Spear.cancel_subscription(conn, sub)
:ok
iex> {:ok, sub} = Spear.subscribe(conn, self(), :all, filter: Spear.Filter.exclude_system_events())
iex> flush
%Spear.Filter.Checkpoint{}
%Spear.Filter.Checkpoint{}
%Spear.Event{}
%Spear.Event{}
%Spear.Filter.Checkpoint{}
%Spear.Event{}
%Spear.Filter.Checkpoint{}
:ok
iex> GenServer.call(conn, :close)
{:ok, :closed}
iex> flush
{:eos, #Reference<0.1160763861.3015180291.51238>, :closed}
"""
@doc since: "0.1.0"
@doc api: :streams
@spec subscribe(
connection :: Spear.Connection.t(),
subscriber :: pid() | GenServer.name(),
stream_name :: String.t() | :all,
opts :: Keyword.t()
) :: {:ok, subscription_reference :: reference()} | {:error, any()}
def subscribe(conn, subscriber, stream_name, opts \\ [])
def subscribe(conn, subscriber, stream_name, opts)
when (is_binary(stream_name) or stream_name == :all) and is_list(opts) do
default_subscribe_opts = [
direction: :forwards,
from: :start,
filter: nil,
resolve_links?: true,
timeout: 5_000,
raw?: false,
through: &Spear.Reading.decode_read_response/2,
credentials: nil
]
opts =
default_subscribe_opts
|> Keyword.merge(opts)
|> Keyword.merge(stream: stream_name, subscriber: subscriber)
through =
if opts[:raw?] do
fn resp, subscription -> {subscription, resp} end
else
opts[:through]
end
request = opts |> Enum.into(%{}) |> Spear.Reading.build_subscribe_request()
case Connection.call(conn, {{:subscription, subscriber, through}, request}, opts[:timeout]) do
{:ok, subscription} when is_reference(subscription) ->
{:ok, subscription}
{:ok, %Spear.Connection.Response{} = response} ->
grpc_response =
Spear.Grpc.Response.from_connection_response(response, request.rpc, opts[:raw?])
{:error, grpc_response}
# coveralls-ignore-start
error ->
error
# coveralls-ignore-stop
end
end
@doc """
Cancels a subscription
This function will cancel a subscription if the provided
`subscription_reference` exists, but is idempotent: if the
`subscription_reference` is not an active subscription reference, `:ok` will
be returned.
Subscriptions are automatically cancelled when a subscribe process exits.
## Examples
iex> {:ok, subscription} = Spear.subscribe(conn, self(), "my_stream")
{:ok, #Reference<0.4293953740.2750676995.30541>}
iex> Spear.cancel_subscription(conn, subscription)
:ok
iex> Spear.cancel_subscription(conn, subscription)
:ok
"""
@doc since: "0.1.0"
@doc api: :utils
@spec cancel_subscription(
connection :: Spear.Connection.t(),
subscription_reference :: reference(),
timeout()
) :: :ok | {:error, any()}
def cancel_subscription(conn, subscription_reference, timeout \\ 5_000)
when is_reference(subscription_reference) do
Connection.call(conn, {:cancel, subscription_reference}, timeout)
end
@doc """
Deletes an EventStoreDB stream
EventStoreDB supports two kinds of stream deletions: soft-deletes and
tombstones. By default this function will perform a soft-delete. Pass the
`tombstone?: true` option to tombstone the stream.
Soft-deletes make the events in the specified stream no longer accessible
through reads. A scavenge operation will reclaim the disk space taken by
any soft-deleted events. New events may be written to a soft-deleted stream.
When reading soft-deleted streams, `:from` options of `:start` and `:end`
will behave as expected, but all events in the stream will have revision
numbers off-set by the number of deleted events.
Tombstoned streams may not be written to ever again. Attempting to write
to a tombstoned stream will fail with a gRPC `:failed_precondition` error
```elixir
iex> [Spear.Event.new("delete_test", %{})] |> Spear.append(conn, "delete_test_0")
:ok
iex> Spear.delete_stream(conn, "delete_test_0", tombstone?: true)
:ok
iex> [Spear.Event.new("delete_test", %{})] |> Spear.append(conn, "delete_test_0")
{:error,
%Spear.Grpc.Response{
data: "",
message: "Event stream 'delete_test_0' is deleted.",
status: :failed_precondition,
status_code: 9
}}
```
## Options
* `:tombstone?` - (default: `false`) controls whether the stream is
soft-deleted or tombstoned.
* `:timeout` - (default: `5_000` - 5s) the time allowed to block while
waiting for the EventStoreDB to delete the stream.
* `:expect` - (default: `:any`) the expected state of the stream when
performing the deletion. See `append/4` and `Spear.ExpectationViolation`
for more information.
* `:credentials` - (default: `nil`) a two-tuple `{username, password}` to
use as credentials for the request. This option overrides any credentials
set in the connection configuration, if present. See the
[Security guide](guides/security.md) for more details.
## Examples
iex> Spear.append(events, conn, "my_stream")
:ok
iex> Spear.delete_stream(conn, "my_stream")
:ok
iex> Spear.stream!(conn, "my_stream") |> Enum.to_list()
[]
"""
@doc since: "0.1.0"
@doc api: :streams
@spec delete_stream(
connection :: Spear.Connection.t(),
stream_name :: String.t(),
opts :: Keyword.t()
) :: :ok | {:error, any()}
def delete_stream(conn, stream_name, opts \\ []) when is_binary(stream_name) do
default_delete_opts = [
tombstone?: false,
timeout: 5_000,
expect: :any,
credentials: nil
]
opts =
default_delete_opts
|> Keyword.merge(opts)
|> Keyword.put(:stream, stream_name)
rpc = if opts[:tombstone?], do: :Tombstone, else: :Delete
messages = [Spear.Writing.build_delete_request(opts |> Enum.into(%{}))]
request_opts = Keyword.take(opts, [:credentials, :timeout])
with {:ok, _response} <- request(conn, Streams, rpc, messages, request_opts) do
:ok
end
end
@doc """
Pings a connection
This can be used to ensure that the connection process is alive, or to
roughly measure the latency between the connection process and EventStoreDB.
## Examples
iex> Spear.ping(conn)
:pong
"""
@doc since: "0.1.2"
@doc api: :utils
@spec ping(connection :: Spear.Connection.t(), timeout()) :: :pong | {:error, any()}
def ping(conn, timeout \\ 5_000), do: Connection.call(conn, :ping, timeout)
@doc """
Sets the global stream ACL
This function appends metadata to the `$streams` EventStoreDB stream
detailing how the EventStoreDB should allow access to user and system
streams (with the `user_acl` and `system_acl` arguments, respectively).
See the [security guide](guides/security.md) for more information.
## Options
* `:json_encode!` - (default: `Jason.encode!/1`) a 1-arity JSON encoding
function used to serialize the event. This event must be JSON encoded
in order for the EventStoreDB to consider it valid.
Remaining options are passed to `Spear.append/4`. The `:expect` option
will be applied to the `$streams` system stream, so one could attempt to
set the initial ACL by passing `expect: :empty`.
## Examples
This recreates the default ACL:
iex> Spear.set_global_acl(conn, Spear.Acl.allow_all(), Spear.Acl.admins_only())
:ok
"""
@doc since: "0.1.3"
@doc api: :utils
@spec set_global_acl(
connection :: Spear.Connection.t(),
user_acl :: Spear.Acl.t(),
system_acl :: Spear.Acl.t(),
opts :: Keyword.t()
) :: :ok | {:error, any()}
def set_global_acl(conn, user_acl, system_acl, opts \\ [])
def set_global_acl(conn, %Spear.Acl{} = user_acl, %Spear.Acl{} = system_acl, opts) do
{json_encode!, opts} = Keyword.pop(opts, :json_encode!)
json_encode! = json_encode! || (&Jason.encode!/1)
Spear.Writing.build_global_acl_event(user_acl, system_acl, json_encode!)
|> List.wrap()
|> Spear.append(conn, "$$$streams", opts)
end
@doc """
Determines the metadata stream for any given stream
Meta streams are used by the EventStoreDB to store some internal information
about a stream, and to configure features such setting time-to-lives for
events or streams.
## Examples
iex> Spear.meta_stream("es_supported_clients")
"$$es_supported_clients"
"""
@doc since: "0.1.3"
@doc api: :utils
@spec meta_stream(stream :: String.t()) :: String.t()
def meta_stream(stream) when is_binary(stream), do: "$$" <> stream
@doc """
Queries the metadata for a stream
Note that the `stream` argument is passed through `meta_stream/1` before
being read. It is not necessary to call that function on the stream name
before passing it as `stream`.
If no metadata has been set on a stream `{:error, :unset}` is returned.
## Options
Under the hood, `get_stream_metadata/3` uses `read_stream/3` and all options
are passed directly to that function. These options are overridden, however,
and cannot be changed:
* `:direction`
* `:from`
* `:max_count`
* `:raw?`
## Examples
iex> Spear.get_stream_metadata(conn, "my_stream")
{:error, :unset}
iex> Spear.get_stream_metadata(conn, "some_stream_with_max_count")
{:ok, %Spear.StreamMetadata{max_count: 50_000, ..}}
"""
@doc since: "0.1.3"
@doc api: :streams
@spec get_stream_metadata(
connection :: Spear.Connection.t(),
stream :: String.t(),
opts :: Keyword.t()
) :: {:ok, Spear.StreamMetadata.t()} | {:error, any()}
def get_stream_metadata(conn, stream, opts \\ []) do
stream = meta_stream(stream)
opts =
opts
|> Keyword.merge(
direction: :backwards,
from: :end,
max_count: 1,
raw?: false
)
with {:ok, event_stream} <- read_stream(conn, stream, opts),
[%Spear.Event{} = event] <- Enum.take(event_stream, 1) do
{:ok, Spear.StreamMetadata.from_spear_event(event)}
else
[] ->
{:error, :unset}
# coveralls-ignore-start
{:error, reason} ->
{:error, reason}
# coveralls-ignore-stop
end
end
@doc """
Sets a stream's metadata
Note that the `stream` argument is passed through `meta_stream/1` before
being read. It is not necessary to call that function on the stream name
before passing it as `stream`.
## Options
This function uses `append/4` under the hood. All options are passed to
the `opts` argument of `append/4`.
## Examples
# only allow admins to read, write, and delete the stream (or stream metadata)
iex> metadata = %Spear.StreamMetadata{acl: Spear.Acl.admins_only()}
iex> Spear.set_stream_metadata(conn, stream, metadata)
:ok
"""
@doc since: "0.1.3"
@doc api: :streams
@spec set_stream_metadata(
connection :: Spear.Connection.t(),
stream :: String.t(),
metadata :: Spear.StreamMetadata.t(),
opts :: Keyword.t()
) :: :ok | {:error, any()}
def set_stream_metadata(conn, stream, metadata, opts \\ [])
def set_stream_metadata(conn, stream, %Spear.StreamMetadata{} = metadata, opts)
when is_binary(stream) do
Spear.Event.new("$metadata", Spear.StreamMetadata.to_map(metadata))
|> List.wrap()
|> append(conn, meta_stream(stream), opts)
end
@doc """
Creates an EventStoreDB user
## Options
All options are passed to `Spear.request/5`.
## Examples
iex> Spear.create_user(conn, "Aladdin", "aladdin", "open sesame", ["$ops"], credentials: {"admin", "changeit"})
:ok
"""
@doc since: "0.3.0"
@doc api: :users
@spec create_user(
connection :: Spear.Connection.t(),
full_name :: String.t(),
login_name :: String.t(),
password :: String.t(),
groups :: [String.t()],
opts :: Keyword.t()
) :: :ok | {:error, any()}
def create_user(conn, full_name, login_name, password, groups, opts \\ []) do
message =
Users.create_req(
options:
Users.create_req_options(
full_name: full_name,
login_name: login_name,
password: password,
groups: groups
)
)
with {:ok, Users.create_resp()} <- request(conn, Users, :Create, [message], opts) do
:ok
end
end
@doc """
Updates an existing EventStoreDB user
## Options
All options are passed to `Spear.request/5`.
## Examples
iex> Spear.create_user(conn, "Aladdin", "aladdin", "open sesame", ["$ops"], credentials: {"admin", "changeit"})
:ok
iex> Spear.update_user(conn, "Aladdin", "aladdin", "open sesame", ["$admins"], credentials: {"admin", "changeit"})
:ok
"""
@doc since: "0.3.0"
@doc api: :users
@spec update_user(
connection :: Spear.Connection.t(),
full_name :: String.t(),
login_name :: String.t(),
password :: String.t(),
groups :: [String.t()],
opts :: Keyword.t()
) :: :ok | {:error, any()}
def update_user(conn, full_name, login_name, password, groups, opts \\ []) do
message =
Users.update_req(
options:
Users.update_req_options(
full_name: full_name,
login_name: login_name,
password: password,
groups: groups
)
)
with {:ok, Users.update_resp()} <- request(conn, Users, :Update, [message], opts) do
:ok
end
end
@doc """
Deletes a user from the EventStoreDB
EventStoreDB users are deleted by the `login_name` parameter as passed
to `Spear.create_user/6`.
## Options
All options are passed to `Spear.request/5`.
## Examples
iex> Spear.create_user(conn, "Aladdin", "aladdin", "open sesame", ["$ops"], credentials: {"admin", "changeit"})
:ok
iex> Spear.delete_user(conn, "aladdin", credentials: {"admin", "changeit"})
:ok
"""
@doc since: "0.3.0"
@doc api: :users
@spec delete_user(
connection :: Spear.Connection.t(),
login_name :: String.t(),
opts :: Keyword.t()
) :: :ok | {:error, any()}
def delete_user(conn, login_name, opts \\ []) do
message = Users.delete_req(options: Users.delete_req_options(login_name: login_name))
with {:ok, Users.delete_resp()} <- request(conn, Users, :Delete, [message], opts) do
:ok
end
end
@doc """
Enables a user to make requests against the EventStoreDB
Disabling and enabling users are an alternative to repeatedly creating and
deleting users and is suitable for when a user needs to be temporarily
denied access.
## Options
All options are passed to `Spear.request/5`.
## Examples
iex> Spear.disable_user(conn, "aladdin")
:ok
iex> Spear.enable_user(conn, "aladdin")
:ok
"""
@doc since: "0.3.0"
@doc api: :users
@spec enable_user(
connection :: Spear.Connection.t(),
login_name :: String.t(),
opts :: Keyword.t()
) :: :ok | {:error, any()}
def enable_user(conn, login_name, opts \\ []) do
message = Users.enable_req(options: Users.enable_req_options(login_name: login_name))
with {:ok, Users.enable_resp()} <- request(conn, Users, :Enable, [message], opts) do
:ok
end
end
@doc """
Disables a user's ability to make requests against the EventStoreDB
This can be used in conjunction with `Spear.enable_user/3` to temporarily
deny access to a user as an alternative to deleting and creating the user.
Enabling and disabling users does not require the password of the user:
just that requestor to be in the `$admins` group.
## Options
All options are passed to `Spear.request/5`.
## Examples
iex> Spear.enable_user(conn, "aladdin")
:ok
iex> Spear.disable_user(conn, "aladdin")
:ok
"""
@doc since: "0.3.0"
@doc api: :users
@spec disable_user(
connection :: Spear.Connection.t(),
login_name :: String.t(),
opts :: Keyword.t()
) :: :ok | {:error, any()}
def disable_user(conn, login_name, opts \\ []) do
message = Users.disable_req(options: Users.disable_req_options(login_name: login_name))
with {:ok, Users.disable_resp()} <- request(conn, Users, :Disable, [message], opts) do
:ok
end
end
@doc """
Fetches details about an EventStoreDB user
## Options
All options are passed to `Spear.request/5`.
## Examples
iex> Spear.create_user(conn, "Aladdin", "aladdin", "open sesame", ["$ops"])
:ok
iex> Spear.user_details(conn, "aladdin")
{:ok,
%Spear.User{
enabled?: true,
full_name: "Aladdin",
groups: ["$ops"],
last_updated: ~U[2021-04-18 16:48:38.583313Z],
login_name: "aladdin"
}}
"""
@doc since: "0.3.0"
@doc api: :users
@spec user_details(
connection :: Spear.Connection.t(),
login_name :: String.t(),
opts :: Keyword.t()
) :: :ok | {:error, any()}
def user_details(conn, login_name, opts \\ []) do
message = Users.details_req(options: Users.details_req_options(login_name: login_name))
with {:ok, detail_stream} <- request(conn, Users, :Details, [message], opts) do
details =
detail_stream
|> Enum.take(1)
|> List.first()
|> Spear.User.from_details_resp()
{:ok, details}
end
end
@doc """
Changes a user's password by providing the current password
This can be accomplished regardless of the current credentials since the
user's current password is provided.
## Options
All options are passed to `Spear.request/5`.
## Examples
iex> Spear.create_user(conn, "Aladdin", "aladdin", "changeit", ["$ops"])
:ok
iex> Spear.change_user_password(conn, "aladdin", "changeit", "open sesame")
:ok
"""
@doc api: :users
@spec change_user_password(
connection :: Spear.Connection.t(),
login_name :: String.t(),
current_password :: String.t(),
new_password :: String.t(),
opts :: Keyword.t()
) :: :ok | {:error, any()}
def change_user_password(conn, login_name, current_password, new_password, opts \\ []) do
message =
Users.change_password_req(
options:
Users.change_password_req_options(
login_name: login_name,
current_password: current_password,
new_password: new_password
)
)
with {:ok, Users.change_password_resp()} <-
request(conn, Users, :ChangePassword, [message], opts) do
:ok
end
end
@doc """
Resets a user's password
This can be only requested by a user in the `$admins` group. The current
password is not passed in this request, so this function is suitable for
setting a new password when the current password is lost.
## Options
All options are passed to `Spear.request/5`.
## Examples
iex> Spear.create_user(conn, "Aladdin", "aladdin", "changeit", ["$ops"])
:ok
iex> Spear.reset_user_password(conn, "aladdin", "open sesame", credentials: {"admin", "changeit"})
:ok
"""
@doc since: "0.3.0"
@doc api: :users
@spec reset_user_password(
connection :: Spear.Connection.t(),
login_name :: String.t(),
new_password :: String.t(),
opts :: Keyword.t()
) ::
:ok | {:error, any()}
def reset_user_password(conn, login_name, new_password, opts \\ []) do
message =
Users.reset_password_req(
options:
Users.reset_password_req_options(
login_name: login_name,
new_password: new_password
)
)
with {:ok, Users.reset_password_resp()} <-
request(conn, Users, :ResetPassword, [message], opts) do
:ok
end
end
@doc """
Performs a generic request synchronously
This is appropriate for many operations across the Users, Streams, and
Operations APIs but not suitable for `Spear.subscribe/4` or the
Persistent Subscriptions API.
`message` must be an enumeration of records as created by the Record
Interfaces. Lazy stream enumerations are allowed and are not run until each
element is serialized over the wire.
This function is mostly used under-the-hood to implement functions in
`Spear` such as `Spear.create_user/5`, but may be used generically.
## Options
* `:timeout` - (default: `5_000`ms - 5s) the GenServer timeout: the maximum
time allowed to wait for this request to complete.
* `:credentials` - (default: `nil`) the username and password to use to make
the request. Overrides the connection-level credentials if provided.
Connection-level credentials are used as the default if not provided.
## Examples
iex> alias Spear.Records.Users
iex> require Users
iex> message = Users.enable_req(options: Users.enable_req_options(login_name: "my_user"))
iex> Spear.request(conn, Users, :Enable, [message], credentials: {"admin", "changeit"})
{:ok, Users.enable_resp()}
"""
@doc since: "0.3.0"
@doc api: :utils
@spec request(
connection :: Spear.Connection.t(),
api :: module(),
rpc :: atom(),
messages :: Enumerable.t(),
opts :: Keyword.t()
) ::
{:ok, tuple() | Enumerable.t()} | {:error, any()}
def request(conn, api, rpc, messages, opts \\ []) do
opts =
[
timeout: 5_000,
credentials: nil,
raw?: false
]
|> Keyword.merge(opts)
request =
%Spear.Request{
api: {api, rpc},
messages: messages,
credentials: opts[:credentials]
}
|> Spear.Request.expand()
with {:ok, %Spear.Connection.Response{} = response} <-
Connection.call(conn, {:request, request}, opts[:timeout]),
%Spear.Grpc.Response{status: :ok, data: data} <-
Spear.Grpc.Response.from_connection_response(response, request.rpc, opts[:raw?]) do
{:ok, data}
else
# coveralls-ignore-start
{:error, reason} -> {:error, reason}
# coveralls-ignore-stop
%Spear.Grpc.Response{} = response -> {:error, response}
end
end
@doc """
Parses an EventStoreDB timestamp into a `DateTime.t()` in UTC time.
## Examples
iex> Spear.parse_stamp(16187636458580612)
{:ok, ~U[2021-04-18 16:34:05.858061Z]}
"""
@doc since: "0.3.0"
@doc api: :utils
@spec parse_stamp(stamp :: pos_integer()) :: {:ok, DateTime.t()} | {:error, atom()}
def parse_stamp(ticks_since_epoch) when is_integer(ticks_since_epoch) do
ticks_since_epoch
|> div(10)
|> DateTime.from_unix(:microsecond)
end
@doc """
Requests that a scavenge be started
Scavenges are disk-space reclaiming operations run on the EventStoreDB
server.
## Options
* `:thread_count` - (default: `1`) the number of threads to use for the
scavenge process. Scavenging can be resource intensive. Setting this to
a low thread count can lower the impact on the server's resources.
* `:start_from_chunk` - (default: `0`) the chunk number to start the
scavenge from. Generally this is only useful if a prior scavenge has
failed on a certain chunk.
Remaining options are passed to `request/5`.
## Examples
iex> Spear.start_scavenge(conn)
{:ok,
%Spear.Scavenge{id: "d2897ba8-2f0c-4fc4-bb25-798ba75f3562", result: :Started}}
"""
@doc since: "0.4.0"
@doc api: :operations
@spec start_scavenge(connection :: Spear.Connection.t(), opts :: Keyword.t()) ::
{:ok, Spear.Scavenge.t()} | {:error, any()}
def start_scavenge(conn, opts \\ []) do
opts =
[
thread_count: 1,
start_from_chunk: 0
]
|> Keyword.merge(opts)
message =
Operations.start_scavenge_req(
options:
Operations.start_scavenge_req_options(
thread_count: opts[:thread_count],
start_from_chunk: opts[:start_from_chunk]
)
)
with {:ok, Operations.scavenge_resp() = resp} <-
request(
conn,
Operations,
:StartScavenge,
[message],
Keyword.take(opts, [:timeout, :credentials])
) do
{:ok, Spear.Scavenge.from_scavenge_resp(resp)}
end
end
@doc """
Produces the scavenge stream for a scavenge ID
`start_scavenge/2` begins an asynchronous scavenge operation since scavenges
may be time consuming. In order to check the progress of a running scavenge,
one may read the scavenge stream with `read_stream/3` or `stream!/3` or
subscribe to updates on the scavenge with `subscribe/4`.
## Examples
iex> {:ok, scavenge} = Spear.start_scavenge(conn)
{:ok,
%Spear.Scavenge{id: "d2897ba8-2f0c-4fc4-bb25-798ba75f3562", result: :Started}}
iex> Spear.scavenge_stream(scavenge)
"$scavenges-d2897ba8-2f0c-4fc4-bb25-798ba75f3562"
"""
@doc since: "0.4.0"
@doc api: :utils
@spec scavenge_stream(scavenge :: String.t() | Spear.Scavenge.t()) :: String.t()
def scavenge_stream(%Spear.Scavenge{id: scavenge_id}), do: scavenge_stream(scavenge_id)
def scavenge_stream(scavenge_id) when is_binary(scavenge_id), do: "$scavenges-" <> scavenge_id
@doc """
Stops a running scavenge
## Options
All options are passed to `request/5`.
## Examples
iex> {:ok, scavenge} = Spear.start_scavenge(conn)
iex> Spear.stop_scavenge(conn, scavenge.id)
{:ok,
%Spear.Scavenge{id: "d2897ba8-2f0c-4fc4-bb25-798ba75f3562", result: :Stopped}}
"""
@doc since: "0.4.0"
@doc api: :operations
@spec stop_scavenge(
connection :: Spear.Connection.t(),
scavenge_id :: String.t(),
opts :: Keyword.t()
) :: {:ok, Spear.Scavenge.t()} | {:error, any()}
def stop_scavenge(conn, scavenge_id, opts \\ [])
def stop_scavenge(conn, scavenge_id, opts) when is_binary(scavenge_id) do
message =
Operations.stop_scavenge_req(
options: Operations.stop_scavenge_req_options(scavenge_id: scavenge_id)
)
with {:ok, Operations.scavenge_resp() = resp} <-
request(conn, Operations, :StopScavenge, [message], opts) do
# coveralls-ignore-start
{:ok, Spear.Scavenge.from_scavenge_resp(resp)}
# coveralls-ignore-stop
end
end
@doc """
Shuts down the connected EventStoreDB
The user performing the shutdown (either the connection credentials or
credentials passed by the `:credentials` option) must at least be in the
`$ops` group. `$admins` permissions are a superset of `$ops`.
## Options
Options are passed to `request/5`.
## Examples
iex> Spear.shutdown(conn)
:ok
iex> Spear.ping(conn)
{:error, :closed}
iex> Spear.shutdown(conn, credentials: {"some_non_ops_user", "changeit"})
{:error,
%Spear.Grpc.Response{
data: "",
message: "Access Denied",
status: :permission_denied,
status_code: 7
}}
iex> Spear.ping(conn)
:pong
"""
@doc since: "0.4.0"
@doc api: :operations
@spec shutdown(connection :: Spear.Connection.t(), opts :: Keyword.t()) :: :ok | {:error, any()}
def shutdown(conn, opts \\ []) do
with {:ok, empty()} <- request(conn, Operations, :Shutdown, [empty()], opts) do
:ok
end
end
@doc """
Requests that the indices be merged
<!--
YARD I have no idea what this does
-->
See the EventStoreDB documentation for more information.
A user does not need to be in `$ops` or any group to initiate this request.
## Options
Options are passed to `request/5`.
## Examples
iex> Spear.merge_indexes(conn)
:ok
"""
@doc since: "0.4.0"
@doc api: :operations
@spec merge_indexes(connection :: Spear.Connection.t(), opts :: Keyword.t()) ::
:ok | {:error, any()}
# coveralls-ignore-start
def merge_indexes(conn, opts \\ []) do
with {:ok, empty()} <- request(conn, Operations, :MergeIndexes, [empty()], opts) do
:ok
end
end
@doc """
Requests that the currently connected node resign its leadership role
<!--
YARD I have no idea what this does
-->
See the EventStoreDB documentation for more information.
A user does not need to be in `$ops` or any group to initiate this request.
## Options
Options are passed to `request/5`.
## Examples
iex> Spear.resign_node(conn)
:ok
"""
@doc since: "0.4.0"
@doc api: :operations
@spec resign_node(connection :: Spear.Connection.t(), opts :: Keyword.t()) ::
:ok | {:error, any()}
def resign_node(conn, opts \\ []) do
with {:ok, empty()} <- request(conn, Operations, :ResignNode, [empty()], opts) do
:ok
end
end
@doc """
Sets the node priority number
<!--
YARD I have no idea what this does
-->
See the EventStoreDB documentation for more information.
A user does not need to be in `$ops` or any group to initiate this request.
## Options
Options are passed to `request/5`.
## Examples
iex> Spear.set_node_priority(conn, 1)
:ok
"""
@doc since: "0.4.0"
@doc api: :operations
@spec set_node_priority(
connection :: Spear.Connection.t(),
priority :: integer(),
opts :: Keyword.t()
) :: :ok | {:error, any()}
def set_node_priority(conn, priority, opts \\ [])
def set_node_priority(conn, priority, opts) when is_integer(priority) do
message = Operations.set_node_priority_req(priority: priority)
with {:ok, empty()} <- request(conn, Operations, :SetNodePriority, [message], opts) do
:ok
end
end
@doc """
Restarts all persistent subscriptions
See the EventStoreDB documentation for more information.
A user does not need to be in `$ops` or any group to initiate this request.
## Options
Options are passed to `request/5`.
## Examples
iex> Spear.restart_persistent_subscriptions(conn)
:ok
"""
@doc since: "0.4.0"
@doc api: :operations
@spec restart_persistent_subscriptions(connection :: Spear.Connection.t(), opts :: Keyword.t()) ::
:ok | {:error, any()}
def restart_persistent_subscriptions(conn, opts \\ []) do
with {:ok, empty()} <-
request(conn, Operations, :RestartPersistentSubscriptions, [empty()], opts) do
:ok
end
end
# coveralls-ignore-stop
@doc """
Reads the cluster information from the connected EventStoreDB
Returns a list of members which are clustered to the currently connected
EventStoreDB.
## Options
Options are passed to `request/5`.
## Examples
iex> Spear.cluster_info(conn)
{:ok,
[
%Spear.ClusterMember{
address: "127.0.0.1",
alive?: true,
instance_id: "eba4c27f-e443-4b21-8756-00845bc5cda1",
port: 2113,
state: :Leader,
timestamp: ~U[2021-04-19 17:25:17.875824Z]
}
]}
"""
@doc since: "0.5.0"
@doc api: :gossip
@spec cluster_info(connection :: Spear.Connection.t(), opts :: Keyword.t()) ::
{:ok, [Spear.ClusterMember.t()]} | {:error, any()}
def cluster_info(conn, opts \\ []) do
with {:ok, Gossip.cluster_info(members: members)} <-
request(conn, Gossip, :Read, [empty()], opts) do
{:ok, Enum.map(members, &Spear.ClusterMember.from_member_info/1)}
end
end
@doc """
Deletes a persistent subscription from the EventStoreDB
Persistent subscriptions are considered unique by their stream and group
names together: you may define separate persistent subscriptions for the same
stream with multiple groups or use the same group name for persistent
subscriptions to multiple streams. A combination of stream name and group
name together is considered unique though.
## Options
Options are passed to `request/5`.
## Examples
iex> Spear.delete_persistent_subscription(conn, "my_stream", "MyGroup")
:ok
"""
@doc since: "0.6.0"
@doc api: :persistent
@spec delete_persistent_subscription(
connection :: Spear.Connection.t(),
stream_name :: String.t(),
group_name :: String.t(),
opts :: Keyword.t()
) :: :ok | {:error, any()}
def delete_persistent_subscription(conn, stream_name, group_name, opts \\ [])
def delete_persistent_subscription(conn, stream_name, group_name, opts)
when (is_binary(stream_name) or stream_name == :all) and is_binary(group_name) do
message =
Persistent.delete_req(
options:
Persistent.delete_req_options(
stream_option: Spear.PersistentSubscription.map_short_stream_option(stream_name),
group_name: group_name
)
)
with {:ok, Persistent.delete_resp()} <- request(conn, Persistent, :Delete, [message], opts) do
:ok
end
end
@doc """
Creates a persistent subscription
See `t:Spear.PersistentSubscription.Settings.t/0` for more information.
Note that persistent subscriptions to the `:all` stream with server-side
filtering is a feature introduced in EventStoreDB v21.6.0. Attempting
to use the `:all` stream on older EventStoreDB versions will fail.
## Options
* `:from` - the position or revision in the stream where the persistent
subscription should start. This option may be `:start` or `:end`
describing the beginning or end of the stream. When the `stream_name`
is `:all`, this parameter describes the prepare and commit positions
in the `:all` stream which can be found on any event emitted from a
subscription to the `:all` stream. When the `stream_name` is not the
`:all` stream, this option may be an integer representing the event
number in the stream from which the subscription should start. This
may be found on any `t:Spear.Event.t/0`. This option may be passed
a `t:Spear.Event.t/0`, from which either the revision or position will
be determined based on the stream name. This option overwrites the
`:revision` field on the `t:Spear.PersistentSubscription.Settings.t/0`
type which is now deprecated.
* `:filter` - a filter to apply while reading from the `:all` stream.
This option only applies when reading the `:all` stream. The same
data structure works for regular and persistent subscriptions to
the `:all` stream. See the `t:Spear.Filter.t/0` documentation for
more information.
Remaining options are passed to `request/5`.
## Examples
iex> Spear.create_persistent_subscription(conn, "my_stream", "my_group", %Spear.PersistentSubscription.Settings{})
:ok
iex> import Spear.Filter
iex> filter = ~f/My.Aggregate.A- My.Aggregate.B-/ps
iex> Spear.create_persistent_subscription(conn, :all, "my_all_group", %Spear.PersistentSubscription.Settings{}, filter: filter)
:ok
"""
@doc since: "0.6.0"
@doc api: :persistent
@spec create_persistent_subscription(
connection :: Spear.Connection.t(),
stream_name :: String.t() | :all,
group_name :: String.t(),
settings :: Spear.PersistentSubscription.Settings.t(),
opts :: Keyword.t()
) :: :ok | {:error, any()}
def create_persistent_subscription(conn, stream_name, group_name, settings, opts \\ [])
def create_persistent_subscription(
conn,
stream_name,
group_name,
%Spear.PersistentSubscription.Settings{} = settings,
opts
)
when (is_binary(stream_name) or stream_name == :all) and is_binary(group_name) do
message =
Spear.PersistentSubscription.build_create_request(stream_name, group_name, settings, opts)
with {:ok, Persistent.create_resp()} <- request(conn, Persistent, :Create, [message], opts) do
:ok
end
end
@doc """
Gets information pertaining to a persistent subcription.
Requires server version 22.6.0 or above.
`opts` are passed to the underlying request.
## Examples
iex> Spear.get_persistent_subscription_info(conn, "accounts", "subscription-group")
{:ok,
%Spear.PersistentSubscription.Info{
event_source: "accounts",
group_name: "subscription-group",
status: "Live",
average_per_second: 0,
total_items: 78,
count_since_last_measurement: 0,
last_checkpointed_event_position: "31",
last_known_event_position: "36",
start_from: "0",
message_timeout_milliseconds: 30000,
max_retry_count: 10,
live_buffer_size: 500,
buffer_size: 500,
read_batch_size: 20,
check_point_after_milliseconds: 2000,
min_check_point_count: 10,
max_check_point_count: 1000,
read_buffer_count: 0,
live_buffer_count: 36,
retry_buffer_count: 0,
total_in_flight_messages: 0,
outstanding_messages_count: 0,
named_consumer_strategy: :RoundRobin,
max_subscriber_count: 0,
parked_message_count: 1,
connections: [],
extra_statistics?: false,
resolve_link_tos?: false
}}
"""
@doc since: "1.2.0"
@doc api: :persistent
@spec get_persistent_subscription_info(
connection :: Spear.Connection.t(),
stream_name :: String.t() | :all,
group_name :: String.t(),
opts :: Keyword.t()
) :: {:ok, Spear.PersistentSubcription.Info.t()} | {:error, any()}
def get_persistent_subscription_info(conn, stream_name, group_name, opts \\ [])
when (is_binary(stream_name) or stream_name == :all) and is_binary(group_name) do
get_info_message =
Spear.PersistentSubscription.Info.build_info_request(stream_name, group_name)
case Spear.request(conn, Spear.Records.Persistent, :GetInfo, [get_info_message], opts) do
{:ok, Spear.Records.Persistent.get_info_resp(subscription_info: info)} ->
{:ok, Spear.PersistentSubscription.Info.from_proto(info)}
error ->
error
end
end
@doc """
Updates an existing persistent subscription
See `t:Spear.PersistentSubscription.Settings.t/0` for more information.
Note that persistent subscriptions to the `:all` stream with server-side
filtering is a feature introduced in EventStoreDB v21.6.0. Attempting
to use the `:all` stream on older EventStoreDB versions will fail.
## Options
Options are passed to `request/5`.
## Examples
iex> Spear.update_persistent_subscription(conn, "my_stream", "my_group", %Spear.PersistentSubscription.Settings{})
:ok
"""
@doc since: "0.6.0"
@doc api: :persistent
@spec update_persistent_subscription(
connection :: Spear.Connection.t(),
stream_name :: String.t(),
group_name :: String.t(),
settings :: Spear.PersistentSubscription.Settings.t(),
opts :: Keyword.t()
) :: :ok | {:error, any()}
def update_persistent_subscription(conn, stream_name, group_name, settings, opts \\ [])
def update_persistent_subscription(
conn,
stream_name,
group_name,
%Spear.PersistentSubscription.Settings{} = settings,
opts
)
when (is_binary(stream_name) or stream_name == :all) and is_binary(group_name) do
message =
Persistent.update_req(
options:
Persistent.update_req_options(
stream_identifier: Shared.stream_identifier(stream_name: stream_name),
stream_option:
Spear.PersistentSubscription.map_update_stream_option(stream_name, opts),
group_name: group_name,
settings: Spear.PersistentSubscription.Settings.to_record(settings, :update)
)
)
with {:ok, Persistent.update_resp()} <- request(conn, Persistent, :Update, [message], opts) do
:ok
end
end
@doc """
Lists the currently existing persistent subscriptions
Results are returned in an `t:Enumerable.t/0` of
`t:Spear.PersistentSubscription.t/0`.
Note that the `:extra_statistics?` field of settings is not determined by
this function: `:extra_statistics?` will always be returned as `nil` in this
function.
This function works by reading the built-in `$persistentSubscriptionConfig`
stream. This stream can be read normally to obtain additional information
such as at timestamp for the last time the persistent subscription config was
updated.
## Options
Options are passed to `read_stream/3`. `:direction`, `:from`, and
`:max_count` are fixed and cannot be overridden.
## Examples
iex> Spear.create_persistent_subscription(conn, "my_stream", "my_group", %Spear.PersistentSubscription.Settings{})
:ok
iex> {:ok, subscriptions} = Spear.list_persistent_subscriptions(conn)
iex> subscriptions |> Enum.to_list()
[
%Spear.PersistentSubscription{
group_name: "my_group",
settings: %Spear.PersistentSubscription.Settings{
checkpoint_after: 3000,
extra_statistics?: nil,
history_buffer_size: 300,
live_buffer_size: 100,
max_checkpoint_count: 100,
max_retry_count: 10,
max_subscriber_count: 1,
message_timeout: 5000,
min_checkpoint_count: 1,
named_consumer_strategy: :RoundRobin,
read_batch_size: 100,
resolve_links?: true,
revision: 0
},
stream_name: "my_stream"
}
]
"""
@doc since: "0.6.0"
@doc api: :persistent
@spec list_persistent_subscriptions(connection :: Spear.Connection.t(), opts :: Keyword.t()) ::
{:ok, Enumerable.t()} | {:error, any()}
def list_persistent_subscriptions(conn, opts \\ []) do
# NOTE: this can be implemented without these hacks after v22.6.0
# with persistent's List RPC.
read_opts =
opts
|> Keyword.merge(
direction: :backwards,
from: :end,
max_count: 1
)
with {:ok, stream} <- read_stream(conn, "$persistentSubscriptionConfig", read_opts) do
subscriptions =
stream
|> Stream.flat_map(&get_in(&1, [Access.key(:body), "entries"]))
|> Stream.map(&Spear.PersistentSubscription.from_map/1)
{:ok, subscriptions}
end
end
@doc """
Subscribes a process to an existing persistent subscription
Persistent subscriptions can be gracefully closed with
`cancel_subscription/3` just like subscriptions started with `subscribe/4`.
The subscription will be cancelled by the connection process if the
subscriber process exits.
Persistent subscriptions are an alternative to standard subscriptions
(via `subscribe/4`) which use `ack/3` and `nack/4` to exert backpressure
and allow out-of-order and batch processing within a single consumer
and allow multiple connected consumers at once.
In standard subscriptions (via `subscribe/4`), if a client wishes to
handle events in order without reprocessing, the client must keep track
of its own position in a stream, either in memory or using some sort of
persistence such as PostgreSQL or mnesia for durability.
In contrast, persistent subscriptions are stateful on the server-side:
the EventStoreDB will keep track of which events have been positively and
negatively acknowledged and will only emit events which have not yet
been processed to any connected consumers.
This allows one to connect multiple subscriber processes to a persistent
subscription stream-group combination in a strategy called [Competing
Consumers](https://docs.microsoft.com/en-us/azure/architecture/patterns/competing-consumers).
Note that persistent subscription events are not guaranteed to be processed
in order like the standard subscriptions because of the ability to `nack/4`
and reprocess or park a message. While this requires special considerations
when authoring a consumer, it allows one to easily write a consumer which
does not head-of-line block in failure cases.
The subscriber will receive a message `{:eos, subscription, reason}` when the
subscription is closed by the server. `:closed` denotes that the EventStoreDB
connection has been severed and `:dropped` denotes that the EventStoreDB
has explicitly told the subscriber that the subscription is terminated.
This can occur for persistent subscriptions in the case where the
subscription is deleted (e.g. via `Spear.delete_persistent_subscription/4`).
`subscription` is the reference returned by this function.
```elixir
iex> Spear.create_persistent_subscription(conn, "asdf", "asdf", %Spear.PersistentSubscription.Settings{})
:ok
iex> Spear.connect_to_persistent_subscription(conn, self(), "asdf", "asdf")
{:ok, #Reference<0.515780924.2297430020.166204>}
iex> flush
:ok
iex> Spear.delete_persistent_subscription(conn, "asdf", "asdf")
:ok
iex> flush
{:eos, #Reference<0.515780924.2297430020.166204>, :dropped}
:ok
```
Like subscriptions from `subscribe/4`, events can be correlated to their
subscription by the `:subscription` key in each `Spear.Event.metadata`
map.
Note that persistent subscriptions to the `:all` stream with server-side
filtering is a feature introduced in EventStoreDB v21.6.0. Attempting
to use the `:all` stream on older EventStoreDB versions will fail.
## Backpressure
Persistent subscriptions allow the subscriber process to exert backpressure
on the EventStoreDB so that the message queue is not flooded. This is
implemented with a buffer of events which are considered by the EventStoreDB
to be _in-flight_ when they are sent to the client. Events remain in-flight
until they are `ack/3`-ed, `nack/4`-ed, or until the `:message_timeout`
duration is exceeded. If a client `ack/3`s a message, the EventStoreDB
will send a new message if any are available.
The in-flight buffer size is controllable per subscriber through
`:buffer_size`. Note that `:message_timeout` applies to each event: if
the `:buffer_size` is 5 and five events arrive simultaneously, the client
has the duration `:message_timeout` to acknowledge all five events before
they are considered stale and are automatically considered nack-ed by
the EventStoreDB.
The `:buffer_size` should align with the consumer's ability to batch
process events.
## Delivery guarantees
Persistent subscriptions provide at-least once delivery. Messages may be
re-delivered under a few circumstances:
* Negatively acknowledging a message with `nack/4` with the `:action` set
to `:retry` will queue the message for re-delivery.
* A message may be handled successfully but not within the configured
`:message_timeout`. If the server does not receive an `ack/3` within the
timeout, the message may be re-delivered.
* The acknowledgement from `ack/3` may be lost. This can happen in a few
cases:
* If the acknowledgement is sent and then a subscription is immediately
cancelled (either explicitly with `Spear.cancel_subscription/2` or if
the subscription process terminates immediately after sending an ack),
the EventStoreDB may discard the ack. This is because of a limitation
in the protocol which conflates the closing of a subscription with
the corruption of data being sent to the server. Subscription processes
may wish to sleep between the last acknowledgement and exiting to reduce
the chances of this happening.
* An unreliable network may drop the packet containing the
acknowledgement.
Re-delivered messages may arrive out-of-order. Cases of repeated delivery
or out-of-order delivery should be handled at the application level.
Also see the
[EventStoreDB Server docs](https://developers.eventstore.com/server/v21.10/persistent-subscriptions.html#persistent-subscription)
on Persistent Subscriptions.
## Options
* `:timeout` - (default: `5_000`ms - 5s) the time to await a subscription
confirmation from the EventStoreDB.
* `:raw?` - (default: `false`) controls whether events are translated from
low-level `Spear.Records.Persistent.read_resp/0` records to
`t:Spear.Event.t/0`s. By default `t:Spear.Event.t/0`s are sent to the
subscriber.
* `:credentials` - (default: `nil`) the credentials to use to connect to the
subscription. When not specified, the connection-level credentials are
used. Credentials must be a two-tuple `{username, password}`.
* `:buffer_size` - (default: `1`) the number of events allowed to be sent
to the client at a time. These events are considered in-flight. See the
backpressure section above for more information.
## Examples
iex> Spear.connect_to_persistent_subscription(conn, self(), "my_stream", "my_group")
iex> flush
%Spear.Event{}
:ok
"""
@doc since: "0.6.0"
@doc api: :persistent
@spec connect_to_persistent_subscription(
connection :: Spear.Connection.t(),
subscriber :: pid() | GenServer.name(),
stream_name :: String.t() | :all,
group_name :: String.t(),
opts :: Keyword.t()
) :: {:ok, subscription :: reference()} | {:error, any()}
def connect_to_persistent_subscription(conn, subscriber, stream_name, group_name, opts \\ [])
def connect_to_persistent_subscription(conn, subscriber, stream_name, group_name, opts)
when (is_binary(stream_name) or stream_name == :all) and is_binary(group_name) do
default_subscribe_opts = [
timeout: 5_000,
raw?: false,
through: &Spear.Reading.decode_read_response/2,
credentials: nil,
buffer_size: 1
]
opts =
default_subscribe_opts
|> Keyword.merge(opts)
|> Keyword.merge(stream: stream_name, subscriber: subscriber)
message =
Persistent.read_req(
content:
{:options,
Persistent.read_req_options(
stream_option: Spear.PersistentSubscription.map_short_stream_option(stream_name),
group_name: group_name,
buffer_size: opts[:buffer_size],
uuid_option: Persistent.read_req_options_uuid_option(content: {:string, empty()})
)}
)
through =
if opts[:raw?] do
# coveralls-ignore-start
fn resp, subscription -> {subscription, resp} end
# coveralls-ignore-stop
else
opts[:through]
end
request =
%Spear.Request{
api: {Persistent, :Read},
messages: [message],
credentials: opts[:credentials]
}
|> Spear.Request.expand()
call = {{:subscription, subscriber, through}, request}
case Connection.call(conn, call, opts[:timeout]) do
{:ok, subscription} when is_reference(subscription) ->
{:ok, subscription}
{:ok, %Spear.Connection.Response{} = response} ->
grpc_response =
Spear.Grpc.Response.from_connection_response(response, request.rpc, opts[:raw?])
{:error, grpc_response}
# coveralls-ignore-start
error ->
error
# coveralls-ignore-stop
end
end
@doc """
Acknowledges that an event received as part of a persistent subscription was
successfully handled
Although `ack/3` can accept a `t:Spear.Event.t/0` alone, the underlying
gRPC call acknowledges a batch of event IDs.
```elixir
Spear.ack(conn, subscription, events |> Enum.map(&Spear.Event.id/1))
```
should be preferred over
```elixir
Enum.each(events, &Spear.ack(conn, subscription, Spear.Event.id(&1)))
```
As the acknowledgements will be batched.
This function (and `nack/4`) are asynchronous casts to the connection
process.
## Examples
# some stream with 3 events
stream_name = "my_stream"
group_name = "spear_iex"
settings = %Spear.PersistentSubscription.Settings{}
get_event_and_ack = fn conn, sub ->
receive do
%Spear.Event{} = event ->
:ok = Spear.ack(conn, sub, event)
event
after
3_000 -> :no_events
end
end
iex> Spear.create_persistent_subscription(conn, stream_name, group_name, settings)
:ok
iex> {:ok, sub} = Spear.connect_to_persistent_subscription(conn, self(), stream_name, group_name)
iex> get_event_and_ack.(conn, sub)
%Spear.Event{..}
iex> get_event_and_ack.(conn, sub)
%Spear.Event{..}
iex> get_event_and_ack.(conn, sub)
%Spear.Event{..}
iex> get_event_and_ack.(conn, sub)
:no_events
iex> Spear.cancel_subscription(conn, sub)
:ok
"""
@doc since: "0.6.0"
@doc api: :persistent
@spec ack(
connection :: Spear.Connection.t(),
subscription :: reference(),
event_or_ids :: Spear.Event.t() | [String.t()]
) :: :ok
def ack(conn, subscription, event_or_ids)
def ack(conn, sub, %Spear.Event{} = event), do: ack(conn, sub, [Spear.Event.id(event)])
def ack(conn, sub, event_ids) when is_list(event_ids) do
id = ""
ids = Enum.map(event_ids, fn id -> Shared.uuid(value: {:string, id}) end)
message = Persistent.read_req(content: {:ack, Persistent.read_req_ack(id: id, ids: ids)})
Connection.cast(conn, {:push, sub, message})
end
@doc """
Negatively acknowldeges a persistent subscription event
Nacking is the opposite of `ack/3`ing: it tells the EventStoreDB that the
event should not be considered processed.
## Options
* `:action` - (default: `:retry`) controls the action the EventStoreDB
should take about the event. See
`t:Spear.PersistentSubscription.nack_action/0` for a full description.
* `:reason` - (default: `""`) a description of why the event is
being nacked
## Examples
# some stream with 3 events
stream_name = "my_stream"
group_name = "spear_iex"
settings = %Spear.PersistentSubscription.Settings{}
get_event_and_ack = fn conn, sub, action ->
receive do
%Spear.Event{} = event ->
:ok = Spear.nack(conn, sub, event, action: action)
event
after
3_000 -> :no_events
end
end
iex> Spear.create_persistent_subscription(conn, stream_name, group_name, settings)
:ok
iex> {:ok, sub} = Spear.connect_to_persistent_subscription(conn, self(), stream_name, group_name)
iex> get_event_and_nack.(conn, sub, :retry)
%Spear.Event{..} # event 0
iex> get_event_and_nack.(conn, sub, :retry)
%Spear.Event{..} # event 0
iex> get_event_and_nack.(conn, sub, :park) # park event 0 and move on
%Spear.Event{..} # event 0
iex> get_event_and_nack.(conn, sub, :skip) # skip event 1
%Spear.Event{..} # event 1
iex> get_event_and_nack.(conn, sub, :skip) # skip event 2
%Spear.Event{..} # event 2
iex> get_event_and_nack.(conn, sub, :skip)
:no_events
iex> Spear.cancel_subscription(conn, sub)
:ok
"""
@doc since: "0.6.0"
@doc api: :persistent
@spec nack(
connection :: Spear.Connection.t(),
subscription :: reference(),
event_or_ids :: Spear.Event.t() | [String.t()],
opts :: Keyword.t()
) :: :ok
# coveralls-ignore-start
def nack(conn, subscription, event_or_ids, opts \\ [])
# coveralls-ignore-stop
def nack(conn, sub, %Spear.Event{} = event, opts),
do: nack(conn, sub, [Spear.Event.id(event)], opts)
def nack(conn, sub, event_ids, opts) when is_list(event_ids) do
reason = Keyword.get(opts, :reason, "")
action = Keyword.get(opts, :action, :retry)
id = ""
ids = Enum.map(event_ids, fn id -> Shared.uuid(value: {:string, id}) end)
message =
Persistent.read_req(
content:
{:nack,
Persistent.read_req_nack(
id: id,
ids: ids,
action: Spear.PersistentSubscription.map_nack_action(action),
reason: reason
)}
)
Connection.cast(conn, {:push, sub, message})
end
@doc """
Requests that the server replays any messages parked for a persistent subscription
stream and group.
## Options
* `:stop_at` - the number of messages to request be replayed. If not specified, the
number of messages is not limited.
Remaining options are passed to `Spear.request/5`.
## Examples
iex> Spear.create_persistent_subscription(conn, "my_stream", "my_group", %Spear.PersistentSubscription.Settings{})
# ... nack some events with the `:park` action ...
iex> Spear.replay_parked(conn, "my_stream", "my_group")
:ok
# ... parked messages are re-delivered ...
"""
@doc since: "1.2.0"
@doc api: :persistent
@spec replay_parked_messages(
connection :: Spear.Connection.t(),
stream_name :: String.t() | :all,
group_name :: String.t(),
opts :: Keyword.t()
) :: :ok | {:error, any()}
def replay_parked_messages(conn, stream_name, group_name, opts \\ [])
when (is_binary(stream_name) or stream_name == :all) and is_binary(group_name) do
stop_at_option =
case Keyword.fetch(opts, :stop_at) do
# coveralls-ignore-start
{:ok, stop_at} when is_integer(stop_at) ->
{:stop_at, stop_at}
# coveralls-ignore-stop
:error ->
{:no_limit, Shared.empty()}
end
message =
Persistent.replay_parked_req(
options:
Persistent.replay_parked_req_options(
group_name: group_name,
stream_option: Spear.PersistentSubscription.map_short_stream_option(stream_name),
stop_at_option: stop_at_option
)
)
with {:ok, Persistent.replay_parked_resp()} <-
request(conn, Persistent, :ReplayParked, [message], opts) do
:ok
end
end
@doc """
Restarts the persistent subscription subsystem on the EventStoreDB server.
"""
@doc since: "1.2.0"
@doc api: :persistent
@spec restart_persistent_subscription_subsystem(
conn :: Spear.Connection.t(),
opts :: Keyword.t()
) :: :ok | {:error, any()}
# coveralls-ignore-start
def restart_persistent_subscription_subsystem(conn, opts \\ []) do
with {:ok, Shared.empty()} <-
request(conn, Persistent, :RestartSubsystem, [Shared.empty()], opts) do
:ok
end
end
# coveralls-ignore-stop
@doc """
Returns the parked events stream for a persistent subscription stream and
group.
If an event is negatively acknowledged and parked, the persistent subscription
will add it to the park stream for the given stream+group combination. It
can be useful to read this stream to determine if there are any parked
messages.
## Examples
iex> Spear.park_stream("MyStream", "MyGroup")
"$persistentsubscription-MyStream::MyGroup-parked"
"""
@doc since: "0.9.1"
@doc api: :utils
@spec park_stream(stream_name :: String.t(), group_name :: String.t()) :: String.t()
def park_stream(stream_name, group_name)
when is_binary(stream_name) and is_binary(group_name) do
"$persistentsubscription-#{stream_name}::#{group_name}-parked"
end
@doc """
Subscribes a process to stats updates from the EventStoreDB
This function subscribes a process in the same way as `subscribe/4`: the
function will return a reference representing the subscription and the
stats messages will be sent to the subscriber process with `send/2`.
This subscription can be cancelled with `cancel_subscription/3`.
This functionality was added to EventStoreDB in release v21.6.0. Prior
EventStoreDB versions will throw a GRPC error when attempting to use
this function.
## Options
* `:interval` - (default: `5_000` - 5 seconds) the interval after which
a new stats message should be sent. By default, stats messages arrive
every five seconds.
* `:use_metadata?` - (default: `true`) undocumented option. See the
EventStoreDB implementation for more information.
* `:timeout` - (default: `5_000` - 5 seconds) the GenServer timeout to
use when requesting a subscription to stats
* `:raw?` - (default: `false`) whether to emit the stats messages as
'raw' `Spear.Records.Monitoring.stats_resp/0` records in a tuple of
`{subscription :: reference(), Spear.Records.Monitoring.stats_resp()}`.
By default, stats messages are returned as maps.
* `:credentials` - (default: `nil`) credentials to use to perform this
subscription request.
## Examples
iex> Spear.subscribe_to_stats(conn, self())
{:ok, #Reference<0.359109646.3547594759.216222>}
iex> flush()
%{
"es-queue-Projection Core #2-length" => "0",
"es-queue-Worker #1-lengthLifetimePeak" => "0",
"es-queue-Worker #3-lengthCurrentTryPeak" => "0",
"es-queue-StorageReaderQueue #9-avgProcessingTime" => "0",
"es-queue-StorageReaderQueue #6-idleTimePercent" => "100",
..
}
"""
@doc since: "0.10.0"
@doc api: :monitoring
@spec subscribe_to_stats(
connection :: Spear.Connection.t(),
subscriber :: pid() | GenServer.name(),
opts :: Keyword.t()
) ::
{:ok, reference()} | {:error, any()}
def subscribe_to_stats(conn, subscriber, opts \\ []) do
default_subscribe_opts = [
use_metadata?: false,
interval: 5_000,
timeout: 5_000,
raw?: false,
credentials: nil
]
opts =
default_subscribe_opts
|> Keyword.merge(opts)
stats_req =
Monitoring.stats_req(
use_metadata: opts[:use_metadata?],
refresh_time_period_in_ms: opts[:interval]
)
request =
%Spear.Request{
api: {Monitoring, :Stats},
messages: [stats_req],
credentials: nil
}
|> Spear.Request.expand()
through =
if opts[:raw?] do
fn message, request_ref -> {request_ref, message} end
else
fn Monitoring.stats_resp(stats: stats), _ -> stats end
end
Connection.call(
conn,
{{:subscription, subscriber, through}, request},
opts[:timeout]
)
end
@doc """
Requests the available server RPCs
This function is compatible with server version v21.10.0 and later.
## Options
Options are passed to `request/5`.
## Examples
iex> Spear.get_supported_rpcs(conn)
{:ok,
[
%Spear.SupportedRpc{
features: ["stream", "all"],
rpc: "create",
service: "event_store.client.persistent_subscriptions.persistentsubscriptions"
},
%Spear.SupportedRpc{
features: ["stream", "all"],
rpc: "update",
service: "event_store.client.persistent_subscriptions.persistentsubscriptions"
},
..
]}
"""
@doc since: "0.11.0"
@doc api: :server_features
@spec get_supported_rpcs(connection :: Spear.Connection.t(), opts :: Keyword.t()) ::
{:ok, [Spear.SupportedRpc.t()]} | {:error, any()}
def get_supported_rpcs(conn, opts \\ []) do
with {:ok, ServerFeatures.supported_methods(methods: methods)} <-
request(conn, ServerFeatures, :GetSupportedMethods, [empty()], opts) do
{:ok, Enum.map(methods, &Spear.SupportedRpc.from_proto/1)}
end
end
@doc """
Determines the current version of the connected server
This function is compatible with server version v21.10.0 and later.
## Options
Options are passed to `request/5`.
## Examples
iex> Spear.get_server_version(conn)
{:ok, "21.10.0"}
"""
@doc since: "0.11.0"
@doc api: :server_features
@spec get_server_version(connection :: Spear.Connection.t(), opts :: Keyword.t()) ::
{:ok, [String.t()]} | {:error, any()}
def get_server_version(conn, opts \\ []) do
with {:ok, ServerFeatures.supported_methods(event_store_server_version: version)} <-
request(conn, ServerFeatures, :GetSupportedMethods, [empty()], opts) do
{:ok, version}
end
end
end