README.md

# ExEventBus

ExEventBus provides an event bus that uses the outbox pattern.  Behind the scenes, 
it relies on Oban and ConCache.

## Installation

If [available in Hex](https://hex.pm/docs/publish), the package can be installed
by adding `ex_event_bus` to your list of dependencies in `mix.exs`:

```elixir
def deps do
  [
    {:ex_event_bus, "~> 0.2.0"}
  ]
end
```

## Run tests

```bash
# run only once to setup the test DB
MIX_ENV=test mix test.setup

# actually run the tests
mix test
```

## Setup

1. Create a module that defines your event bus

  ```elixir
  defmodule MyApp.EventBus do
    use ExEventBus, otp_app: :my_app
  end
  ```

2. Add the required config for your EventBus, that is the Oban config

  ```elixir 
  config :my_app, MyApp.EventBus,
    oban: [
      engine: Oban.Engines.Basic,
      notifier: Oban.Notifiers.Postgres,
      repo: MyApp.Repo,
      plugins: [
        {Oban.Plugins.Lifeline, rescue_after: :timer.minutes(60)},
        {Oban.Plugins.Pruner, max_age: 60 * 60 * 24 * 7}
      ],
      queues: [
        event_bus: 2
      ]
    ]
  ```

3. Create your first events 

  ```elixir 
  defmodule MyApp.Events do
    use ExEventBus.Event

    defevent(MyEvent)
  end
  ```

4. Create your first event handler

  ```elixir
  defmodule MyApp.EventHandler do
    use ExEventBus.EventHandler,
      event_bus: MyApp.EventBus,
      events: [MyApp.Events.MyEvent]

    @impl ExEventBus.EventHandler
    def handle_event(%MyApp.Events.MyEvent{aggregate: %{"id" => aggregate_id}}) do
      # ... handle the event here
    end
  end
  ```

5. Add your event bus to your supervision tree

  ```elixir
  # add the event bus to your application children 

  def start(_type, _args) do 
    # ... 

    children = [
      # ...
      MyApp.EventBus,
      # ...
    ]

    # ...
  ```

6. Add your event handlers to your supervision tree

  ```elixir
  def start(_type, _args) do
    # ...

    children = [
      # ...
      {MyApp.EventHandler, [event_bus: MyApp.EventBus]},
      # ...
    ]

    # ...
  end
  ```

## Event Structure

Events published by ExEventBus contain detailed information about the operation that triggered them. Each event includes:

- **`aggregate`**: The complete struct of the affected entity
- **`changes`**: Map of fields that changed (with primary keys for associations)
- **`initial_data`**: Map of previous values for changed fields
- **`metadata`**: Optional metadata passed to the operation

### Changes and Initial Data

The `changes` and `initial_data` maps provide a complete picture of what changed:

```elixir
# For a simple update
%MyEvent{
  aggregate: %User{id: 1, name: "Jane Doe", email: "jane@example.com"},
  changes: %{"email" => "jane.doe@example.com"},
  initial_data: %{"email" => "jane@example.com"},
  metadata: nil
}
```

#### INSERT Operations

For insertions, the primary key is included in `changes` (it changed from nil to the actual value), and `initial_data` contains `nil` values for the fields being set:

```elixir
%UserCreated{
  aggregate: %User{id: 1, name: "John", email: "john@example.com"},
  changes: %{
    "id" => 1,  # ← Primary key included for inserts
    "name" => "John",
    "email" => "john@example.com"
  },
  initial_data: %{
    "name" => nil,
    "email" => nil
  }
}
```

#### UPDATE Operations

For updates, `initial_data` contains only the old values of fields that actually changed:

```elixir
%UserUpdated{
  aggregate: %User{id: 1, name: "John", email: "new@example.com", age: 30},
  changes: %{
    "email" => "new@example.com",
    "age" => 30
  },
  initial_data: %{
    "email" => "old@example.com",
    "age" => 25
  }
}
```

#### DELETE Operations

For deletions, both `changes` and `initial_data` are empty:

```elixir
%UserDeleted{
  aggregate: %User{id: 1, name: "John", email: "john@example.com"},
  changes: %{},
  initial_data: %{}
}
```

## Association Change Tracking

ExEventBus provides detailed tracking of association changes when using `Ecto.Changeset.cast_assoc/3`. This allows you to distinguish between creates, updates, and deletes within nested associations.

### Primary Keys in Associations

**All nested associations include their primary key** in both `changes` and `initial_data` to distinguish between creates, updates, and deletes:

#### Create (New Item)
```elixir
changes: %{
  "profile" => %{
    "id" => nil,  # ← nil indicates CREATE
    "bio" => "Engineer"
  }
}

initial_data: %{
  "profile" => nil  # ← nil because association didn't exist
}
```

#### Update (Existing Item)
```elixir
changes: %{
  "profile" => %{
    "id" => 5,  # ← value indicates UPDATE
    "bio" => "Senior Engineer"
  }
}

initial_data: %{
  "profile" => %{
    "id" => 5,  # ← same ID
    "bio" => "Engineer"  # ← old value
  }
}
```

#### Delete (Removed Item)
```elixir
# For has_many: item present in initial_data but absent from changes
changes: %{
  "posts" => [
    %{"id" => 1, "title" => "Kept Post"}
  ]
}

initial_data: %{
  "posts" => [
    %{"id" => 1, "title" => "Kept Post"},
    %{"id" => 2, "title" => "Deleted Post"}  # ← Post 2 deleted
  ]
}
```

**Summary:**
- **New items**: `"id" => nil` in `changes`, association is `nil` or `[]` in `initial_data`
- **Updated items**: `"id" => <value>` in both `changes` and `initial_data`
- **Deleted items**: Present in `initial_data` but absent from `changes`

### Has One Associations

#### Creating a Profile

```elixir
# Ecto operation
user_changeset = User.changeset(%User{}, %{
  name: "Alice",
  email: "alice@example.com",
  profile: %{bio: "Software Engineer", avatar_url: "https://example.com/avatar.jpg"}
})

Repo.insert(user_changeset, success_event: UserCreated)

# Published event
%UserCreated{
  changes: %{
    "name" => "Alice",
    "email" => "alice@example.com",
    "profile" => %{
      "id" => nil,  # ← nil indicates this is a CREATE
      "bio" => "Software Engineer",
      "avatar_url" => "https://example.com/avatar.jpg"
    }
  },
  initial_data: %{
    "name" => nil,
    "email" => nil,
    "profile" => nil  # ← nil because association didn't exist
  }
}
```

#### Updating a Profile

```elixir
# Ecto operation
user = Repo.get(User, 1) |> Repo.preload(:profile)

user_changeset = User.changeset(user, %{
  profile: %{id: user.profile.id, bio: "Senior Software Engineer"}
})

Repo.update(user_changeset, success_event: UserUpdated)

# Published event
%UserUpdated{
  changes: %{
    "profile" => %{
      "id" => 5,  # ← ID present indicates this is an UPDATE
      "bio" => "Senior Software Engineer"
    }
  },
  initial_data: %{
    "profile" => %{
      "id" => 5,  # ← Same ID
      "bio" => "Software Engineer"  # ← Only changed field (not avatar_url)
    }
  }
}
```

### Has Many Associations

#### Creating Posts

```elixir
# Ecto operation
user_changeset = User.changeset(%User{}, %{
  name: "Bob",
  email: "bob@example.com",
  posts: [
    %{title: "First Post", body: "Hello World"},
    %{title: "Second Post", body: "Elixir is great"}
  ]
})

Repo.insert(user_changeset, success_event: UserCreated)

# Published event
%UserCreated{
  changes: %{
    "name" => "Bob",
    "email" => "bob@example.com",
    "posts" => [
      %{"id" => nil, "title" => "First Post", "body" => "Hello World"},
      %{"id" => nil, "title" => "Second Post", "body" => "Elixir is great"}
    ]
  },
  initial_data: %{
    "name" => nil,
    "email" => nil,
    "posts" => []  # ← Empty list because no posts existed
  }
}
```

#### Mixed Operations (Update + Create)

```elixir
# Ecto operation
user = Repo.get(User, 1) |> Repo.preload(:posts)
# Assume user has one existing post with id: 10

user_changeset = User.changeset(user, %{
  posts: [
    %{id: 10, title: "Updated Title"},  # Update existing
    %{title: "New Post", body: "New Content"}  # Create new
  ]
})

Repo.update(user_changeset, success_event: UserUpdated)

# Published event
%UserUpdated{
  changes: %{
    "posts" => [
      %{"id" => 10, "title" => "Updated Title"},  # ← ID present = UPDATE
      %{"id" => nil, "title" => "New Post", "body" => "New Content"}  # ← id: nil = CREATE
    ]
  },
  initial_data: %{
    "posts" => [
      %{"id" => 10, "title" => "Original Title"}  # ← Only updated post, not the new one
    ]
  }
}
```

#### Deleting Association Items

```elixir
# Ecto operation
user = Repo.get(User, 1) |> Repo.preload(:posts)
# Assume user has two posts with id: 10 and id: 11

user_changeset = User.changeset(user, %{
  posts: [
    %{id: 10, title: "Updated Title"}  # Keep post 10, delete post 11
  ]
})

Repo.update(user_changeset, success_event: UserUpdated)

# Published event
%UserUpdated{
  changes: %{
    "posts" => [
      %{"id" => 10, "title" => "Updated Title"}
    ]
  },
  initial_data: %{
    "posts" => [
      %{"id" => 10, "title" => "Original Title"}
      # Post 11 was deleted - it's not in changes, indicating deletion
    ]
  }
}
```

### Distinguishing Operations in Event Handlers

Use primary keys to determine the operation type:

```elixir
def handle_event(%UserUpdated{changes: changes}) do
  case changes["posts"] do
    nil ->
      # No post changes
      :ok

    posts when is_list(posts) ->
      Enum.each(posts, fn post ->
        case post["id"] do
          nil ->
            # This is a new post being created
            notify_subscribers_about_new_post(post)

          post_id ->
            # This is an existing post being updated
            notify_subscribers_about_post_update(post_id, post)
        end
      end)
  end
end
```

### Primary Key Handling

**Root aggregates**:
- **INSERT operations**: Primary key IS included in `changes` (it changed from nil to the actual value)
- **UPDATE operations**: Primary key is NOT included in `changes` (it didn't change)
- **DELETE operations**: Primary key is NOT included in `changes`

**Nested associations**: Primary keys ARE always included to distinguish between creates, updates, and deletes within associations.

#### Standard Primary Key (`id`)

Most common case - integer `id` field:

```elixir
schema "users" do
  field(:name, :string)
  field(:email, :string)
  timestamps()
end

# INSERT: PK included in root changes (changed from nil → 1)
%UserCreated{
  changes: %{
    "id" => 1,
    "name" => "John",
    "email" => "john@example.com"
  },
  initial_data: %{
    "name" => nil,
    "email" => nil
  }
}

# UPDATE: PK not included in root changes (didn't change)
%UserUpdated{
  changes: %{
    "email" => "new@example.com"
  },
  initial_data: %{
    "email" => "old@example.com"
  }
}
```

#### Custom Primary Key

Custom field name (e.g., UUID, external ID):

```elixir
schema "api_tokens" do
  field(:token_id, :string, primary_key: true)
  field(:secret, :string)
  field(:expires_at, :naive_datetime)
  timestamps()
end

# INSERT: Custom PK included in root changes
%TokenCreated{
  changes: %{
    "token_id" => "tok_abc123",
    "secret" => "abc123...",
    "expires_at" => ~N[2025-12-31 23:59:59]
  },
  initial_data: %{
    "secret" => nil,
    "expires_at" => nil
  }
}

# UPDATE: Custom PK not included in root changes (didn't change)
%TokenUpdated{
  changes: %{
    "expires_at" => ~N[2026-01-31 23:59:59]
  },
  initial_data: %{
    "expires_at" => ~N[2025-12-31 23:59:59]
  }
}
```

#### Composite Primary Keys

Multiple fields form the primary key:

```elixir
schema "user_permissions" do
  field(:user_id, :integer, primary_key: true)
  field(:resource_id, :integer, primary_key: true)
  field(:permission_level, :string)
  timestamps()
end

# INSERT: Composite PK included in root changes
%PermissionCreated{
  changes: %{
    "user_id" => 5,
    "resource_id" => 10,
    "permission_level" => "read"
  },
  initial_data: %{
    "permission_level" => nil
  }
}

# UPDATE: Composite PK not included in root changes (didn't change)
%PermissionUpdated{
  changes: %{
    "permission_level" => "write"
  },
  initial_data: %{
    "permission_level" => "read"
  }
}
```

#### Composite Keys in Associations

When associations have composite keys, all PK fields are included:

```elixir
# Parent with standard PK
schema "projects" do
  field(:name, :string)
  has_many(:tasks, Task)
end

# Child with composite PK
schema "tasks" do
  field(:project_id, :integer, primary_key: true)
  field(:task_number, :integer, primary_key: true)
  field(:title, :string)
  field(:status, :string)
end

# Creating project with tasks
%ProjectCreated{
  changes: %{
    "name" => "New Project",
    "tasks" => [
      %{
        "project_id" => nil,  # ← Composite PK part 1
        "task_number" => nil,  # ← Composite PK part 2
        "title" => "Setup",
        "status" => "pending"
      }
    ]
  },
  initial_data: %{
    "name" => nil,
    "tasks" => []
  }
}

# Updating project with mixed task operations
%ProjectUpdated{
  changes: %{
    "tasks" => [
      %{
        "project_id" => 5,  # ← Existing task (composite PK)
        "task_number" => 1,
        "status" => "completed"
      },
      %{
        "project_id" => nil,  # ← New task (composite PK nil)
        "task_number" => nil,
        "title" => "Review",
        "status" => "pending"
      }
    ]
  },
  initial_data: %{
    "tasks" => [
      %{
        "project_id" => 5,
        "task_number" => 1,
        "status" => "pending"
      }
    ]
  }
}
```

#### No Primary Key

Schemas without primary keys are handled gracefully:

```elixir
schema "audit_logs" do
  field(:action, :string)
  field(:timestamp, :naive_datetime)
end

# No PK fields in changes/initial_data
%AuditLogCreated{
  changes: %{
    "action" => "login",
    "timestamp" => ~N[2025-11-21 12:00:00]
  },
  initial_data: %{
    "action" => nil,
    "timestamp" => nil
  }
}
```

## Supported Field Types

ExEventBus fully supports all Ecto field types, including:

- **Primitive types**: `:string`, `:integer`, `:float`, `:boolean`, `:date`, `:time`, `:naive_datetime`, `:utc_datetime`, etc.
- **Primitive arrays**: `{:array, :string}`, `{:array, :integer}`, `{:array, Ecto.UUID}`, etc. with `default: []`
- **Custom types**: Any Ecto type including `Ecto.Enum`, embedded schemas, etc.
- **Associations**: `has_one`, `has_many`, `belongs_to` with full change tracking

Primitive array fields (like `field(:tags, {:array, :string}, default: [])`) are properly tracked as field changes, while association arrays are tracked with individual item primary keys.

## Usage with Ecto Operations

To publish events from Ecto operations, pass the event module using the `:success_event` option:

```elixir
# Insert
Repo.insert(changeset, success_event: MyApp.Events.UserCreated)

# Update
Repo.update(changeset, success_event: MyApp.Events.UserUpdated)

# Delete
Repo.delete(user, success_event: MyApp.Events.UserDeleted)
```

The event is only published if the operation succeeds.