defmodule Commanded.Projections.Ecto do
@moduledoc """
Read model projections for Commanded using Ecto.
## Example usage
defmodule Projector do
use Commanded.Projections.Ecto,
application: MyApp.Application,
name: "my-projection",
repo: MyApp.Repo,
schema_prefix: "my-prefix",
timeout: :infinity
project %Event{}, _metadata, fn multi ->
Ecto.Multi.insert(multi, :my_projection, %MyProjection{...})
end
project %AnotherEvent{}, fn multi ->
Ecto.Multi.insert(multi, :my_projection, %MyProjection{...})
end
end
## Guides
- [Getting started](getting-started.html)
- [Usage](usage.html)
"""
defmacro __using__(opts) do
opts = opts || []
schema_prefix =
opts[:schema_prefix] || Application.get_env(:commanded_ecto_projections, :schema_prefix)
quote location: :keep do
@behaviour Commanded.Projections.Ecto
@opts unquote(opts)
@repo @opts[:repo] || Application.compile_env(:commanded_ecto_projections, :repo) ||
raise("Commanded Ecto projections expects :repo to be configured in environment")
@timeout @opts[:timeout] || :infinity
# Pass through any other configuration to the event handler
@handler_opts Keyword.drop(@opts, [:repo, :schema_prefix, :timeout])
unquote(__include_schema_prefix__(schema_prefix))
unquote(__include_projection_version_schema__())
use Ecto.Schema
use Commanded.Event.Handler, @handler_opts
import Ecto.Changeset
import Ecto.Query
import unquote(__MODULE__)
def update_projection(event, metadata, multi_fn) do
projection_name = Map.fetch!(metadata, :handler_name)
event_number = Map.fetch!(metadata, :event_number)
changeset =
%ProjectionVersion{projection_name: projection_name}
|> ProjectionVersion.changeset(%{last_seen_event_number: event_number})
prefix = schema_prefix(event, metadata)
multi =
Ecto.Multi.new()
|> Ecto.Multi.run(:verify_projection_version, fn repo, _changes ->
version =
case repo.get(ProjectionVersion, projection_name, prefix: prefix) do
nil ->
repo.insert!(
%ProjectionVersion{
projection_name: projection_name,
last_seen_event_number: 0
},
prefix: prefix
)
version ->
version
end
if version.last_seen_event_number < event_number do
{:ok, %{version: version}}
else
{:error, :already_seen_event}
end
end)
|> Ecto.Multi.update(:projection_version, changeset, prefix: prefix)
with %Ecto.Multi{} = multi <- apply(multi_fn, [multi]),
{:ok, changes} <- transaction(multi) do
if function_exported?(__MODULE__, :after_update, 3) do
apply(__MODULE__, :after_update, [event, metadata, changes])
else
:ok
end
else
{:error, :verify_projection_version, :already_seen_event, _changes} -> :ok
{:error, _stage, error, _changes} -> {:error, error}
{:error, _error} = reply -> reply
end
end
defp transaction(%Ecto.Multi{} = multi) do
@repo.transaction(multi, timeout: @timeout, pool_timeout: @timeout)
end
defoverridable schema_prefix: 1, schema_prefix: 2
end
end
## User callbacks
@optional_callbacks [after_update: 3, schema_prefix: 1, schema_prefix: 2]
@doc """
The optional `after_update/3` callback function defined in a projector is
called after each projected event.
The function receives the event, its metadata, and all changes from the
`Ecto.Multi` struct that were executed within the database transaction.
You could use this function to notify subscribers that the read model has been
updated, such as by publishing changes via Phoenix PubSub channels.
## Example
defmodule MyApp.ExampleProjector do
use Commanded.Projections.Ecto,
application: MyApp.Application,
repo: MyApp.Projections.Repo,
name: "MyApp.ExampleProjector"
project %AnEvent{name: name}, fn multi ->
Ecto.Multi.insert(multi, :example_projection, %ExampleProjection{name: name})
end
@impl Commanded.Projections.Ecto
def after_update(event, metadata, changes) do
# Use the event, metadata, or `Ecto.Multi` changes and return `:ok`
:ok
end
end
"""
@callback after_update(event :: struct, metadata :: map, changes :: Ecto.Multi.changes()) ::
:ok | {:error, any}
@doc """
The optional `schema_prefix/1` callback function defined in a projector is
used to set the schema of the `projection_versions` table used by the
projector for idempotency checks.
It is passed the event and its metadata and must return the schema name, as a
string, or `nil`.
"""
@callback schema_prefix(event :: struct) :: String.t() | nil
@doc """
The optional `schema_prefix/2` callback function defined in a projector is
used to set the schema of the `projection_versions` table used by the
projector for idempotency checks.
It is passed the event and its metadata, and must return the schema name, as a
string, or `nil`
"""
@callback schema_prefix(event :: struct(), metadata :: map()) :: String.t() | nil
defp __include_schema_prefix__(schema_prefix) do
quote do
cond do
is_nil(unquote(schema_prefix)) ->
def schema_prefix(_event), do: nil
def schema_prefix(event, _metadata), do: schema_prefix(event)
is_binary(unquote(schema_prefix)) ->
def schema_prefix(_event), do: nil
def schema_prefix(_event, _metadata), do: unquote(schema_prefix)
is_function(unquote(schema_prefix), 1) ->
def schema_prefix(event), do: nil
def schema_prefix(event, _metadata), do: apply(unquote(schema_prefix), [event])
is_function(unquote(schema_prefix), 2) ->
def schema_prefix(event), do: nil
def schema_prefix(event, metadata), do: apply(unquote(schema_prefix), [event, metadata])
true ->
raise ArgumentError,
message:
"expected :schema_prefix option to be a string or a one-arity or two-arity function, but got: " <>
inspect(unquote(schema_prefix))
end
end
end
defp __include_projection_version_schema__ do
quote do
defmodule ProjectionVersion do
@moduledoc false
use Ecto.Schema
import Ecto.Changeset
@primary_key {:projection_name, :string, []}
schema "projection_versions" do
field(:last_seen_event_number, :integer)
timestamps(type: :naive_datetime_usec)
end
@required_fields ~w(last_seen_event_number)a
def changeset(model, params \\ :invalid) do
cast(model, params, @required_fields)
end
end
end
end
defmacro project(event, do: block) do
IO.warn(
"project macro with \"do end\" block is deprecated; use project/2 with function instead",
Macro.Env.stacktrace(__ENV__)
)
quote do
def handle(unquote(event) = event, metadata) do
update_projection(event, metadata, fn var!(multi) ->
unquote(block)
end)
end
end
end
@doc """
Project a domain event into a read model by appending one or more operations
to the `Ecto.Multi` struct passed to the projection function you define
The operations will be executed in a database transaction including an
idempotency check to guarantee an event cannot be projected more than once.
## Example
project %AnEvent{}, fn multi ->
Ecto.Multi.insert(multi, :my_projection, %MyProjection{...})
end
"""
defmacro project(event, lambda) do
quote do
def handle(unquote(event) = event, metadata) do
update_projection(event, metadata, unquote(lambda))
end
end
end
defmacro project(event, metadata, do: block) do
IO.warn(
"project macro with \"do end\" block is deprecated; use project/3 with function instead",
Macro.Env.stacktrace(__ENV__)
)
quote do
def handle(unquote(event) = event, unquote(metadata) = metadata) do
update_projection(event, metadata, fn var!(multi) ->
unquote(block)
end)
end
end
end
@doc """
Project a domain event and its metadata map into a read model by appending one
or more operations to the `Ecto.Multi` struct passed to the projection
function you define.
The operations will be executed in a database transaction including an
idempotency check to guarantee an event cannot be projected more than once.
## Example
project %AnEvent{}, metadata, fn multi ->
Ecto.Multi.insert(multi, :my_projection, %MyProjection{...})
end
"""
defmacro project(event, metadata, lambda) do
quote do
def handle(unquote(event) = event, unquote(metadata) = metadata) do
update_projection(event, metadata, unquote(lambda))
end
end
end
end