defmodule Counterpoint.OnDemandProjection do
@moduledoc """
Read-side projection that folds events into state on every call.
Unlike a persistent projection, no state is stored between calls — events are
re-read and re-folded each time `run/3` is invoked. This is simple and
always consistent, at the cost of re-reading events on every request.
The `query/1` callback may use `Query.limit/2` and `Query.reverse/1`, making
it suitable for "last N events" patterns (e.g. checking whether a book is
currently borrowed by fetching only the most recent event).
`use Counterpoint.OnDemandProjection` injects `@behaviour Counterpoint.OnDemandProjection`.
## Example
defmodule MyApp.Views.OrderSummary do
use Counterpoint.OnDemandProjection
alias Counterpoint.Query
alias MyApp.Events.{OrderPlaced, OrderCancelled}
defstruct [:order_id, :status]
@impl Counterpoint.OnDemandProjection
def query(order_id) do
Query.new()
|> Query.add_item(types: [OrderPlaced, OrderCancelled], tags: ["order_id:\#{order_id}"])
end
@impl Counterpoint.OnDemandProjection
def init, do: %__MODULE__{}
@impl Counterpoint.OnDemandProjection
def apply(state, %Counterpoint.Envelope{data: %OrderPlaced{order_id: id}}),
do: %{state | order_id: id, status: :placed}
def apply(state, %Counterpoint.Envelope{data: %OrderCancelled{}}),
do: %{state | status: :cancelled}
end
Counterpoint.OnDemandProjection.run(MyApp.Views.OrderSummary, :my_store, "order-1")
"""
@doc "Build the query used to fetch events for this projection."
@callback query(args :: term()) :: Counterpoint.Query.t()
@doc "Return the initial (empty) state before any events are applied."
@callback init() :: term()
@doc "Fold a single envelope into the current state."
@callback apply(state :: term(), envelope :: Counterpoint.Envelope.t()) :: term()
@doc """
Execute the projection for `args` against the given store.
Fetches events using `module.query(args)`, then folds them with `module.apply/2`
starting from `module.init()`.
"""
def run(module, store_name, args \\ nil) do
query = module.query(args)
ref = Counterpoint.Store.lookup!(store_name)
{:ok, raw_events} =
Dcb.Store.read(ref, Counterpoint.Query.to_dcb(query),
after: query.after_position,
limit: query.limit,
reverse: query.reverse
)
raw_events
|> Counterpoint.Envelope.deserialize_many()
|> Enum.reduce(module.init(), fn envelope, acc ->
module.apply(acc, envelope)
end)
end
defmacro __using__(_opts) do
quote do
@behaviour Counterpoint.OnDemandProjection
end
end
end