README.md

# derive

A small, simple, and yet flexible API for deriving state from an event source, that works out of the box with [Ecto](https://hexdocs.pm/ecto).

It comes ships with all the essential side-effects:
- insert
- update
- merge
- upsert
- delete

If you need more exotic / complex side-effects, you can easily create your own by implementing the `Derive.SideEffect` protocol for your custom side-effect structs.

## show me the code

Here's the most basic example I can thnk of:

```elixir
# priv/repo/migrations/20260127044710_create_derive_tables.exs
defmodule Dummy.Repo.Migrations.CreateDerivedTables do
  use Derive.Migration

  def up do
    Derive.Migration.up(1)
  end

  def down do
    Derive.Migration.down(1)
  end
end

# lib/dummy.ex
defmodule Dummy.Application do
  use Application

  def start(_, _) do
    children = [
      Dummy.Repo,
      Dummy.Consumer # {Dummy.Consumer, filters: [customer_id: 167810624]}
    ]

    Supervisor.start_link(children, strategy: :one_for_one, name: Dummy.Supervisor)
  end
end

# lib/dummy/consumer.ex
defmodule Dummy.Consumer do
  use Derive, otp_app: :dummy

  @impl Derive
  def fetch_events(repo, filters) do
    filters
    |> Dummy.Inbox.query()
    |> repo.all()
    |> Enum.map(&Dummy.Inbox.to_event/1)
  end

  @impl Derive
  def handle_event(%LoginFailed{timestamp: ts} = event) do
    [
      insert(%Dummy.SecurityIncident{
        user_id: event.user_id,
        type: :login_failed,
        occurrencies: 1,
        last_occurred_at: ts
      })
      |> on_conflict(inc: [ocurrencies: 1], set: [last_occurred_at: ts])
    ]
  end

  def handle_event(_), do: :skip
end
```

And here's an example with "director's commentary on":

```elixir
# priv/repo/migrations/20260127044710_create_derive_tables.exs
defmodule Dummy.Repo.Migrations.CreateDerivedTables do
  use Derive.Migration
  # ↑ - be sure to use derive's migration module instead of
  #     Ecto.Migration
  #   - turns out it takes a lot of code to provide an easy-to-use
  #     migration API - I was lazy, so I just put together a couple
  #     macros
  #   - the options were that you either would have to:
  #     a) add a `require Derive.Migration` to the migration file; or
  #     b) `use Derive.Migration`
  #     the latter felt less awkward, just personal preference
  #   - if you don't want to / can't do `use Derive.Migration` that's
  #     ok, just do `require Derive.Migration` instead so that you can
  #     use the `change/1` macro

  def up do
    Derive.Migration.up(1)
  end

  def down do
    Derive.Migration.down(1)
  end
end

# lib/dummy.ex
defmodule Dummy.Application do
  use Application

  def start(_, _) do
    children = [
      Dummy.Repo,
      # ↓ simplest setup
      Dummy.Consumer,
      #   - alternatively you can provide filters to produce different
      #     event streams for the same consumer module
      #   - this is particularly useful when you want to parallelize
      #     processing of events by a sharding key, like :customer_id
      #   - you could have a Supervisor dedicated to managing a bunch
      #     of consumers, or even a DynamicSupervisor if you need to
      # ↓   spawn consumers on demand
      {Dummy.Consumer, filters: [customer_id: 167810624]}
    ]

    Supervisor.start_link(children, strategy: :one_for_one, name: Dummy.Supervisor)
  end
end

# lib/dummy/consumer.ex
defmodule Dummy.Consumer do
  use Derive,
    # ↓ derive figures out the ecto repo from your configs
    otp_app: :dummy

  @impl Derive
  #   - fetches the next batch of events
  #   - store events however you want, wherever you want - it doesn't
  # ↓   have to be in an ecto repo, could be an API call for example
  def fetch_events(repo, filters) do
    filters
    # ↑ - filters is a keyword list containing the filters you might
    #     have provided in the Supervisor children

    #   - derive only makes two assumptions about your application,
    #     one of them is that your event's :id is an integer sequence
    #     in chronological order - that's a big assumption, I know -,
    #     that's because derive uses :id as a cursor - I plan on
    # ↓   supporting sort-safe strings at some point
    |> Dummy.Inbox.query()
    # ↑ - besides the filters, there're two keys your `query/1`
    #     function must handle in this example - that's the second
    #     assumption:
    #     1. :after - the last processed event's :id; and
    #     2. :take - the maximum number of events to fetch
    #     you can override the default :batch_size either here or by
    #     providing :batch_size in the Supervisor children along with
    #     any filters you might have
    #   - that's the only time your code is exposed directly to the
    #     consumer's cursor, that' was unavoidable - for the resto of
    #     the time derive manages it the hood for you under
    |> repo.all()
    #   - best place to transform / normalize / filter out your events
    # ↓   before they hit `handle_event/1`
    |> Enum.map(&Dummy.Inbox.to_event/1)
  end

  @impl Derive
  def handle_event(%LoginFailed{timestamp: ts} = event) do
    [
      #   derive ships with essential side-effects, such as:
      #   - insert (can tweak options :conflict_target and :on_conflict
      #     to get the upsert behaviour);
      #   - update;
      #   - merge;
      #   - upsert (alias to insert); and
      #   - delete
      #   you can easily create your own side-effects by implementing
      #   the `Derive.SideEffect` protocol for your custom side-effect
      # ↓ structs
      insert(%Dummy.SecurityIncident{
        user_id: event.user_id,
        type: :login_failed,
        occurrencies: 1,
        last_occurred_at: ts
      })
      |> on_conflict(inc: [ocurrencies: 1], set: [last_occurred_at: ts])
    ]
  end

  def handle_event(_), do: []
  # ↑ - return an empty list to ignore the event

  # ↓ - returning :skip works too, whatever feels more natural to you
  def handle_event(_), do: :skip

  @impl Derive
  #   this is an optional callback, when implemented it allows you to
  # ↓ override the persistence logic
  def persist(repo, side_effects, multi) do
    multi = into_multi(side_effects, multi)
    # ↑ - by default, derive accumulates the side-effects of a batch
    #     of events into a multi and then persists them in a single
    #     transaction, but you can optionally implement your own
    #     persistence logic in this function
    #   - you can choose how much you want to override, if you just
    #     want to override like the ecto repo to persist to, you don't
    #     have to re-implement the whole thing - the logic for
    #     accumulating side-effects into a multi is available via the
    #     `into_multi/2` function
    #   - beware though that the multi received by this function isn't
    #     empty, it already includes cursor operations, meaning you
    #     need to transact it regardless of your strategy for state
    #     persistence

    case custom_logic(repo, multi) do
      {:ok, _} -> :ok
      # ↑ - by returning `:ok` derive assumes it can move on to the
      #     next batch of events
      {:error, _, reason, _} -> {:error, reason}
      # ↑ - errors will bubble up to some function that will log and
      #     handle it appropriately
      #   - you can find a copy of the error in the consumer's cursor
      #     record in the database, along with the :stuck_since
      #     timestamp
    end
  end
end
```

As promised, the API's surface is small and designed to both:
* **just work** for the intended use case where we source events and persist state changes to an ecto repo;
* be **easily extensible / overridable** for more exotic / complex use-cases.

## scope

* **In Scope** - all the code needed to reduce events into persisted state changes, including:
  * consuming an event source / forming an event streams, easy to override it with your own sourcing logic to supports sources other than an Ecto table
  * essetial side-effects, easy to create your own
  * persisting state changes to an Ecto repo, easy to override it with your own persistence logic
  * parallelizing event processing
  * tracking processing progress via cursors
  * persisting state changes
* **Out of Scope**, as in I have no plans at all - everything related to producing, persisting, broadcasting or synchronizing events.
* **Unclear Scope** - honestly, I'll only pay attention to these if I personally need them:
  * event versioning
  * event schemas / validation
  * event lifecycle hooks (e.g.: processing, succeeded, failed, etc)