defmodule EventStore do
@moduledoc """
EventStore allows you to define one or more event store modules to append,
read, and subscribe to streams of events.
It uses PostgreSQL (v9.5 or later) as the underlying storage engine.
## Defining an event store
An event store module is defined in your own application as follows:
defmodule MyApp.EventStore do
use EventStore, otp_app: :my_app
# Optional `init/1` function to modify config at runtime.
def init(config) do
{:ok, config}
end
end
Where the configuration for the event store must be in your application
environment, usually defined in `config/config.exs`:
config :my_app, MyApp.EventStore,
serializer: EventStore.JsonSerializer,
username: "postgres",
password: "postgres",
database: "eventstore",
hostname: "localhost"
Or use a URL to connect instead:
config :my_app, MyApp.EventStore,
serializer: EventStore.JsonSerializer,
url: "postgres://postgres:postgres@localhost/eventstore"
**Note:** To use an EventStore with Commanded you should configure the event
store to use Commanded's JSON serializer which provides additional support for
JSON decoding:
config :my_app, MyApp.EventStore,
serializer: Commanded.Serialization.JsonSerializer
The event store module defines a `start_link/1` function that needs to be
invoked before using the event store. In general, this function is not
called directly, but included as part of your application supervision tree.
If your application was generated with a supervisor (by passing `--sup`
to `mix new`) you will have a `lib/my_app/application.ex` file
containing the application start callback that defines and starts your
supervisor. You just need to edit the `start/2` function to start the event
store in your application's supervisor:
def start(_type, _args) do
children = [
MyApp.EventStore
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
Each event store module (e.g. `MyApp.EventStore`) provides a public API to
read events from and write events to an event stream, and subscribe to event
notifications.
## Postgres schema
By default the `public` schema will be used for event store tables. An event
store can be configured to use an alternate Postgres schema:
defmodule MyApp.EventStore do
use EventStore, otp_app: :my_app, schema: "schema_name"
end
Or provide the schema as an option in the `init/1` callback function:
defmodule MyApp.EventStore do
use EventStore, otp_app: :my_app
def init(config) do
{:ok, Keyword.put(config, :schema, "schema_name")}
end
end
Or define it in environment config when configuring the database connection
settings:
# config/config.exs
config :my_app, MyApp.EventStore, schema: "schema_name"
This feature allows you to define and start multiple event stores sharing a
single Postgres database, but with their data isolated and segregated by
schema.
Note the `mix event_store.<task>` tasks to create, initialize, and drop an
event store database will also handle creating and/or dropping the schema.
## Dynamic named event store
An event store can be started multiple times by providing a name when
starting. The name must be provided as an option to all event store operations
to identify the correct instance.
### Example
Define an event store:
defmodule MyApp.EventStore do
use EventStore, otp_app: :my_app
end
Start multiple instances of the event store, each with a unique name:
{:ok, _pid} = EventStore.start_link(name: :eventstore1)
{:ok, _pid} = EventStore.start_link(name: :eventstore2)
{:ok, _pid} = EventStore.start_link(name: :eventstore3)
Use a dynamic event store by providing its name as an option to each function:
:ok = EventStore.append_to_stream(stream_uuid, expected_version, events, name: :eventstore1)
{:ok, events} = EventStore.read_stream_forward(stream_uuid, 0, 1_000, name: :eventstore1)
## Dynamic schemas
This feature also allows you to start each event store instance using a
different schema:
{:ok, _pid} = EventStore.start_link(name: :tenant1, schema: "tenant1")
{:ok, _pid} = EventStore.start_link(name: :tenant2, schema: "tenant2")
Or start supervised:
children =
for tenant <- [:tenant1, :tenant2, :tenant3] do
{MyApp.EventStore, name: tenant, schema: "#\{tenant\}"}
end
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
The above can be used for multi-tenancy where the data for each tenant is
stored in a separate, isolated schema.
## Shared database connection pools
By default each event store will start its own `Postgrex` database connection
pool. The size of the pool is configured with the `pool_size` config option.
When you have multiple event stores running you will also end up with multiple
connection pools. If they are all connecting to the same physical Postgres
database then it can be useful to share a single pool amongst all event
stores. Use the `shared_connection_pool` config option to specify a name for
the shared connection pool. Then configure the event stores you'd like to
share the pool with the same name.
This can be done in config:
# config/config.exs
config :my_app, MyApp.EventStore, shared_connection_pool: :shared_pool
Or when starting the event stores, such as via a `Supervisor`:
Supervisor.start_link(
[
{MyApp.EventStore, name: :eventstore1, shared_connection_pool: :shared_pool},
{MyApp.EventStore, name: :eventstore2, shared_connection_pool: :shared_pool},
{MyApp.EventStore, name: :eventstore3, shared_connection_pool: :shared_pool}
], opts)
## Using an existing database connection or transaction
In some situations you might want to execute the event store operations using
an existing Postgres database connection or transaction. For instance, if you
want to persist changes to one or more other tables, such as a read-model
projection.
To do this you can provide a Postgrex connection process or transaction as a
`:conn` option to any of the supported `EventStore` functions.
{:ok, pid} = Postgrex.start_link(config)
Postgrex.transaction(pid, fn conn ->
:ok = EventStore.append_to_stream(stream_uuid, expected_version, events, conn: conn)
end)
This can also be used with an Ecto `Repo` which is configured to use the
Postgres SQL adapter. The connection process may be looked up as follows:
Repo.transaction(fn ->
%{pid: pool} = Ecto.Adapter.lookup_meta(Repo)
conn = Process.get({Ecto.Adapters.SQL, pool})
:ok = EventStore.append_to_stream(stream_uuid, expected_version, events, conn: conn)
end)
---
## Guides
Please refer to the following guides to learn more:
- [Getting started](getting-started.html)
- [Usage](usage.html)
- [Subscriptions](subscriptions.html)
- [Running on a cluster of nodes](cluster.html)
- [Event serialization](event-serialization.html)
- [Upgrading an existing EventStore database](upgrades.html)
---
"""
@type t :: module
@type option ::
{:name, atom}
| {:conn, Postgrex.conn() | DBConnection.t()}
| {:timeout, timeout()}
@type options :: [option]
@type pagination_option ::
option
| {:page_size, pos_integer()}
| {:page_number, pos_integer()}
| {:search, String.t()}
| {:sort_by,
:stream_uuid | :stream_id | :stream_version | :created_at | :deleted_at | :status}
| {:sort_dir, :asc | :desc}
@type pagination_options :: [pagination_option]
@type transient_subscribe_option ::
{:name, atom}
| {:selector, (EventStore.RecordedEvent.t() -> any())}
| {:mapper, (EventStore.RecordedEvent.t() -> any())}
@type transient_subscribe_options :: [transient_subscribe_option]
@type persistent_subscription_option ::
transient_subscribe_option
| {:buffer_size, pos_integer()}
| {:checkpoint_after, non_neg_integer()}
| {:checkpoint_threshold, pos_integer()}
| {:concurrency_limit, pos_integer()}
| {:max_size, pos_integer()}
| {:partition_by, (EventStore.RecordedEvent.t() -> any())}
| {:start_from, :origin | :current | non_neg_integer()}
| {:timeout, timeout()}
| {:transient, boolean()}
@type persistent_subscription_options :: [persistent_subscription_option]
@type expected_version :: :any_version | :no_stream | :stream_exists | non_neg_integer
@type start_from :: :origin | :current | non_neg_integer
@doc false
defmacro __using__(opts) do
quote bind_quoted: [opts: opts] do
@behaviour EventStore
alias EventStore.{Config, EventData, PubSub, Subscriptions}
alias EventStore.Snapshots.{SnapshotData, Snapshotter}
alias EventStore.Subscriptions.Subscription
alias EventStore.Streams.Stream
@otp_app Keyword.fetch!(opts, :otp_app)
@all_stream "$all"
@default_batch_size 1_000
@default_count 1_000
def config(opts \\ []) do
opts = Keyword.merge(unquote(opts), opts)
with {:ok, config} <- EventStore.Supervisor.runtime_config(__MODULE__, @otp_app, opts) do
config
end
end
def child_spec(opts) do
name = name(opts)
%{
id: name,
start: {__MODULE__, :start_link, [opts]},
type: :supervisor
}
end
def start_link(opts \\ []) do
opts = Keyword.merge(unquote(opts), opts)
name = name(opts)
EventStore.Supervisor.start_link(__MODULE__, @otp_app, name, opts)
end
def stop(supervisor, timeout \\ 5000) do
Supervisor.stop(supervisor, :normal, timeout)
end
def append_to_stream(stream_uuid, expected_version, events, opts \\ [])
def append_to_stream(@all_stream, _expected_version, _events, _opts),
do: {:error, :cannot_append_to_all_stream}
def append_to_stream(stream_uuid, expected_version, events, opts) do
{conn, opts} = parse_opts(opts)
Stream.append_to_stream(conn, stream_uuid, expected_version, events, opts)
end
def link_to_stream(
stream_uuid,
expected_version,
events_or_event_ids,
opts \\ []
)
def link_to_stream(@all_stream, _expected_version, _events_or_event_ids, _opts),
do: {:error, :cannot_append_to_all_stream}
def link_to_stream(stream_uuid, expected_version, events_or_event_ids, opts) do
{conn, opts} = parse_opts(opts)
Stream.link_to_stream(conn, stream_uuid, expected_version, events_or_event_ids, opts)
end
def read_stream_forward(
stream_uuid,
start_version \\ 0,
count \\ @default_count,
opts \\ []
)
def read_stream_forward(stream_uuid, start_version, count, opts) do
{conn, opts} = parse_opts(opts)
Stream.read_stream_forward(conn, stream_uuid, start_version, count, opts)
end
def read_all_streams_forward(
start_version \\ 0,
count \\ @default_count,
opts \\ []
)
def read_all_streams_forward(start_version, count, opts),
do: read_stream_forward(@all_stream, start_version, count, opts)
def read_stream_backward(
stream_uuid,
start_version \\ -1,
count \\ @default_count,
opts \\ []
)
def read_stream_backward(stream_uuid, start_version, count, opts) do
{conn, opts} = parse_opts(opts)
Stream.read_stream_backward(conn, stream_uuid, start_version, count, opts)
end
def read_all_streams_backward(
start_version \\ -1,
count \\ @default_count,
opts \\ []
)
def read_all_streams_backward(start_version, count, opts),
do: read_stream_backward(@all_stream, start_version, count, opts)
def stream_forward(stream_uuid, start_version \\ 0, opts \\ [])
def stream_forward(stream_uuid, start_version, read_batch_size)
when is_integer(start_version) and is_integer(read_batch_size) do
stream_forward(stream_uuid, start_version, read_batch_size: read_batch_size)
end
def stream_forward(stream_uuid, start_version, opts) do
{conn, opts} = parse_opts(opts)
opts = Keyword.put_new(opts, :read_batch_size, @default_batch_size)
Stream.stream_forward(conn, stream_uuid, start_version, opts)
end
def stream_all_forward(start_version \\ 0, opts \\ [])
def stream_all_forward(start_version, opts),
do: stream_forward(@all_stream, start_version, opts)
def stream_backward(stream_uuid, start_version \\ -1, opts \\ [])
def stream_backward(stream_uuid, start_version, read_batch_size)
when is_integer(start_version) and is_integer(read_batch_size) do
stream_backward(stream_uuid, start_version, read_batch_size: read_batch_size)
end
def stream_backward(stream_uuid, start_version, opts) do
{conn, opts} = parse_opts(opts)
opts = Keyword.put_new(opts, :read_batch_size, @default_batch_size)
Stream.stream_backward(conn, stream_uuid, start_version, opts)
end
def stream_all_backward(start_version \\ -1, opts \\ [])
def stream_all_backward(start_version, opts),
do: stream_backward(@all_stream, start_version, opts)
def delete_stream(stream_uuid, expected_version, type \\ :soft, opts \\ [])
def delete_stream(@all_stream, _expected_version, _type, _opts),
do: {:error, :cannot_delete_all_stream}
def delete_stream(stream_uuid, expected_version, type, opts) when type in [:soft, :hard] do
{conn, opts} = parse_opts(opts)
Stream.delete(conn, stream_uuid, expected_version, type, opts)
end
def paginate_streams(opts \\ []) do
pagination_opts =
Keyword.take(opts, [:page_size, :page_number, :search, :sort_by, :sort_dir])
{conn, opts} = parse_opts(opts)
opts = Keyword.merge(opts, pagination_opts)
Stream.paginate_streams(conn, opts)
end
def stream_info(stream_uuid, opts \\ [])
def stream_info(:all, opts), do: stream_info(@all_stream, opts)
def stream_info(stream_uuid, opts) do
{conn, opts} = parse_opts(opts)
Stream.stream_info(conn, stream_uuid, :stream_exists, opts)
end
def subscribe(stream_uuid, opts \\ []) do
name = name(opts)
PubSub.subscribe(name, stream_uuid, Keyword.delete(opts, :name))
end
def subscribe_to_stream(stream_uuid, subscription_name, subscriber, opts \\ []) do
name = name(opts)
config = Config.lookup(name)
conn = Keyword.fetch!(config, :conn)
schema = Keyword.fetch!(config, :schema)
serializer = Keyword.fetch!(config, :serializer)
query_timeout = timeout(opts, config)
{start_from, opts} = Keyword.pop(opts, :start_from, :origin)
with {:ok, start_from} <-
Stream.start_from(conn, stream_uuid, start_from,
schema: schema,
timeout: query_timeout
) do
opts =
opts
|> Keyword.delete(:timeout)
|> Keyword.merge(
conn: conn,
event_store: name,
query_timeout: query_timeout,
schema: schema,
serializer: serializer,
stream_uuid: stream_uuid,
subscription_name: subscription_name,
start_from: start_from
)
|> Keyword.put_new_lazy(:hibernate_after, fn ->
Keyword.fetch!(config, :subscription_hibernate_after)
end)
|> Keyword.put_new_lazy(:retry_interval, fn ->
Keyword.fetch!(config, :subscription_retry_interval)
end)
Subscriptions.subscribe_to_stream(subscriber, opts)
end
end
def subscribe_to_all_streams(subscription_name, subscriber, opts \\ []),
do: subscribe_to_stream(@all_stream, subscription_name, subscriber, opts)
defdelegate ack(subscription, ack), to: Subscription
def unsubscribe_from_stream(stream_uuid, subscription_name, opts \\ []) do
name = name(opts)
Subscriptions.unsubscribe_from_stream(name, stream_uuid, subscription_name)
end
def unsubscribe_from_all_streams(subscription_name, opts \\ []),
do: unsubscribe_from_stream(@all_stream, subscription_name, opts)
def delete_subscription(stream_uuid, subscription_name, opts \\ []) do
name = name(opts)
with :ok <- Subscriptions.stop_subscription(name, stream_uuid, subscription_name) do
{conn, opts} = parse_opts(opts)
Subscriptions.delete_subscription(conn, stream_uuid, subscription_name, opts)
end
end
def delete_all_streams_subscription(subscription_name, opts \\ []),
do: delete_subscription(@all_stream, subscription_name, opts)
def read_snapshot(source_uuid, opts \\ []) do
{conn, opts} = parse_opts(opts)
Snapshotter.read_snapshot(conn, source_uuid, opts)
end
def record_snapshot(%SnapshotData{} = snapshot, opts \\ []) do
{conn, opts} = parse_opts(opts)
Snapshotter.record_snapshot(conn, snapshot, opts)
end
def delete_snapshot(source_uuid, opts \\ []) do
{conn, opts} = parse_opts(opts)
Snapshotter.delete_snapshot(conn, source_uuid, opts)
end
defp parse_opts(opts) do
name = name(opts)
config = Config.lookup(name)
conn = Keyword.get(opts, :conn) || Keyword.fetch!(config, :conn)
timeout = timeout(opts, config)
config =
case Keyword.fetch(opts, :read_batch_size) do
{:ok, read_batch_size} -> Keyword.put(config, :read_batch_size, read_batch_size)
:error -> config
end
{conn, Keyword.put(config, :timeout, timeout)}
end
defp name(opts) do
case Keyword.get(opts, :name) do
nil ->
__MODULE__
name when is_atom(name) ->
name
invalid ->
raise ArgumentError,
message: "expected `:name` to be an atom, got: " <> inspect(invalid)
end
end
defp timeout(opts, config) do
case Keyword.get(opts, :timeout) do
nil ->
# Use event store default timeout, or 15 seconds if not configured
Keyword.get(config, :timeout, 15_000)
timeout when is_integer(timeout) ->
timeout
:infinity ->
:infinity
invalid ->
raise ArgumentError,
message:
"expected `:timeout` to be an integer or `:infinity`, got: " <>
inspect(invalid)
end
end
end
end
alias EventStore.{Config, EventData, Page}
alias EventStore.Snapshots.SnapshotData
alias EventStore.Streams.StreamInfo
## User callbacks
@optional_callbacks init: 1
@doc """
A callback executed when the event store starts or when configuration is read.
It must return `{:ok, keyword}` with the updated list of configuration.
"""
@callback init(config :: Keyword.t()) :: {:ok, Keyword.t()}
@doc """
Returns the event store configuration stored in the `:otp_app` environment.
"""
@callback config() :: Keyword.t()
@doc """
Starts any connection pooling or supervision and return `{:ok, pid}`
or just `:ok` if nothing needs to be done.
Returns `{:error, {:already_started, pid}}` if the event store is already
started or `{:error, term}` in case anything else goes wrong.
"""
@callback start_link(opts :: Keyword.t()) ::
{:ok, pid}
| {:error, {:already_started, pid}}
| {:error, term}
@doc """
Shuts down the event store.
"""
@callback stop(Supervisor.supervisor(), timeout) :: :ok
@doc """
Append one or more events to a stream atomically.
- `stream_uuid` is used to uniquely identify a stream.
- `expected_version` is used for optimistic concurrency checks.
You can provide a non-negative integer to specify the expected stream
version. This is used to ensure you can only append to the stream if it is
at exactly that version.
You can also provide one of the following values to alter the concurrency
check behaviour:
- `:any_version` - No concurrency checking and allow any stream version
(including no stream).
- `:no_stream` - Ensure the stream does not exist.
- `:stream_exists` - Ensure the stream exists.
- `events` is a list of `%EventStore.EventData{}` structs.
- `opts` an optional keyword list containing:
- `name` the name of the event store if provided to `start_link/1`.
- `timeout` an optional timeout for the database transaction, in
milliseconds. Defaults to 15,000ms.
Returns `:ok` on success, or an `{:error, reason}` tagged tuple. The returned
error may be due to one of the following reasons:
* `{:error, :wrong_expected_version}` when the actual stream version differs
from the provided expected version.
* `{:error, :stream_exists}` when the stream exists, but expected version
was `:no_stream`.
* `{:error, :stream_not_found}` when the stream does not exist, but
expected version was `:stream_exists`.
"""
@callback append_to_stream(
stream_uuid :: String.t(),
expected_version,
events :: list(EventData.t()),
opts :: options
) ::
:ok
| {:error, :cannot_append_to_all_stream}
| {:error, :stream_exists}
| {:error, :stream_not_found}
| {:error, :wrong_expected_version}
| {:error, :stream_deleted}
| {:error, reason :: term}
@doc """
Link one or more existing events to another stream.
Allows you to construct streams containing events already appended to any
other stream. This is more efficient than copying events between streams since
only a reference to the existing event is created.
- `stream_uuid` is used to uniquely identify the target stream.
- `expected_version` is used for optimistic concurrency checks.
You can provide a non-negative integer to specify the expected stream
version. This is used to ensure you can only append to the stream if it is
at exactly that version.
You can also provide one of the following values to affect the concurrency
check behaviour:
- `:any_version` - No concurrency checking; allow any stream version
(including no stream).
- `:no_stream` - Ensure the stream does not exist.
- `:stream_exists` - Ensure the stream exists.
- `events_or_event_ids` is a list of `%EventStore.EventData{}` structs or
event ids.
- `opts` an optional keyword list containing:
- `name` the name of the event store if provided to `start_link/1`.
- `timeout` an optional timeout for the database transaction, in
milliseconds. Defaults to 15,000ms.
Returns `:ok` on success, or an `{:error, reason}` tagged tuple. The returned
error may be due to one of the following reasons:
* `{:error, :wrong_expected_version}` when the actual stream version differs
from the provided expected version.
* `{:error, :stream_exists}` when the stream exists, but expected version
was `:no_stream`.
* `{:error, :stream_not_found}` when the stream does not exist, but
expected version was `:stream_exists`.
"""
@callback link_to_stream(
stream_uuid :: String.t(),
expected_version,
events :: list(EventStore.RecordedEvent.t()) | list(non_neg_integer),
opts :: options
) ::
:ok
| {:error, :cannot_append_to_all_stream}
| {:error, :stream_exists}
| {:error, :stream_not_found}
| {:error, :wrong_expected_version}
| {:error, :stream_deleted}
| {:error, reason :: term}
@doc """
Reads the requested number of events from the given stream in the order in
which they were originally written.
- `stream_uuid` is used to uniquely identify a stream.
- `start_version` optionally, the stream version of the first event to read.
Defaults to the beginning of the stream if not set.
- `count` optionally, the maximum number of events to read.
Defaults to to returning 1,000 events from the stream.
- `opts` an optional keyword list containing:
- `name` the name of the event store if provided to `start_link/1`.
- `timeout` an optional timeout for the database query, in milliseconds.
Defaults to 15,000ms.
"""
@callback read_stream_forward(
stream_uuid :: String.t(),
start_version :: non_neg_integer,
count :: non_neg_integer,
opts :: options
) ::
{:ok, list(EventStore.RecordedEvent.t())}
| {:error, :stream_deleted}
| {:error, reason :: term}
@doc """
Reads the requested number of events from all streams in the order in which
they were originally written.
- `start_version` optionally, the stream version of the first event to read.
Defaults to the beginning of the stream if not set.
- `count` optionally, the maximum number of events to read.
Defaults to returning 1,000 events from all streams.
- `opts` an optional keyword list containing:
- `name` the name of the event store if provided to `start_link/1`.
- `timeout` an optional timeout for the database query, in milliseconds.
Defaults to 15,000ms.
"""
@callback read_all_streams_forward(
start_version :: non_neg_integer,
count :: non_neg_integer,
opts :: options
) :: {:ok, list(EventStore.RecordedEvent.t())} | {:error, reason :: term}
@doc """
Reads the requested number of events from the given stream in the reverse
order from which they were originally written.
- `stream_uuid` is used to uniquely identify a stream.
- `start_version` optionally, the stream version of the first event to read.
Use `-1` to indicate starting from the end of the stream. Defaults to the
end of the stream if not set.
- `count` optionally, the maximum number of events to read.
Defaults to to returning 1,000 events from the stream.
- `opts` an optional keyword list containing:
- `name` the name of the event store if provided to `start_link/1`.
- `timeout` an optional timeout for the database query, in milliseconds.
Defaults to 15,000ms.
"""
@callback read_stream_backward(
stream_uuid :: String.t(),
start_version :: non_neg_integer,
count :: non_neg_integer,
opts :: options
) ::
{:ok, list(EventStore.RecordedEvent.t())}
| {:error, :stream_deleted}
| {:error, reason :: term}
@doc """
Reads the requested number of events from all streams in the reverse order
from which they were originally written.
- `start_version` optionally, the stream version of the first event to read.
Use `-1` to indicate starting from the end of the stream. Defaults to the
end of the stream if not set.
- `count` optionally, the maximum number of events to read.
Defaults to returning 1,000 events from all streams.
- `opts` an optional keyword list containing:
- `name` the name of the event store if provided to `start_link/1`.
- `timeout` an optional timeout for the database query, in milliseconds.
Defaults to 15,000ms.
"""
@callback read_all_streams_backward(
start_version :: integer,
count :: non_neg_integer,
opts :: options
) :: {:ok, list(EventStore.RecordedEvent.t())} | {:error, reason :: term}
@doc """
Streams events from the given stream in the order in which they were
originally written.
- `start_version` optionally, the stream version of the first event to read.
Defaults to the beginning of the stream if not set.
- `opts` an optional keyword list containing:
- `name` the name of the event store if provided to `start_link/1`.
- `timeout` an optional timeout for the database query, in milliseconds.
Defaults to 15,000ms.
- `read_batch_size` optionally, the number of events to read at a time
from storage. Defaults to reading 1,000 events per batch.
"""
@callback stream_forward(
stream_uuid :: String.t(),
start_version :: integer,
opts :: [options | {:read_batch_size, non_neg_integer}]
) :: Enumerable.t() | {:error, :stream_deleted} | {:error, reason :: term}
@doc """
Streams events from all streams in the order in which they were originally
written.
- `start_version` optionally, the stream version of the first event to read.
Defaults to the beginning of the stream if not set.
- `opts` an optional keyword list containing:
- `name` the name of the event store if provided to `start_link/1`.
- `timeout` an optional timeout for the database query, in milliseconds.
Defaults to 15,000ms.
- `read_batch_size` optionally, the number of events to read at a time from
storage. Defaults to reading 1,000 events per batch.
"""
@callback stream_all_forward(
start_version :: non_neg_integer,
opts :: [options | {:read_batch_size, non_neg_integer}]
) :: Enumerable.t() | {:error, :stream_deleted} | {:error, reason :: term}
@doc """
Streams events from the given stream in the reverse order from which they
were originally written.
- `start_version` optionally, the stream version of the first event to read.
Use `-1` to indicate starting from the end of the stream. Defaults to the
end of the stream if not set.
- `opts` an optional keyword list containing:
- `name` the name of the event store if provided to `start_link/1`.
- `timeout` an optional timeout for the database query, in milliseconds.
Defaults to 15,000ms.
- `read_batch_size` optionally, the number of events to read at a time
from storage. Defaults to reading 1,000 events per batch.
"""
@callback stream_backward(
stream_uuid :: String.t(),
start_version :: integer,
opts :: [options | {:read_batch_size, non_neg_integer}]
) :: Enumerable.t() | {:error, :stream_deleted} | {:error, reason :: term}
@doc """
Streams events from all streams in the reverse order from which they were
originally written.
- `start_version` optionally, the stream version of the first event to read.
Use `-1` to indicate starting from the end of the stream. Defaults to the
end of the stream if not set.
- `opts` an optional keyword list containing:
- `name` the name of the event store if provided to `start_link/1`.
- `timeout` an optional timeout for the database query, in milliseconds.
Defaults to 15,000ms.
- `read_batch_size` optionally, the number of events to read at a time from
storage. Defaults to reading 1,000 events per batch.
"""
@callback stream_all_backward(
start_version :: non_neg_integer,
opts :: [options | {:read_batch_size, non_neg_integer}]
) :: Enumerable.t() | {:error, :stream_deleted} | {:error, reason :: term}
@doc """
Delete an existing stream.
- `stream_uuid` identity of the stream to be deleted.
- `expected_version` is used for optimistic concurrency checking.
You can provide a non-negative integer to specify the expected stream
version. This is used to ensure you can only delete a stream if it is
at exactly that version.
You can also provide one of the following values to alter the concurrency
checking behaviour:
- `:any_version` - No concurrency check, allow any stream version.
- `:stream_exists` - Ensure the stream exists, at any version.
- `type` - used to indicate how the stream is deleted:
- `:soft` - the stream is marked as deleted, but no events are removed.
- `:hard` - the stream and its events are permanently deleted from the
database.
Soft deletion is the default if the type is not provided.
Returns `:ok` on success or an error tagged tuple on failure.
### Soft delete
Will mark the stream as deleted, but will not delete its events. Events from
soft deleted streams will still appear in the globally ordered all events
(`$all`) stream and in any linked streams.
A soft deleted stream cannot be read nor appended to. Subscriptions to the
deleted stream will not receive any events but subscriptions containing linked
events from the deleted stream, such as the global all events stream, will
still receive events from the deleted stream.
### Hard delete
Will permanently delete the stream and its events. **This is irreversible and
will remove data**. Events will be removed from the globally ordered all
events stream and any linked streams.
After being hard deleted, a stream can later be appended to and read as if it
had never existed.
### Examples
#### Soft delete a stream
Delete a stream at any version:
:ok = MyApp.EventStore.delete_stream("stream1", :any_version, :soft)
Delete a stream at an expected version:
:ok = MyApp.EventStore.delete_stream("stream2", 3, :soft)
Delete stream will use soft delete by default so you can omit the type:
:ok = MyApp.EventStore.delete_stream("stream1", :any_version)
#### Hard delete a stream
Since hard deletes are destructive and irreversible they are disabled by
default. To use hard deletes you must first enable them for the event store:
defmodule MyApp.EventStore do
use EventStore, otp_app: :my_app, enable_hard_deletes: true
end
Or via config:
# config/config.exs
config :my_app, MyApp.EventStore, enable_hard_deletes: true
Hard delete a stream at any version:
:ok = MyApp.EventStore.delete_stream("stream1", :any_version, :hard)
Hard delete a stream that should exist:
:ok = MyApp.EventStore.delete_stream("stream2", :stream_exists, :hard)
"""
@callback delete_stream(
stream_uuid :: String.t(),
expected_version :: :any_version | :stream_exists | non_neg_integer(),
type :: :soft | :hard,
opts :: Keyword.t()
) ::
:ok
| {:error, :stream_not_found}
| {:error, :stream_deleted}
| {:error, term}
@doc """
Paginate all streams.
- `opts` an optional keyword list containing:
- `page_size` the total number of streams per page. Defaults to 50.
- `page_number` the current page number. Defaults to page 1.
- `search` search for a stream by its identity.
- `sort_by` sort the streams by the given field.
Defaults to sorting by the stream's internal id (`:stream_id` field)
- `sort_dir` direction to sort streams by, either `:asc` or `:desc`.
Defaults to `:asc`.
- `name` the name of the event store if provided to `start_link/1`.
Defaults to the event store module name (e.g. `MyApp.EventStore`).
- `timeout` an optional timeout for the database query, in milliseconds.
Defaults to 15,000ms.
Returns an `{:ok, page}` result containing a list of `StreamInfo` structs, or
an error tagged tuple on failure.
### Example
alias EventStore.Page
{:ok, %Page{entries: streams}} = MyApp.EventStore.paginate_streams()
"""
@callback paginate_streams(opts :: pagination_options()) ::
{:ok, Page.t(StreamInfo.t())} | {:error, any()}
@doc """
Get basic information about a stream, including its version, status, and
created date.
- `opts` an optional keyword list containing:
- `name` the name of the event store if provided to `start_link/1`.
- `timeout` an optional timeout for the database query, in milliseconds.
Defaults to 15,000ms.
Returns `{:ok, StreamInfo.t()}` on success, or an `{:error, reason}` tagged
tuple. The returned error may be due to one of the following reasons:
* `{:error, :stream_not_found}` when the stream does not exist.
* `{:error, :stream_deleted}` when the stream was soft deleted.
### Example
alias EventStore.Streams.StreamInfo
{:ok, %StreamInfo{stream_version: stream_version}} =
MyApp.EventStore.stream_info("stream-1234")
"""
@callback stream_info(stream_uuid :: String.t() | :all, opts :: options()) ::
{:ok, StreamInfo.t()}
| {:error, :stream_not_found}
| {:error, :stream_deleted}
| {:error, reason :: term}
@doc """
Create a transient subscription to a given stream.
- `stream_uuid` is the stream to subscribe to.
Use the `$all` identifier to subscribe to events from all streams.
- `opts` is an optional keyword list providing additional subscription
configuration:
- `name` the name of the event store if provided to `start_link/1`.
- `selector` to define a function to filter each event, i.e. returns
only those elements for which fun returns a truthy value
- `mapper` to define a function to map each recorded event before sending
to the subscriber.
The calling process will be notified whenever new events are appended to
the given `stream_uuid`.
As the subscription is transient you do not need to acknowledge receipt of
each event. The subscriber process will miss any events if it is restarted
and resubscribes. If you need a persistent subscription with guaranteed
at-least-once event delivery and back-pressure you should use
`c:EventStore.subscribe_to_stream/4`.
## Notification message
Events will be sent to the subscriber, in batches, as `{:events, events}`
where events is a collection of `EventStore.RecordedEvent` structs.
## Example
{:ok, subscription} = EventStore.subscribe(stream_uuid)
# receive first batch of events
receive do
{:events, events} ->
IO.puts "Received events: " <> inspect(events)
end
"""
@callback subscribe(stream_uuid :: String.t(), opts :: transient_subscribe_options) ::
:ok | {:error, term}
@doc """
Create a subscription to a single stream. By default the subscription is persistent.
The `subscriber` process will be notified of each batch of events appended to
the single stream identified by `stream_uuid`.
- `stream_uuid` is the stream to subscribe to.
Use the `$all` identifier to subscribe to events from all streams.
- `subscription_name` is used to uniquely identify the subscription.
- `subscriber` is a process that will be sent `{:events, events}`
notification messages.
- `opts` is an optional keyword list providing additional subscription
configuration:
- `name` the name of the event store if provided to `start_link/1`.
- `start_from` is a pointer to the first event to receive.
It must be one of:
- `:origin` for all events from the start of the stream (default).
- `:current` for any new events appended to the stream after the
subscription has been created.
- any positive integer for a stream version to receive events after.
- `selector` to define a function to filter each event, i.e. returns
only those elements for which the function returns a truthy value.
- `mapper` to define a function to map each recorded event before sending
to the subscriber.
- `concurrency_limit` defines the maximum number of concurrent subscribers
allowed to connect to the subscription. By default only one subscriber
may connect. If too many subscribers attempt to connect to the
subscription an `{:error, :too_many_subscribers}` is returned.
- `buffer_size` limits how many in-flight events will be sent to the
subscriber process before acknowledgement of successful processing. This
limits the number of messages sent to the subscriber and stops their
message queue from getting filled with events. Defaults to one in-flight
event.
- `checkpoint_threshold` determines how frequently a checkpoint is written
to the database for the subscription after events are acknowledged.
Increasing the threshold will reduce the number of database writes for
busy subscriptions, but means that events might be replayed when the
subscription resumes if the checkpoint cannot be written.
The default is to persist the checkpoint after each acknowledgement.
- `checkpoint_after` (milliseconds) used to ensure a checkpoint is written
after a period of inactivity even if the checkpoint threshold has not
been met. This ensures checkpoints are consistently written during
less busy periods. It is only applicable when a checkpoint threshold has
been set as the default subscription behaviour is to checkpoint after
each acknowledgement.
- `max_size` limits the number of events queued in memory by the
subscription process to prevent excessive memory usage. If the in-memory
queue exceeds the max size - because the subscriber cannot keep up -
then events will not be queued in memory, but instead will be read from
the database on demand once the subscriber process has processed the
queue. This limit also determines how many events are read from the
database at a time during catch-up. Defaults to 1,000 events.
- `partition_by` is an optional function used to partition events to
subscribers. It can be used to guarantee processing order when multiple
subscribers have subscribed to a single subscription. The function is
passed a single argument (an `EventStore.RecordedEvent` struct) and must
return the partition key. As an example to guarantee events for a single
stream are processed serially, but different streams are processed
concurrently, you could use the `stream_uuid` as the partition key.
```
by_stream = fn %EventStore.RecordedEvent{stream_uuid: stream_uuid} -> stream_uuid end
{:ok, _subscription} =
EventStore.subscribe_to_stream(stream_uuid, "example", self(),
concurrency_limit: 10,
partition_by: by_stream
)
```
- `timeout` an optional timeout for database queries, in milliseconds.
Defaults to 15,000ms.
- `transient` is an optional boolean flag to create a transient subscription.
By default this is set to `false`. If you want to create a transient
subscription set this flag to true. Your subscription will not be
persisted, so if the subscription is restarted, you will receive the events
again starting from `start_from`.
An example usage are short lived event handlers that keep their state in
memory but still want to have the guarantee to have received all events.
It's possible to create a persistent subscription with some name,
stop it and later create a transient subscription with the same name. The
transient subscription will now receive all events starting from `start_from`.
If you later stop this `transient` subscription and start a persistent
subscription again with the same name, you will receive the events again
as if the transient subscription never existed.
The subscription will resume from the last acknowledged event if it already
exists. It will ignore the `start_from` argument in this case.
Returns `{:ok, subscription}` when subscription succeeds.
## Notification messages
Subscribers will initially receive a `{:subscribed, subscription}` message
once the subscription has successfully subscribed.
After this message events will be sent to the subscriber, in batches, as
`{:events, events}` where events is a collection of `EventStore.RecordedEvent`
structs.
## Example
{:ok, subscription} = EventStore.subscribe_to_stream(stream_uuid, "example", self())
# wait for the subscription confirmation
receive do
{:subscribed, ^subscription} ->
IO.puts "Successfully subscribed to stream: " <> inspect(stream_uuid)
end
receive do
{:events, events} ->
IO.puts "Received events: " <> inspect(events)
# acknowledge receipt
EventStore.ack(subscription, events)
end
## Subscription tuning
Use the `checkpoint_threshold` and `checkpoint_after` options to configure how
frequently checkpoints are written to the database. By default a subscription
will persist a checkpoint after each acknowledgement. This can cause high
write load on the database for busy subscriptions which receive a large number
of events. This problem is known as write amplification where each event
written to a stream causes many additional writes as subscriptions acknowledge
processing of the event.
The `checkpoint_threshold` controls how frequently checkpoints are persisted.
Increasing the threshold reduces the number of database writes. For example
using a threshold of 100 means that a checkpoint is written at most once for
every 100 events processed. The `checkpoint_after` ensures that a checkpoint
will still be written after a period of inactivity even when the threshold has
not been met. This ensures bursts of event processing can be safely handled.
"""
@callback subscribe_to_stream(
stream_uuid :: String.t(),
subscription_name :: String.t(),
subscriber :: pid,
opts :: persistent_subscription_options
) ::
{:ok, subscription :: pid}
| {:error, :already_subscribed}
| {:error, :subscription_already_exists}
| {:error, :too_many_subscribers}
| {:error, reason :: term}
@doc """
Create a subscription to all streams. By default the subscription is persistent.
See `c:EventStore.subscribe_to_stream/4` for options.
## Example
{:ok, subscription} = EventStore.subscribe_to_all_streams("all_subscription", self())
# wait for the subscription confirmation
receive do
{:subscribed, ^subscription} ->
IO.puts "Successfully subscribed to all streams"
end
receive do
{:events, events} ->
IO.puts "Received events: " <> inspect(events)
# acknowledge receipt
EventStore.ack(subscription, events)
end
"""
@callback subscribe_to_all_streams(
subscription_name :: String.t(),
subscriber :: pid,
opts :: persistent_subscription_options
) ::
{:ok, subscription :: pid}
| {:error, :already_subscribed}
| {:error, :subscription_already_exists}
| {:error, :too_many_subscribers}
| {:error, reason :: term}
@doc """
Acknowledge receipt of the given events received from a subscription.
Accepts a single `EventStore.RecordedEvent` struct, a list of
`EventStore.RecordedEvent`s, or the event number of the recorded event to
acknowledge.
"""
@callback ack(
subscription :: pid,
EventStore.RecordedEvent.t()
| list(EventStore.RecordedEvent.t())
| non_neg_integer()
) :: :ok | {:error, reason :: term}
@doc """
Unsubscribe an existing subscriber from event notifications.
- `stream_uuid` is the stream to unsubscribe from.
- `subscription_name` is used to identify the existing subscription process
to stop.
Returns `:ok` on success.
"""
@callback unsubscribe_from_stream(
stream_uuid :: String.t(),
subscription_name :: String.t(),
opts :: options
) ::
:ok
@doc """
Unsubscribe an existing subscriber from all event notifications.
- `subscription_name` is used to identify the existing subscription process
to stop.
Returns `:ok` on success.
"""
@callback unsubscribe_from_all_streams(subscription_name :: String.t(), opts :: options) :: :ok
@doc """
Delete an existing persistent subscription.
- `stream_uuid` is the stream the subscription is subscribed to.
- `subscription_name` is used to identify the existing subscription to
remove.
Returns `:ok` on success.
"""
@callback delete_subscription(
stream_uuid :: String.t(),
subscription_name :: String.t(),
opts :: options
) ::
:ok | {:error, term}
@doc """
Delete an existing persistent subscription to all streams.
- `subscription_name` is used to identify the existing subscription to
remove.
Returns `:ok` on success.
"""
@callback delete_all_streams_subscription(subscription_name :: String.t(), opts :: options) ::
:ok | {:error, term}
@doc """
Read a snapshot, if available, for a given source.
Returns `{:ok, %EventStore.Snapshots.SnapshotData{}}` on success, or
`{:error, :snapshot_not_found}` when unavailable.
"""
@callback read_snapshot(source_uuid :: String.t(), opts :: options) ::
{:ok, SnapshotData.t()} | {:error, :snapshot_not_found}
@doc """
Record a snapshot of the data and metadata for a given source.
Returns `:ok` on success.
"""
@callback record_snapshot(snapshot :: SnapshotData.t(), opts :: options) ::
:ok | {:error, reason :: term}
@doc """
Delete a previously recorded snapshop for a given source.
Returns `:ok` on success, or when the snapshot does not exist.
"""
@callback delete_snapshot(source_uuid :: String.t(), opts :: options) ::
:ok | {:error, reason :: term}
@doc """
Returns all running EventStore instances.
Note that order is not guaranteed.
"""
@spec all_instances :: list({event_store :: module(), [{:name, atom()}]})
def all_instances do
for {event_store, name} <- Config.all(), Process.whereis(name) do
{event_store, [name: name]}
end
end
end