# CLAUDE.md — CQRS & Event Sourcing en Go
> Contexte spécialisé pour Claude Code. Coller ce fichier à la racine du projet pour guider le travail sur une architecture CQRS/Event Sourcing en Go.
---
## Quand utiliser ce contexte
- ✅ Domaine métier complexe avec des règles qui évoluent (comptabilité, assurances, e-commerce avancé)
- ✅ Besoin d'audit complet : qui a fait quoi, quand, et avec quelles données
- ✅ Ratio lecture/écriture très déséquilibré nécessitant des read models dédiés
- ✅ Plusieurs équipes sur des bounded contexts distincts avec coordination via events
- ❌ Application CRUD simple (formulaire → BDD → liste) : CQRS ajoute une complexité injustifiée
- ❌ MVP ou prototype : la vélocité initiale prime, l'architecture peut évoluer après
- ❌ Équipe non familière avec les patterns DDD : la courbe d'apprentissage est élevée
---
## Section 1 : Concepts fondamentaux — modèle mental
### Vue d'ensemble
CQRS (Command Query Responsibility Segregation) et Event Sourcing sont deux patterns distincts qui se complètent naturellement mais peuvent être utilisés séparément.
**CQRS** : séparer les opérations d'écriture (Commands) des opérations de lecture (Queries). Le write model est optimisé pour les invariants métier ; le read model est optimisé pour l'affichage.
**Event Sourcing** : ne jamais stocker l'état courant d'un aggregate. Stocker la séquence d'événements qui ont conduit à cet état. L'état courant se reconstitue en rejouant les événements.
### Le flux complet
```
Client
│
▼
Command (intent métier)
│
▼
Command Handler
├── Charge l'Aggregate depuis l'Event Store
├── Appelle une méthode métier sur l'Aggregate
│ ├── Valide les invariants
│ └── Produit un ou plusieurs Domain Events
└── Sauvegarde les nouveaux événements dans l'Event Store
│
▼
Event Store (PostgreSQL)
│
├──────────────────────┐
▼ ▼
Projection Worker Outbox Relay
(read models) (integration events)
│ │
▼ ▼
Read Database Message Broker (Kafka/NATS)
│ │
▼ ▼
Query Handler Other Services
```
### Vocabulaire essentiel
| Terme | Définition |
|-------|-----------|
| **Aggregate** | Cluster d'entités formant une unité de cohérence transactionnelle |
| **Command** | Intention d'agir — peut échouer. Ex: `PlaceOrder` |
| **Domain Event** | Fait passé immuable — ne peut pas échouer. Ex: `OrderPlaced` |
| **Event Store** | Base de données append-only des événements |
| **Projection** | Construction d'un read model à partir d'événements |
| **Snapshot** | Photo de l'état d'un aggregate à un instant T |
| **Saga/Process Manager** | Coordination de plusieurs aggregates sur plusieurs transactions |
| **Outbox** | Table intermédiaire pour publication fiable des events |
| **Optimistic Concurrency** | Détection de conflit via numéro de version |
### Règle d'or
Un aggregate = une transaction. Ne jamais modifier deux aggregates dans la même transaction de base de données. Si c'est nécessaire, les boundaries sont mal définies ou il faut une saga.
---
## Section 2 : Aggregate design — structure et invariants
### Structure de base
L'aggregate encapsule son état dans des champs privés. Seul l'aggregate lui-même peut modifier son état via les méthodes `Apply`/`Transition`.
```go
package order
import (
"errors"
"time"
"github.com/google/uuid"
)
// Status représente l'état d'une commande
type Status string
const (
StatusPending Status = "pending"
StatusConfirmed Status = "confirmed"
StatusShipped Status = "shipped"
StatusCancelled Status = "cancelled"
)
// Order est l'aggregate racine pour les commandes
type Order struct {
// Champs de base de l'aggregate — TOUJOURS présents
id uuid.UUID
version int
// Événements non encore persistés
changes []Event
// État métier — PRIVÉ, protège les invariants
status Status
customerID uuid.UUID
items []OrderItem
totalCents int64
shippedAt *time.Time
cancelledAt *time.Time
}
// OrderItem représente une ligne de commande
type OrderItem struct {
ProductID uuid.UUID
Quantity int
PriceCents int64
}
```
### Accesseurs obligatoires
Ces quatre accesseurs sont la "colle" entre l'aggregate et l'infrastructure (Event Store, Repository).
```go
// ID retourne l'identifiant de l'aggregate
func (o *Order) ID() uuid.UUID {
return o.id
}
// Version retourne la version courante (pour l'optimistic concurrency)
func (o *Order) Version() int {
return o.version
}
// UncommittedEvents retourne les événements produits mais pas encore sauvegardés
func (o *Order) UncommittedEvents() []Event {
// Retourne une copie défensive
result := make([]Event, len(o.changes))
copy(result, o.changes)
return result
}
// ClearChanges vide la liste des événements non persistés
// Appelé par le Repository APRÈS sauvegarde réussie
func (o *Order) ClearChanges() {
o.changes = nil
}
```
### Interface Event
Tous les domain events implémentent cette interface. La méthode `isEvent()` non exportée empêche les types extérieurs de l'implémenter accidentellement.
```go
// Event est l'interface marqueur pour tous les domain events de ce package
type Event interface {
isEvent()
}
// AggregateType retourne le type de l'aggregate pour l'Event Store
func AggregateType() string {
return "order"
}
```
### Reconstruction depuis les événements
`NewFromEvents` rejoue tous les événements dans l'ordre pour reconstruire l'état courant. C'est le cœur de l'Event Sourcing.
```go
// NewFromEvents reconstruit un Order depuis sa séquence d'événements
// Utilisé par le Repository lors du chargement
func NewFromEvents(events []Event) (*Order, error) {
if len(events) == 0 {
return nil, errors.New("cannot reconstruct order from empty event list")
}
o := &Order{}
for _, e := range events {
o.transition(e)
o.version++
}
return o, nil
}
// NewFromSnapshot reconstruit depuis un snapshot + événements ultérieurs
func NewFromSnapshot(snap Snapshot, events []Event) (*Order, error) {
o := &Order{
id: snap.AggregateID,
version: snap.Version,
status: snap.Status,
customerID: snap.CustomerID,
items: snap.Items,
totalCents: snap.TotalCents,
}
for _, e := range events {
o.transition(e)
o.version++
}
return o, nil
}
```
---
## Section 3 : Aggregate — command methods et Apply/Transition
### Command methods : valider AVANT de lever l'événement
La règle absolue : valider **tous les invariants** avant d'appeler `raise()`. Si une validation échoue, retourner une erreur. Ne jamais lever un événement si l'opération ne peut pas aboutir.
```go
// Place crée une nouvelle commande
// C'est une "factory method" — elle crée l'aggregate
func Place(id uuid.UUID, customerID uuid.UUID, items []OrderItem) (*Order, error) {
// Validation des invariants AVANT tout
if id == uuid.Nil {
return nil, errors.New("order id is required")
}
if customerID == uuid.Nil {
return nil, errors.New("customer id is required")
}
if len(items) == 0 {
return nil, errors.New("order must have at least one item")
}
var totalCents int64
for _, item := range items {
if item.Quantity <= 0 {
return nil, fmt.Errorf("item %s: quantity must be positive", item.ProductID)
}
if item.PriceCents <= 0 {
return nil, fmt.Errorf("item %s: price must be positive", item.ProductID)
}
totalCents += int64(item.Quantity) * item.PriceCents
}
o := &Order{}
o.raise(OrderPlaced{
OrderID: id,
CustomerID: customerID,
Items: items,
TotalCents: totalCents,
PlacedAt: time.Now().UTC(),
})
return o, nil
}
// Confirm confirme la commande après paiement validé
func (o *Order) Confirm() error {
if o.status != StatusPending {
return fmt.Errorf("cannot confirm order in status %s, expected pending", o.status)
}
o.raise(OrderConfirmed{
OrderID: o.id,
ConfirmedAt: time.Now().UTC(),
})
return nil
}
// Ship marque la commande comme expédiée
func (o *Order) Ship(trackingNumber string) error {
if o.status != StatusConfirmed {
return fmt.Errorf("cannot ship order in status %s, expected confirmed", o.status)
}
if trackingNumber == "" {
return errors.New("tracking number is required")
}
o.raise(OrderShipped{
OrderID: o.id,
TrackingNumber: trackingNumber,
ShippedAt: time.Now().UTC(),
})
return nil
}
// Cancel annule la commande
func (o *Order) Cancel(reason string) error {
if o.status == StatusShipped {
return errors.New("cannot cancel a shipped order")
}
if o.status == StatusCancelled {
return errors.New("order is already cancelled")
}
if reason == "" {
return errors.New("cancellation reason is required")
}
o.raise(OrderCancelled{
OrderID: o.id,
Reason: reason,
CancelledAt: time.Now().UTC(),
})
return nil
}
```
### raise() : le mécanisme interne
`raise()` appelle `transition()` (mise à jour de l'état) puis ajoute l'événement à la liste des changements non persistés.
```go
// raise applique un événement à l'aggregate et l'ajoute aux changements
// JAMAIS appelé directement par du code extérieur
func (o *Order) raise(e Event) {
o.transition(e)
o.version++
o.changes = append(o.changes, e)
}
```
### transition() / Apply : mutation d'état pure
`transition()` est la méthode la plus importante de l'Event Sourcing. Elle doit être :
- **Pure** : aucun appel externe, aucun effet de bord
- **Sans erreur** : si l'event a été produit, il doit pouvoir être appliqué
- **Déterministe** : rejouer les mêmes événements donne toujours le même état
```go
// transition applique un événement pour modifier l'état interne
// RÈGLES ABSOLUES :
// 1. Aucun appel externe (DB, HTTP, etc.)
// 2. Aucun retour d'erreur
// 3. Déterministe — même input, même output
// 4. Appelée lors de la création ET lors du replay
func (o *Order) transition(e Event) {
switch ev := e.(type) {
case OrderPlaced:
o.id = ev.OrderID
o.customerID = ev.CustomerID
o.items = ev.Items
o.totalCents = ev.TotalCents
o.status = StatusPending
case OrderConfirmed:
o.status = StatusConfirmed
case OrderShipped:
o.status = StatusShipped
now := ev.ShippedAt
o.shippedAt = &now
case OrderCancelled:
o.status = StatusCancelled
now := ev.CancelledAt
o.cancelledAt = &now
}
// Événements inconnus ignorés silencieusement (forward compatibility)
}
```
### Domain Events : définition
```go
// OrderPlaced — commande passée par le client
type OrderPlaced struct {
OrderID uuid.UUID `json:"order_id"`
CustomerID uuid.UUID `json:"customer_id"`
Items []OrderItem `json:"items"`
TotalCents int64 `json:"total_cents"`
PlacedAt time.Time `json:"placed_at"`
}
func (OrderPlaced) isEvent() {}
// OrderConfirmed — commande confirmée après paiement
type OrderConfirmed struct {
OrderID uuid.UUID `json:"order_id"`
ConfirmedAt time.Time `json:"confirmed_at"`
}
func (OrderConfirmed) isEvent() {}
// OrderShipped — commande expédiée
type OrderShipped struct {
OrderID uuid.UUID `json:"order_id"`
TrackingNumber string `json:"tracking_number"`
ShippedAt time.Time `json:"shipped_at"`
}
func (OrderShipped) isEvent() {}
// OrderCancelled — commande annulée
type OrderCancelled struct {
OrderID uuid.UUID `json:"order_id"`
Reason string `json:"reason"`
CancelledAt time.Time `json:"cancelled_at"`
}
func (OrderCancelled) isEvent() {}
```
### Snapshot state
```go
// Snapshot représente l'état sérialisé d'un aggregate à un instant T
type Snapshot struct {
AggregateID uuid.UUID `json:"aggregate_id"`
Version int `json:"version"`
Status Status `json:"status"`
CustomerID uuid.UUID `json:"customer_id"`
Items []OrderItem `json:"items"`
TotalCents int64 `json:"total_cents"`
CreatedAt time.Time `json:"created_at"`
}
// TakeSnapshot crée un snapshot de l'état courant
func (o *Order) TakeSnapshot() Snapshot {
return Snapshot{
AggregateID: o.id,
Version: o.version,
Status: o.status,
CustomerID: o.customerID,
Items: o.items,
TotalCents: o.totalCents,
CreatedAt: time.Now().UTC(),
}
}
```
---
## Section 4 : Command handlers — orchestration
### Principes des Command Handlers
- Les commandes retournent **uniquement** une erreur, jamais de données
- Le client fournit l'UUID de l'entité (ne pas générer côté serveur dans le handler)
- Le domain language prime : `CancelOrder` pas `DeleteOrder`, `PlaceOrder` pas `CreateOrder`
- Un handler par commande
- Les cross-cutting concerns (logging, métriques) via des décorateurs middleware
### Définition des commandes
```go
package command
import "github.com/google/uuid"
// PlaceOrder — intent de passer une commande
type PlaceOrder struct {
OrderID uuid.UUID
CustomerID uuid.UUID
Items []OrderItem
}
// OrderItem dans le contexte d'une commande
type OrderItem struct {
ProductID uuid.UUID
Quantity int
PriceCents int64
}
// ConfirmOrder — intent de confirmer une commande après paiement
type ConfirmOrder struct {
OrderID uuid.UUID
}
// ShipOrder — intent d'expédier une commande
type ShipOrder struct {
OrderID uuid.UUID
TrackingNumber string
}
// CancelOrder — intent d'annuler une commande
// Note: "Cancel" pas "Delete" — langage du domaine
type CancelOrder struct {
OrderID uuid.UUID
Reason string
}
```
### Repository interface
```go
package order
import (
"context"
"github.com/google/uuid"
)
// Repository définit le contrat pour la persistance des aggregates Order
type Repository interface {
// Load charge un aggregate par son ID
// Retourne ErrNotFound si l'aggregate n'existe pas
Load(ctx context.Context, id uuid.UUID) (*Order, error)
// Save persiste les nouveaux événements de l'aggregate
// Retourne ErrConflict en cas de conflit de concurrence optimiste
Save(ctx context.Context, order *Order) error
// Update est un helper: Load + fn + Save dans une seule opération
Update(ctx context.Context, id uuid.UUID, fn func(*Order) error) error
}
var (
ErrNotFound = errors.New("order not found")
ErrConflict = errors.New("optimistic concurrency conflict")
)
```
### Implementation du pattern Update
```go
// Update dans le repository PostgreSQL
func (r *PostgresRepository) Update(
ctx context.Context,
id uuid.UUID,
fn func(*Order) error,
) error {
order, err := r.Load(ctx, id)
if err != nil {
return fmt.Errorf("load order %s: %w", id, err)
}
if err := fn(order); err != nil {
return err // erreur métier, pas de wrap supplémentaire
}
if err := r.Save(ctx, order); err != nil {
return fmt.Errorf("save order %s: %w", id, err)
}
return nil
}
```
### Command Handlers
```go
package commandhandler
import (
"context"
"fmt"
"myapp/internal/order"
"myapp/internal/command"
)
// OrderCommandHandler gère toutes les commandes liées aux orders
type OrderCommandHandler struct {
repo order.Repository
}
func NewOrderCommandHandler(repo order.Repository) *OrderCommandHandler {
return &OrderCommandHandler{repo: repo}
}
// HandlePlaceOrder traite la commande PlaceOrder
func (h *OrderCommandHandler) HandlePlaceOrder(ctx context.Context, cmd command.PlaceOrder) error {
// Mapper les items de la commande vers le domaine
items := make([]order.OrderItem, len(cmd.Items))
for i, item := range cmd.Items {
items[i] = order.OrderItem{
ProductID: item.ProductID,
Quantity: item.Quantity,
PriceCents: item.PriceCents,
}
}
// Créer l'aggregate (factory method)
o, err := order.Place(cmd.OrderID, cmd.CustomerID, items)
if err != nil {
return fmt.Errorf("place order: %w", err)
}
// Sauvegarder dans l'Event Store
if err := h.repo.Save(ctx, o); err != nil {
return fmt.Errorf("save order: %w", err)
}
return nil
}
// HandleConfirmOrder traite la commande ConfirmOrder
func (h *OrderCommandHandler) HandleConfirmOrder(ctx context.Context, cmd command.ConfirmOrder) error {
return h.repo.Update(ctx, cmd.OrderID, func(o *order.Order) error {
return o.Confirm()
})
}
// HandleShipOrder traite la commande ShipOrder
func (h *OrderCommandHandler) HandleShipOrder(ctx context.Context, cmd command.ShipOrder) error {
return h.repo.Update(ctx, cmd.OrderID, func(o *order.Order) error {
return o.Ship(cmd.TrackingNumber)
})
}
// HandleCancelOrder traite la commande CancelOrder
func (h *OrderCommandHandler) HandleCancelOrder(ctx context.Context, cmd command.CancelOrder) error {
return h.repo.Update(ctx, cmd.OrderID, func(o *order.Order) error {
return o.Cancel(cmd.Reason)
})
}
```
### Décorateurs middleware
```go
// LoggingMiddleware ajoute du logging à un command handler
type LoggingMiddleware struct {
next *OrderCommandHandler
logger *slog.Logger
}
func (m *LoggingMiddleware) HandlePlaceOrder(ctx context.Context, cmd command.PlaceOrder) error {
start := time.Now()
err := m.next.HandlePlaceOrder(ctx, cmd)
m.logger.Info("HandlePlaceOrder",
"order_id", cmd.OrderID,
"customer_id", cmd.CustomerID,
"item_count", len(cmd.Items),
"duration_ms", time.Since(start).Milliseconds(),
"error", err,
)
return err
}
// MetricsMiddleware ajoute des métriques Prometheus
type MetricsMiddleware struct {
next *OrderCommandHandler
commands *prometheus.CounterVec
duration *prometheus.HistogramVec
}
func (m *MetricsMiddleware) HandlePlaceOrder(ctx context.Context, cmd command.PlaceOrder) error {
timer := prometheus.NewTimer(m.duration.WithLabelValues("place_order"))
defer timer.ObserveDuration()
err := m.next.HandlePlaceOrder(ctx, cmd)
status := "success"
if err != nil {
status = "error"
}
m.commands.WithLabelValues("place_order", status).Inc()
return err
}
```
### HTTP Handler — REST convention
```go
// POST /orders → 204 No Content + Location header
// Convention REST pour CQRS : pas de body en réponse
func (h *HTTPHandler) PlaceOrder(w http.ResponseWriter, r *http.Request) {
var req PlaceOrderRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid request body", http.StatusBadRequest)
return
}
// Le CLIENT fournit l'UUID — permet l'idempotence
orderID, err := uuid.Parse(req.OrderID)
if err != nil {
// Générer si non fourni (moins idempotent)
orderID = uuid.New()
}
cmd := command.PlaceOrder{
OrderID: orderID,
CustomerID: req.CustomerID,
Items: mapItems(req.Items),
}
if err := h.handler.HandlePlaceOrder(r.Context(), cmd); err != nil {
if errors.Is(err, order.ErrConflict) {
http.Error(w, "conflict", http.StatusConflict)
return
}
http.Error(w, "internal error", http.StatusInternalServerError)
return
}
// 204 + Content-Location pour que le client sache où lire l'état
w.Header().Set("Content-Location", fmt.Sprintf("/orders/%s", orderID))
w.WriteHeader(http.StatusNoContent)
}
```
---
## Section 5 : Event Store — schéma PostgreSQL
### Schéma complet
```sql
-- Aggregate registry : une ligne par aggregate, sert à l'optimistic concurrency
CREATE TABLE es_aggregate (
id UUID NOT NULL,
aggregate_type VARCHAR(64) NOT NULL,
version INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (id)
);
-- Séquence d'événements : append-only
CREATE TABLE es_event (
id BIGSERIAL NOT NULL,
-- transaction_id permet de regrouper les events d'une même TX
transaction_id XID8 NOT NULL DEFAULT pg_current_xact_id(),
aggregate_id UUID NOT NULL,
-- version = numéro de séquence dans l'aggregate (1-based)
version INTEGER NOT NULL,
event_type VARCHAR(128) NOT NULL,
-- schema_version pour les upcasters de migration
schema_version SMALLINT NOT NULL DEFAULT 1,
-- payload contient les données de l'événement sérialisées en JSON
payload JSONB NOT NULL,
-- metadata : correlation_id, causation_id, user_id, etc.
metadata JSONB NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (id),
-- Contrainte d'unicité : un seul event par version par aggregate
-- C'est le filet de sécurité de l'optimistic concurrency
UNIQUE (aggregate_id, version),
FOREIGN KEY (aggregate_id) REFERENCES es_aggregate(id)
);
-- Snapshots : état sérialisé d'un aggregate à un instant T
CREATE TABLE es_snapshot (
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(64) NOT NULL,
version INTEGER NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (aggregate_id, version)
);
-- Checkpoints : suivi de la progression des projections
CREATE TABLE projection_checkpoint (
projection_name VARCHAR(128) NOT NULL,
-- last_event_id = ID du dernier es_event traité
last_event_id BIGINT NOT NULL DEFAULT 0,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (projection_name)
);
-- Outbox : publication fiable d'integration events
CREATE TABLE outbox (
id BIGSERIAL NOT NULL,
topic VARCHAR(128) NOT NULL,
payload JSONB NOT NULL,
state VARCHAR(16) NOT NULL DEFAULT 'pending',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
sent_at TIMESTAMPTZ,
PRIMARY KEY (id)
);
-- Déduplication des commandes pour l'idempotence
CREATE TABLE processed_commands (
idempotency_key VARCHAR(128) NOT NULL,
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (idempotency_key)
);
-- Index de performance
CREATE INDEX es_event_aggregate_id_version
ON es_event (aggregate_id, version ASC);
CREATE INDEX es_event_id_for_projection
ON es_event (id ASC);
CREATE INDEX es_event_type_for_rebuild
ON es_event (event_type, id ASC);
CREATE INDEX outbox_pending
ON outbox (state, id ASC)
WHERE state = 'pending';
-- Vue utile pour le monitoring du lag des projections
CREATE VIEW projection_lag AS
SELECT
pc.projection_name,
pc.last_event_id,
MAX(e.id) AS latest_event_id,
MAX(e.id) - pc.last_event_id AS lag_count,
MAX(e.created_at) - (
SELECT created_at FROM es_event WHERE id = pc.last_event_id
) AS lag_duration
FROM projection_checkpoint pc
CROSS JOIN es_event e
GROUP BY pc.projection_name, pc.last_event_id;
```
### Types Go pour l'Event Store
```go
package eventstore
import (
"time"
"github.com/google/uuid"
)
// StoredEvent représente une ligne de la table es_event
type StoredEvent struct {
ID int64 `db:"id"`
TransactionID uint64 `db:"transaction_id"`
AggregateID uuid.UUID `db:"aggregate_id"`
Version int `db:"version"`
EventType string `db:"event_type"`
SchemaVersion int `db:"schema_version"`
Payload json.RawMessage `db:"payload"`
Metadata Metadata `db:"metadata"`
CreatedAt time.Time `db:"created_at"`
}
// Metadata contient les informations de traçabilité
type Metadata struct {
CorrelationID string `json:"correlation_id,omitempty"`
CausationID string `json:"causation_id,omitempty"`
UserID string `json:"user_id,omitempty"`
IPAddress string `json:"ip_address,omitempty"`
}
```
---
## Section 6 : Event Store — save, load et optimistic concurrency
### Interface de l'Event Store
```go
package eventstore
import (
"context"
"github.com/google/uuid"
)
// EventStore est l'interface principale pour la persistance des événements
type EventStore interface {
// Save persiste les nouveaux événements d'un aggregate
// Retourne ErrConflict si la version attendue ne correspond pas
Save(ctx context.Context, aggregateID uuid.UUID, aggregateType string, expectedVersion int, events []SerializedEvent) error
// Load charge tous les événements d'un aggregate
Load(ctx context.Context, aggregateID uuid.UUID) ([]StoredEvent, error)
// LoadFrom charge les événements à partir d'une version donnée (pour les snapshots)
LoadFrom(ctx context.Context, aggregateID uuid.UUID, fromVersion int) ([]StoredEvent, error)
// LoadForProjection charge tous les événements depuis un ID donné (pour les projections)
LoadForProjection(ctx context.Context, fromEventID int64, limit int) ([]StoredEvent, error)
}
// SerializedEvent représente un événement prêt à être persisté
type SerializedEvent struct {
EventType string
SchemaVersion int
Payload []byte
Metadata Metadata
}
var ErrConflict = errors.New("optimistic concurrency conflict")
var ErrNotFound = errors.New("aggregate not found")
```
### Save avec optimistic concurrency
La clé de la concurrence optimiste : on tente de mettre à jour `es_aggregate` avec la version attendue. Si 0 lignes affectées, quelqu'un d'autre a modifié l'aggregate entre notre lecture et notre écriture.
```go
// Save persiste les événements avec contrôle de concurrence optimiste
func (s *PostgresEventStore) Save(
ctx context.Context,
aggregateID uuid.UUID,
aggregateType string,
expectedVersion int,
events []SerializedEvent,
) error {
if len(events) == 0 {
return nil
}
return s.db.BeginTxx(ctx, nil, func(tx *sqlx.Tx) error {
// Étape 1 : Insérer ou mettre à jour l'aggregate avec contrôle de version
newVersion := expectedVersion + len(events)
var rowsAffected int64
var err error
if expectedVersion == 0 {
// Premier save : INSERT
result, err := tx.ExecContext(ctx, `
INSERT INTO es_aggregate (id, aggregate_type, version)
VALUES ($1, $2, $3)
ON CONFLICT (id) DO NOTHING`,
aggregateID, aggregateType, newVersion,
)
if err != nil {
return fmt.Errorf("insert aggregate: %w", err)
}
rowsAffected, err = result.RowsAffected()
} else {
// Update existant : version doit correspondre
result, err := tx.ExecContext(ctx, `
UPDATE es_aggregate
SET version = $1
WHERE id = $2 AND version = $3`,
newVersion, aggregateID, expectedVersion,
)
if err != nil {
return fmt.Errorf("update aggregate version: %w", err)
}
rowsAffected, err = result.RowsAffected()
}
if err != nil {
return err
}
if rowsAffected == 0 {
// Conflit de concurrence : quelqu'un a modifié l'aggregate entre-temps
return ErrConflict
}
// Étape 2 : Insérer les événements
for i, event := range events {
version := expectedVersion + i + 1
_, err := tx.ExecContext(ctx, `
INSERT INTO es_event
(aggregate_id, version, event_type, schema_version, payload, metadata)
VALUES
($1, $2, $3, $4, $5, $6)`,
aggregateID,
version,
event.EventType,
event.SchemaVersion,
event.Payload,
event.Metadata,
)
if err != nil {
// La contrainte UNIQUE(aggregate_id, version) peut aussi déclencher un conflit
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) && pgErr.Code == "23505" {
return ErrConflict
}
return fmt.Errorf("insert event version %d: %w", version, err)
}
}
return nil
})
}
```
### Helper pour les transactions sqlx
```go
// BeginTxx exécute fn dans une transaction, rollback automatique si erreur
func (db *DB) BeginTxx(ctx context.Context, opts *sql.TxOptions, fn func(*sqlx.Tx) error) error {
tx, err := db.sqlx.BeginTxx(ctx, opts)
if err != nil {
return fmt.Errorf("begin transaction: %w", err)
}
defer func() {
if p := recover(); p != nil {
_ = tx.Rollback()
panic(p) // re-panic après rollback
}
}()
if err := fn(tx); err != nil {
_ = tx.Rollback()
return err
}
return tx.Commit()
}
```
### Load
```go
// Load charge tous les événements d'un aggregate, triés par version ASC
func (s *PostgresEventStore) Load(
ctx context.Context,
aggregateID uuid.UUID,
) ([]StoredEvent, error) {
return s.LoadFrom(ctx, aggregateID, 0)
}
// LoadFrom charge les événements à partir d'une version donnée (pour snapshots)
func (s *PostgresEventStore) LoadFrom(
ctx context.Context,
aggregateID uuid.UUID,
fromVersion int,
) ([]StoredEvent, error) {
var events []StoredEvent
err := s.db.SelectContext(ctx, &events, `
SELECT
id, transaction_id, aggregate_id, version,
event_type, schema_version, payload, metadata, created_at
FROM es_event
WHERE aggregate_id = $1
AND version > $2
ORDER BY version ASC`,
aggregateID, fromVersion,
)
if err != nil {
return nil, fmt.Errorf("load events for aggregate %s: %w", aggregateID, err)
}
return events, nil
}
```
### Repository avec sérialisation/désérialisation
```go
// PostgresRepository implémente order.Repository
type PostgresRepository struct {
store EventStore
serializer EventSerializer
}
// Save sérialise les événements et les persiste
func (r *PostgresRepository) Save(ctx context.Context, o *order.Order) error {
events := o.UncommittedEvents()
if len(events) == 0 {
return nil
}
serialized := make([]SerializedEvent, len(events))
for i, e := range events {
s, err := r.serializer.Serialize(e)
if err != nil {
return fmt.Errorf("serialize event %T: %w", e, err)
}
serialized[i] = s
}
// expectedVersion = version actuelle - nb d'événements non persistés
expectedVersion := o.Version() - len(events)
err := r.store.Save(ctx, o.ID(), order.AggregateType(), expectedVersion, serialized)
if err != nil {
return err // ErrConflict propagé tel quel
}
o.ClearChanges()
return nil
}
// Load charge et désérialise les événements, reconstruit l'aggregate
func (r *PostgresRepository) Load(ctx context.Context, id uuid.UUID) (*order.Order, error) {
stored, err := r.store.Load(ctx, id)
if err != nil {
return nil, fmt.Errorf("load events: %w", err)
}
if len(stored) == 0 {
return nil, order.ErrNotFound
}
events := make([]order.Event, len(stored))
for i, s := range stored {
e, err := r.serializer.Deserialize(s)
if err != nil {
return nil, fmt.Errorf("deserialize event %s v%d: %w", s.EventType, s.Version, err)
}
events[i] = e.(order.Event)
}
return order.NewFromEvents(events)
}
```
### Sérialiseur
```go
// EventSerializer gère la sérialisation/désérialisation des événements
type EventSerializer struct {
upcasters map[string][]Upcaster // pour le versioning
}
func (s *EventSerializer) Serialize(e order.Event) (SerializedEvent, error) {
payload, err := json.Marshal(e)
if err != nil {
return SerializedEvent{}, fmt.Errorf("marshal event: %w", err)
}
return SerializedEvent{
EventType: eventType(e),
SchemaVersion: 1,
Payload: payload,
}, nil
}
func (s *EventSerializer) Deserialize(stored StoredEvent) (interface{}, error) {
// Appliquer les upcasters si nécessaire
payload := s.upcast(stored.EventType, stored.SchemaVersion, stored.Payload)
switch stored.EventType {
case "OrderPlaced":
var e order.OrderPlaced
return e, json.Unmarshal(payload, &e)
case "OrderConfirmed":
var e order.OrderConfirmed
return e, json.Unmarshal(payload, &e)
case "OrderShipped":
var e order.OrderShipped
return e, json.Unmarshal(payload, &e)
case "OrderCancelled":
var e order.OrderCancelled
return e, json.Unmarshal(payload, &e)
default:
return nil, fmt.Errorf("unknown event type: %s", stored.EventType)
}
}
func eventType(e order.Event) string {
t := reflect.TypeOf(e)
if t.Kind() == reflect.Ptr {
t = t.Elem()
}
return t.Name()
}
```
---
## Section 7 : Snapshots — éviter le replay intégral
### Quand utiliser les snapshots
Les snapshots sont nécessaires quand un aggregate accumule de nombreux événements sur sa durée de vie. Un compte bancaire avec 10 ans d'historique pourrait avoir des milliers d'événements — rejouer tous ces événements à chaque chargement est trop coûteux.
**Règle pratique** : implémenter les snapshots quand un aggregate dépasse ~100 événements.
### Schéma et types
```go
// SnapshotStore gère la persistance des snapshots
type SnapshotStore interface {
// Save sauvegarde un snapshot (idempotent : ON CONFLICT DO NOTHING)
Save(ctx context.Context, snap StoredSnapshot) error
// Load charge le dernier snapshot d'un aggregate
// Retourne nil, nil si aucun snapshot n'existe
Load(ctx context.Context, aggregateID uuid.UUID) (*StoredSnapshot, error)
}
// StoredSnapshot représente une ligne de es_snapshot
type StoredSnapshot struct {
AggregateID uuid.UUID `db:"aggregate_id"`
AggregateType string `db:"aggregate_type"`
Version int `db:"version"`
Payload json.RawMessage `db:"payload"`
CreatedAt time.Time `db:"created_at"`
}
```
### Implémentation PostgreSQL
```go
// Save sauvegarde un snapshot de manière idempotente
func (s *PostgresSnapshotStore) Save(ctx context.Context, snap StoredSnapshot) error {
_, err := s.db.ExecContext(ctx, `
INSERT INTO es_snapshot (aggregate_id, aggregate_type, version, payload)
VALUES ($1, $2, $3, $4)
ON CONFLICT (aggregate_id, version) DO NOTHING`,
snap.AggregateID,
snap.AggregateType,
snap.Version,
snap.Payload,
)
if err != nil {
return fmt.Errorf("save snapshot for %s v%d: %w", snap.AggregateID, snap.Version, err)
}
return nil
}
// Load charge le snapshot le plus récent
func (s *PostgresSnapshotStore) Load(ctx context.Context, aggregateID uuid.UUID) (*StoredSnapshot, error) {
var snap StoredSnapshot
err := s.db.GetContext(ctx, &snap, `
SELECT aggregate_id, aggregate_type, version, payload, created_at
FROM es_snapshot
WHERE aggregate_id = $1
ORDER BY version DESC
LIMIT 1`,
aggregateID,
)
if errors.Is(err, sql.ErrNoRows) {
return nil, nil // Aucun snapshot, pas une erreur
}
if err != nil {
return nil, fmt.Errorf("load snapshot for %s: %w", aggregateID, err)
}
return &snap, nil
}
```
### Repository avec support des snapshots
```go
const snapshotInterval = 100 // Prendre un snapshot tous les 100 événements
// SnapshotRepository wrap un Repository de base avec support des snapshots
type SnapshotRepository struct {
base *PostgresRepository
snapStore SnapshotStore
}
// Load charge depuis snapshot + événements ultérieurs
func (r *SnapshotRepository) Load(ctx context.Context, id uuid.UUID) (*order.Order, error) {
// Essayer de charger un snapshot
snap, err := r.snapStore.Load(ctx, id)
if err != nil {
return nil, fmt.Errorf("load snapshot: %w", err)
}
if snap == nil {
// Pas de snapshot : chargement complet depuis les événements
return r.base.Load(ctx, id)
}
// Charger seulement les événements APRÈS le snapshot
stored, err := r.base.store.LoadFrom(ctx, id, snap.Version)
if err != nil {
return nil, fmt.Errorf("load events after snapshot: %w", err)
}
// Désérialiser le snapshot
var orderSnap order.Snapshot
if err := json.Unmarshal(snap.Payload, &orderSnap); err != nil {
return nil, fmt.Errorf("deserialize snapshot: %w", err)
}
// Désérialiser les événements ultérieurs
events := make([]order.Event, len(stored))
for i, s := range stored {
e, err := r.base.serializer.Deserialize(s)
if err != nil {
return nil, fmt.Errorf("deserialize event: %w", err)
}
events[i] = e.(order.Event)
}
// Reconstruire depuis snapshot + événements
return order.NewFromSnapshot(orderSnap, events)
}
// Save persiste les événements et prend un snapshot si nécessaire
func (r *SnapshotRepository) Save(ctx context.Context, o *order.Order) error {
// Sauvegarder les événements normalement
if err := r.base.Save(ctx, o); err != nil {
return err
}
// Prendre un snapshot si on atteint l'intervalle
if o.Version()%snapshotInterval == 0 {
snap := o.TakeSnapshot()
payload, err := json.Marshal(snap)
if err != nil {
// Non-fatal : loguer mais ne pas échouer
slog.Warn("failed to marshal snapshot",
"order_id", o.ID(),
"version", o.Version(),
"error", err,
)
return nil
}
stored := StoredSnapshot{
AggregateID: o.ID(),
AggregateType: order.AggregateType(),
Version: o.Version(),
Payload: payload,
}
if err := r.snapStore.Save(ctx, stored); err != nil {
// Non-fatal : loguer mais ne pas échouer
slog.Warn("failed to save snapshot",
"order_id", o.ID(),
"version", o.Version(),
"error", err,
)
}
}
return nil
}
```
---
## Section 8 : Projections — read models idempotents
### Principes
- Les projections construisent des read models optimisés pour les queries
- Elles sont **entièrement reconstituables** depuis l'Event Store — elles sont dispensables
- Chaque projection doit être **idempotente** : rejouer le même événement ne doit pas altérer l'état
- Ne jamais utiliser une projection pour des décisions côté write
- Sauvegarder le checkpoint dans la **même transaction** que la mise à jour du read model
### Définition d'une projection
```go
package projection
import (
"context"
"database/sql"
"github.com/jmoiron/sqlx"
"myapp/internal/eventstore"
"myapp/internal/order"
)
// OrderSummaryProjection construit une vue résumée des commandes
type OrderSummaryProjection struct {
db *sqlx.DB
}
// Name retourne le nom unique de cette projection (clé dans projection_checkpoint)
func (p *OrderSummaryProjection) Name() string {
return "order_summary"
}
// Handle traite un événement stocké
func (p *OrderSummaryProjection) Handle(ctx context.Context, stored eventstore.StoredEvent) error {
switch stored.EventType {
case "OrderPlaced":
return p.handleOrderPlaced(ctx, stored)
case "OrderConfirmed":
return p.handleOrderConfirmed(ctx, stored)
case "OrderShipped":
return p.handleOrderShipped(ctx, stored)
case "OrderCancelled":
return p.handleOrderCancelled(ctx, stored)
}
// Événements inconnus ignorés silencieusement
return nil
}
// handleOrderPlaced traite un événement OrderPlaced de manière idempotente
func (p *OrderSummaryProjection) handleOrderPlaced(
ctx context.Context,
stored eventstore.StoredEvent,
) error {
var e order.OrderPlaced
if err := json.Unmarshal(stored.Payload, &e); err != nil {
return fmt.Errorf("unmarshal OrderPlaced: %w", err)
}
// ON CONFLICT DO NOTHING : idempotent — si déjà inséré, ne rien faire
_, err := p.db.ExecContext(ctx, `
INSERT INTO order_summary
(id, customer_id, status, total_cents, item_count, placed_at)
VALUES
($1, $2, 'pending', $3, $4, $5)
ON CONFLICT (id) DO NOTHING`,
e.OrderID,
e.CustomerID,
e.TotalCents,
len(e.Items),
e.PlacedAt,
)
if err != nil {
return fmt.Errorf("insert order_summary: %w", err)
}
return nil
}
// handleOrderShipped traite un événement OrderShipped
func (p *OrderSummaryProjection) handleOrderShipped(
ctx context.Context,
stored eventstore.StoredEvent,
) error {
var e order.OrderShipped
if err := json.Unmarshal(stored.Payload, &e); err != nil {
return fmt.Errorf("unmarshal OrderShipped: %w", err)
}
// UPDATE idempotent : WHERE garantit qu'on ne met à jour qu'au bon moment
_, err := p.db.ExecContext(ctx, `
UPDATE order_summary
SET
status = 'shipped',
tracking_number = $1,
shipped_at = $2
WHERE id = $3
AND status != 'shipped'`, -- idempotence via guard
e.TrackingNumber,
e.ShippedAt,
e.OrderID,
)
if err != nil {
return fmt.Errorf("update order_summary for ship: %w", err)
}
return nil
}
```
### Projection Worker
```go
// ProjectionWorker exécute une projection en lisant les événements en continu
type ProjectionWorker struct {
store EventStore
projection Projection
db *sqlx.DB
batchSize int
pollInterval time.Duration
}
// Projection est l'interface implémentée par toutes les projections
type Projection interface {
Name() string
Handle(ctx context.Context, stored StoredEvent) error
}
// Run lance le worker en boucle jusqu'à annulation du contexte
func (w *ProjectionWorker) Run(ctx context.Context) error {
slog.Info("projection worker started", "projection", w.projection.Name())
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
processed, err := w.processBatch(ctx)
if err != nil {
slog.Error("projection batch failed",
"projection", w.projection.Name(),
"error", err,
)
// Attendre avant de réessayer pour éviter le busy-loop
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(5 * time.Second):
}
continue
}
if processed == 0 {
// Rien à traiter : attendre le prochain poll
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(w.pollInterval):
}
}
}
}
// processBatch traite un lot d'événements dans une transaction unique
func (w *ProjectionWorker) processBatch(ctx context.Context) (int, error) {
// Lire le checkpoint courant
var checkpoint int64
err := w.db.GetContext(ctx, &checkpoint, `
SELECT last_event_id
FROM projection_checkpoint
WHERE projection_name = $1`,
w.projection.Name(),
)
if errors.Is(err, sql.ErrNoRows) {
checkpoint = 0
} else if err != nil {
return 0, fmt.Errorf("read checkpoint: %w", err)
}
// Charger le prochain batch d'événements
events, err := w.store.LoadForProjection(ctx, checkpoint, w.batchSize)
if err != nil {
return 0, fmt.Errorf("load events: %w", err)
}
if len(events) == 0 {
return 0, nil
}
// Traiter tous les événements dans une seule transaction
// Le checkpoint et le read model sont mis à jour atomiquement
err = w.db.BeginTxx(ctx, nil, func(tx *sqlx.Tx) error {
for _, event := range events {
if err := w.projection.Handle(ctx, event); err != nil {
return fmt.Errorf("handle event %d (%s): %w", event.ID, event.EventType, err)
}
}
// Mettre à jour le checkpoint dans la MÊME transaction
lastID := events[len(events)-1].ID
_, err := tx.ExecContext(ctx, `
INSERT INTO projection_checkpoint (projection_name, last_event_id)
VALUES ($1, $2)
ON CONFLICT (projection_name)
DO UPDATE SET last_event_id = $2, updated_at = NOW()`,
w.projection.Name(),
lastID,
)
return err
})
if err != nil {
return 0, fmt.Errorf("process batch: %w", err)
}
slog.Info("projection batch processed",
"projection", w.projection.Name(),
"count", len(events),
"last_event_id", events[len(events)-1].ID,
)
return len(events), nil
}
```
### Rebuild d'une projection
```go
// Rebuild recrée entièrement une projection depuis le début
// Utile après un bug ou un changement de schéma
func (w *ProjectionWorker) Rebuild(ctx context.Context) error {
slog.Info("rebuilding projection", "projection", w.projection.Name())
// Réinitialiser le checkpoint
_, err := w.db.ExecContext(ctx, `
INSERT INTO projection_checkpoint (projection_name, last_event_id)
VALUES ($1, 0)
ON CONFLICT (projection_name)
DO UPDATE SET last_event_id = 0`,
w.projection.Name(),
)
if err != nil {
return fmt.Errorf("reset checkpoint: %w", err)
}
// Optionnel : vider le read model avant de reconstruire
// (selon si la projection est idempotente ou non)
// Relancer le worker normalement
return w.Run(ctx)
}
```
---
## Section 9 : Idempotence — déduplication des commandes
### Problème
Dans un système distribué, les commandes peuvent être reçues plusieurs fois (retry du client, at-least-once delivery, timeout réseau suivi d'une réémission). Chaque command handler doit être idempotent.
### Stratégie : clé d'idempotence
Le client génère une clé unique (UUID) par opération et la renvoie lors des retries. Le serveur mémorise les commandes déjà traitées.
```go
// IdempotencyKey est une clé unique fournie par le client
// Format recommandé : "resource-type:uuid" ou simplement un UUID v4
type IdempotencyKey string
// CommandWithIdempotency wrap une commande avec sa clé d'idempotence
type CommandWithIdempotency[T any] struct {
Key IdempotencyKey
Command T
}
```
### Déduplication en base de données
```go
// IdempotencyStore gère la table processed_commands
type IdempotencyStore struct {
db *sqlx.DB
}
// Claim tente de "réclamer" une clé d'idempotence
// Retourne true si la clé est nouvelle (commande à traiter)
// Retourne false si déjà traitée (réponse idempotente)
func (s *IdempotencyStore) Claim(ctx context.Context, key IdempotencyKey) (bool, error) {
result, err := s.db.ExecContext(ctx, `
INSERT INTO processed_commands (idempotency_key)
VALUES ($1)
ON CONFLICT (idempotency_key) DO NOTHING`,
string(key),
)
if err != nil {
return false, fmt.Errorf("claim idempotency key: %w", err)
}
rows, err := result.RowsAffected()
if err != nil {
return false, err
}
return rows > 0, nil // true = nouvelle commande
}
```
### Middleware d'idempotence pour les handlers
```go
// IdempotentCommandHandler wrap un handler avec déduplication
type IdempotentCommandHandler[T any] struct {
inner func(context.Context, T) error
idempStore *IdempotencyStore
}
func (h *IdempotentCommandHandler[T]) Handle(
ctx context.Context,
cmd CommandWithIdempotency[T],
) error {
// Tenter de réclamer la clé
isNew, err := h.idempStore.Claim(ctx, cmd.Key)
if err != nil {
return fmt.Errorf("check idempotency: %w", err)
}
if !isNew {
// Commande déjà traitée : réponse idempotente
slog.Info("duplicate command ignored", "idempotency_key", cmd.Key)
return nil
}
// Nouvelle commande : traiter normalement
return h.inner(ctx, cmd.Command)
}
```
### Dans le HTTP Handler
```go
// POST /orders avec support de l'idempotence
func (h *HTTPHandler) PlaceOrder(w http.ResponseWriter, r *http.Request) {
// La clé d'idempotence est dans le header HTTP
// Convention: Idempotency-Key: <uuid>
idempKey := r.Header.Get("Idempotency-Key")
if idempKey == "" {
// Générer une clé basée sur les données si non fournie
// (moins robuste que la clé fournie par le client)
idempKey = uuid.New().String()
}
var req PlaceOrderRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid body", http.StatusBadRequest)
return
}
cmd := command.PlaceOrder{
OrderID: req.OrderID,
CustomerID: req.CustomerID,
Items: mapItems(req.Items),
}
wrapped := CommandWithIdempotency[command.PlaceOrder]{
Key: IdempotencyKey(idempKey),
Command: cmd,
}
if err := h.idempHandler.Handle(r.Context(), wrapped); err != nil {
handleCommandError(w, err)
return
}
w.Header().Set("Content-Location", "/orders/"+req.OrderID.String())
w.WriteHeader(http.StatusNoContent)
}
```
### Nettoyage des vieilles clés
```go
// CleanupOldKeys supprime les clés d'idempotence plus vieilles que duration
// À exécuter en tâche périodique (ex: toutes les heures)
func (s *IdempotencyStore) CleanupOldKeys(ctx context.Context, olderThan time.Duration) (int64, error) {
result, err := s.db.ExecContext(ctx, `
DELETE FROM processed_commands
WHERE processed_at < $1`,
time.Now().Add(-olderThan),
)
if err != nil {
return 0, fmt.Errorf("cleanup idempotency keys: %w", err)
}
count, _ := result.RowsAffected()
return count, nil
}
```
---
## Section 10 : Outbox pattern — publication fiable d'events
### Problème
Si on publie un event sur Kafka APRÈS avoir commité la transaction DB, il y a un risque de perte (crash entre les deux). L'Outbox pattern résout ce problème en écrivant l'event dans la même transaction que les données.
### Distinction événements domaine vs intégration
| Type | Scope | Audience | Format |
|------|-------|----------|--------|
| **Domain Event** | Aggregate interne | Même bounded context | Riche en données métier |
| **Integration Event** | Publié à l'extérieur | Autres services | Contrat stable, versioning strict |
Ne pas publier les domain events bruts à l'extérieur. Les transformer en integration events.
### Écriture dans l'Outbox (même TX que les événements)
```go
// OutboxEntry représente un message en attente de publication
type OutboxEntry struct {
Topic string
Payload []byte
}
// SaveWithOutbox sauvegarde les événements ET les messages outbox dans la même TX
func (s *PostgresEventStore) SaveWithOutbox(
ctx context.Context,
aggregateID uuid.UUID,
aggregateType string,
expectedVersion int,
events []SerializedEvent,
outboxEntries []OutboxEntry,
) error {
return s.db.BeginTxx(ctx, nil, func(tx *sqlx.Tx) error {
// 1. Sauvegarder les événements (avec optimistic concurrency)
if err := s.saveInTx(ctx, tx, aggregateID, aggregateType, expectedVersion, events); err != nil {
return err
}
// 2. Écrire les messages outbox dans LA MÊME TRANSACTION
for _, entry := range outboxEntries {
_, err := tx.ExecContext(ctx, `
INSERT INTO outbox (topic, payload)
VALUES ($1, $2)`,
entry.Topic,
entry.Payload,
)
if err != nil {
return fmt.Errorf("insert outbox entry for topic %s: %w", entry.Topic, err)
}
}
return nil
})
}
```
### Outbox Relay — publication vers le broker
```go
// OutboxRelay lit les messages pending et les publie sur le broker
type OutboxRelay struct {
db *sqlx.DB
publisher MessagePublisher
batchSize int
}
// MessagePublisher est l'interface vers le broker (Kafka, NATS, etc.)
type MessagePublisher interface {
Publish(ctx context.Context, topic string, payload []byte) error
}
// Run exécute le relay en boucle
func (r *OutboxRelay) Run(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
published, err := r.publishBatch(ctx)
if err != nil {
slog.Error("outbox relay batch failed", "error", err)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(5 * time.Second):
}
continue
}
if published == 0 {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(500 * time.Millisecond):
}
}
}
}
// publishBatch publie un lot de messages outbox avec FOR UPDATE SKIP LOCKED
func (r *OutboxRelay) publishBatch(ctx context.Context) (int, error) {
// FOR UPDATE SKIP LOCKED : permet plusieurs relay workers en parallèle
// sans conflit — chaque worker prend des lignes différentes
rows, err := r.db.QueryxContext(ctx, `
SELECT id, topic, payload
FROM outbox
WHERE state = 'pending'
ORDER BY id ASC
LIMIT $1
FOR UPDATE SKIP LOCKED`,
r.batchSize,
)
if err != nil {
return 0, fmt.Errorf("query outbox: %w", err)
}
defer rows.Close()
type entry struct {
ID int64 `db:"id"`
Topic string `db:"topic"`
Payload []byte `db:"payload"`
}
var entries []entry
for rows.Next() {
var e entry
if err := rows.StructScan(&e); err != nil {
return 0, fmt.Errorf("scan outbox row: %w", err)
}
entries = append(entries, e)
}
published := 0
for _, e := range entries {
// Publier sur le broker
if err := r.publisher.Publish(ctx, e.Topic, e.Payload); err != nil {
slog.Error("failed to publish outbox message",
"id", e.ID,
"topic", e.Topic,
"error", err,
)
continue // Resilient execution : continuer avec les autres
}
// Marquer comme envoyé
_, err := r.db.ExecContext(ctx, `
UPDATE outbox
SET state = 'sent', sent_at = NOW()
WHERE id = $1`,
e.ID,
)
if err != nil {
slog.Error("failed to mark outbox message as sent",
"id", e.ID,
"error", err,
)
continue
}
published++
}
return published, nil
}
```
### Alternative : WAL-based outbox avec pglogrepl
Pour une latence minimale (< 100ms), utiliser PostgreSQL logical replication au lieu du polling :
```go
// WALOutboxListener écoute le WAL PostgreSQL pour une latence ultra-basse
// Nécessite : wal_level = logical dans postgresql.conf
// Utilise la librairie : github.com/jackc/pglogrepl
type WALOutboxListener struct {
conn *pgconn.PgConn
publisher MessagePublisher
slotName string
}
// Note : implémentation simplifiée — voir pglogrepl pour le code complet
func (l *WALOutboxListener) Start(ctx context.Context) error {
// Créer un slot de réplication logique si inexistant
// Puis écouter les changements sur la table outbox
// Beaucoup plus complexe à implémenter mais latence < 100ms vs polling
// Recommandé seulement pour les cas nécessitant une très faible latence
return nil
}
```
---
## Section 11 : Sagas — chorégraphie et orchestration
### Chorégraphie
Chaque service écoute les événements et réagit de manière autonome. Couplage faible mais difficile à tracer.
```
OrderService PaymentService InventoryService
│ │ │
│── OrderPlaced ──────▶│ │
│ │── PaymentProcessed ─▶│
│ │ │── InventoryReserved ──▶ (OrderService réagit)
│◀── InventoryReserved ─────────────────────│
```
```go
// PaymentService écoute OrderPlaced et traite le paiement
type PaymentEventHandler struct {
paymentService PaymentService
}
func (h *PaymentEventHandler) HandleOrderPlaced(ctx context.Context, e OrderPlaced) error {
// Traiter le paiement
if err := h.paymentService.ProcessPayment(ctx, e.OrderID, e.TotalCents); err != nil {
// Publier un PaymentFailed pour que OrderService compense
return h.publishPaymentFailed(ctx, e.OrderID, err.Error())
}
return h.publishPaymentProcessed(ctx, e.OrderID)
}
```
### Orchestration avec Process Manager
L'orchestrateur est un aggregate event-sourcé qui coordonne la séquence.
```go
// OrderFulfillmentSaga est le Process Manager pour l'expédition d'une commande
type OrderFulfillmentSaga struct {
id uuid.UUID
version int
changes []SagaEvent
// État de la saga
orderID uuid.UUID
status SagaStatus
retryCount int
compensation []CompensationStep
}
type SagaStatus string
const (
SagaStarted SagaStatus = "started"
SagaCompleted SagaStatus = "completed"
SagaFailed SagaStatus = "failed"
SagaCompensating SagaStatus = "compensating"
)
// Start démarre la saga pour une commande
func StartOrderFulfillmentSaga(sagaID, orderID uuid.UUID) (*OrderFulfillmentSaga, error) {
s := &OrderFulfillmentSaga{}
s.raise(SagaStarted{
SagaID: sagaID,
OrderID: orderID,
StartedAt: time.Now().UTC(),
})
return s, nil
}
// HandlePaymentProcessed réagit à un paiement réussi
func (s *OrderFulfillmentSaga) HandlePaymentProcessed(paymentID uuid.UUID) error {
if s.status != SagaStarted {
return fmt.Errorf("unexpected payment in status %s", s.status)
}
s.raise(PaymentConfirmedInSaga{
SagaID: s.id,
PaymentID: paymentID,
})
return nil
}
// HandlePaymentFailed déclenche la compensation
func (s *OrderFulfillmentSaga) HandlePaymentFailed(reason string) error {
s.raise(SagaCompensationStarted{
SagaID: s.id,
Reason: reason,
})
return nil
}
func (s *OrderFulfillmentSaga) transition(e SagaEvent) {
switch ev := e.(type) {
case SagaStarted:
s.id = ev.SagaID
s.orderID = ev.OrderID
s.status = SagaStarted
case PaymentConfirmedInSaga:
s.status = SagaCompleted
case SagaCompensationStarted:
s.status = SagaCompensating
// Enregistrer les étapes de compensation à exécuter
s.compensation = []CompensationStep{
{Type: "cancel_order", ResourceID: s.orderID},
}
}
}
```
### Compensation — transactions compensatoires
Toutes les transactions compensatoires doivent être **idempotentes**.
```go
// CompensationExecutor exécute les compensations
type CompensationExecutor struct {
orderRepo order.Repository
// autres repos...
}
// Execute exécute une étape de compensation de manière idempotente
func (e *CompensationExecutor) Execute(ctx context.Context, step CompensationStep) error {
switch step.Type {
case "cancel_order":
// Idempotent : si déjà annulée, l'erreur est ignorée
err := e.orderRepo.Update(ctx, step.ResourceID, func(o *order.Order) error {
err := o.Cancel("saga compensation")
if errors.Is(err, order.ErrAlreadyCancelled) {
return nil // Déjà compensé — OK
}
return err
})
return err
default:
return fmt.Errorf("unknown compensation type: %s", step.Type)
}
}
```
### Note sur Temporal
Pour les sagas complexes (multi-step, longs timeouts, retry sophistiqués), considérer [Temporal](https://temporal.io) :
```go
// Avec Temporal, une saga devient un simple workflow Go
func OrderFulfillmentWorkflow(ctx workflow.Context, orderID string) error {
// Temporal gère automatiquement :
// - La persistance de l'état
// - Les retries avec backoff
// - Les timeouts
// - La compensation en cas d'échec
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Minute,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 3,
},
}
ctx = workflow.WithActivityOptions(ctx, ao)
var paymentResult PaymentResult
if err := workflow.ExecuteActivity(ctx, ProcessPayment, orderID).Get(ctx, &paymentResult); err != nil {
// Compensation automatique
return workflow.ExecuteActivity(ctx, CancelOrder, orderID).Get(ctx, nil)
}
return workflow.ExecuteActivity(ctx, ReserveInventory, orderID).Get(ctx, nil)
}
```
---
## Section 12 : Event versioning — schéma faible et upcasters
### Stratégie 1 : schéma faible (préférée)
Les changements additifs ne nécessitent pas de versioning si on utilise `omitempty`. Le schéma JSON est naturellement extensible.
```go
// Version 1 originale
type OrderPlaced struct {
OrderID uuid.UUID `json:"order_id"`
CustomerID uuid.UUID `json:"customer_id"`
Items []OrderItem `json:"items"`
TotalCents int64 `json:"total_cents"`
PlacedAt time.Time `json:"placed_at"`
}
// Version 2 : ajout d'un champ optionnel (backward compatible)
// Les anciens events qui ne contiennent pas PromoCode auront ""
// schema_version reste à 1 — aucun upcaster nécessaire
type OrderPlaced struct {
OrderID uuid.UUID `json:"order_id"`
CustomerID uuid.UUID `json:"customer_id"`
Items []OrderItem `json:"items"`
TotalCents int64 `json:"total_cents"`
PlacedAt time.Time `json:"placed_at"`
// Nouveau champ optionnel — backward compatible
PromoCode string `json:"promo_code,omitempty"`
// Autre ajout optionnel
Channel string `json:"channel,omitempty"` // "web", "mobile", "api"
}
```
### Stratégie 2 : upcasters pour changements cassants
Quand un changement est incompatible (renommage de champ, changement de type), utiliser un upcaster.
```go
// Upcaster transforme un event d'une version vers la suivante
type Upcaster interface {
EventType() string
FromVersion() int
ToVersion() int
Upcast(payload json.RawMessage) (json.RawMessage, error)
}
// OrderPlacedV1ToV2Upcaster : migration du champ "amount" → "total_cents"
// (changement cassant : renommage + conversion centimes)
type OrderPlacedV1ToV2Upcaster struct{}
func (u *OrderPlacedV1ToV2Upcaster) EventType() string { return "OrderPlaced" }
func (u *OrderPlacedV1ToV2Upcaster) FromVersion() int { return 1 }
func (u *OrderPlacedV1ToV2Upcaster) ToVersion() int { return 2 }
func (u *OrderPlacedV1ToV2Upcaster) Upcast(payload json.RawMessage) (json.RawMessage, error) {
// Lire l'ancienne structure
var v1 struct {
OrderID string `json:"order_id"`
CustomerID string `json:"customer_id"`
Amount float64 `json:"amount"` // ancien: montant en euros
PlacedAt time.Time `json:"placed_at"`
}
if err := json.Unmarshal(payload, &v1); err != nil {
return nil, fmt.Errorf("unmarshal v1: %w", err)
}
// Transformer vers la nouvelle structure
v2 := struct {
OrderID string `json:"order_id"`
CustomerID string `json:"customer_id"`
TotalCents int64 `json:"total_cents"` // nouveau: montant en centimes
PlacedAt time.Time `json:"placed_at"`
}{
OrderID: v1.OrderID,
CustomerID: v1.CustomerID,
TotalCents: int64(v1.Amount * 100), // conversion
PlacedAt: v1.PlacedAt,
}
return json.Marshal(v2)
}
// UpcasterChain applique une chaîne d'upcasters
type UpcasterChain struct {
upcasters map[string][]Upcaster // eventType → []Upcaster triés par version
}
func (c *UpcasterChain) Upcast(eventType string, fromVersion int, payload json.RawMessage) (json.RawMessage, int, error) {
upcasters, ok := c.upcasters[eventType]
if !ok {
return payload, fromVersion, nil // Pas d'upcaster nécessaire
}
currentVersion := fromVersion
currentPayload := payload
for _, u := range upcasters {
if u.FromVersion() < currentVersion {
continue // Déjà appliqué
}
if u.FromVersion() != currentVersion {
break // Gap dans la chaîne
}
var err error
currentPayload, err = u.Upcast(currentPayload)
if err != nil {
return nil, 0, fmt.Errorf("upcast %s v%d→v%d: %w",
eventType, u.FromVersion(), u.ToVersion(), err)
}
currentVersion = u.ToVersion()
}
return currentPayload, currentVersion, nil
}
```
### Métadonnées essentielles
Toujours inclure ces champs dans les métadonnées de chaque événement :
```go
// EventMetadata — traçabilité et corrélation
type EventMetadata struct {
// CorrelationID : ID de la requête originale, propagé à travers tous les services
CorrelationID string `json:"correlation_id"`
// CausationID : ID de l'événement ou commande qui a causé cet événement
CausationID string `json:"causation_id"`
// SchemaVersion : version du schéma de l'événement pour les upcasters
SchemaVersion int `json:"schema_version"`
// UserID : qui a déclenché l'action
UserID string `json:"user_id,omitempty"`
// Timestamp : quand l'événement a été créé (précision nanoseconde)
Timestamp time.Time `json:"timestamp"`
}
// RÈGLES ABSOLUES pour le versioning :
// 1. Ne JAMAIS modifier un événement déjà persisté
// 2. Ne JAMAIS supprimer des champs (utiliser omitempty)
// 3. Ne JAMAIS changer le sens d'un champ existant
// 4. TOUJOURS bumper schema_version pour les changements cassants
// 5. TOUJOURS écrire un upcaster pour chaque changement cassant
```
---
## Section 13 : Testing — Given/When/Then et intégration
### Tests unitaires d'aggregate avec Given/When/Then
```go
package order_test
import (
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"myapp/internal/order"
)
// testCase structure le pattern Given/When/Then
type testCase struct {
name string
given []order.Event // état initial (events rejoués)
when func(*order.Order) error // opération à tester
then []order.Event // events attendus
wantErr bool
}
// customerID et orderID réutilisables dans les tests
var (
testOrderID = uuid.MustParse("00000000-0000-0000-0000-000000000001")
testCustomerID = uuid.MustParse("00000000-0000-0000-0000-000000000002")
testProductID = uuid.MustParse("00000000-0000-0000-0000-000000000003")
testItems = []order.OrderItem{{
ProductID: testProductID,
Quantity: 2,
PriceCents: 1500,
}}
)
// orderPlacedEvent crée un OrderPlaced pour les tests
func orderPlacedEvent() order.OrderPlaced {
return order.OrderPlaced{
OrderID: testOrderID,
CustomerID: testCustomerID,
Items: testItems,
TotalCents: 3000,
PlacedAt: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC),
}
}
func TestOrderConfirm(t *testing.T) {
tests := []testCase{
{
name: "confirm a pending order",
given: []order.Event{orderPlacedEvent()},
when: func(o *order.Order) error {
return o.Confirm()
},
then: []order.Event{
order.OrderConfirmed{OrderID: testOrderID},
},
},
{
name: "cannot confirm already confirmed order",
given: []order.Event{
orderPlacedEvent(),
order.OrderConfirmed{OrderID: testOrderID},
},
when: func(o *order.Order) error {
return o.Confirm()
},
wantErr: true,
},
{
name: "cannot confirm cancelled order",
given: []order.Event{
orderPlacedEvent(),
order.OrderCancelled{
OrderID: testOrderID,
Reason: "customer request",
},
},
when: func(o *order.Order) error {
return o.Confirm()
},
wantErr: true,
},
}
runTestCases(t, tests)
}
func TestOrderCancel(t *testing.T) {
tests := []testCase{
{
name: "cancel a pending order",
given: []order.Event{orderPlacedEvent()},
when: func(o *order.Order) error {
return o.Cancel("customer changed mind")
},
then: []order.Event{
order.OrderCancelled{
OrderID: testOrderID,
Reason: "customer changed mind",
},
},
},
{
name: "cannot cancel a shipped order",
given: []order.Event{
orderPlacedEvent(),
order.OrderConfirmed{OrderID: testOrderID},
order.OrderShipped{
OrderID: testOrderID,
TrackingNumber: "TRACK123",
},
},
when: func(o *order.Order) error {
return o.Cancel("too late")
},
wantErr: true,
},
{
name: "cancel requires non-empty reason",
given: []order.Event{orderPlacedEvent()},
when: func(o *order.Order) error {
return o.Cancel("")
},
wantErr: true,
},
}
runTestCases(t, tests)
}
// runTestCases exécute tous les cas de test avec le pattern Given/When/Then
func runTestCases(t *testing.T, tests []testCase) {
t.Helper()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// GIVEN : reconstruire l'aggregate depuis les événements initiaux
var agg *order.Order
var err error
if len(tt.given) > 0 {
agg, err = order.NewFromEvents(tt.given)
require.NoError(t, err, "failed to reconstruct aggregate from given events")
} else {
agg = &order.Order{} // aggregate vide pour les factory methods
}
// WHEN : exécuter l'opération
err = tt.when(agg)
// THEN : vérifier le résultat
if tt.wantErr {
assert.Error(t, err)
return
}
require.NoError(t, err)
// Vérifier les événements produits
uncommitted := agg.UncommittedEvents()
require.Len(t, uncommitted, len(tt.then),
"expected %d events, got %d", len(tt.then), len(uncommitted))
for i, expected := range tt.then {
actual := uncommitted[i]
// Comparer le type
assert.IsType(t, expected, actual,
"event[%d]: expected type %T, got %T", i, expected, actual)
// Comparer les champs importants (sans les timestamps)
assertEventEquals(t, expected, actual, i)
}
})
}
}
// assertEventEquals compare deux events en ignorant les timestamps auto-générés
func assertEventEquals(t *testing.T, expected, actual order.Event, index int) {
t.Helper()
switch exp := expected.(type) {
case order.OrderConfirmed:
act, ok := actual.(order.OrderConfirmed)
require.True(t, ok)
assert.Equal(t, exp.OrderID, act.OrderID, "event[%d]: OrderID mismatch", index)
case order.OrderCancelled:
act, ok := actual.(order.OrderCancelled)
require.True(t, ok)
assert.Equal(t, exp.OrderID, act.OrderID)
assert.Equal(t, exp.Reason, act.Reason)
case order.OrderShipped:
act, ok := actual.(order.OrderShipped)
require.True(t, ok)
assert.Equal(t, exp.OrderID, act.OrderID)
assert.Equal(t, exp.TrackingNumber, act.TrackingNumber)
}
}
```
### Tests d'intégration — Event Store et concurrence optimiste
```go
package integration_test
import (
"context"
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestOptimisticConcurrencyConflict vérifie que deux saves concurrents conflictuels
// retournent bien ErrConflict
func TestOptimisticConcurrencyConflict(t *testing.T) {
ctx := context.Background()
db := setupTestDB(t) // helper qui crée une DB de test
store := eventstore.NewPostgresEventStore(db)
orderID := uuid.New()
// Créer un aggregate initial
event1 := SerializedEvent{EventType: "OrderPlaced", Payload: []byte(`{}`)}
err := store.Save(ctx, orderID, "order", 0, []SerializedEvent{event1})
require.NoError(t, err)
// Simuler deux modifications concurrentes de la version 1
var wg sync.WaitGroup
results := make([]error, 2)
for i := 0; i < 2; i++ {
wg.Add(1)
go func(idx int) {
defer wg.Done()
event := SerializedEvent{
EventType: "OrderConfirmed",
Payload: []byte(`{}`),
}
// Les deux tentent de passer de version 1 → 2
results[idx] = store.Save(ctx, orderID, "order", 1, []SerializedEvent{event})
}(i)
}
wg.Wait()
// Exactement l'un doit réussir, l'autre doit avoir un conflit
successes := 0
conflicts := 0
for _, err := range results {
if err == nil {
successes++
} else if errors.Is(err, eventstore.ErrConflict) {
conflicts++
}
}
assert.Equal(t, 1, successes, "exactly one save should succeed")
assert.Equal(t, 1, conflicts, "exactly one save should conflict")
}
// TestProjectionIdempotency vérifie qu'une projection est idempotente
func TestProjectionIdempotency(t *testing.T) {
ctx := context.Background()
db := setupTestDB(t)
proj := projection.NewOrderSummaryProjection(db)
orderID := uuid.New()
event := eventstore.StoredEvent{
ID: 1,
EventType: "OrderPlaced",
Payload: mustMarshal(order.OrderPlaced{OrderID: orderID, TotalCents: 3000}),
}
// Traiter le même événement 3 fois
for i := 0; i < 3; i++ {
err := proj.Handle(ctx, event)
require.NoError(t, err, "handle attempt %d failed", i+1)
}
// Vérifier qu'il n'y a qu'une seule ligne dans la projection
var count int
err := db.GetContext(ctx, &count,
"SELECT COUNT(*) FROM order_summary WHERE id = $1", orderID)
require.NoError(t, err)
assert.Equal(t, 1, count, "projection should be idempotent")
}
```
### Tests avec eventual consistency
```go
// TestEventualConsistency vérifie qu'une projection est éventuellement cohérente
func TestEventualConsistency(t *testing.T) {
ctx := context.Background()
// Setup : event store + projection worker
db := setupTestDB(t)
store := eventstore.NewPostgresEventStore(db)
proj := projection.NewOrderSummaryProjection(db)
worker := projection.NewWorker(store, proj, db)
// Lancer le worker en arrière-plan
workerCtx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
if err := worker.Run(workerCtx); err != nil && !errors.Is(err, context.Canceled) {
t.Errorf("worker failed: %v", err)
}
}()
// Créer une commande
orderID := uuid.New()
o, err := order.Place(orderID, uuid.New(), testItems)
require.NoError(t, err)
repo := NewSnapshotRepository(store, db)
err = repo.Save(ctx, o)
require.NoError(t, err)
// Attendre que la projection soit mise à jour (eventual consistency)
assert.Eventually(t, func() bool {
var count int
err := db.GetContext(ctx, &count,
"SELECT COUNT(*) FROM order_summary WHERE id = $1 AND status = 'pending'",
orderID,
)
return err == nil && count == 1
}, 5*time.Second, 100*time.Millisecond,
"projection should eventually reflect the placed order",
)
}
```
---
## Section 14 : Anti-patterns — ce qu'il ne faut jamais faire
### 1. Property Sourcing — trop granulaire
```go
// ❌ MAUVAIS : Property Sourcing
// Chaque modification de champ = un event séparé
// Résultat : des milliers d'events sans signification métier
type OrderCustomerIDChanged struct {
OrderID uuid.UUID
CustomerID uuid.UUID
}
type OrderTotalAmountChanged struct {
OrderID uuid.UUID
TotalCents int64
}
// ✅ BON : Event métier riche
// L'event capture l'intention, pas le changement de propriété
type OrderPlaced struct {
OrderID uuid.UUID
CustomerID uuid.UUID
Items []OrderItem
TotalCents int64
PlacedAt time.Time
}
```
### 2. CRUD Events — sans signification métier
```go
// ❌ MAUVAIS : Events CRUD
type OrderCreated struct{ OrderID uuid.UUID }
type OrderUpdated struct{ OrderID uuid.UUID; Fields map[string]interface{} }
type OrderDeleted struct{ OrderID uuid.UUID }
// ✅ BON : Events du domaine
type OrderPlaced struct{ /* ... */ }
type OrderConfirmed struct{ /* ... */ }
type OrderCancelled struct{ Reason string /* ... */ }
```
### 3. God Aggregate — contention massive
```go
// ❌ MAUVAIS : Un seul aggregate pour tout le catalogue produit
type ProductCatalog struct {
id uuid.UUID
products []Product // 100 000 produits — lock sur chaque modification !
}
// ✅ BON : Un aggregate par produit
type Product struct {
id uuid.UUID
name string
price int64
// ...
}
```
### 4. Appels externes dans Apply/Transition
```go
// ❌ MAUVAIS : Appel externe dans Apply
// Brise le replay des événements historiques
func (o *Order) applyOrderPlaced(e OrderPlaced) {
o.status = StatusPending
price, _ := http.Get("http://pricing-service/price/" + e.Items[0].ProductID.String()) // ❌ JAMAIS
}
// ✅ BON : Apply ne fait que muter l'état
func (o *Order) applyOrderPlaced(e OrderPlaced) {
o.id = e.OrderID
o.status = StatusPending
o.items = e.Items
o.totalCents = e.TotalCents // Calculé au moment de la commande
}
```
### 5. Event Store utilisé comme Event Bus
```go
// ❌ MAUVAIS : Les projections écoutent directement l'Event Store d'un autre service
// Crée un couplage invisible et difficile à tracer
projectionWorker.Subscribe("payment-service-event-store")
// ✅ BON : Publier des integration events via Outbox → Broker
// Chaque service a ses propres topics publics avec contrat versioned
broker.Subscribe("payment.payment-processed.v1", handler)
```
### 6. Retourner des données depuis les commandes
```go
// ❌ MAUVAIS : Le command handler retourne des données
// Viole CQRS — le write side ne doit pas retourner l'état
func (h *Handler) HandlePlaceOrder(ctx context.Context, cmd PlaceOrder) (*Order, error) {
// ...
return order, nil // ❌ CQRS violation
}
// ✅ BON : Le command handler retourne seulement une erreur
// Le client utilise le Content-Location pour lire l'état via le query side
func (h *Handler) HandlePlaceOrder(ctx context.Context, cmd PlaceOrder) error {
// ...
return nil
}
```
### 7. Projections non-idempotentes
```go
// ❌ MAUVAIS : Pas d'idempotence
func (p *Projection) HandleOrderPlaced(ctx context.Context, e OrderPlaced) error {
_, err := p.db.ExecContext(ctx,
"INSERT INTO order_summary (id, status) VALUES ($1, 'pending')",
e.OrderID,
)
// Si cet event est rejoué, ça plante avec une violation de contrainte unique
return err
}
// ✅ BON : Idempotent avec ON CONFLICT DO NOTHING
func (p *Projection) HandleOrderPlaced(ctx context.Context, e OrderPlaced) error {
_, err := p.db.ExecContext(ctx,
"INSERT INTO order_summary (id, status) VALUES ($1, 'pending') ON CONFLICT (id) DO NOTHING",
e.OrderID,
)
return err
}
```
### 8. Mauvaises boundaries d'aggregates
```go
// ❌ MAUVAIS : Order et Customer dans le même aggregate
// Nécessite de modifier deux "aggregates" dans une transaction
type OrderWithCustomer struct {
orderID uuid.UUID
customer Customer // Entité séparée avec son propre cycle de vie
}
// ✅ BON : Chaque aggregate a son propre cycle de vie
// Order référence Customer par ID seulement
type Order struct {
id uuid.UUID
customerID uuid.UUID // Référence par ID, pas par valeur
}
```
### 9. Snapshots manquants sur aggregates long-lived
```go
// ❌ MAUVAIS : Aggregate avec des années d'historique, jamais de snapshot
// Un compte bancaire avec 50 000 transactions va prendre des secondes à charger
// ✅ BON : Implémenter les snapshots dès que l'aggregate peut dépasser ~100 events
// Voir Section 7 pour l'implémentation complète
```
---
## Section 15 : Quand NE PAS utiliser CQRS/ES
### Critères d'exclusion
CQRS/Event Sourcing apporte une complexité significative. Ne l'utiliser que si les bénéfices justifient ce coût.
| Situation | Recommandation |
|-----------|---------------|
| CRUD simple, pas de règles métier complexes | CRUD standard avec ORM |
| Cohérence forte requise immédiatement | CQRS/ES avec projection synchrone OU architecture plus simple |
| Petite équipe, deadline serrée, pas d'expérience ES | Commencer simple, migrer plus tard |
| Contrainte de stockage forte | ES utilise 3-5x plus d'espace |
| Pas besoin d'audit/historique | Pas justifié |
| Prototype ou MVP | Trop complexe pour valider une idée |
### Signaux positifs pour CQRS/ES
Utiliser CQRS/ES si la réponse est oui à au moins 3 de ces questions :
```
□ Le domaine a des règles métier complexes avec de nombreux invariants ?
□ Un historique complet des changements est requis (audit, compliance) ?
□ Les read models et write models ont des besoins très différents ?
□ Plusieurs services doivent réagir aux changements d'état ?
□ La scalabilité lecture/écriture indépendante est nécessaire ?
□ Le métier parle naturellement en termes d'événements passés ?
□ L'équipe a l'expérience nécessaire pour maintenir l'architecture ?
□ Le système vivra plusieurs années et devra évoluer ?
```
### Alternative graduée
```
Niveau 1 : CRUD + triggers DB pour audit
↓ si règles métier complexes
Niveau 2 : Domain Model + Repository (sans ES)
↓ si historique/audit requis
Niveau 3 : CQRS sans Event Sourcing (séparation write/read model)
↓ si historique COMPLET requis + scalabilité
Niveau 4 : CQRS + Event Sourcing complet
```
---
## Section 16 : Monitoring production et alertes
### Métriques Prometheus
```go
package metrics
import "github.com/prometheus/client_golang/prometheus"
var (
// Compteur de writes dans l'Event Store
EventStoreWritesTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "event_store_writes_total",
Help: "Total number of event store write operations",
},
[]string{"aggregate_type", "status"}, // status: success, conflict, error
)
// Durée des writes
EventStoreWriteDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "event_store_write_duration_seconds",
Help: "Duration of event store write operations",
Buckets: []float64{.001, .005, .01, .025, .05, .1, .25, .5, 1},
},
[]string{"aggregate_type"},
)
// Lag des projections (en nombre d'events)
ProjectionLagEvents = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "projection_lag_events",
Help: "Number of events not yet processed by this projection",
},
[]string{"projection_name"},
)
// Lag des projections (en durée)
ProjectionLagDuration = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "projection_lag_seconds",
Help: "Age of the oldest unprocessed event for this projection",
},
[]string{"projection_name"},
)
// Conflits de concurrence optimiste
ConcurrencyConflictsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "optimistic_concurrency_conflicts_total",
Help: "Total number of optimistic concurrency conflicts",
},
[]string{"aggregate_type"},
)
// Outbox messages en attente
OutboxPendingMessages = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "outbox_pending_messages",
Help: "Number of messages pending in the outbox",
},
)
// Durée de replay des aggregates
AggregateReplayDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "aggregate_replay_duration_seconds",
Help: "Duration to replay an aggregate from its events",
Buckets: []float64{.001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5},
},
[]string{"aggregate_type"},
)
// Taille des aggregates (nombre d'events)
AggregateEventCount = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "aggregate_event_count",
Help: "Number of events per aggregate at load time",
Buckets: []float64{1, 5, 10, 25, 50, 100, 250, 500, 1000},
},
[]string{"aggregate_type"},
)
)
```
### Intégration des métriques dans le code
```go
// Dans le repository
func (r *InstrumentedRepository) Save(ctx context.Context, o *order.Order) error {
timer := prometheus.NewTimer(
metrics.EventStoreWriteDuration.WithLabelValues("order"),
)
defer timer.ObserveDuration()
err := r.base.Save(ctx, o)
status := "success"
if err != nil {
if errors.Is(err, order.ErrConflict) {
status = "conflict"
metrics.ConcurrencyConflictsTotal.WithLabelValues("order").Inc()
} else {
status = "error"
}
}
metrics.EventStoreWritesTotal.WithLabelValues("order", status).Inc()
return err
}
// Dans le projection worker — collecte du lag
func (w *ProjectionWorker) collectLagMetrics(ctx context.Context) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
}
var lag struct {
LagCount int64 `db:"lag_count"`
LagDuration sql.NullString `db:"lag_duration"`
}
err := w.db.GetContext(ctx, &lag, `
SELECT
MAX(e.id) - pc.last_event_id AS lag_count,
EXTRACT(EPOCH FROM (MAX(e.created_at) - (
SELECT created_at FROM es_event WHERE id = pc.last_event_id
))) AS lag_duration
FROM projection_checkpoint pc, es_event e
WHERE pc.projection_name = $1`,
w.projection.Name(),
)
if err != nil {
continue
}
metrics.ProjectionLagEvents.
WithLabelValues(w.projection.Name()).
Set(float64(lag.LagCount))
if lag.LagDuration.Valid {
duration, _ := strconv.ParseFloat(lag.LagDuration.String, 64)
metrics.ProjectionLagDuration.
WithLabelValues(w.projection.Name()).
Set(duration)
}
}
}
```
### Règles d'alerte Prometheus/Alertmanager
```yaml
# alerting-rules.yml
groups:
- name: event_sourcing
rules:
# Projection lag > 5 minutes : alerte critique
- alert: ProjectionLagTooHigh
expr: projection_lag_seconds > 300
for: 2m
labels:
severity: critical
annotations:
summary: "Projection {{ $labels.projection_name }} is lagging"
description: "Projection lag is {{ $value }}s (threshold: 300s)"
# Outbox > 1000 messages pending
- alert: OutboxBacklogTooLarge
expr: outbox_pending_messages > 1000
for: 5m
labels:
severity: warning
annotations:
summary: "Outbox backlog is growing"
description: "{{ $value }} messages pending in outbox"
# Taux de conflit > 5% sur les 5 dernières minutes
- alert: HighConcurrencyConflictRate
expr: |
rate(optimistic_concurrency_conflicts_total[5m])
/
rate(event_store_writes_total[5m])
> 0.05
for: 5m
labels:
severity: warning
annotations:
summary: "High optimistic concurrency conflict rate"
description: "Conflict rate is {{ $value | humanizePercentage }}"
# Replay lent > 1 seconde (besoin de snapshots ?)
- alert: SlowAggregateReplay
expr: |
histogram_quantile(0.95,
rate(aggregate_replay_duration_seconds_bucket[10m])
) > 1.0
for: 5m
labels:
severity: warning
annotations:
summary: "Aggregate replay is slow"
description: "P95 replay duration is {{ $value }}s — consider adding snapshots"
```
### Dashboard Grafana — requêtes clés
```
# Taux d'écriture dans l'Event Store
rate(event_store_writes_total{status="success"}[1m])
# Taux de conflits de concurrence
rate(optimistic_concurrency_conflicts_total[5m])
# Lag des projections (toutes)
max by(projection_name) (projection_lag_seconds)
# P99 durée de replay
histogram_quantile(0.99, rate(aggregate_replay_duration_seconds_bucket[5m]))
# Taille moyenne des aggregates
histogram_quantile(0.50, rate(aggregate_event_count_bucket[5m]))
```
---
## Section 17 : Checklist complète
### Checklist Aggregate
```
Architecture
□ Tous les champs de l'aggregate sont privés
□ Les quatre accesseurs obligatoires sont présents : ID(), Version(), UncommittedEvents(), ClearChanges()
□ L'interface Event a une méthode isEvent() non exportée
□ Une aggregate = une transaction boundary
Command Methods
□ Tous les invariants sont validés AVANT d'appeler raise()
□ Les erreurs retournées sont des erreurs métier claires
□ Aucun appel externe dans les command methods
raise() et transition()
□ raise() appelle transition() PUIS incrémente version PUIS append à changes
□ transition() ne fait que de la mutation d'état
□ transition() n'a pas de return error
□ transition() n'a aucun appel externe (DB, HTTP, temps actuel, etc.)
□ transition() est déterministe
Reconstruction
□ NewFromEvents(events) fonctionne correctement
□ NewFromSnapshot(snap, events) fonctionne correctement si snapshots implémentés
□ La version de l'aggregate après reconstruction est correcte
```
### Checklist Event Store
```
Schéma
□ Table es_aggregate avec contrainte d'unicité sur (id)
□ Table es_event avec contrainte UNIQUE(aggregate_id, version)
□ Index sur (aggregate_id, version ASC) pour les loads
□ Index sur (id ASC) pour les projections
□ Table es_snapshot si snapshots implémentés
□ Table projection_checkpoint
□ Table outbox si outbox pattern utilisé
□ Table processed_commands si idempotence requise
Save avec optimistic concurrency
□ BEGIN TX avant toute opération
□ UPDATE es_aggregate WHERE version = expected
□ Si 0 rows affectées → retourner ErrConflict
□ INSERT events dans la même TX
□ Contrainte UNIQUE(aggregate_id, version) comme filet de sécurité supplémentaire
□ COMMIT final
Load
□ SELECT WHERE aggregate_id ORDER BY version ASC
□ Gestion correcte du cas "aggregate not found" (0 events)
```
### Checklist Command Handlers
```
□ Commands retournent seulement error, jamais de données
□ Le client fournit l'UUID (pas généré côté serveur dans le handler)
□ Language du domaine dans les noms de commandes (CancelOrder, pas DeleteOrder)
□ Un handler par commande
□ Le pattern Update (load → fn → save) est utilisé pour les modifications
□ Cross-cutting concerns via décorateurs, pas dans le handler métier
□ HTTP: 204 + Content-Location header pour les commands réussies
```
### Checklist Projections
```
□ Toutes les projections sont idempotentes
□ INSERT ON CONFLICT DO NOTHING pour les OrderPlaced-like events
□ UPDATE avec guard (WHERE status != 'already-applied') pour idempotence
□ Le checkpoint est mis à jour dans la MÊME transaction que le read model
□ Les projections ne sont JAMAIS utilisées pour des décisions write-side
□ Un rebuild depuis zéro est possible et testé
□ Les événements inconnus sont ignorés silencieusement
```
### Checklist Outbox
```
□ Messages outbox écrits dans la MÊME transaction que les events
□ Relay utilise FOR UPDATE SKIP LOCKED pour la concurrence
□ Les integration events ont un contrat versioned distinct des domain events
□ La publication sur le broker est idempotente (ou le consumer l'est)
□ Monitoring de l'outbox backlog (alerte si > 1000)
```
### Checklist Idempotence
```
□ Tous les command handlers ont une clé d'idempotence
□ La table processed_commands est utilisée pour la déduplication
□ Les transactions compensatoires sont idempotentes
□ Les projections sont idempotentes (Section 8)
□ Le cleanup des vieilles clés est planifié
```
### Checklist Testing
```
□ Tests unitaires avec pattern Given/When/Then pour chaque command method
□ Cas de succès ET cas d'erreur testés
□ Test de concurrence optimiste (deux saves concurrents)
□ Test d'idempotence des projections (rejouer les mêmes events)
□ Test d'eventual consistency avec assert.Eventually
□ Test de reconstruction depuis snapshot si implémenté
□ Test d'upcasters si versioning implémenté
```
### Checklist Event Versioning
```
□ schema_version présent dans toutes les métadonnées
□ correlation_id et causation_id présents dans les métadonnées
□ Les nouveaux champs utilisent omitempty
□ Un upcaster est écrit pour chaque changement cassant
□ Les anciens events ne sont JAMAIS modifiés ni supprimés
□ Les upcasters sont testés avec des payloads historiques réels
```
### Checklist Monitoring
```
□ Métriques : event_store_writes_total, write_duration
□ Métriques : projection_lag (events et durée)
□ Métriques : concurrency_conflicts_total
□ Métriques : outbox_pending_messages
□ Métriques : aggregate_replay_duration
□ Alerte : projection lag > 5 minutes
□ Alerte : outbox backlog > 1000
□ Alerte : conflict rate > 5%
□ Alerte : P95 replay > 1 seconde
```
### Checklist Architecture générale
```
□ Un aggregate = une transaction (jamais deux dans la même TX DB)
□ Les projections ne sont pas utilisées pour les décisions write-side
□ Les domain events ne sont pas publiés tels quels à l'extérieur
□ Les integration events ont un contrat stable et versionné
□ La livraison est at-least-once → chaque consumer est idempotent
□ Les sagas ont des compensations idempotentes
□ Les snapshots sont implémentés pour les aggregates long-lived
□ Le rebuild de toutes les projections est testé et documenté
```
### Bibliothèques Go recommandées
```go
// Event Sourcing frameworks
// github.com/hallgren/eventsourcing — Simple, bien maintenu
// github.com/looplab/eventhorizon — Complet, opinionated
// github.com/thefabric-io/eventsourcing — Léger
// Messaging
// github.com/ThreeDotsLabs/watermill — Excellent, multi-broker
// Supporte : Kafka, NATS, RabbitMQ, PostgreSQL, etc.
// Workflow complexes (sagas)
// go.temporal.io/sdk — Temporal SDK Go
// PostgreSQL
// github.com/jmoiron/sqlx — Extension de database/sql
// github.com/jackc/pgx/v5 — Driver natif PostgreSQL
// github.com/jackc/pglogrepl — Logical replication (WAL outbox)
// UUID
// github.com/google/uuid — UUID v4/v7
// Tests
// github.com/stretchr/testify — Assert, require, mock
// github.com/testcontainers/testcontainers-go — DB de test réelle
// Métriques
// github.com/prometheus/client_golang — Prometheus
```
---
## Testing
### Tester un Command Handler en isolation
Le command handler dépend d'un event store — injecter une implémentation in-memory pour les tests unitaires.
```go
// InMemoryEventStore — implémentation de test
type InMemoryEventStore struct {
mu sync.Mutex
events map[string][]Event
}
func (s *InMemoryEventStore) Load(ctx context.Context, aggregateID string) ([]Event, error) {
s.mu.Lock()
defer s.mu.Unlock()
return s.events[aggregateID], nil
}
func (s *InMemoryEventStore) Append(ctx context.Context, aggregateID string, events []Event, expectedVersion int) error {
s.mu.Lock()
defer s.mu.Unlock()
current := s.events[aggregateID]
if len(current) != expectedVersion {
return ErrOptimisticConcurrency
}
s.events[aggregateID] = append(current, events...)
return nil
}
// Test du command handler
func TestPlaceOrderHandler(t *testing.T) {
store := &InMemoryEventStore{events: make(map[string][]Event)}
handler := NewPlaceOrderHandler(store)
cmd := PlaceOrder{
OrderID: "order-1",
CustomerID: "cust-42",
Items: []OrderItem{{ProductID: "p1", Qty: 2, Price: 1500}},
}
err := handler.Handle(context.Background(), cmd)
require.NoError(t, err)
events, _ := store.Load(context.Background(), "order-1")
require.Len(t, events, 1)
placed, ok := events[0].(OrderPlaced)
require.True(t, ok)
assert.Equal(t, "cust-42", placed.CustomerID)
}
```
### Tester un Projector
Les projectors sont des fonctions pures : event en entrée, état en sortie. Tester chaque transition.
```go
func TestOrderProjector_OrderPlaced(t *testing.T) {
projector := NewOrderProjector()
state := projector.InitialState()
event := OrderPlaced{
OrderID: "order-1",
CustomerID: "cust-42",
TotalCents: 3000,
PlacedAt: time.Now(),
}
newState := projector.Apply(state, event)
order, ok := newState.(OrderReadModel)
require.True(t, ok)
assert.Equal(t, "pending", order.Status)
assert.Equal(t, 3000, order.TotalCents)
}
// Tester une séquence d'événements (replay)
func TestOrderProjector_FullLifecycle(t *testing.T) {
projector := NewOrderProjector()
events := []Event{
OrderPlaced{OrderID: "o1", TotalCents: 1000},
OrderShipped{OrderID: "o1", TrackingCode: "TRACK-123"},
OrderDelivered{OrderID: "o1"},
}
state := projector.Replay(events)
order := state.(OrderReadModel)
assert.Equal(t, "delivered", order.Status)
assert.Equal(t, "TRACK-123", order.TrackingCode)
}
```
### Tester une Saga
Tester que la saga émet les bons commands en réponse aux events.
```go
func TestPaymentSaga_OrderPlaced_EmitsChargeCommand(t *testing.T) {
var capturedCommands []Command
dispatcher := func(cmd Command) error {
capturedCommands = append(capturedCommands, cmd)
return nil
}
saga := NewPaymentSaga(dispatcher)
err := saga.Handle(context.Background(), OrderPlaced{
OrderID: "order-1",
TotalCents: 5000,
PaymentMethodID: "pm-abc",
})
require.NoError(t, err)
require.Len(t, capturedCommands, 1)
charge, ok := capturedCommands[0].(ChargePaymentMethod)
require.True(t, ok)
assert.Equal(t, 5000, charge.AmountCents)
assert.Equal(t, "pm-abc", charge.PaymentMethodID)
}
```
### Tests d'intégration avec Testcontainers
Pour tester l'event store réel avec PostgreSQL.
```go
func TestEventStore_Integration(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
ctx := context.Background()
container, err := postgres.Run(ctx,
"postgres:16-alpine",
postgres.WithDatabase("testdb"),
postgres.WithUsername("test"),
postgres.WithPassword("test"),
)
require.NoError(t, err)
defer container.Terminate(ctx)
dsn, _ := container.ConnectionString(ctx, "sslmode=disable")
store := NewPostgresEventStore(dsn)
require.NoError(t, store.Migrate(ctx))
// Tester append + load
events := []Event{OrderPlaced{OrderID: "o1", TotalCents: 1000}}
err = store.Append(ctx, "o1", events, 0)
require.NoError(t, err)
loaded, err := store.Load(ctx, "o1")
require.NoError(t, err)
require.Len(t, loaded, 1)
}
```
### Tester la concurrence optimiste
```go
func TestEventStore_OptimisticConcurrency(t *testing.T) {
store := &InMemoryEventStore{events: make(map[string][]Event)}
// Premier append : version 0 → 1
err := store.Append(ctx, "agg-1", []Event{evt1}, 0)
require.NoError(t, err)
// Deuxième append concurrent avec version incorrecte
err = store.Append(ctx, "agg-1", []Event{evt2}, 0) // ← devrait être 1
require.ErrorIs(t, err, ErrOptimisticConcurrency)
}
```
---
*Last updated: 2025-03 — Revoir si : Go 1.24+ (nouvelles APIs itertools/rangefunc), PostgreSQL 17+ (changements logical replication), ou émergence d'un framework CQRS/ES Go dominant.*