← Contextes /
cqrs-event-sourcing-go.md 3343 lignes · 104 KB
Personnaliser Télécharger
# CLAUDE.md — CQRS & Event Sourcing en Go

> Contexte spécialisé pour Claude Code. Coller ce fichier à la racine du projet pour guider le travail sur une architecture CQRS/Event Sourcing en Go.

---

## Quand utiliser ce contexte
- ✅ Domaine métier complexe avec des règles qui évoluent (comptabilité, assurances, e-commerce avancé)
- ✅ Besoin d'audit complet : qui a fait quoi, quand, et avec quelles données
- ✅ Ratio lecture/écriture très déséquilibré nécessitant des read models dédiés
- ✅ Plusieurs équipes sur des bounded contexts distincts avec coordination via events
- ❌ Application CRUD simple (formulaire → BDD → liste) : CQRS ajoute une complexité injustifiée
- ❌ MVP ou prototype : la vélocité initiale prime, l'architecture peut évoluer après
- ❌ Équipe non familière avec les patterns DDD : la courbe d'apprentissage est élevée

---

## Section 1 : Concepts fondamentaux — modèle mental

### Vue d'ensemble

CQRS (Command Query Responsibility Segregation) et Event Sourcing sont deux patterns distincts qui se complètent naturellement mais peuvent être utilisés séparément.

**CQRS** : séparer les opérations d'écriture (Commands) des opérations de lecture (Queries). Le write model est optimisé pour les invariants métier ; le read model est optimisé pour l'affichage.

**Event Sourcing** : ne jamais stocker l'état courant d'un aggregate. Stocker la séquence d'événements qui ont conduit à cet état. L'état courant se reconstitue en rejouant les événements.

### Le flux complet

```
Client
  │
  ▼
Command (intent métier)
  │
  ▼
Command Handler
  ├── Charge l'Aggregate depuis l'Event Store
  ├── Appelle une méthode métier sur l'Aggregate
  │     ├── Valide les invariants
  │     └── Produit un ou plusieurs Domain Events
  └── Sauvegarde les nouveaux événements dans l'Event Store
                │
                ▼
        Event Store (PostgreSQL)
                │
                ├──────────────────────┐
                ▼                      ▼
        Projection Worker        Outbox Relay
        (read models)            (integration events)
                │                      │
                ▼                      ▼
        Read Database          Message Broker (Kafka/NATS)
                │                      │
                ▼                      ▼
        Query Handler          Other Services
```

### Vocabulaire essentiel

| Terme | Définition |
|-------|-----------|
| **Aggregate** | Cluster d'entités formant une unité de cohérence transactionnelle |
| **Command** | Intention d'agir — peut échouer. Ex: `PlaceOrder` |
| **Domain Event** | Fait passé immuable — ne peut pas échouer. Ex: `OrderPlaced` |
| **Event Store** | Base de données append-only des événements |
| **Projection** | Construction d'un read model à partir d'événements |
| **Snapshot** | Photo de l'état d'un aggregate à un instant T |
| **Saga/Process Manager** | Coordination de plusieurs aggregates sur plusieurs transactions |
| **Outbox** | Table intermédiaire pour publication fiable des events |
| **Optimistic Concurrency** | Détection de conflit via numéro de version |

### Règle d'or

Un aggregate = une transaction. Ne jamais modifier deux aggregates dans la même transaction de base de données. Si c'est nécessaire, les boundaries sont mal définies ou il faut une saga.

---

## Section 2 : Aggregate design — structure et invariants

### Structure de base

L'aggregate encapsule son état dans des champs privés. Seul l'aggregate lui-même peut modifier son état via les méthodes `Apply`/`Transition`.

```go
package order

import (
    "errors"
    "time"

    "github.com/google/uuid"
)

// Status représente l'état d'une commande
type Status string

const (
    StatusPending   Status = "pending"
    StatusConfirmed Status = "confirmed"
    StatusShipped   Status = "shipped"
    StatusCancelled Status = "cancelled"
)

// Order est l'aggregate racine pour les commandes
type Order struct {
    // Champs de base de l'aggregate — TOUJOURS présents
    id      uuid.UUID
    version int

    // Événements non encore persistés
    changes []Event

    // État métier — PRIVÉ, protège les invariants
    status     Status
    customerID uuid.UUID
    items      []OrderItem
    totalCents int64
    shippedAt  *time.Time
    cancelledAt *time.Time
}

// OrderItem représente une ligne de commande
type OrderItem struct {
    ProductID  uuid.UUID
    Quantity   int
    PriceCents int64
}
```

### Accesseurs obligatoires

Ces quatre accesseurs sont la "colle" entre l'aggregate et l'infrastructure (Event Store, Repository).

```go
// ID retourne l'identifiant de l'aggregate
func (o *Order) ID() uuid.UUID {
    return o.id
}

// Version retourne la version courante (pour l'optimistic concurrency)
func (o *Order) Version() int {
    return o.version
}

// UncommittedEvents retourne les événements produits mais pas encore sauvegardés
func (o *Order) UncommittedEvents() []Event {
    // Retourne une copie défensive
    result := make([]Event, len(o.changes))
    copy(result, o.changes)
    return result
}

// ClearChanges vide la liste des événements non persistés
// Appelé par le Repository APRÈS sauvegarde réussie
func (o *Order) ClearChanges() {
    o.changes = nil
}
```

### Interface Event

Tous les domain events implémentent cette interface. La méthode `isEvent()` non exportée empêche les types extérieurs de l'implémenter accidentellement.

```go
// Event est l'interface marqueur pour tous les domain events de ce package
type Event interface {
    isEvent()
}

// AggregateType retourne le type de l'aggregate pour l'Event Store
func AggregateType() string {
    return "order"
}
```

### Reconstruction depuis les événements

`NewFromEvents` rejoue tous les événements dans l'ordre pour reconstruire l'état courant. C'est le cœur de l'Event Sourcing.

```go
// NewFromEvents reconstruit un Order depuis sa séquence d'événements
// Utilisé par le Repository lors du chargement
func NewFromEvents(events []Event) (*Order, error) {
    if len(events) == 0 {
        return nil, errors.New("cannot reconstruct order from empty event list")
    }

    o := &Order{}
    for _, e := range events {
        o.transition(e)
        o.version++
    }
    return o, nil
}

// NewFromSnapshot reconstruit depuis un snapshot + événements ultérieurs
func NewFromSnapshot(snap Snapshot, events []Event) (*Order, error) {
    o := &Order{
        id:         snap.AggregateID,
        version:    snap.Version,
        status:     snap.Status,
        customerID: snap.CustomerID,
        items:      snap.Items,
        totalCents: snap.TotalCents,
    }
    for _, e := range events {
        o.transition(e)
        o.version++
    }
    return o, nil
}
```

---

## Section 3 : Aggregate — command methods et Apply/Transition

### Command methods : valider AVANT de lever l'événement

La règle absolue : valider **tous les invariants** avant d'appeler `raise()`. Si une validation échoue, retourner une erreur. Ne jamais lever un événement si l'opération ne peut pas aboutir.

```go
// Place crée une nouvelle commande
// C'est une "factory method" — elle crée l'aggregate
func Place(id uuid.UUID, customerID uuid.UUID, items []OrderItem) (*Order, error) {
    // Validation des invariants AVANT tout
    if id == uuid.Nil {
        return nil, errors.New("order id is required")
    }
    if customerID == uuid.Nil {
        return nil, errors.New("customer id is required")
    }
    if len(items) == 0 {
        return nil, errors.New("order must have at least one item")
    }

    var totalCents int64
    for _, item := range items {
        if item.Quantity <= 0 {
            return nil, fmt.Errorf("item %s: quantity must be positive", item.ProductID)
        }
        if item.PriceCents <= 0 {
            return nil, fmt.Errorf("item %s: price must be positive", item.ProductID)
        }
        totalCents += int64(item.Quantity) * item.PriceCents
    }

    o := &Order{}
    o.raise(OrderPlaced{
        OrderID:    id,
        CustomerID: customerID,
        Items:      items,
        TotalCents: totalCents,
        PlacedAt:   time.Now().UTC(),
    })
    return o, nil
}

// Confirm confirme la commande après paiement validé
func (o *Order) Confirm() error {
    if o.status != StatusPending {
        return fmt.Errorf("cannot confirm order in status %s, expected pending", o.status)
    }
    o.raise(OrderConfirmed{
        OrderID:     o.id,
        ConfirmedAt: time.Now().UTC(),
    })
    return nil
}

// Ship marque la commande comme expédiée
func (o *Order) Ship(trackingNumber string) error {
    if o.status != StatusConfirmed {
        return fmt.Errorf("cannot ship order in status %s, expected confirmed", o.status)
    }
    if trackingNumber == "" {
        return errors.New("tracking number is required")
    }
    o.raise(OrderShipped{
        OrderID:       o.id,
        TrackingNumber: trackingNumber,
        ShippedAt:     time.Now().UTC(),
    })
    return nil
}

// Cancel annule la commande
func (o *Order) Cancel(reason string) error {
    if o.status == StatusShipped {
        return errors.New("cannot cancel a shipped order")
    }
    if o.status == StatusCancelled {
        return errors.New("order is already cancelled")
    }
    if reason == "" {
        return errors.New("cancellation reason is required")
    }
    o.raise(OrderCancelled{
        OrderID:     o.id,
        Reason:      reason,
        CancelledAt: time.Now().UTC(),
    })
    return nil
}
```

### raise() : le mécanisme interne

