Skip to main content

lib/counterpoint/on_demand_projection.ex

defmodule Counterpoint.OnDemandProjection do
  @moduledoc """
  Read-side projection that folds events into state on every call.

  Unlike a persistent projection, no state is stored between calls — events are
  re-read and re-folded each time `run/3` is invoked.  This is simple and
  always consistent, at the cost of re-reading events on every request.

  The `query/1` callback may use `Query.limit/2` and `Query.reverse/1`, making
  it suitable for "last N events" patterns (e.g. checking whether a book is
  currently borrowed by fetching only the most recent event).

  `use Counterpoint.OnDemandProjection` injects `@behaviour Counterpoint.OnDemandProjection`.

  ## Example

      defmodule MyApp.Views.OrderSummary do
        use Counterpoint.OnDemandProjection
        alias Counterpoint.Query
        alias MyApp.Events.{OrderPlaced, OrderCancelled}

        defstruct [:order_id, :status]

        @impl Counterpoint.OnDemandProjection
        def query(order_id) do
          Query.new()
          |> Query.add_item(types: [OrderPlaced, OrderCancelled], tags: ["order_id:\#{order_id}"])
        end

        @impl Counterpoint.OnDemandProjection
        def init, do: %__MODULE__{}

        @impl Counterpoint.OnDemandProjection
        def apply(state, %Counterpoint.Envelope{data: %OrderPlaced{order_id: id}}),
          do: %{state | order_id: id, status: :placed}

        def apply(state, %Counterpoint.Envelope{data: %OrderCancelled{}}),
          do: %{state | status: :cancelled}
      end

      Counterpoint.OnDemandProjection.run(MyApp.Views.OrderSummary, :my_store, "order-1")
  """

  @doc "Build the query used to fetch events for this projection."
  @callback query(args :: term()) :: Counterpoint.Query.t()

  @doc "Return the initial (empty) state before any events are applied."
  @callback init() :: term()

  @doc "Fold a single envelope into the current state."
  @callback apply(state :: term(), envelope :: Counterpoint.Envelope.t()) :: term()

  @doc """
  Execute the projection for `args` against the given store.

  Fetches events using `module.query(args)`, then folds them with `module.apply/2`
  starting from `module.init()`.
  """
  def run(module, store_name, args \\ nil) do
    query = module.query(args)
    ref = Counterpoint.Store.lookup!(store_name)

    {:ok, raw_events} =
      Dcb.Store.read(ref, Counterpoint.Query.to_dcb(query),
        after: query.after_position,
        limit: query.limit,
        reverse: query.reverse
      )

    raw_events
    |> Counterpoint.Envelope.deserialize_many()
    |> Enum.reduce(module.init(), fn envelope, acc ->
      module.apply(acc, envelope)
    end)
  end

  defmacro __using__(_opts) do
    quote do
      @behaviour Counterpoint.OnDemandProjection
    end
  end
end