`raise()` appelle `transition()` (mise à jour de l'état) puis ajoute l'événement à la liste des changements non persistés.

```go
// raise applique un événement à l'aggregate et l'ajoute aux changements
// JAMAIS appelé directement par du code extérieur
func (o *Order) raise(e Event) {
    o.transition(e)
    o.version++
    o.changes = append(o.changes, e)
}
```

### transition() / Apply : mutation d'état pure

`transition()` est la méthode la plus importante de l'Event Sourcing. Elle doit être :
- **Pure** : aucun appel externe, aucun effet de bord
- **Sans erreur** : si l'event a été produit, il doit pouvoir être appliqué
- **Déterministe** : rejouer les mêmes événements donne toujours le même état

```go
// transition applique un événement pour modifier l'état interne
// RÈGLES ABSOLUES :
//   1. Aucun appel externe (DB, HTTP, etc.)
//   2. Aucun retour d'erreur
//   3. Déterministe — même input, même output
//   4. Appelée lors de la création ET lors du replay
func (o *Order) transition(e Event) {
    switch ev := e.(type) {
    case OrderPlaced:
        o.id = ev.OrderID
        o.customerID = ev.CustomerID
        o.items = ev.Items
        o.totalCents = ev.TotalCents
        o.status = StatusPending

    case OrderConfirmed:
        o.status = StatusConfirmed

    case OrderShipped:
        o.status = StatusShipped
        now := ev.ShippedAt
        o.shippedAt = &now

    case OrderCancelled:
        o.status = StatusCancelled
        now := ev.CancelledAt
        o.cancelledAt = &now
    }
    // Événements inconnus ignorés silencieusement (forward compatibility)
}
```

### Domain Events : définition

```go
// OrderPlaced — commande passée par le client
type OrderPlaced struct {
    OrderID    uuid.UUID   `json:"order_id"`
    CustomerID uuid.UUID   `json:"customer_id"`
    Items      []OrderItem `json:"items"`
    TotalCents int64       `json:"total_cents"`
    PlacedAt   time.Time   `json:"placed_at"`
}
func (OrderPlaced) isEvent() {}

// OrderConfirmed — commande confirmée après paiement
type OrderConfirmed struct {
    OrderID     uuid.UUID `json:"order_id"`
    ConfirmedAt time.Time `json:"confirmed_at"`
}
func (OrderConfirmed) isEvent() {}

// OrderShipped — commande expédiée
type OrderShipped struct {
    OrderID        uuid.UUID `json:"order_id"`
    TrackingNumber string    `json:"tracking_number"`
    ShippedAt      time.Time `json:"shipped_at"`
}
func (OrderShipped) isEvent() {}

// OrderCancelled — commande annulée
type OrderCancelled struct {
    OrderID     uuid.UUID `json:"order_id"`
    Reason      string    `json:"reason"`
    CancelledAt time.Time `json:"cancelled_at"`
}
func (OrderCancelled) isEvent() {}
```

### Snapshot state

```go
// Snapshot représente l'état sérialisé d'un aggregate à un instant T
type Snapshot struct {
    AggregateID uuid.UUID   `json:"aggregate_id"`
    Version     int         `json:"version"`
    Status      Status      `json:"status"`
    CustomerID  uuid.UUID   `json:"customer_id"`
    Items       []OrderItem `json:"items"`
    TotalCents  int64       `json:"total_cents"`
    CreatedAt   time.Time   `json:"created_at"`
}

// TakeSnapshot crée un snapshot de l'état courant
func (o *Order) TakeSnapshot() Snapshot {
    return Snapshot{
        AggregateID: o.id,
        Version:     o.version,
        Status:      o.status,
        CustomerID:  o.customerID,
        Items:       o.items,
        TotalCents:  o.totalCents,
        CreatedAt:   time.Now().UTC(),
    }
}
```

---

## Section 4 : Command handlers — orchestration

### Principes des Command Handlers

- Les commandes retournent **uniquement** une erreur, jamais de données
- Le client fournit l'UUID de l'entité (ne pas générer côté serveur dans le handler)
- Le domain language prime : `CancelOrder` pas `DeleteOrder`, `PlaceOrder` pas `CreateOrder`
- Un handler par commande
- Les cross-cutting concerns (logging, métriques) via des décorateurs middleware

### Définition des commandes

```go
package command

import "github.com/google/uuid"

// PlaceOrder — intent de passer une commande
type PlaceOrder struct {
    OrderID    uuid.UUID
    CustomerID uuid.UUID
    Items      []OrderItem
}

// OrderItem dans le contexte d'une commande
type OrderItem struct {
    ProductID  uuid.UUID
    Quantity   int
    PriceCents int64
}

// ConfirmOrder — intent de confirmer une commande après paiement
type ConfirmOrder struct {
    OrderID uuid.UUID
}

// ShipOrder — intent d'expédier une commande
type ShipOrder struct {
    OrderID        uuid.UUID
    TrackingNumber string
}

// CancelOrder — intent d'annuler une commande
// Note: "Cancel" pas "Delete" — langage du domaine
type CancelOrder struct {
    OrderID uuid.UUID
    Reason  string
}
```

### Repository interface

```go
package order

import (
    "context"
    "github.com/google/uuid"
)

// Repository définit le contrat pour la persistance des aggregates Order
type Repository interface {
    // Load charge un aggregate par son ID
    // Retourne ErrNotFound si l'aggregate n'existe pas
    Load(ctx context.Context, id uuid.UUID) (*Order, error)

    // Save persiste les nouveaux événements de l'aggregate
    // Retourne ErrConflict en cas de conflit de concurrence optimiste
    Save(ctx context.Context, order *Order) error

    // Update est un helper: Load + fn + Save dans une seule opération
    Update(ctx context.Context, id uuid.UUID, fn func(*Order) error) error
}

var (
    ErrNotFound = errors.New("order not found")
    ErrConflict = errors.New("optimistic concurrency conflict")
)
```

### Implementation du pattern Update

```go
// Update dans le repository PostgreSQL
func (r *PostgresRepository) Update(
    ctx context.Context,
    id uuid.UUID,
    fn func(*Order) error,
) error {
    order, err := r.Load(ctx, id)
    if err != nil {
        return fmt.Errorf("load order %s: %w", id, err)
    }

    if err := fn(order); err != nil {
        return err // erreur métier, pas de wrap supplémentaire
    }

    if err := r.Save(ctx, order); err != nil {
        return fmt.Errorf("save order %s: %w", id, err)
    }

    return nil
}
```

### Command Handlers

```go
package commandhandler

import (
    "context"
    "fmt"

    "myapp/internal/order"
    "myapp/internal/command"
)

// OrderCommandHandler gère toutes les commandes liées aux orders
type OrderCommandHandler struct {
    repo order.Repository
}

func NewOrderCommandHandler(repo order.Repository) *OrderCommandHandler {
    return &OrderCommandHandler{repo: repo}
}

// HandlePlaceOrder traite la commande PlaceOrder
func (h *OrderCommandHandler) HandlePlaceOrder(ctx context.Context, cmd command.PlaceOrder) error {
    // Mapper les items de la commande vers le domaine
    items := make([]order.OrderItem, len(cmd.Items))
    for i, item := range cmd.Items {
        items[i] = order.OrderItem{
            ProductID:  item.ProductID,
            Quantity:   item.Quantity,
            PriceCents: item.PriceCents,
        }
    }

    // Créer l'aggregate (factory method)
    o, err := order.Place(cmd.OrderID, cmd.CustomerID, items)
    if err != nil {
        return fmt.Errorf("place order: %w", err)
    }

    // Sauvegarder dans l'Event Store
    if err := h.repo.Save(ctx, o); err != nil {
        return fmt.Errorf("save order: %w", err)
    }

    return nil
}

// HandleConfirmOrder traite la commande ConfirmOrder
func (h *OrderCommandHandler) HandleConfirmOrder(ctx context.Context, cmd command.ConfirmOrder) error {
    return h.repo.Update(ctx, cmd.OrderID, func(o *order.Order) error {
        return o.Confirm()
    })
}

// HandleShipOrder traite la commande ShipOrder
func (h *OrderCommandHandler) HandleShipOrder(ctx context.Context, cmd command.ShipOrder) error {
    return h.repo.Update(ctx, cmd.OrderID, func(o *order.Order) error {
        return o.Ship(cmd.TrackingNumber)
    })
}

// HandleCancelOrder traite la commande CancelOrder
func (h *OrderCommandHandler) HandleCancelOrder(ctx context.Context, cmd command.CancelOrder) error {
    return h.repo.Update(ctx, cmd.OrderID, func(o *order.Order) error {
        return o.Cancel(cmd.Reason)
    })
}
```

### Décorateurs middleware

```go
// LoggingMiddleware ajoute du logging à un command handler
type LoggingMiddleware struct {
    next   *OrderCommandHandler
    logger *slog.Logger
}

func (m *LoggingMiddleware) HandlePlaceOrder(ctx context.Context, cmd command.PlaceOrder) error {
    start := time.Now()
    err := m.next.HandlePlaceOrder(ctx, cmd)
    m.logger.Info("HandlePlaceOrder",
        "order_id", cmd.OrderID,
        "customer_id", cmd.CustomerID,
        "item_count", len(cmd.Items),
        "duration_ms", time.Since(start).Milliseconds(),
        "error", err,
    )
    return err
}

// MetricsMiddleware ajoute des métriques Prometheus
type MetricsMiddleware struct {
    next     *OrderCommandHandler
    commands *prometheus.CounterVec
    duration *prometheus.HistogramVec
}

func (m *MetricsMiddleware) HandlePlaceOrder(ctx context.Context, cmd command.PlaceOrder) error {
    timer := prometheus.NewTimer(m.duration.WithLabelValues("place_order"))
    defer timer.ObserveDuration()

    err := m.next.HandlePlaceOrder(ctx, cmd)
    status := "success"
    if err != nil {
        status = "error"
    }
    m.commands.WithLabelValues("place_order", status).Inc()
    return err
}
```

### HTTP Handler — REST convention

```go
// POST /orders → 204 No Content + Location header
// Convention REST pour CQRS : pas de body en réponse
func (h *HTTPHandler) PlaceOrder(w http.ResponseWriter, r *http.Request) {
    var req PlaceOrderRequest
    if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
        http.Error(w, "invalid request body", http.StatusBadRequest)
        return
    }

    // Le CLIENT fournit l'UUID — permet l'idempotence
    orderID, err := uuid.Parse(req.OrderID)
    if err != nil {
        // Générer si non fourni (moins idempotent)
        orderID = uuid.New()
    }

    cmd := command.PlaceOrder{
        OrderID:    orderID,
        CustomerID: req.CustomerID,
        Items:      mapItems(req.Items),
    }

    if err := h.handler.HandlePlaceOrder(r.Context(), cmd); err != nil {
        if errors.Is(err, order.ErrConflict) {
            http.Error(w, "conflict", http.StatusConflict)
            return
        }
        http.Error(w, "internal error", http.StatusInternalServerError)
        return
    }

    // 204 + Content-Location pour que le client sache où lire l'état
    w.Header().Set("Content-Location", fmt.Sprintf("/orders/%s", orderID))
    w.WriteHeader(http.StatusNoContent)
}
```

---

## Section 5 : Event Store — schéma PostgreSQL

### Schéma complet

```sql
-- Aggregate registry : une ligne par aggregate, sert à l'optimistic concurrency
CREATE TABLE es_aggregate (
    id             UUID        NOT NULL,
    aggregate_type VARCHAR(64) NOT NULL,
    version        INTEGER     NOT NULL DEFAULT 0,

    PRIMARY KEY (id)
);

-- Séquence d'événements : append-only
CREATE TABLE es_event (
    id              BIGSERIAL   NOT NULL,
    -- transaction_id permet de regrouper les events d'une même TX
    transaction_id  XID8        NOT NULL DEFAULT pg_current_xact_id(),
    aggregate_id    UUID        NOT NULL,
    -- version = numéro de séquence dans l'aggregate (1-based)
    version         INTEGER     NOT NULL,
    event_type      VARCHAR(128) NOT NULL,
    -- schema_version pour les upcasters de migration
    schema_version  SMALLINT    NOT NULL DEFAULT 1,
    -- payload contient les données de l'événement sérialisées en JSON
    payload         JSONB       NOT NULL,
    -- metadata : correlation_id, causation_id, user_id, etc.
    metadata        JSONB       NOT NULL DEFAULT '{}',
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),

    PRIMARY KEY (id),
    -- Contrainte d'unicité : un seul event par version par aggregate
    -- C'est le filet de sécurité de l'optimistic concurrency
    UNIQUE (aggregate_id, version),
    FOREIGN KEY (aggregate_id) REFERENCES es_aggregate(id)
);

-- Snapshots : état sérialisé d'un aggregate à un instant T
CREATE TABLE es_snapshot (
    aggregate_id   UUID        NOT NULL,
    aggregate_type VARCHAR(64) NOT NULL,
    version        INTEGER     NOT NULL,
    payload        JSONB       NOT NULL,
    created_at     TIMESTAMPTZ NOT NULL DEFAULT NOW(),

    PRIMARY KEY (aggregate_id, version)
);

-- Checkpoints : suivi de la progression des projections
CREATE TABLE projection_checkpoint (
    projection_name VARCHAR(128) NOT NULL,
    -- last_event_id = ID du dernier es_event traité
    last_event_id   BIGINT       NOT NULL DEFAULT 0,
    updated_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),

    PRIMARY KEY (projection_name)
);

-- Outbox : publication fiable d'integration events
CREATE TABLE outbox (
    id         BIGSERIAL    NOT NULL,
    topic      VARCHAR(128) NOT NULL,
    payload    JSONB        NOT NULL,
    state      VARCHAR(16)  NOT NULL DEFAULT 'pending',
    created_at TIMESTAMPTZ  NOT NULL DEFAULT NOW(),
    sent_at    TIMESTAMPTZ,

    PRIMARY KEY (id)
);

-- Déduplication des commandes pour l'idempotence
CREATE TABLE processed_commands (
    idempotency_key VARCHAR(128) NOT NULL,
    processed_at    TIMESTAMPTZ  NOT NULL DEFAULT NOW(),

    PRIMARY KEY (idempotency_key)
);

-- Index de performance
CREATE INDEX es_event_aggregate_id_version
    ON es_event (aggregate_id, version ASC);

CREATE INDEX es_event_id_for_projection
    ON es_event (id ASC);

CREATE INDEX es_event_type_for_rebuild
    ON es_event (event_type, id ASC);

CREATE INDEX outbox_pending
    ON outbox (state, id ASC)
    WHERE state = 'pending';

-- Vue utile pour le monitoring du lag des projections
CREATE VIEW projection_lag AS
SELECT
    pc.projection_name,
    pc.last_event_id,
    MAX(e.id) AS latest_event_id,
    MAX(e.id) - pc.last_event_id AS lag_count,
    MAX(e.created_at) - (
        SELECT created_at FROM es_event WHERE id = pc.last_event_id
    ) AS lag_duration
FROM projection_checkpoint pc
CROSS JOIN es_event e
GROUP BY pc.projection_name, pc.last_event_id;
```

### Types Go pour l'Event Store

```go
package eventstore

import (
    "time"
    "github.com/google/uuid"
)

// StoredEvent représente une ligne de la table es_event
type StoredEvent struct {
    ID            int64           `db:"id"`
    TransactionID uint64          `db:"transaction_id"`
    AggregateID   uuid.UUID       `db:"aggregate_id"`
    Version       int             `db:"version"`
    EventType     string          `db:"event_type"`
    SchemaVersion int             `db:"schema_version"`
    Payload       json.RawMessage `db:"payload"`
    Metadata      Metadata        `db:"metadata"`
    CreatedAt     time.Time       `db:"created_at"`
}

// Metadata contient les informations de traçabilité
type Metadata struct {
    CorrelationID string `json:"correlation_id,omitempty"`
    CausationID   string `json:"causation_id,omitempty"`
    UserID        string `json:"user_id,omitempty"`
    IPAddress     string `json:"ip_address,omitempty"`
}
```

---

## Section 6 : Event Store — save, load et optimistic concurrency

### Interface de l'Event Store

```go
package eventstore

import (
    "context"
    "github.com/google/uuid"
)

// EventStore est l'interface principale pour la persistance des événements
type EventStore interface {
    // Save persiste les nouveaux événements d'un aggregate
    // Retourne ErrConflict si la version attendue ne correspond pas
    Save(ctx context.Context, aggregateID uuid.UUID, aggregateType string, expectedVersion int, events []SerializedEvent) error

    // Load charge tous les événements d'un aggregate
    Load(ctx context.Context, aggregateID uuid.UUID) ([]StoredEvent, error)

    // LoadFrom charge les événements à partir d'une version donnée (pour les snapshots)
    LoadFrom(ctx context.Context, aggregateID uuid.UUID, fromVersion int) ([]StoredEvent, error)

    // LoadForProjection charge tous les événements depuis un ID donné (pour les projections)
    LoadForProjection(ctx context.Context, fromEventID int64, limit int) ([]StoredEvent, error)
}

// SerializedEvent représente un événement prêt à être persisté
type SerializedEvent struct {
    EventType     string
    SchemaVersion int
    Payload       []byte
    Metadata      Metadata
}

var ErrConflict  = errors.New("optimistic concurrency conflict")
var ErrNotFound  = errors.New("aggregate not found")
```

### Save avec optimistic concurrency

La clé de la concurrence optimiste : on tente de mettre à jour `es_aggregate` avec la version attendue. Si 0 lignes affectées, quelqu'un d'autre a modifié l'aggregate entre notre lecture et notre écriture.

```go
// Save persiste les événements avec contrôle de concurrence optimiste
func (s *PostgresEventStore) Save(
    ctx context.Context,
    aggregateID uuid.UUID,
    aggregateType string,
    expectedVersion int,
    events []SerializedEvent,
) error {
    if len(events) == 0 {
        return nil
    }

    return s.db.BeginTxx(ctx, nil, func(tx *sqlx.Tx) error {
        // Étape 1 : Insérer ou mettre à jour l'aggregate avec contrôle de version
        newVersion := expectedVersion + len(events)

        var rowsAffected int64
        var err error

        if expectedVersion == 0 {
            // Premier save : INSERT
            result, err := tx.ExecContext(ctx, `
                INSERT INTO es_aggregate (id, aggregate_type, version)
                VALUES ($1, $2, $3)
                ON CONFLICT (id) DO NOTHING`,
                aggregateID, aggregateType, newVersion,
            )
            if err != nil {
                return fmt.Errorf("insert aggregate: %w", err)
            }
            rowsAffected, err = result.RowsAffected()
        } else {
            // Update existant : version doit correspondre
            result, err := tx.ExecContext(ctx, `
                UPDATE es_aggregate
                SET version = $1
                WHERE id = $2 AND version = $3`,
                newVersion, aggregateID, expectedVersion,
            )
            if err != nil {
                return fmt.Errorf("update aggregate version: %w", err)
            }
            rowsAffected, err = result.RowsAffected()
        }

        if err != nil {
            return err
        }
        if rowsAffected == 0 {
            // Conflit de concurrence : quelqu'un a modifié l'aggregate entre-temps
            return ErrConflict
        }

        // Étape 2 : Insérer les événements
        for i, event := range events {
            version := expectedVersion + i + 1
            _, err := tx.ExecContext(ctx, `
                INSERT INTO es_event
                    (aggregate_id, version, event_type, schema_version, payload, metadata)
                VALUES
                    ($1, $2, $3, $4, $5, $6)`,
                aggregateID,
                version,
                event.EventType,
                event.SchemaVersion,
                event.Payload,
                event.Metadata,
            )
            if err != nil {
                // La contrainte UNIQUE(aggregate_id, version) peut aussi déclencher un conflit
                var pgErr *pgconn.PgError
                if errors.As(err, &pgErr) && pgErr.Code == "23505" {
                    return ErrConflict
                }
                return fmt.Errorf("insert event version %d: %w", version, err)
            }
        }

        return nil
    })
}
```

### Helper pour les transactions sqlx

```go
// BeginTxx exécute fn dans une transaction, rollback automatique si erreur
func (db *DB) BeginTxx(ctx context.Context, opts *sql.TxOptions, fn func(*sqlx.Tx) error) error {
    tx, err := db.sqlx.BeginTxx(ctx, opts)
    if err != nil {
        return fmt.Errorf("begin transaction: %w", err)
    }
    defer func() {
        if p := recover(); p != nil {
            _ = tx.Rollback()
            panic(p) // re-panic après rollback
        }
    }()

    if err := fn(tx); err != nil {
        _ = tx.Rollback()
        return err
    }

    return tx.Commit()
}
```

### Load

```go
// Load charge tous les événements d'un aggregate, triés par version ASC
func (s *PostgresEventStore) Load(
    ctx context.Context,
    aggregateID uuid.UUID,
) ([]StoredEvent, error) {
    return s.LoadFrom(ctx, aggregateID, 0)
}

// LoadFrom charge les événements à partir d'une version donnée (pour snapshots)
func (s *PostgresEventStore) LoadFrom(
    ctx context.Context,
    aggregateID uuid.UUID,
    fromVersion int,
) ([]StoredEvent, error) {
    var events []StoredEvent
    err := s.db.SelectContext(ctx, &events, `
        SELECT
            id, transaction_id, aggregate_id, version,
            event_type, schema_version, payload, metadata, created_at
        FROM es_event
        WHERE aggregate_id = $1
          AND version > $2
        ORDER BY version ASC`,
        aggregateID, fromVersion,
    )
    if err != nil {
        return nil, fmt.Errorf("load events for aggregate %s: %w", aggregateID, err)
    }
    return events, nil
}
```

### Repository avec sérialisation/désérialisation

```go
// PostgresRepository implémente order.Repository
type PostgresRepository struct {
    store      EventStore
    serializer EventSerializer
}

// Save sérialise les événements et les persiste
func (r *PostgresRepository) Save(ctx context.Context, o *order.Order) error {
    events := o.UncommittedEvents()
    if len(events) == 0 {
        return nil
    }

    serialized := make([]SerializedEvent, len(events))
    for i, e := range events {
        s, err := r.serializer.Serialize(e)
        if err != nil {
            return fmt.Errorf("serialize event %T: %w", e, err)
        }
        serialized[i] = s
    }

    // expectedVersion = version actuelle - nb d'événements non persistés
    expectedVersion := o.Version() - len(events)

    err := r.store.Save(ctx, o.ID(), order.AggregateType(), expectedVersion, serialized)
    if err != nil {
        return err // ErrConflict propagé tel quel
    }

    o.ClearChanges()
    return nil
}

// Load charge et désérialise les événements, reconstruit l'aggregate
func (r *PostgresRepository) Load(ctx context.Context, id uuid.UUID) (*order.Order, error) {
    stored, err := r.store.Load(ctx, id)
    if err != nil {
        return nil, fmt.Errorf("load events: %w", err)
    }
    if len(stored) == 0 {
        return nil, order.ErrNotFound
    }

    events := make([]order.Event, len(stored))
    for i, s := range stored {
        e, err := r.serializer.Deserialize(s)
        if err != nil {
            return nil, fmt.Errorf("deserialize event %s v%d: %w", s.EventType, s.Version, err)
        }
        events[i] = e.(order.Event)
    }

    return order.NewFromEvents(events)
}
```

### Sérialiseur

```go
// EventSerializer gère la sérialisation/désérialisation des événements
type EventSerializer struct {
    upcasters map[string][]Upcaster // pour le versioning
}

func (s *EventSerializer) Serialize(e order.Event) (SerializedEvent, error) {
    payload, err := json.Marshal(e)
    if err != nil {
        return SerializedEvent{}, fmt.Errorf("marshal event: %w", err)
    }
    return SerializedEvent{
        EventType:     eventType(e),
        SchemaVersion: 1,
        Payload:       payload,
    }, nil
}

func (s *EventSerializer) Deserialize(stored StoredEvent) (interface{}, error) {
    // Appliquer les upcasters si nécessaire
    payload := s.upcast(stored.EventType, stored.SchemaVersion, stored.Payload)

    switch stored.EventType {
    case "OrderPlaced":
        var e order.OrderPlaced
        return e, json.Unmarshal(payload, &e)
    case "OrderConfirmed":
        var e order.OrderConfirmed
        return e, json.Unmarshal(payload, &e)
    case "OrderShipped":
        var e order.OrderShipped
        return e, json.Unmarshal(payload, &e)
    case "OrderCancelled":
        var e order.OrderCancelled
        return e, json.Unmarshal(payload, &e)
    default:
        return nil, fmt.Errorf("unknown event type: %s", stored.EventType)
    }
}

func eventType(e order.Event) string {
    t := reflect.TypeOf(e)
    if t.Kind() == reflect.Ptr {
        t = t.Elem()
    }
    return t.Name()
}
```

---

## Section 7 : Snapshots — éviter le replay intégral

### Quand utiliser les snapshots

Les snapshots sont nécessaires quand un aggregate accumule de nombreux événements sur sa durée de vie. Un compte bancaire avec 10 ans d'historique pourrait avoir des milliers d'événements — rejouer tous ces événements à chaque chargement est trop coûteux.

**Règle pratique** : implémenter les snapshots quand un aggregate dépasse ~100 événements.

### Schéma et types

```go
// SnapshotStore gère la persistance des snapshots
type SnapshotStore interface {
    // Save sauvegarde un snapshot (idempotent : ON CONFLICT DO NOTHING)
    Save(ctx context.Context, snap StoredSnapshot) error

    // Load charge le dernier snapshot d'un aggregate
    // Retourne nil, nil si aucun snapshot n'existe
    Load(ctx context.Context, aggregateID uuid.UUID) (*StoredSnapshot, error)
}

// StoredSnapshot représente une ligne de es_snapshot
type StoredSnapshot struct {
    AggregateID   uuid.UUID       `db:"aggregate_id"`
    AggregateType string          `db:"aggregate_type"`
    Version       int             `db:"version"`
    Payload       json.RawMessage `db:"payload"`
    CreatedAt     time.Time       `db:"created_at"`
}
```

### Implémentation PostgreSQL

```go
// Save sauvegarde un snapshot de manière idempotente
func (s *PostgresSnapshotStore) Save(ctx context.Context, snap StoredSnapshot) error {
    _, err := s.db.ExecContext(ctx, `
        INSERT INTO es_snapshot (aggregate_id, aggregate_type, version, payload)
        VALUES ($1, $2, $3, $4)
        ON CONFLICT (aggregate_id, version) DO NOTHING`,
        snap.AggregateID,
        snap.AggregateType,
        snap.Version,
        snap.Payload,
    )
    if err != nil {
        return fmt.Errorf("save snapshot for %s v%d: %w", snap.AggregateID, snap.Version, err)
    }
    return nil
}

// Load charge le snapshot le plus récent
func (s *PostgresSnapshotStore) Load(ctx context.Context, aggregateID uuid.UUID) (*StoredSnapshot, error) {
    var snap StoredSnapshot
    err := s.db.GetContext(ctx, &snap, `
        SELECT aggregate_id, aggregate_type, version, payload, created_at
        FROM es_snapshot
        WHERE aggregate_id = $1
        ORDER BY version DESC
        LIMIT 1`,
        aggregateID,
    )
    if errors.Is(err, sql.ErrNoRows) {
        return nil, nil // Aucun snapshot, pas une erreur
    }
    if err != nil {
        return nil, fmt.Errorf("load snapshot for %s: %w", aggregateID, err)
    }
    return &snap, nil
}
```

### Repository avec support des snapshots

```go
const snapshotInterval = 100 // Prendre un snapshot tous les 100 événements

// SnapshotRepository wrap un Repository de base avec support des snapshots
type SnapshotRepository struct {
    base      *PostgresRepository
    snapStore SnapshotStore
}

// Load charge depuis snapshot + événements ultérieurs
func (r *SnapshotRepository) Load(ctx context.Context, id uuid.UUID) (*order.Order, error) {
    // Essayer de charger un snapshot
    snap, err := r.snapStore.Load(ctx, id)
    if err != nil {
        return nil, fmt.Errorf("load snapshot: %w", err)
    }

    if snap == nil {
        // Pas de snapshot : chargement complet depuis les événements
        return r.base.Load(ctx, id)
    }

    // Charger seulement les événements APRÈS le snapshot
    stored, err := r.base.store.LoadFrom(ctx, id, snap.Version)
    if err != nil {
        return nil, fmt.Errorf("load events after snapshot: %w", err)
    }

    // Désérialiser le snapshot
    var orderSnap order.Snapshot
    if err := json.Unmarshal(snap.Payload, &orderSnap); err != nil {
        return nil, fmt.Errorf("deserialize snapshot: %w", err)
    }

    // Désérialiser les événements ultérieurs
    events := make([]order.Event, len(stored))
    for i, s := range stored {
        e, err := r.base.serializer.Deserialize(s)
        if err != nil {
            return nil, fmt.Errorf("deserialize event: %w", err)
        }
        events[i] = e.(order.Event)
    }

    // Reconstruire depuis snapshot + événements
    return order.NewFromSnapshot(orderSnap, events)
}

// Save persiste les événements et prend un snapshot si nécessaire
func (r *SnapshotRepository) Save(ctx context.Context, o *order.Order) error {
    // Sauvegarder les événements normalement
    if err := r.base.Save(ctx, o); err != nil {
        return err
    }

    // Prendre un snapshot si on atteint l'intervalle
    if o.Version()%snapshotInterval == 0 {
        snap := o.TakeSnapshot()
        payload, err := json.Marshal(snap)
        if err != nil {
            // Non-fatal : loguer mais ne pas échouer
            slog.Warn("failed to marshal snapshot",
                "order_id", o.ID(),
                "version", o.Version(),
                "error", err,
            )
            return nil
        }

        stored := StoredSnapshot{
            AggregateID:   o.ID(),
            AggregateType: order.AggregateType(),
            Version:       o.Version(),
            Payload:       payload,
        }
        if err := r.snapStore.Save(ctx, stored); err != nil {
            // Non-fatal : loguer mais ne pas échouer
            slog.Warn("failed to save snapshot",
                "order_id", o.ID(),
                "version", o.Version(),
                "error", err,
            )
        }
    }

    return nil
}
```

---

## Section 8 : Projections — read models idempotents

### Principes

- Les projections construisent des read models optimisés pour les queries
- Elles sont **entièrement reconstituables** depuis l'Event Store — elles sont dispensables
- Chaque projection doit être **idempotente** : rejouer le même événement ne doit pas altérer l'état
- Ne jamais utiliser une projection pour des décisions côté write
- Sauvegarder le checkpoint dans la **même transaction** que la mise à jour du read model

### Définition d'une projection

```go
package projection

import (
    "context"
    "database/sql"

    "github.com/jmoiron/sqlx"
    "myapp/internal/eventstore"
    "myapp/internal/order"
)

// OrderSummaryProjection construit une vue résumée des commandes
type OrderSummaryProjection struct {
    db *sqlx.DB
}

// Name retourne le nom unique de cette projection (clé dans projection_checkpoint)
func (p *OrderSummaryProjection) Name() string {
    return "order_summary"
}

// Handle traite un événement stocké
func (p *OrderSummaryProjection) Handle(ctx context.Context, stored eventstore.StoredEvent) error {
    switch stored.EventType {
    case "OrderPlaced":
        return p.handleOrderPlaced(ctx, stored)
    case "OrderConfirmed":
        return p.handleOrderConfirmed(ctx, stored)
    case "OrderShipped":
        return p.handleOrderShipped(ctx, stored)
    case "OrderCancelled":
        return p.handleOrderCancelled(ctx, stored)
    }
    // Événements inconnus ignorés silencieusement
    return nil
}

// handleOrderPlaced traite un événement OrderPlaced de manière idempotente
func (p *OrderSummaryProjection) handleOrderPlaced(
    ctx context.Context,
    stored eventstore.StoredEvent,
) error {
    var e order.OrderPlaced
    if err := json.Unmarshal(stored.Payload, &e); err != nil {
        return fmt.Errorf("unmarshal OrderPlaced: %w", err)
    }

    // ON CONFLICT DO NOTHING : idempotent — si déjà inséré, ne rien faire
    _, err := p.db.ExecContext(ctx, `
        INSERT INTO order_summary
            (id, customer_id, status, total_cents, item_count, placed_at)
        VALUES
            ($1, $2, 'pending', $3, $4, $5)
        ON CONFLICT (id) DO NOTHING`,
        e.OrderID,
        e.CustomerID,
        e.TotalCents,
        len(e.Items),
        e.PlacedAt,
    )
    if err != nil {
        return fmt.Errorf("insert order_summary: %w", err)
    }
    return nil
}

// handleOrderShipped traite un événement OrderShipped
func (p *OrderSummaryProjection) handleOrderShipped(
    ctx context.Context,
    stored eventstore.StoredEvent,
) error {
    var e order.OrderShipped
    if err := json.Unmarshal(stored.Payload, &e); err != nil {
        return fmt.Errorf("unmarshal OrderShipped: %w", err)
    }

    // UPDATE idempotent : WHERE garantit qu'on ne met à jour qu'au bon moment
    _, err := p.db.ExecContext(ctx, `
        UPDATE order_summary
        SET
            status = 'shipped',
            tracking_number = $1,
            shipped_at = $2
        WHERE id = $3
          AND status != 'shipped'`,  -- idempotence via guard
        e.TrackingNumber,
        e.ShippedAt,
        e.OrderID,
    )
    if err != nil {
        return fmt.Errorf("update order_summary for ship: %w", err)
    }
    return nil
}
```

### Projection Worker

```go
// ProjectionWorker exécute une projection en lisant les événements en continu
type ProjectionWorker struct {
    store      EventStore
    projection Projection
    db         *sqlx.DB
    batchSize  int
    pollInterval time.Duration
}

// Projection est l'interface implémentée par toutes les projections
type Projection interface {
    Name() string
    Handle(ctx context.Context, stored StoredEvent) error
}

// Run lance le worker en boucle jusqu'à annulation du contexte
func (w *ProjectionWorker) Run(ctx context.Context) error {
    slog.Info("projection worker started", "projection", w.projection.Name())

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }

        processed, err := w.processBatch(ctx)
        if err != nil {
            slog.Error("projection batch failed",
                "projection", w.projection.Name(),
                "error", err,
            )
            // Attendre avant de réessayer pour éviter le busy-loop
            select {
            case <-ctx.Done():
                return ctx.Err()
            case <-time.After(5 * time.Second):
            }
            continue
        }

        if processed == 0 {
            // Rien à traiter : attendre le prochain poll
            select {
            case <-ctx.Done():
                return ctx.Err()
            case <-time.After(w.pollInterval):
            }
        }
    }
}

// processBatch traite un lot d'événements dans une transaction unique
func (w *ProjectionWorker) processBatch(ctx context.Context) (int, error) {
    // Lire le checkpoint courant
    var checkpoint int64
    err := w.db.GetContext(ctx, &checkpoint, `
        SELECT last_event_id
        FROM projection_checkpoint
        WHERE projection_name = $1`,
        w.projection.Name(),
    )
    if errors.Is(err, sql.ErrNoRows) {
        checkpoint = 0
    } else if err != nil {
        return 0, fmt.Errorf("read checkpoint: %w", err)
    }

    // Charger le prochain batch d'événements
    events, err := w.store.LoadForProjection(ctx, checkpoint, w.batchSize)
    if err != nil {
        return 0, fmt.Errorf("load events: %w", err)
    }
    if len(events) == 0 {
        return 0, nil
    }

    // Traiter tous les événements dans une seule transaction
    // Le checkpoint et le read model sont mis à jour atomiquement
    err = w.db.BeginTxx(ctx, nil, func(tx *sqlx.Tx) error {
        for _, event := range events {
            if err := w.projection.Handle(ctx, event); err != nil {
                return fmt.Errorf("handle event %d (%s): %w", event.ID, event.EventType, err)
            }
        }

        // Mettre à jour le checkpoint dans la MÊME transaction
        lastID := events[len(events)-1].ID
        _, err := tx.ExecContext(ctx, `
            INSERT INTO projection_checkpoint (projection_name, last_event_id)
            VALUES ($1, $2)
            ON CONFLICT (projection_name)
            DO UPDATE SET last_event_id = $2, updated_at = NOW()`,
            w.projection.Name(),
            lastID,
        )
        return err
    })

    if err != nil {
        return 0, fmt.Errorf("process batch: %w", err)
    }

    slog.Info("projection batch processed",
        "projection", w.projection.Name(),
        "count", len(events),
        "last_event_id", events[len(events)-1].ID,
    )

    return len(events), nil
}
```

### Rebuild d'une projection

```go
// Rebuild recrée entièrement une projection depuis le début
// Utile après un bug ou un changement de schéma
func (w *ProjectionWorker) Rebuild(ctx context.Context) error {
    slog.Info("rebuilding projection", "projection", w.projection.Name())

    // Réinitialiser le checkpoint
    _, err := w.db.ExecContext(ctx, `
        INSERT INTO projection_checkpoint (projection_name, last_event_id)
        VALUES ($1, 0)
        ON CONFLICT (projection_name)
        DO UPDATE SET last_event_id = 0`,
        w.projection.Name(),
    )
    if err != nil {
        return fmt.Errorf("reset checkpoint: %w", err)
    }

    // Optionnel : vider le read model avant de reconstruire
    // (selon si la projection est idempotente ou non)

    // Relancer le worker normalement
    return w.Run(ctx)
}
```

---

## Section 9 : Idempotence — déduplication des commandes

### Problème

Dans un système distribué, les commandes peuvent être reçues plusieurs fois (retry du client, at-least-once delivery, timeout réseau suivi d'une réémission). Chaque command handler doit être idempotent.

### Stratégie : clé d'idempotence

Le client génère une clé unique (UUID) par opération et la renvoie lors des retries. Le serveur mémorise les commandes déjà traitées.

```go
// IdempotencyKey est une clé unique fournie par le client
// Format recommandé : "resource-type:uuid" ou simplement un UUID v4
type IdempotencyKey string

// CommandWithIdempotency wrap une commande avec sa clé d'idempotence
type CommandWithIdempotency[T any] struct {
    Key     IdempotencyKey
    Command T
}
```

### Déduplication en base de données

```go
// IdempotencyStore gère la table processed_commands
type IdempotencyStore struct {
    db *sqlx.DB
}

// Claim tente de "réclamer" une clé d'idempotence
// Retourne true si la clé est nouvelle (commande à traiter)
// Retourne false si déjà traitée (réponse idempotente)
func (s *IdempotencyStore) Claim(ctx context.Context, key IdempotencyKey) (bool, error) {
    result, err := s.db.ExecContext(ctx, `
        INSERT INTO processed_commands (idempotency_key)
        VALUES ($1)
        ON CONFLICT (idempotency_key) DO NOTHING`,
        string(key),
    )
    if err != nil {
        return false, fmt.Errorf("claim idempotency key: %w", err)
    }

    rows, err := result.RowsAffected()
    if err != nil {
        return false, err
    }

    return rows > 0, nil // true = nouvelle commande
}
```

### Middleware d'idempotence pour les handlers

```go
// IdempotentCommandHandler wrap un handler avec déduplication
type IdempotentCommandHandler[T any] struct {
    inner      func(context.Context, T) error
    idempStore *IdempotencyStore
}

func (h *IdempotentCommandHandler[T]) Handle(
    ctx context.Context,
    cmd CommandWithIdempotency[T],
) error {
    // Tenter de réclamer la clé
    isNew, err := h.idempStore.Claim(ctx, cmd.Key)
    if err != nil {
        return fmt.Errorf("check idempotency: %w", err)
    }

    if !isNew {
        // Commande déjà traitée : réponse idempotente
        slog.Info("duplicate command ignored", "idempotency_key", cmd.Key)
        return nil
    }

    // Nouvelle commande : traiter normalement
    return h.inner(ctx, cmd.Command)
}
```

### Dans le HTTP Handler

```go
// POST /orders avec support de l'idempotence
func (h *HTTPHandler) PlaceOrder(w http.ResponseWriter, r *http.Request) {
    // La clé d'idempotence est dans le header HTTP
    // Convention: Idempotency-Key: <uuid>
    idempKey := r.Header.Get("Idempotency-Key")
    if idempKey == "" {
        // Générer une clé basée sur les données si non fournie
        // (moins robuste que la clé fournie par le client)
        idempKey = uuid.New().String()
    }

    var req PlaceOrderRequest
    if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
        http.Error(w, "invalid body", http.StatusBadRequest)
        return
    }

    cmd := command.PlaceOrder{
        OrderID:    req.OrderID,
        CustomerID: req.CustomerID,
        Items:      mapItems(req.Items),
    }

    wrapped := CommandWithIdempotency[command.PlaceOrder]{
        Key:     IdempotencyKey(idempKey),
        Command: cmd,
    }

    if err := h.idempHandler.Handle(r.Context(), wrapped); err != nil {
        handleCommandError(w, err)
        return
    }

    w.Header().Set("Content-Location", "/orders/"+req.OrderID.String())
    w.WriteHeader(http.StatusNoContent)
}
```

### Nettoyage des vieilles clés

```go
// CleanupOldKeys supprime les clés d'idempotence plus vieilles que duration
// À exécuter en tâche périodique (ex: toutes les heures)
func (s *IdempotencyStore) CleanupOldKeys(ctx context.Context, olderThan time.Duration) (int64, error) {
    result, err := s.db.ExecContext(ctx, `
        DELETE FROM processed_commands
        WHERE processed_at < $1`,
        time.Now().Add(-olderThan),
    )
    if err != nil {
        return 0, fmt.Errorf("cleanup idempotency keys: %w", err)
    }
    count, _ := result.RowsAffected()
    return count, nil
}
```

---

## Section 10 : Outbox pattern — publication fiable d'events

### Problème

Si on publie un event sur Kafka APRÈS avoir commité la transaction DB, il y a un risque de perte (crash entre les deux). L'Outbox pattern résout ce problème en écrivant l'event dans la même transaction que les données.

### Distinction événements domaine vs intégration

| Type | Scope | Audience | Format |
|------|-------|----------|--------|
| **Domain Event** | Aggregate interne | Même bounded context | Riche en données métier |
| **Integration Event** | Publié à l'extérieur | Autres services | Contrat stable, versioning strict |

Ne pas publier les domain events bruts à l'extérieur. Les transformer en integration events.

### Écriture dans l'Outbox (même TX que les événements)

```go
// OutboxEntry représente un message en attente de publication
type OutboxEntry struct {
    Topic   string
    Payload []byte
}

// SaveWithOutbox sauvegarde les événements ET les messages outbox dans la même TX
func (s *PostgresEventStore) SaveWithOutbox(
    ctx context.Context,
    aggregateID uuid.UUID,
    aggregateType string,
    expectedVersion int,
    events []SerializedEvent,
    outboxEntries []OutboxEntry,
) error {
    return s.db.BeginTxx(ctx, nil, func(tx *sqlx.Tx) error {
        // 1. Sauvegarder les événements (avec optimistic concurrency)
        if err := s.saveInTx(ctx, tx, aggregateID, aggregateType, expectedVersion, events); err != nil {
            return err
        }

        // 2. Écrire les messages outbox dans LA MÊME TRANSACTION
        for _, entry := range outboxEntries {
            _, err := tx.ExecContext(ctx, `
                INSERT INTO outbox (topic, payload)
                VALUES ($1, $2)`,
                entry.Topic,
                entry.Payload,
            )
            if err != nil {
                return fmt.Errorf("insert outbox entry for topic %s: %w", entry.Topic, err)
            }
        }

        return nil
    })
}
```

### Outbox Relay — publication vers le broker

```go
// OutboxRelay lit les messages pending et les publie sur le broker
type OutboxRelay struct {
    db        *sqlx.DB
    publisher MessagePublisher
    batchSize int
}

// MessagePublisher est l'interface vers le broker (Kafka, NATS, etc.)
type MessagePublisher interface {
    Publish(ctx context.Context, topic string, payload []byte) error
}

// Run exécute le relay en boucle
func (r *OutboxRelay) Run(ctx context.Context) error {
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }

        published, err := r.publishBatch(ctx)
        if err != nil {
            slog.Error("outbox relay batch failed", "error", err)
            select {
            case <-ctx.Done():
                return ctx.Err()
            case <-time.After(5 * time.Second):
            }
            continue
        }

        if published == 0 {
            select {
            case <-ctx.Done():
                return ctx.Err()
            case <-time.After(500 * time.Millisecond):
            }
        }
    }
}

// publishBatch publie un lot de messages outbox avec FOR UPDATE SKIP LOCKED
func (r *OutboxRelay) publishBatch(ctx context.Context) (int, error) {
    // FOR UPDATE SKIP LOCKED : permet plusieurs relay workers en parallèle
    // sans conflit — chaque worker prend des lignes différentes
    rows, err := r.db.QueryxContext(ctx, `
        SELECT id, topic, payload
        FROM outbox
        WHERE state = 'pending'
        ORDER BY id ASC
        LIMIT $1
        FOR UPDATE SKIP LOCKED`,
        r.batchSize,
    )
    if err != nil {
        return 0, fmt.Errorf("query outbox: %w", err)
    }
    defer rows.Close()

    type entry struct {
        ID      int64  `db:"id"`
        Topic   string `db:"topic"`
        Payload []byte `db:"payload"`
    }

    var entries []entry
    for rows.Next() {
        var e entry
        if err := rows.StructScan(&e); err != nil {
            return 0, fmt.Errorf("scan outbox row: %w", err)
        }
        entries = append(entries, e)
    }

    published := 0
    for _, e := range entries {
        // Publier sur le broker
        if err := r.publisher.Publish(ctx, e.Topic, e.Payload); err != nil {
            slog.Error("failed to publish outbox message",
                "id", e.ID,
                "topic", e.Topic,
                "error", err,
            )
            continue // Resilient execution : continuer avec les autres
        }

        // Marquer comme envoyé
        _, err := r.db.ExecContext(ctx, `
            UPDATE outbox
            SET state = 'sent', sent_at = NOW()
            WHERE id = $1`,
            e.ID,
        )
        if err != nil {
            slog.Error("failed to mark outbox message as sent",
                "id", e.ID,
                "error", err,
            )
            continue
        }
        published++
    }

    return published, nil
}
```

### Alternative : WAL-based outbox avec pglogrepl

Pour une latence minimale (< 100ms), utiliser PostgreSQL logical replication au lieu du polling :

```go
// WALOutboxListener écoute le WAL PostgreSQL pour une latence ultra-basse
// Nécessite : wal_level = logical dans postgresql.conf
// Utilise la librairie : github.com/jackc/pglogrepl

type WALOutboxListener struct {
    conn      *pgconn.PgConn
    publisher MessagePublisher
    slotName  string
}

// Note : implémentation simplifiée — voir pglogrepl pour le code complet
func (l *WALOutboxListener) Start(ctx context.Context) error {
    // Créer un slot de réplication logique si inexistant
    // Puis écouter les changements sur la table outbox
    // Beaucoup plus complexe à implémenter mais latence < 100ms vs polling
    // Recommandé seulement pour les cas nécessitant une très faible latence
    return nil
}
```

---

## Section 11 : Sagas — chorégraphie et orchestration

### Chorégraphie

Chaque service écoute les événements et réagit de manière autonome. Couplage faible mais difficile à tracer.

```
OrderService         PaymentService        InventoryService
     │                     │                     │
     │── OrderPlaced ──────▶│                     │
     │                      │── PaymentProcessed ─▶│
     │                      │                     │── InventoryReserved ──▶ (OrderService réagit)
     │◀── InventoryReserved ─────────────────────│
```

```go
// PaymentService écoute OrderPlaced et traite le paiement
type PaymentEventHandler struct {
    paymentService PaymentService
}

func (h *PaymentEventHandler) HandleOrderPlaced(ctx context.Context, e OrderPlaced) error {
    // Traiter le paiement
    if err := h.paymentService.ProcessPayment(ctx, e.OrderID, e.TotalCents); err != nil {
        // Publier un PaymentFailed pour que OrderService compense
        return h.publishPaymentFailed(ctx, e.OrderID, err.Error())
    }
    return h.publishPaymentProcessed(ctx, e.OrderID)
}
```

### Orchestration avec Process Manager

L'orchestrateur est un aggregate event-sourcé qui coordonne la séquence.

```go
// OrderFulfillmentSaga est le Process Manager pour l'expédition d'une commande
type OrderFulfillmentSaga struct {
    id      uuid.UUID
    version int
    changes []SagaEvent

    // État de la saga
    orderID      uuid.UUID
    status       SagaStatus
    retryCount   int
    compensation []CompensationStep
}

type SagaStatus string

const (
    SagaStarted    SagaStatus = "started"
    SagaCompleted  SagaStatus = "completed"
    SagaFailed     SagaStatus = "failed"
    SagaCompensating SagaStatus = "compensating"
)

// Start démarre la saga pour une commande
func StartOrderFulfillmentSaga(sagaID, orderID uuid.UUID) (*OrderFulfillmentSaga, error) {
    s := &OrderFulfillmentSaga{}
    s.raise(SagaStarted{
        SagaID:    sagaID,
        OrderID:   orderID,
        StartedAt: time.Now().UTC(),
    })
    return s, nil
}

// HandlePaymentProcessed réagit à un paiement réussi
func (s *OrderFulfillmentSaga) HandlePaymentProcessed(paymentID uuid.UUID) error {
    if s.status != SagaStarted {
        return fmt.Errorf("unexpected payment in status %s", s.status)
    }
    s.raise(PaymentConfirmedInSaga{
        SagaID:    s.id,
        PaymentID: paymentID,
    })
    return nil
}

// HandlePaymentFailed déclenche la compensation
func (s *OrderFulfillmentSaga) HandlePaymentFailed(reason string) error {
    s.raise(SagaCompensationStarted{
        SagaID: s.id,
        Reason: reason,
    })
    return nil
}

func (s *OrderFulfillmentSaga) transition(e SagaEvent) {
    switch ev := e.(type) {
    case SagaStarted:
        s.id = ev.SagaID
        s.orderID = ev.OrderID
        s.status = SagaStarted
    case PaymentConfirmedInSaga:
        s.status = SagaCompleted
    case SagaCompensationStarted:
        s.status = SagaCompensating
        // Enregistrer les étapes de compensation à exécuter
        s.compensation = []CompensationStep{
            {Type: "cancel_order", ResourceID: s.orderID},
        }
    }
}
```

### Compensation — transactions compensatoires

Toutes les transactions compensatoires doivent être **idempotentes**.

```go
// CompensationExecutor exécute les compensations
type CompensationExecutor struct {
    orderRepo  order.Repository
    // autres repos...
}

// Execute exécute une étape de compensation de manière idempotente
func (e *CompensationExecutor) Execute(ctx context.Context, step CompensationStep) error {
    switch step.Type {
    case "cancel_order":
        // Idempotent : si déjà annulée, l'erreur est ignorée
        err := e.orderRepo.Update(ctx, step.ResourceID, func(o *order.Order) error {
            err := o.Cancel("saga compensation")
            if errors.Is(err, order.ErrAlreadyCancelled) {
                return nil // Déjà compensé — OK
            }
            return err
        })
        return err
    default:
        return fmt.Errorf("unknown compensation type: %s", step.Type)
    }
}
```

### Note sur Temporal

Pour les sagas complexes (multi-step, longs timeouts, retry sophistiqués), considérer [Temporal](https://temporal.io) :

```go
// Avec Temporal, une saga devient un simple workflow Go
func OrderFulfillmentWorkflow(ctx workflow.Context, orderID string) error {
    // Temporal gère automatiquement :
    // - La persistance de l'état
    // - Les retries avec backoff
    // - Les timeouts
    // - La compensation en cas d'échec

    ao := workflow.ActivityOptions{
        StartToCloseTimeout: 10 * time.Minute,
        RetryPolicy: &temporal.RetryPolicy{
            MaximumAttempts: 3,
        },
    }
    ctx = workflow.WithActivityOptions(ctx, ao)

    var paymentResult PaymentResult
    if err := workflow.ExecuteActivity(ctx, ProcessPayment, orderID).Get(ctx, &paymentResult); err != nil {
        // Compensation automatique
        return workflow.ExecuteActivity(ctx, CancelOrder, orderID).Get(ctx, nil)
    }

    return workflow.ExecuteActivity(ctx, ReserveInventory, orderID).Get(ctx, nil)
}
```

---

## Section 12 : Event versioning — schéma faible et upcasters

### Stratégie 1 : schéma faible (préférée)

Les changements additifs ne nécessitent pas de versioning si on utilise `omitempty`. Le schéma JSON est naturellement extensible.

```go
// Version 1 originale
type OrderPlaced struct {
    OrderID    uuid.UUID   `json:"order_id"`
    CustomerID uuid.UUID   `json:"customer_id"`
    Items      []OrderItem `json:"items"`
    TotalCents int64       `json:"total_cents"`
    PlacedAt   time.Time   `json:"placed_at"`
}

// Version 2 : ajout d'un champ optionnel (backward compatible)
// Les anciens events qui ne contiennent pas PromoCode auront ""
// schema_version reste à 1 — aucun upcaster nécessaire
type OrderPlaced struct {
    OrderID    uuid.UUID   `json:"order_id"`
    CustomerID uuid.UUID   `json:"customer_id"`
    Items      []OrderItem `json:"items"`
    TotalCents int64       `json:"total_cents"`
    PlacedAt   time.Time   `json:"placed_at"`
    // Nouveau champ optionnel — backward compatible
    PromoCode  string      `json:"promo_code,omitempty"`
    // Autre ajout optionnel
    Channel    string      `json:"channel,omitempty"` // "web", "mobile", "api"
}
```

### Stratégie 2 : upcasters pour changements cassants

Quand un changement est incompatible (renommage de champ, changement de type), utiliser un upcaster.

```go
// Upcaster transforme un event d'une version vers la suivante
type Upcaster interface {
    EventType() string
    FromVersion() int
    ToVersion() int
    Upcast(payload json.RawMessage) (json.RawMessage, error)
}

// OrderPlacedV1ToV2Upcaster : migration du champ "amount" → "total_cents"
// (changement cassant : renommage + conversion centimes)
type OrderPlacedV1ToV2Upcaster struct{}

func (u *OrderPlacedV1ToV2Upcaster) EventType() string  { return "OrderPlaced" }
func (u *OrderPlacedV1ToV2Upcaster) FromVersion() int   { return 1 }
func (u *OrderPlacedV1ToV2Upcaster) ToVersion() int     { return 2 }

func (u *OrderPlacedV1ToV2Upcaster) Upcast(payload json.RawMessage) (json.RawMessage, error) {
    // Lire l'ancienne structure
    var v1 struct {
        OrderID    string    `json:"order_id"`
        CustomerID string    `json:"customer_id"`
        Amount     float64   `json:"amount"`      // ancien: montant en euros
        PlacedAt   time.Time `json:"placed_at"`
    }
    if err := json.Unmarshal(payload, &v1); err != nil {
        return nil, fmt.Errorf("unmarshal v1: %w", err)
    }

    // Transformer vers la nouvelle structure
    v2 := struct {
        OrderID    string    `json:"order_id"`
        CustomerID string    `json:"customer_id"`
        TotalCents int64     `json:"total_cents"` // nouveau: montant en centimes
        PlacedAt   time.Time `json:"placed_at"`
    }{
        OrderID:    v1.OrderID,
        CustomerID: v1.CustomerID,
        TotalCents: int64(v1.Amount * 100), // conversion
        PlacedAt:   v1.PlacedAt,
    }

    return json.Marshal(v2)
}

// UpcasterChain applique une chaîne d'upcasters
type UpcasterChain struct {
    upcasters map[string][]Upcaster // eventType → []Upcaster triés par version
}

func (c *UpcasterChain) Upcast(eventType string, fromVersion int, payload json.RawMessage) (json.RawMessage, int, error) {
    upcasters, ok := c.upcasters[eventType]
    if !ok {
        return payload, fromVersion, nil // Pas d'upcaster nécessaire
    }

    currentVersion := fromVersion
    currentPayload := payload

    for _, u := range upcasters {
        if u.FromVersion() < currentVersion {
            continue // Déjà appliqué
        }
        if u.FromVersion() != currentVersion {
            break // Gap dans la chaîne
        }

        var err error
        currentPayload, err = u.Upcast(currentPayload)
        if err != nil {
            return nil, 0, fmt.Errorf("upcast %s v%d→v%d: %w",
                eventType, u.FromVersion(), u.ToVersion(), err)
        }
        currentVersion = u.ToVersion()
    }

    return currentPayload, currentVersion, nil
}
```

### Métadonnées essentielles

Toujours inclure ces champs dans les métadonnées de chaque événement :

```go
// EventMetadata — traçabilité et corrélation
type EventMetadata struct {
    // CorrelationID : ID de la requête originale, propagé à travers tous les services
    CorrelationID string `json:"correlation_id"`
    // CausationID : ID de l'événement ou commande qui a causé cet événement
    CausationID string `json:"causation_id"`
    // SchemaVersion : version du schéma de l'événement pour les upcasters
    SchemaVersion int `json:"schema_version"`
    // UserID : qui a déclenché l'action
    UserID string `json:"user_id,omitempty"`
    // Timestamp : quand l'événement a été créé (précision nanoseconde)
    Timestamp time.Time `json:"timestamp"`
}

// RÈGLES ABSOLUES pour le versioning :
// 1. Ne JAMAIS modifier un événement déjà persisté
// 2. Ne JAMAIS supprimer des champs (utiliser omitempty)
// 3. Ne JAMAIS changer le sens d'un champ existant
// 4. TOUJOURS bumper schema_version pour les changements cassants
// 5. TOUJOURS écrire un upcaster pour chaque changement cassant
```

---

## Section 13 : Testing — Given/When/Then et intégration

### Tests unitaires d'aggregate avec Given/When/Then

```go
package order_test

import (
    "testing"
    "time"

    "github.com/google/uuid"
    "github.com/stretchr/testify/assert"
    "github.com/stretchr/testify/require"
    "myapp/internal/order"
)

// testCase structure le pattern Given/When/Then
type testCase struct {
    name    string
    given   []order.Event            // état initial (events rejoués)
    when    func(*order.Order) error // opération à tester
    then    []order.Event            // events attendus
    wantErr bool
}

// customerID et orderID réutilisables dans les tests
var (
    testOrderID    = uuid.MustParse("00000000-0000-0000-0000-000000000001")
    testCustomerID = uuid.MustParse("00000000-0000-0000-0000-000000000002")
    testProductID  = uuid.MustParse("00000000-0000-0000-0000-000000000003")
    testItems      = []order.OrderItem{{
        ProductID:  testProductID,
        Quantity:   2,
        PriceCents: 1500,
    }}
)

// orderPlacedEvent crée un OrderPlaced pour les tests
func orderPlacedEvent() order.OrderPlaced {
    return order.OrderPlaced{
        OrderID:    testOrderID,
        CustomerID: testCustomerID,
        Items:      testItems,
        TotalCents: 3000,
        PlacedAt:   time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC),
    }
}

func TestOrderConfirm(t *testing.T) {
    tests := []testCase{
        {
            name:  "confirm a pending order",
            given: []order.Event{orderPlacedEvent()},
            when: func(o *order.Order) error {
                return o.Confirm()
            },
            then: []order.Event{
                order.OrderConfirmed{OrderID: testOrderID},
            },
        },
        {
            name: "cannot confirm already confirmed order",
            given: []order.Event{
                orderPlacedEvent(),
                order.OrderConfirmed{OrderID: testOrderID},
            },
            when: func(o *order.Order) error {
                return o.Confirm()
            },
            wantErr: true,
        },
        {
            name: "cannot confirm cancelled order",
            given: []order.Event{
                orderPlacedEvent(),
                order.OrderCancelled{
                    OrderID: testOrderID,
                    Reason:  "customer request",
                },
            },
            when: func(o *order.Order) error {
                return o.Confirm()
            },
            wantErr: true,
        },
    }

    runTestCases(t, tests)
}

func TestOrderCancel(t *testing.T) {
    tests := []testCase{
        {
            name:  "cancel a pending order",
            given: []order.Event{orderPlacedEvent()},
            when: func(o *order.Order) error {
                return o.Cancel("customer changed mind")
            },
            then: []order.Event{
                order.OrderCancelled{
                    OrderID: testOrderID,
                    Reason:  "customer changed mind",
                },
            },
        },
        {
            name: "cannot cancel a shipped order",
            given: []order.Event{
                orderPlacedEvent(),
                order.OrderConfirmed{OrderID: testOrderID},
                order.OrderShipped{
                    OrderID:        testOrderID,
                    TrackingNumber: "TRACK123",
                },
            },
            when: func(o *order.Order) error {
                return o.Cancel("too late")
            },
            wantErr: true,
        },
        {
            name: "cancel requires non-empty reason",
            given: []order.Event{orderPlacedEvent()},
            when: func(o *order.Order) error {
                return o.Cancel("")
            },
            wantErr: true,
        },
    }

    runTestCases(t, tests)
}

// runTestCases exécute tous les cas de test avec le pattern Given/When/Then
func runTestCases(t *testing.T, tests []testCase) {
    t.Helper()
    for _, tt := range tests {
        t.Run(tt.name, func(t *testing.T) {
            // GIVEN : reconstruire l'aggregate depuis les événements initiaux
            var agg *order.Order
            var err error

            if len(tt.given) > 0 {
                agg, err = order.NewFromEvents(tt.given)
                require.NoError(t, err, "failed to reconstruct aggregate from given events")
            } else {
                agg = &order.Order{} // aggregate vide pour les factory methods
            }

            // WHEN : exécuter l'opération
            err = tt.when(agg)

            // THEN : vérifier le résultat
            if tt.wantErr {
                assert.Error(t, err)
                return
            }
            require.NoError(t, err)

            // Vérifier les événements produits
            uncommitted := agg.UncommittedEvents()
            require.Len(t, uncommitted, len(tt.then),
                "expected %d events, got %d", len(tt.then), len(uncommitted))

            for i, expected := range tt.then {
                actual := uncommitted[i]
                // Comparer le type
                assert.IsType(t, expected, actual,
                    "event[%d]: expected type %T, got %T", i, expected, actual)
                // Comparer les champs importants (sans les timestamps)
                assertEventEquals(t, expected, actual, i)
            }
        })
    }
}

// assertEventEquals compare deux events en ignorant les timestamps auto-générés
func assertEventEquals(t *testing.T, expected, actual order.Event, index int) {
    t.Helper()
    switch exp := expected.(type) {
    case order.OrderConfirmed:
        act, ok := actual.(order.OrderConfirmed)
        require.True(t, ok)
        assert.Equal(t, exp.OrderID, act.OrderID, "event[%d]: OrderID mismatch", index)

    case order.OrderCancelled:
        act, ok := actual.(order.OrderCancelled)
        require.True(t, ok)
        assert.Equal(t, exp.OrderID, act.OrderID)
        assert.Equal(t, exp.Reason, act.Reason)

    case order.OrderShipped:
        act, ok := actual.(order.OrderShipped)
        require.True(t, ok)
        assert.Equal(t, exp.OrderID, act.OrderID)
        assert.Equal(t, exp.TrackingNumber, act.TrackingNumber)
    }
}
```

### Tests d'intégration — Event Store et concurrence optimiste

```go
package integration_test

import (
    "context"
    "sync"
    "testing"

    "github.com/stretchr/testify/assert"
    "github.com/stretchr/testify/require"
)

// TestOptimisticConcurrencyConflict vérifie que deux saves concurrents conflictuels
// retournent bien ErrConflict
func TestOptimisticConcurrencyConflict(t *testing.T) {
    ctx := context.Background()
    db := setupTestDB(t) // helper qui crée une DB de test
    store := eventstore.NewPostgresEventStore(db)

    orderID := uuid.New()

    // Créer un aggregate initial
    event1 := SerializedEvent{EventType: "OrderPlaced", Payload: []byte(`{}`)}
    err := store.Save(ctx, orderID, "order", 0, []SerializedEvent{event1})
    require.NoError(t, err)

    // Simuler deux modifications concurrentes de la version 1
    var wg sync.WaitGroup
    results := make([]error, 2)

    for i := 0; i < 2; i++ {
        wg.Add(1)
        go func(idx int) {
            defer wg.Done()
            event := SerializedEvent{
                EventType: "OrderConfirmed",
                Payload:   []byte(`{}`),
            }
            // Les deux tentent de passer de version 1 → 2
            results[idx] = store.Save(ctx, orderID, "order", 1, []SerializedEvent{event})
        }(i)
    }

    wg.Wait()

    // Exactement l'un doit réussir, l'autre doit avoir un conflit
    successes := 0
    conflicts := 0
    for _, err := range results {
        if err == nil {
            successes++
        } else if errors.Is(err, eventstore.ErrConflict) {
            conflicts++
        }
    }

    assert.Equal(t, 1, successes, "exactly one save should succeed")
    assert.Equal(t, 1, conflicts, "exactly one save should conflict")
}

// TestProjectionIdempotency vérifie qu'une projection est idempotente
func TestProjectionIdempotency(t *testing.T) {
    ctx := context.Background()
    db := setupTestDB(t)
    proj := projection.NewOrderSummaryProjection(db)

    orderID := uuid.New()
    event := eventstore.StoredEvent{
        ID:        1,
        EventType: "OrderPlaced",
        Payload:   mustMarshal(order.OrderPlaced{OrderID: orderID, TotalCents: 3000}),
    }

    // Traiter le même événement 3 fois
    for i := 0; i < 3; i++ {
        err := proj.Handle(ctx, event)
        require.NoError(t, err, "handle attempt %d failed", i+1)
    }

    // Vérifier qu'il n'y a qu'une seule ligne dans la projection
    var count int
    err := db.GetContext(ctx, &count,
        "SELECT COUNT(*) FROM order_summary WHERE id = $1", orderID)
    require.NoError(t, err)
    assert.Equal(t, 1, count, "projection should be idempotent")
}
```

### Tests avec eventual consistency

```go
// TestEventualConsistency vérifie qu'une projection est éventuellement cohérente
func TestEventualConsistency(t *testing.T) {
    ctx := context.Background()

    // Setup : event store + projection worker
    db := setupTestDB(t)
    store := eventstore.NewPostgresEventStore(db)
    proj := projection.NewOrderSummaryProjection(db)
    worker := projection.NewWorker(store, proj, db)

    // Lancer le worker en arrière-plan
    workerCtx, cancel := context.WithCancel(ctx)
    defer cancel()

    go func() {
        if err := worker.Run(workerCtx); err != nil && !errors.Is(err, context.Canceled) {
            t.Errorf("worker failed: %v", err)
        }
    }()

    // Créer une commande
    orderID := uuid.New()
    o, err := order.Place(orderID, uuid.New(), testItems)
    require.NoError(t, err)

    repo := NewSnapshotRepository(store, db)
    err = repo.Save(ctx, o)
    require.NoError(t, err)

    // Attendre que la projection soit mise à jour (eventual consistency)
    assert.Eventually(t, func() bool {
        var count int
        err := db.GetContext(ctx, &count,
            "SELECT COUNT(*) FROM order_summary WHERE id = $1 AND status = 'pending'",
            orderID,
        )
        return err == nil && count == 1
    }, 5*time.Second, 100*time.Millisecond,
        "projection should eventually reflect the placed order",
    )
}
```

---

## Section 14 : Anti-patterns — ce qu'il ne faut jamais faire

### 1. Property Sourcing — trop granulaire

```go
// ❌ MAUVAIS : Property Sourcing
// Chaque modification de champ = un event séparé
// Résultat : des milliers d'events sans signification métier
type OrderCustomerIDChanged struct {
    OrderID    uuid.UUID
    CustomerID uuid.UUID
}
type OrderTotalAmountChanged struct {
    OrderID    uuid.UUID
    TotalCents int64
}

// ✅ BON : Event métier riche
// L'event capture l'intention, pas le changement de propriété
type OrderPlaced struct {
    OrderID    uuid.UUID
    CustomerID uuid.UUID
    Items      []OrderItem
    TotalCents int64
    PlacedAt   time.Time
}
```

### 2. CRUD Events — sans signification métier

```go
// ❌ MAUVAIS : Events CRUD
type OrderCreated struct{ OrderID uuid.UUID }
type OrderUpdated struct{ OrderID uuid.UUID; Fields map[string]interface{} }
type OrderDeleted struct{ OrderID uuid.UUID }

// ✅ BON : Events du domaine
type OrderPlaced struct{ /* ... */ }
type OrderConfirmed struct{ /* ... */ }
type OrderCancelled struct{ Reason string /* ... */ }
```

### 3. God Aggregate — contention massive

```go
// ❌ MAUVAIS : Un seul aggregate pour tout le catalogue produit
type ProductCatalog struct {
    id       uuid.UUID
    products []Product // 100 000 produits — lock sur chaque modification !
}

// ✅ BON : Un aggregate par produit
type Product struct {
    id    uuid.UUID
    name  string
    price int64
    // ...
}
```

### 4. Appels externes dans Apply/Transition

```go
// ❌ MAUVAIS : Appel externe dans Apply
// Brise le replay des événements historiques
func (o *Order) applyOrderPlaced(e OrderPlaced) {
    o.status = StatusPending
    price, _ := http.Get("http://pricing-service/price/" + e.Items[0].ProductID.String()) // ❌ JAMAIS
}

// ✅ BON : Apply ne fait que muter l'état
func (o *Order) applyOrderPlaced(e OrderPlaced) {
    o.id = e.OrderID
    o.status = StatusPending
    o.items = e.Items
    o.totalCents = e.TotalCents // Calculé au moment de la commande
}
```

### 5. Event Store utilisé comme Event Bus

```go
// ❌ MAUVAIS : Les projections écoutent directement l'Event Store d'un autre service
// Crée un couplage invisible et difficile à tracer
projectionWorker.Subscribe("payment-service-event-store")

// ✅ BON : Publier des integration events via Outbox → Broker
// Chaque service a ses propres topics publics avec contrat versioned
broker.Subscribe("payment.payment-processed.v1", handler)
```

### 6. Retourner des données depuis les commandes

```go
// ❌ MAUVAIS : Le command handler retourne des données
// Viole CQRS — le write side ne doit pas retourner l'état
func (h *Handler) HandlePlaceOrder(ctx context.Context, cmd PlaceOrder) (*Order, error) {
    // ...
    return order, nil // ❌ CQRS violation
}

// ✅ BON : Le command handler retourne seulement une erreur
// Le client utilise le Content-Location pour lire l'état via le query side
func (h *Handler) HandlePlaceOrder(ctx context.Context, cmd PlaceOrder) error {
    // ...
    return nil
}
```

### 7. Projections non-idempotentes

```go
// ❌ MAUVAIS : Pas d'idempotence
func (p *Projection) HandleOrderPlaced(ctx context.Context, e OrderPlaced) error {
    _, err := p.db.ExecContext(ctx,
        "INSERT INTO order_summary (id, status) VALUES ($1, 'pending')",
        e.OrderID,
    )
    // Si cet event est rejoué, ça plante avec une violation de contrainte unique
    return err
}

// ✅ BON : Idempotent avec ON CONFLICT DO NOTHING
func (p *Projection) HandleOrderPlaced(ctx context.Context, e OrderPlaced) error {
    _, err := p.db.ExecContext(ctx,
        "INSERT INTO order_summary (id, status) VALUES ($1, 'pending') ON CONFLICT (id) DO NOTHING",
        e.OrderID,
    )
    return err
}
```

### 8. Mauvaises boundaries d'aggregates

```go
// ❌ MAUVAIS : Order et Customer dans le même aggregate
// Nécessite de modifier deux "aggregates" dans une transaction
type OrderWithCustomer struct {
    orderID    uuid.UUID
    customer   Customer // Entité séparée avec son propre cycle de vie
}

// ✅ BON : Chaque aggregate a son propre cycle de vie
// Order référence Customer par ID seulement
type Order struct {
    id         uuid.UUID
    customerID uuid.UUID // Référence par ID, pas par valeur
}
```

### 9. Snapshots manquants sur aggregates long-lived

```go
// ❌ MAUVAIS : Aggregate avec des années d'historique, jamais de snapshot
// Un compte bancaire avec 50 000 transactions va prendre des secondes à charger

// ✅ BON : Implémenter les snapshots dès que l'aggregate peut dépasser ~100 events
// Voir Section 7 pour l'implémentation complète
```

---

## Section 15 : Quand NE PAS utiliser CQRS/ES

### Critères d'exclusion

CQRS/Event Sourcing apporte une complexité significative. Ne l'utiliser que si les bénéfices justifient ce coût.

| Situation | Recommandation |
|-----------|---------------|
| CRUD simple, pas de règles métier complexes | CRUD standard avec ORM |
| Cohérence forte requise immédiatement | CQRS/ES avec projection synchrone OU architecture plus simple |
| Petite équipe, deadline serrée, pas d'expérience ES | Commencer simple, migrer plus tard |
| Contrainte de stockage forte | ES utilise 3-5x plus d'espace |
| Pas besoin d'audit/historique | Pas justifié |
| Prototype ou MVP | Trop complexe pour valider une idée |

### Signaux positifs pour CQRS/ES

Utiliser CQRS/ES si la réponse est oui à au moins 3 de ces questions :

```
□ Le domaine a des règles métier complexes avec de nombreux invariants ?
□ Un historique complet des changements est requis (audit, compliance) ?
□ Les read models et write models ont des besoins très différents ?
□ Plusieurs services doivent réagir aux changements d'état ?
□ La scalabilité lecture/écriture indépendante est nécessaire ?
□ Le métier parle naturellement en termes d'événements passés ?
□ L'équipe a l'expérience nécessaire pour maintenir l'architecture ?
□ Le système vivra plusieurs années et devra évoluer ?
```

### Alternative graduée

```
Niveau 1 : CRUD + triggers DB pour audit
    ↓ si règles métier complexes
Niveau 2 : Domain Model + Repository (sans ES)
    ↓ si historique/audit requis
Niveau 3 : CQRS sans Event Sourcing (séparation write/read model)
    ↓ si historique COMPLET requis + scalabilité
Niveau 4 : CQRS + Event Sourcing complet
```

---

## Section 16 : Monitoring production et alertes

### Métriques Prometheus

```go
package metrics

import "github.com/prometheus/client_golang/prometheus"

var (
    // Compteur de writes dans l'Event Store
    EventStoreWritesTotal = prometheus.NewCounterVec(
        prometheus.CounterOpts{
            Name: "event_store_writes_total",
            Help: "Total number of event store write operations",
        },
        []string{"aggregate_type", "status"}, // status: success, conflict, error
    )

    // Durée des writes
    EventStoreWriteDuration = prometheus.NewHistogramVec(
        prometheus.HistogramOpts{
            Name:    "event_store_write_duration_seconds",
            Help:    "Duration of event store write operations",
            Buckets: []float64{.001, .005, .01, .025, .05, .1, .25, .5, 1},
        },
        []string{"aggregate_type"},
    )

    // Lag des projections (en nombre d'events)
    ProjectionLagEvents = prometheus.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "projection_lag_events",
            Help: "Number of events not yet processed by this projection",
        },
        []string{"projection_name"},
    )

    // Lag des projections (en durée)
    ProjectionLagDuration = prometheus.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "projection_lag_seconds",
            Help: "Age of the oldest unprocessed event for this projection",
        },
        []string{"projection_name"},
    )

    // Conflits de concurrence optimiste
    ConcurrencyConflictsTotal = prometheus.NewCounterVec(
        prometheus.CounterOpts{
            Name: "optimistic_concurrency_conflicts_total",
            Help: "Total number of optimistic concurrency conflicts",
        },
        []string{"aggregate_type"},
    )

    // Outbox messages en attente
    OutboxPendingMessages = prometheus.NewGauge(
        prometheus.GaugeOpts{
            Name: "outbox_pending_messages",
            Help: "Number of messages pending in the outbox",
        },
    )

    // Durée de replay des aggregates
    AggregateReplayDuration = prometheus.NewHistogramVec(
        prometheus.HistogramOpts{
            Name:    "aggregate_replay_duration_seconds",
            Help:    "Duration to replay an aggregate from its events",
            Buckets: []float64{.001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5},
        },
        []string{"aggregate_type"},
    )

    // Taille des aggregates (nombre d'events)
    AggregateEventCount = prometheus.NewHistogramVec(
        prometheus.HistogramOpts{
            Name:    "aggregate_event_count",
            Help:    "Number of events per aggregate at load time",
            Buckets: []float64{1, 5, 10, 25, 50, 100, 250, 500, 1000},
        },
        []string{"aggregate_type"},
    )
)
```

### Intégration des métriques dans le code

```go
// Dans le repository
func (r *InstrumentedRepository) Save(ctx context.Context, o *order.Order) error {
    timer := prometheus.NewTimer(
        metrics.EventStoreWriteDuration.WithLabelValues("order"),
    )
    defer timer.ObserveDuration()

    err := r.base.Save(ctx, o)

    status := "success"
    if err != nil {
        if errors.Is(err, order.ErrConflict) {
            status = "conflict"
            metrics.ConcurrencyConflictsTotal.WithLabelValues("order").Inc()
        } else {
            status = "error"
        }
    }
    metrics.EventStoreWritesTotal.WithLabelValues("order", status).Inc()

    return err
}

// Dans le projection worker — collecte du lag
func (w *ProjectionWorker) collectLagMetrics(ctx context.Context) {
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
        }

        var lag struct {
            LagCount    int64          `db:"lag_count"`
            LagDuration sql.NullString `db:"lag_duration"`
        }
        err := w.db.GetContext(ctx, &lag, `
            SELECT
                MAX(e.id) - pc.last_event_id AS lag_count,
                EXTRACT(EPOCH FROM (MAX(e.created_at) - (
                    SELECT created_at FROM es_event WHERE id = pc.last_event_id
                ))) AS lag_duration
            FROM projection_checkpoint pc, es_event e
            WHERE pc.projection_name = $1`,
            w.projection.Name(),
        )
        if err != nil {
            continue
        }

        metrics.ProjectionLagEvents.
            WithLabelValues(w.projection.Name()).
            Set(float64(lag.LagCount))

        if lag.LagDuration.Valid {
            duration, _ := strconv.ParseFloat(lag.LagDuration.String, 64)
            metrics.ProjectionLagDuration.
                WithLabelValues(w.projection.Name()).
                Set(duration)
        }
    }
}
```

### Règles d'alerte Prometheus/Alertmanager

```yaml
# alerting-rules.yml
groups:
  - name: event_sourcing
    rules:
      # Projection lag > 5 minutes : alerte critique
      - alert: ProjectionLagTooHigh
        expr: projection_lag_seconds > 300
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "Projection {{ $labels.projection_name }} is lagging"
          description: "Projection lag is {{ $value }}s (threshold: 300s)"

      # Outbox > 1000 messages pending
      - alert: OutboxBacklogTooLarge
        expr: outbox_pending_messages > 1000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Outbox backlog is growing"
          description: "{{ $value }} messages pending in outbox"

      # Taux de conflit > 5% sur les 5 dernières minutes
      - alert: HighConcurrencyConflictRate
        expr: |
          rate(optimistic_concurrency_conflicts_total[5m])
          /
          rate(event_store_writes_total[5m])
          > 0.05
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High optimistic concurrency conflict rate"
          description: "Conflict rate is {{ $value | humanizePercentage }}"

      # Replay lent > 1 seconde (besoin de snapshots ?)
      - alert: SlowAggregateReplay
        expr: |
          histogram_quantile(0.95,
            rate(aggregate_replay_duration_seconds_bucket[10m])
          ) > 1.0
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Aggregate replay is slow"
          description: "P95 replay duration is {{ $value }}s — consider adding snapshots"
```

### Dashboard Grafana — requêtes clés

```
# Taux d'écriture dans l'Event Store
rate(event_store_writes_total{status="success"}[1m])

# Taux de conflits de concurrence
rate(optimistic_concurrency_conflicts_total[5m])

# Lag des projections (toutes)
max by(projection_name) (projection_lag_seconds)

# P99 durée de replay
histogram_quantile(0.99, rate(aggregate_replay_duration_seconds_bucket[5m]))

# Taille moyenne des aggregates
histogram_quantile(0.50, rate(aggregate_event_count_bucket[5m]))
```

---

## Section 17 : Checklist complète

### Checklist Aggregate

```
Architecture
□ Tous les champs de l'aggregate sont privés
□ Les quatre accesseurs obligatoires sont présents : ID(), Version(), UncommittedEvents(), ClearChanges()
□ L'interface Event a une méthode isEvent() non exportée
□ Une aggregate = une transaction boundary

Command Methods
□ Tous les invariants sont validés AVANT d'appeler raise()
□ Les erreurs retournées sont des erreurs métier claires
□ Aucun appel externe dans les command methods

raise() et transition()
□ raise() appelle transition() PUIS incrémente version PUIS append à changes
□ transition() ne fait que de la mutation d'état
□ transition() n'a pas de return error
□ transition() n'a aucun appel externe (DB, HTTP, temps actuel, etc.)
□ transition() est déterministe

Reconstruction
□ NewFromEvents(events) fonctionne correctement
□ NewFromSnapshot(snap, events) fonctionne correctement si snapshots implémentés
□ La version de l'aggregate après reconstruction est correcte
```

### Checklist Event Store

```
Schéma
□ Table es_aggregate avec contrainte d'unicité sur (id)
□ Table es_event avec contrainte UNIQUE(aggregate_id, version)
□ Index sur (aggregate_id, version ASC) pour les loads
□ Index sur (id ASC) pour les projections
□ Table es_snapshot si snapshots implémentés
□ Table projection_checkpoint
□ Table outbox si outbox pattern utilisé
□ Table processed_commands si idempotence requise

Save avec optimistic concurrency
□ BEGIN TX avant toute opération
□ UPDATE es_aggregate WHERE version = expected
□ Si 0 rows affectées → retourner ErrConflict
□ INSERT events dans la même TX
□ Contrainte UNIQUE(aggregate_id, version) comme filet de sécurité supplémentaire
□ COMMIT final

Load
□ SELECT WHERE aggregate_id ORDER BY version ASC
□ Gestion correcte du cas "aggregate not found" (0 events)
```

### Checklist Command Handlers

```
□ Commands retournent seulement error, jamais de données
□ Le client fournit l'UUID (pas généré côté serveur dans le handler)
□ Language du domaine dans les noms de commandes (CancelOrder, pas DeleteOrder)
□ Un handler par commande
□ Le pattern Update (load → fn → save) est utilisé pour les modifications
□ Cross-cutting concerns via décorateurs, pas dans le handler métier
□ HTTP: 204 + Content-Location header pour les commands réussies
```

### Checklist Projections

```
□ Toutes les projections sont idempotentes
□ INSERT ON CONFLICT DO NOTHING pour les OrderPlaced-like events
□ UPDATE avec guard (WHERE status != 'already-applied') pour idempotence
□ Le checkpoint est mis à jour dans la MÊME transaction que le read model
□ Les projections ne sont JAMAIS utilisées pour des décisions write-side
□ Un rebuild depuis zéro est possible et testé
□ Les événements inconnus sont ignorés silencieusement
```

### Checklist Outbox

```
□ Messages outbox écrits dans la MÊME transaction que les events
□ Relay utilise FOR UPDATE SKIP LOCKED pour la concurrence
□ Les integration events ont un contrat versioned distinct des domain events
□ La publication sur le broker est idempotente (ou le consumer l'est)
□ Monitoring de l'outbox backlog (alerte si > 1000)
```

### Checklist Idempotence

```
□ Tous les command handlers ont une clé d'idempotence
□ La table processed_commands est utilisée pour la déduplication
□ Les transactions compensatoires sont idempotentes
□ Les projections sont idempotentes (Section 8)
□ Le cleanup des vieilles clés est planifié
```

### Checklist Testing

```
□ Tests unitaires avec pattern Given/When/Then pour chaque command method
□ Cas de succès ET cas d'erreur testés
□ Test de concurrence optimiste (deux saves concurrents)
□ Test d'idempotence des projections (rejouer les mêmes events)
□ Test d'eventual consistency avec assert.Eventually
□ Test de reconstruction depuis snapshot si implémenté
□ Test d'upcasters si versioning implémenté
```

### Checklist Event Versioning

```
□ schema_version présent dans toutes les métadonnées
□ correlation_id et causation_id présents dans les métadonnées
□ Les nouveaux champs utilisent omitempty
□ Un upcaster est écrit pour chaque changement cassant
□ Les anciens events ne sont JAMAIS modifiés ni supprimés
□ Les upcasters sont testés avec des payloads historiques réels
```

### Checklist Monitoring

```
□ Métriques : event_store_writes_total, write_duration
□ Métriques : projection_lag (events et durée)
□ Métriques : concurrency_conflicts_total
□ Métriques : outbox_pending_messages
□ Métriques : aggregate_replay_duration
□ Alerte : projection lag > 5 minutes
□ Alerte : outbox backlog > 1000
□ Alerte : conflict rate > 5%
□ Alerte : P95 replay > 1 seconde
```

### Checklist Architecture générale

```
□ Un aggregate = une transaction (jamais deux dans la même TX DB)
□ Les projections ne sont pas utilisées pour les décisions write-side
□ Les domain events ne sont pas publiés tels quels à l'extérieur
□ Les integration events ont un contrat stable et versionné
□ La livraison est at-least-once → chaque consumer est idempotent
□ Les sagas ont des compensations idempotentes
□ Les snapshots sont implémentés pour les aggregates long-lived
□ Le rebuild de toutes les projections est testé et documenté
```

### Bibliothèques Go recommandées

```go
// Event Sourcing frameworks
// github.com/hallgren/eventsourcing    — Simple, bien maintenu
// github.com/looplab/eventhorizon      — Complet, opinionated
// github.com/thefabric-io/eventsourcing — Léger

// Messaging
// github.com/ThreeDotsLabs/watermill   — Excellent, multi-broker
//   Supporte : Kafka, NATS, RabbitMQ, PostgreSQL, etc.

// Workflow complexes (sagas)
// go.temporal.io/sdk                   — Temporal SDK Go

// PostgreSQL
// github.com/jmoiron/sqlx              — Extension de database/sql
// github.com/jackc/pgx/v5              — Driver natif PostgreSQL
// github.com/jackc/pglogrepl           — Logical replication (WAL outbox)

// UUID
// github.com/google/uuid               — UUID v4/v7

// Tests
// github.com/stretchr/testify          — Assert, require, mock
// github.com/testcontainers/testcontainers-go — DB de test réelle

// Métriques
// github.com/prometheus/client_golang  — Prometheus
```

---

## Testing

### Tester un Command Handler en isolation

Le command handler dépend d'un event store — injecter une implémentation in-memory pour les tests unitaires.

```go
// InMemoryEventStore — implémentation de test
type InMemoryEventStore struct {
    mu     sync.Mutex
    events map[string][]Event
}

func (s *InMemoryEventStore) Load(ctx context.Context, aggregateID string) ([]Event, error) {
    s.mu.Lock()
    defer s.mu.Unlock()
    return s.events[aggregateID], nil
}

func (s *InMemoryEventStore) Append(ctx context.Context, aggregateID string, events []Event, expectedVersion int) error {
    s.mu.Lock()
    defer s.mu.Unlock()
    current := s.events[aggregateID]
    if len(current) != expectedVersion {
        return ErrOptimisticConcurrency
    }
    s.events[aggregateID] = append(current, events...)
    return nil
}

// Test du command handler
func TestPlaceOrderHandler(t *testing.T) {
    store := &InMemoryEventStore{events: make(map[string][]Event)}
    handler := NewPlaceOrderHandler(store)

    cmd := PlaceOrder{
        OrderID:    "order-1",
        CustomerID: "cust-42",
        Items:      []OrderItem{{ProductID: "p1", Qty: 2, Price: 1500}},
    }

    err := handler.Handle(context.Background(), cmd)
    require.NoError(t, err)

    events, _ := store.Load(context.Background(), "order-1")
    require.Len(t, events, 1)
    placed, ok := events[0].(OrderPlaced)
    require.True(t, ok)
    assert.Equal(t, "cust-42", placed.CustomerID)
}
```

### Tester un Projector

Les projectors sont des fonctions pures : event en entrée, état en sortie. Tester chaque transition.

```go
func TestOrderProjector_OrderPlaced(t *testing.T) {
    projector := NewOrderProjector()
    state := projector.InitialState()

    event := OrderPlaced{
        OrderID:    "order-1",
        CustomerID: "cust-42",
        TotalCents: 3000,
        PlacedAt:   time.Now(),
    }

    newState := projector.Apply(state, event)

    order, ok := newState.(OrderReadModel)
    require.True(t, ok)
    assert.Equal(t, "pending", order.Status)
    assert.Equal(t, 3000, order.TotalCents)
}

// Tester une séquence d'événements (replay)
func TestOrderProjector_FullLifecycle(t *testing.T) {
    projector := NewOrderProjector()
    events := []Event{
        OrderPlaced{OrderID: "o1", TotalCents: 1000},
        OrderShipped{OrderID: "o1", TrackingCode: "TRACK-123"},
        OrderDelivered{OrderID: "o1"},
    }

    state := projector.Replay(events)

    order := state.(OrderReadModel)
    assert.Equal(t, "delivered", order.Status)
    assert.Equal(t, "TRACK-123", order.TrackingCode)
}
```

### Tester une Saga

Tester que la saga émet les bons commands en réponse aux events.

```go
func TestPaymentSaga_OrderPlaced_EmitsChargeCommand(t *testing.T) {
    var capturedCommands []Command
    dispatcher := func(cmd Command) error {
        capturedCommands = append(capturedCommands, cmd)
        return nil
    }

    saga := NewPaymentSaga(dispatcher)
    err := saga.Handle(context.Background(), OrderPlaced{
        OrderID:    "order-1",
        TotalCents: 5000,
        PaymentMethodID: "pm-abc",
    })

    require.NoError(t, err)
    require.Len(t, capturedCommands, 1)
    charge, ok := capturedCommands[0].(ChargePaymentMethod)
    require.True(t, ok)
    assert.Equal(t, 5000, charge.AmountCents)
    assert.Equal(t, "pm-abc", charge.PaymentMethodID)
}
```

### Tests d'intégration avec Testcontainers

Pour tester l'event store réel avec PostgreSQL.

```go
func TestEventStore_Integration(t *testing.T) {
    if testing.Short() {
        t.Skip("skipping integration test")
    }

    ctx := context.Background()
    container, err := postgres.Run(ctx,
        "postgres:16-alpine",
        postgres.WithDatabase("testdb"),
        postgres.WithUsername("test"),
        postgres.WithPassword("test"),
    )
    require.NoError(t, err)
    defer container.Terminate(ctx)

    dsn, _ := container.ConnectionString(ctx, "sslmode=disable")
    store := NewPostgresEventStore(dsn)
    require.NoError(t, store.Migrate(ctx))

    // Tester append + load
    events := []Event{OrderPlaced{OrderID: "o1", TotalCents: 1000}}
    err = store.Append(ctx, "o1", events, 0)
    require.NoError(t, err)

    loaded, err := store.Load(ctx, "o1")
    require.NoError(t, err)
    require.Len(t, loaded, 1)
}
```

### Tester la concurrence optimiste

```go
func TestEventStore_OptimisticConcurrency(t *testing.T) {
    store := &InMemoryEventStore{events: make(map[string][]Event)}

    // Premier append : version 0 → 1
    err := store.Append(ctx, "agg-1", []Event{evt1}, 0)
    require.NoError(t, err)

    // Deuxième append concurrent avec version incorrecte
    err = store.Append(ctx, "agg-1", []Event{evt2}, 0) // ← devrait être 1
    require.ErrorIs(t, err, ErrOptimisticConcurrency)
}
```

---

*Last updated: 2025-03 — Revoir si : Go 1.24+ (nouvelles APIs itertools/rangefunc), PostgreSQL 17+ (changements logical replication), ou émergence d'un framework CQRS/ES Go dominant.*