# CLAUDE.md — CQRS & Event Sourcing en Go > Contexte spécialisé pour Claude Code. Coller ce fichier à la racine du projet pour guider le travail sur une architecture CQRS/Event Sourcing en Go. --- ## Quand utiliser ce contexte - ✅ Domaine métier complexe avec des règles qui évoluent (comptabilité, assurances, e-commerce avancé) - ✅ Besoin d'audit complet : qui a fait quoi, quand, et avec quelles données - ✅ Ratio lecture/écriture très déséquilibré nécessitant des read models dédiés - ✅ Plusieurs équipes sur des bounded contexts distincts avec coordination via events - ❌ Application CRUD simple (formulaire → BDD → liste) : CQRS ajoute une complexité injustifiée - ❌ MVP ou prototype : la vélocité initiale prime, l'architecture peut évoluer après - ❌ Équipe non familière avec les patterns DDD : la courbe d'apprentissage est élevée --- ## Section 1 : Concepts fondamentaux — modèle mental ### Vue d'ensemble CQRS (Command Query Responsibility Segregation) et Event Sourcing sont deux patterns distincts qui se complètent naturellement mais peuvent être utilisés séparément. **CQRS** : séparer les opérations d'écriture (Commands) des opérations de lecture (Queries). Le write model est optimisé pour les invariants métier ; le read model est optimisé pour l'affichage. **Event Sourcing** : ne jamais stocker l'état courant d'un aggregate. Stocker la séquence d'événements qui ont conduit à cet état. L'état courant se reconstitue en rejouant les événements. ### Le flux complet ``` Client │ ▼ Command (intent métier) │ ▼ Command Handler ├── Charge l'Aggregate depuis l'Event Store ├── Appelle une méthode métier sur l'Aggregate │ ├── Valide les invariants │ └── Produit un ou plusieurs Domain Events └── Sauvegarde les nouveaux événements dans l'Event Store │ ▼ Event Store (PostgreSQL) │ ├──────────────────────┐ ▼ ▼ Projection Worker Outbox Relay (read models) (integration events) │ │ ▼ ▼ Read Database Message Broker (Kafka/NATS) │ │ ▼ ▼ Query Handler Other Services ``` ### Vocabulaire essentiel | Terme | Définition | |-------|-----------| | **Aggregate** | Cluster d'entités formant une unité de cohérence transactionnelle | | **Command** | Intention d'agir — peut échouer. Ex: `PlaceOrder` | | **Domain Event** | Fait passé immuable — ne peut pas échouer. Ex: `OrderPlaced` | | **Event Store** | Base de données append-only des événements | | **Projection** | Construction d'un read model à partir d'événements | | **Snapshot** | Photo de l'état d'un aggregate à un instant T | | **Saga/Process Manager** | Coordination de plusieurs aggregates sur plusieurs transactions | | **Outbox** | Table intermédiaire pour publication fiable des events | | **Optimistic Concurrency** | Détection de conflit via numéro de version | ### Règle d'or Un aggregate = une transaction. Ne jamais modifier deux aggregates dans la même transaction de base de données. Si c'est nécessaire, les boundaries sont mal définies ou il faut une saga. --- ## Section 2 : Aggregate design — structure et invariants ### Structure de base L'aggregate encapsule son état dans des champs privés. Seul l'aggregate lui-même peut modifier son état via les méthodes `Apply`/`Transition`. ```go package order import ( "errors" "time" "github.com/google/uuid" ) // Status représente l'état d'une commande type Status string const ( StatusPending Status = "pending" StatusConfirmed Status = "confirmed" StatusShipped Status = "shipped" StatusCancelled Status = "cancelled" ) // Order est l'aggregate racine pour les commandes type Order struct { // Champs de base de l'aggregate — TOUJOURS présents id uuid.UUID version int // Événements non encore persistés changes []Event // État métier — PRIVÉ, protège les invariants status Status customerID uuid.UUID items []OrderItem totalCents int64 shippedAt *time.Time cancelledAt *time.Time } // OrderItem représente une ligne de commande type OrderItem struct { ProductID uuid.UUID Quantity int PriceCents int64 } ``` ### Accesseurs obligatoires Ces quatre accesseurs sont la "colle" entre l'aggregate et l'infrastructure (Event Store, Repository). ```go // ID retourne l'identifiant de l'aggregate func (o *Order) ID() uuid.UUID { return o.id } // Version retourne la version courante (pour l'optimistic concurrency) func (o *Order) Version() int { return o.version } // UncommittedEvents retourne les événements produits mais pas encore sauvegardés func (o *Order) UncommittedEvents() []Event { // Retourne une copie défensive result := make([]Event, len(o.changes)) copy(result, o.changes) return result } // ClearChanges vide la liste des événements non persistés // Appelé par le Repository APRÈS sauvegarde réussie func (o *Order) ClearChanges() { o.changes = nil } ``` ### Interface Event Tous les domain events implémentent cette interface. La méthode `isEvent()` non exportée empêche les types extérieurs de l'implémenter accidentellement. ```go // Event est l'interface marqueur pour tous les domain events de ce package type Event interface { isEvent() } // AggregateType retourne le type de l'aggregate pour l'Event Store func AggregateType() string { return "order" } ``` ### Reconstruction depuis les événements `NewFromEvents` rejoue tous les événements dans l'ordre pour reconstruire l'état courant. C'est le cœur de l'Event Sourcing. ```go // NewFromEvents reconstruit un Order depuis sa séquence d'événements // Utilisé par le Repository lors du chargement func NewFromEvents(events []Event) (*Order, error) { if len(events) == 0 { return nil, errors.New("cannot reconstruct order from empty event list") } o := &Order{} for _, e := range events { o.transition(e) o.version++ } return o, nil } // NewFromSnapshot reconstruit depuis un snapshot + événements ultérieurs func NewFromSnapshot(snap Snapshot, events []Event) (*Order, error) { o := &Order{ id: snap.AggregateID, version: snap.Version, status: snap.Status, customerID: snap.CustomerID, items: snap.Items, totalCents: snap.TotalCents, } for _, e := range events { o.transition(e) o.version++ } return o, nil } ``` --- ## Section 3 : Aggregate — command methods et Apply/Transition ### Command methods : valider AVANT de lever l'événement La règle absolue : valider **tous les invariants** avant d'appeler `raise()`. Si une validation échoue, retourner une erreur. Ne jamais lever un événement si l'opération ne peut pas aboutir. ```go // Place crée une nouvelle commande // C'est une "factory method" — elle crée l'aggregate func Place(id uuid.UUID, customerID uuid.UUID, items []OrderItem) (*Order, error) { // Validation des invariants AVANT tout if id == uuid.Nil { return nil, errors.New("order id is required") } if customerID == uuid.Nil { return nil, errors.New("customer id is required") } if len(items) == 0 { return nil, errors.New("order must have at least one item") } var totalCents int64 for _, item := range items { if item.Quantity <= 0 { return nil, fmt.Errorf("item %s: quantity must be positive", item.ProductID) } if item.PriceCents <= 0 { return nil, fmt.Errorf("item %s: price must be positive", item.ProductID) } totalCents += int64(item.Quantity) * item.PriceCents } o := &Order{} o.raise(OrderPlaced{ OrderID: id, CustomerID: customerID, Items: items, TotalCents: totalCents, PlacedAt: time.Now().UTC(), }) return o, nil } // Confirm confirme la commande après paiement validé func (o *Order) Confirm() error { if o.status != StatusPending { return fmt.Errorf("cannot confirm order in status %s, expected pending", o.status) } o.raise(OrderConfirmed{ OrderID: o.id, ConfirmedAt: time.Now().UTC(), }) return nil } // Ship marque la commande comme expédiée func (o *Order) Ship(trackingNumber string) error { if o.status != StatusConfirmed { return fmt.Errorf("cannot ship order in status %s, expected confirmed", o.status) } if trackingNumber == "" { return errors.New("tracking number is required") } o.raise(OrderShipped{ OrderID: o.id, TrackingNumber: trackingNumber, ShippedAt: time.Now().UTC(), }) return nil } // Cancel annule la commande func (o *Order) Cancel(reason string) error { if o.status == StatusShipped { return errors.New("cannot cancel a shipped order") } if o.status == StatusCancelled { return errors.New("order is already cancelled") } if reason == "" { return errors.New("cancellation reason is required") } o.raise(OrderCancelled{ OrderID: o.id, Reason: reason, CancelledAt: time.Now().UTC(), }) return nil } ``` ### raise() : le mécanisme interne `raise()` appelle `transition()` (mise à jour de l'état) puis ajoute l'événement à la liste des changements non persistés. ```go // raise applique un événement à l'aggregate et l'ajoute aux changements // JAMAIS appelé directement par du code extérieur func (o *Order) raise(e Event) { o.transition(e) o.version++ o.changes = append(o.changes, e) } ``` ### transition() / Apply : mutation d'état pure `transition()` est la méthode la plus importante de l'Event Sourcing. Elle doit être : - **Pure** : aucun appel externe, aucun effet de bord - **Sans erreur** : si l'event a été produit, il doit pouvoir être appliqué - **Déterministe** : rejouer les mêmes événements donne toujours le même état ```go // transition applique un événement pour modifier l'état interne // RÈGLES ABSOLUES : // 1. Aucun appel externe (DB, HTTP, etc.) // 2. Aucun retour d'erreur // 3. Déterministe — même input, même output // 4. Appelée lors de la création ET lors du replay func (o *Order) transition(e Event) { switch ev := e.(type) { case OrderPlaced: o.id = ev.OrderID o.customerID = ev.CustomerID o.items = ev.Items o.totalCents = ev.TotalCents o.status = StatusPending case OrderConfirmed: o.status = StatusConfirmed case OrderShipped: o.status = StatusShipped now := ev.ShippedAt o.shippedAt = &now case OrderCancelled: o.status = StatusCancelled now := ev.CancelledAt o.cancelledAt = &now } // Événements inconnus ignorés silencieusement (forward compatibility) } ``` ### Domain Events : définition ```go // OrderPlaced — commande passée par le client type OrderPlaced struct { OrderID uuid.UUID `json:"order_id"` CustomerID uuid.UUID `json:"customer_id"` Items []OrderItem `json:"items"` TotalCents int64 `json:"total_cents"` PlacedAt time.Time `json:"placed_at"` } func (OrderPlaced) isEvent() {} // OrderConfirmed — commande confirmée après paiement type OrderConfirmed struct { OrderID uuid.UUID `json:"order_id"` ConfirmedAt time.Time `json:"confirmed_at"` } func (OrderConfirmed) isEvent() {} // OrderShipped — commande expédiée type OrderShipped struct { OrderID uuid.UUID `json:"order_id"` TrackingNumber string `json:"tracking_number"` ShippedAt time.Time `json:"shipped_at"` } func (OrderShipped) isEvent() {} // OrderCancelled — commande annulée type OrderCancelled struct { OrderID uuid.UUID `json:"order_id"` Reason string `json:"reason"` CancelledAt time.Time `json:"cancelled_at"` } func (OrderCancelled) isEvent() {} ``` ### Snapshot state ```go // Snapshot représente l'état sérialisé d'un aggregate à un instant T type Snapshot struct { AggregateID uuid.UUID `json:"aggregate_id"` Version int `json:"version"` Status Status `json:"status"` CustomerID uuid.UUID `json:"customer_id"` Items []OrderItem `json:"items"` TotalCents int64 `json:"total_cents"` CreatedAt time.Time `json:"created_at"` } // TakeSnapshot crée un snapshot de l'état courant func (o *Order) TakeSnapshot() Snapshot { return Snapshot{ AggregateID: o.id, Version: o.version, Status: o.status, CustomerID: o.customerID, Items: o.items, TotalCents: o.totalCents, CreatedAt: time.Now().UTC(), } } ``` --- ## Section 4 : Command handlers — orchestration ### Principes des Command Handlers - Les commandes retournent **uniquement** une erreur, jamais de données - Le client fournit l'UUID de l'entité (ne pas générer côté serveur dans le handler) - Le domain language prime : `CancelOrder` pas `DeleteOrder`, `PlaceOrder` pas `CreateOrder` - Un handler par commande - Les cross-cutting concerns (logging, métriques) via des décorateurs middleware ### Définition des commandes ```go package command import "github.com/google/uuid" // PlaceOrder — intent de passer une commande type PlaceOrder struct { OrderID uuid.UUID CustomerID uuid.UUID Items []OrderItem } // OrderItem dans le contexte d'une commande type OrderItem struct { ProductID uuid.UUID Quantity int PriceCents int64 } // ConfirmOrder — intent de confirmer une commande après paiement type ConfirmOrder struct { OrderID uuid.UUID } // ShipOrder — intent d'expédier une commande type ShipOrder struct { OrderID uuid.UUID TrackingNumber string } // CancelOrder — intent d'annuler une commande // Note: "Cancel" pas "Delete" — langage du domaine type CancelOrder struct { OrderID uuid.UUID Reason string } ``` ### Repository interface ```go package order import ( "context" "github.com/google/uuid" ) // Repository définit le contrat pour la persistance des aggregates Order type Repository interface { // Load charge un aggregate par son ID // Retourne ErrNotFound si l'aggregate n'existe pas Load(ctx context.Context, id uuid.UUID) (*Order, error) // Save persiste les nouveaux événements de l'aggregate // Retourne ErrConflict en cas de conflit de concurrence optimiste Save(ctx context.Context, order *Order) error // Update est un helper: Load + fn + Save dans une seule opération Update(ctx context.Context, id uuid.UUID, fn func(*Order) error) error } var ( ErrNotFound = errors.New("order not found") ErrConflict = errors.New("optimistic concurrency conflict") ) ``` ### Implementation du pattern Update ```go // Update dans le repository PostgreSQL func (r *PostgresRepository) Update( ctx context.Context, id uuid.UUID, fn func(*Order) error, ) error { order, err := r.Load(ctx, id) if err != nil { return fmt.Errorf("load order %s: %w", id, err) } if err := fn(order); err != nil { return err // erreur métier, pas de wrap supplémentaire } if err := r.Save(ctx, order); err != nil { return fmt.Errorf("save order %s: %w", id, err) } return nil } ``` ### Command Handlers ```go package commandhandler import ( "context" "fmt" "myapp/internal/order" "myapp/internal/command" ) // OrderCommandHandler gère toutes les commandes liées aux orders type OrderCommandHandler struct { repo order.Repository } func NewOrderCommandHandler(repo order.Repository) *OrderCommandHandler { return &OrderCommandHandler{repo: repo} } // HandlePlaceOrder traite la commande PlaceOrder func (h *OrderCommandHandler) HandlePlaceOrder(ctx context.Context, cmd command.PlaceOrder) error { // Mapper les items de la commande vers le domaine items := make([]order.OrderItem, len(cmd.Items)) for i, item := range cmd.Items { items[i] = order.OrderItem{ ProductID: item.ProductID, Quantity: item.Quantity, PriceCents: item.PriceCents, } } // Créer l'aggregate (factory method) o, err := order.Place(cmd.OrderID, cmd.CustomerID, items) if err != nil { return fmt.Errorf("place order: %w", err) } // Sauvegarder dans l'Event Store if err := h.repo.Save(ctx, o); err != nil { return fmt.Errorf("save order: %w", err) } return nil } // HandleConfirmOrder traite la commande ConfirmOrder func (h *OrderCommandHandler) HandleConfirmOrder(ctx context.Context, cmd command.ConfirmOrder) error { return h.repo.Update(ctx, cmd.OrderID, func(o *order.Order) error { return o.Confirm() }) } // HandleShipOrder traite la commande ShipOrder func (h *OrderCommandHandler) HandleShipOrder(ctx context.Context, cmd command.ShipOrder) error { return h.repo.Update(ctx, cmd.OrderID, func(o *order.Order) error { return o.Ship(cmd.TrackingNumber) }) } // HandleCancelOrder traite la commande CancelOrder func (h *OrderCommandHandler) HandleCancelOrder(ctx context.Context, cmd command.CancelOrder) error { return h.repo.Update(ctx, cmd.OrderID, func(o *order.Order) error { return o.Cancel(cmd.Reason) }) } ``` ### Décorateurs middleware ```go // LoggingMiddleware ajoute du logging à un command handler type LoggingMiddleware struct { next *OrderCommandHandler logger *slog.Logger } func (m *LoggingMiddleware) HandlePlaceOrder(ctx context.Context, cmd command.PlaceOrder) error { start := time.Now() err := m.next.HandlePlaceOrder(ctx, cmd) m.logger.Info("HandlePlaceOrder", "order_id", cmd.OrderID, "customer_id", cmd.CustomerID, "item_count", len(cmd.Items), "duration_ms", time.Since(start).Milliseconds(), "error", err, ) return err } // MetricsMiddleware ajoute des métriques Prometheus type MetricsMiddleware struct { next *OrderCommandHandler commands *prometheus.CounterVec duration *prometheus.HistogramVec } func (m *MetricsMiddleware) HandlePlaceOrder(ctx context.Context, cmd command.PlaceOrder) error { timer := prometheus.NewTimer(m.duration.WithLabelValues("place_order")) defer timer.ObserveDuration() err := m.next.HandlePlaceOrder(ctx, cmd) status := "success" if err != nil { status = "error" } m.commands.WithLabelValues("place_order", status).Inc() return err } ``` ### HTTP Handler — REST convention ```go // POST /orders → 204 No Content + Location header // Convention REST pour CQRS : pas de body en réponse func (h *HTTPHandler) PlaceOrder(w http.ResponseWriter, r *http.Request) { var req PlaceOrderRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, "invalid request body", http.StatusBadRequest) return } // Le CLIENT fournit l'UUID — permet l'idempotence orderID, err := uuid.Parse(req.OrderID) if err != nil { // Générer si non fourni (moins idempotent) orderID = uuid.New() } cmd := command.PlaceOrder{ OrderID: orderID, CustomerID: req.CustomerID, Items: mapItems(req.Items), } if err := h.handler.HandlePlaceOrder(r.Context(), cmd); err != nil { if errors.Is(err, order.ErrConflict) { http.Error(w, "conflict", http.StatusConflict) return } http.Error(w, "internal error", http.StatusInternalServerError) return } // 204 + Content-Location pour que le client sache où lire l'état w.Header().Set("Content-Location", fmt.Sprintf("/orders/%s", orderID)) w.WriteHeader(http.StatusNoContent) } ``` --- ## Section 5 : Event Store — schéma PostgreSQL ### Schéma complet ```sql -- Aggregate registry : une ligne par aggregate, sert à l'optimistic concurrency CREATE TABLE es_aggregate ( id UUID NOT NULL, aggregate_type VARCHAR(64) NOT NULL, version INTEGER NOT NULL DEFAULT 0, PRIMARY KEY (id) ); -- Séquence d'événements : append-only CREATE TABLE es_event ( id BIGSERIAL NOT NULL, -- transaction_id permet de regrouper les events d'une même TX transaction_id XID8 NOT NULL DEFAULT pg_current_xact_id(), aggregate_id UUID NOT NULL, -- version = numéro de séquence dans l'aggregate (1-based) version INTEGER NOT NULL, event_type VARCHAR(128) NOT NULL, -- schema_version pour les upcasters de migration schema_version SMALLINT NOT NULL DEFAULT 1, -- payload contient les données de l'événement sérialisées en JSON payload JSONB NOT NULL, -- metadata : correlation_id, causation_id, user_id, etc. metadata JSONB NOT NULL DEFAULT '{}', created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), PRIMARY KEY (id), -- Contrainte d'unicité : un seul event par version par aggregate -- C'est le filet de sécurité de l'optimistic concurrency UNIQUE (aggregate_id, version), FOREIGN KEY (aggregate_id) REFERENCES es_aggregate(id) ); -- Snapshots : état sérialisé d'un aggregate à un instant T CREATE TABLE es_snapshot ( aggregate_id UUID NOT NULL, aggregate_type VARCHAR(64) NOT NULL, version INTEGER NOT NULL, payload JSONB NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), PRIMARY KEY (aggregate_id, version) ); -- Checkpoints : suivi de la progression des projections CREATE TABLE projection_checkpoint ( projection_name VARCHAR(128) NOT NULL, -- last_event_id = ID du dernier es_event traité last_event_id BIGINT NOT NULL DEFAULT 0, updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), PRIMARY KEY (projection_name) ); -- Outbox : publication fiable d'integration events CREATE TABLE outbox ( id BIGSERIAL NOT NULL, topic VARCHAR(128) NOT NULL, payload JSONB NOT NULL, state VARCHAR(16) NOT NULL DEFAULT 'pending', created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), sent_at TIMESTAMPTZ, PRIMARY KEY (id) ); -- Déduplication des commandes pour l'idempotence CREATE TABLE processed_commands ( idempotency_key VARCHAR(128) NOT NULL, processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), PRIMARY KEY (idempotency_key) ); -- Index de performance CREATE INDEX es_event_aggregate_id_version ON es_event (aggregate_id, version ASC); CREATE INDEX es_event_id_for_projection ON es_event (id ASC); CREATE INDEX es_event_type_for_rebuild ON es_event (event_type, id ASC); CREATE INDEX outbox_pending ON outbox (state, id ASC) WHERE state = 'pending'; -- Vue utile pour le monitoring du lag des projections CREATE VIEW projection_lag AS SELECT pc.projection_name, pc.last_event_id, MAX(e.id) AS latest_event_id, MAX(e.id) - pc.last_event_id AS lag_count, MAX(e.created_at) - ( SELECT created_at FROM es_event WHERE id = pc.last_event_id ) AS lag_duration FROM projection_checkpoint pc CROSS JOIN es_event e GROUP BY pc.projection_name, pc.last_event_id; ``` ### Types Go pour l'Event Store ```go package eventstore import ( "time" "github.com/google/uuid" ) // StoredEvent représente une ligne de la table es_event type StoredEvent struct { ID int64 `db:"id"` TransactionID uint64 `db:"transaction_id"` AggregateID uuid.UUID `db:"aggregate_id"` Version int `db:"version"` EventType string `db:"event_type"` SchemaVersion int `db:"schema_version"` Payload json.RawMessage `db:"payload"` Metadata Metadata `db:"metadata"` CreatedAt time.Time `db:"created_at"` } // Metadata contient les informations de traçabilité type Metadata struct { CorrelationID string `json:"correlation_id,omitempty"` CausationID string `json:"causation_id,omitempty"` UserID string `json:"user_id,omitempty"` IPAddress string `json:"ip_address,omitempty"` } ``` --- ## Section 6 : Event Store — save, load et optimistic concurrency ### Interface de l'Event Store ```go package eventstore import ( "context" "github.com/google/uuid" ) // EventStore est l'interface principale pour la persistance des événements type EventStore interface { // Save persiste les nouveaux événements d'un aggregate // Retourne ErrConflict si la version attendue ne correspond pas Save(ctx context.Context, aggregateID uuid.UUID, aggregateType string, expectedVersion int, events []SerializedEvent) error // Load charge tous les événements d'un aggregate Load(ctx context.Context, aggregateID uuid.UUID) ([]StoredEvent, error) // LoadFrom charge les événements à partir d'une version donnée (pour les snapshots) LoadFrom(ctx context.Context, aggregateID uuid.UUID, fromVersion int) ([]StoredEvent, error) // LoadForProjection charge tous les événements depuis un ID donné (pour les projections) LoadForProjection(ctx context.Context, fromEventID int64, limit int) ([]StoredEvent, error) } // SerializedEvent représente un événement prêt à être persisté type SerializedEvent struct { EventType string SchemaVersion int Payload []byte Metadata Metadata } var ErrConflict = errors.New("optimistic concurrency conflict") var ErrNotFound = errors.New("aggregate not found") ``` ### Save avec optimistic concurrency La clé de la concurrence optimiste : on tente de mettre à jour `es_aggregate` avec la version attendue. Si 0 lignes affectées, quelqu'un d'autre a modifié l'aggregate entre notre lecture et notre écriture. ```go // Save persiste les événements avec contrôle de concurrence optimiste func (s *PostgresEventStore) Save( ctx context.Context, aggregateID uuid.UUID, aggregateType string, expectedVersion int, events []SerializedEvent, ) error { if len(events) == 0 { return nil } return s.db.BeginTxx(ctx, nil, func(tx *sqlx.Tx) error { // Étape 1 : Insérer ou mettre à jour l'aggregate avec contrôle de version newVersion := expectedVersion + len(events) var rowsAffected int64 var err error if expectedVersion == 0 { // Premier save : INSERT result, err := tx.ExecContext(ctx, ` INSERT INTO es_aggregate (id, aggregate_type, version) VALUES ($1, $2, $3) ON CONFLICT (id) DO NOTHING`, aggregateID, aggregateType, newVersion, ) if err != nil { return fmt.Errorf("insert aggregate: %w", err) } rowsAffected, err = result.RowsAffected() } else { // Update existant : version doit correspondre result, err := tx.ExecContext(ctx, ` UPDATE es_aggregate SET version = $1 WHERE id = $2 AND version = $3`, newVersion, aggregateID, expectedVersion, ) if err != nil { return fmt.Errorf("update aggregate version: %w", err) } rowsAffected, err = result.RowsAffected() } if err != nil { return err } if rowsAffected == 0 { // Conflit de concurrence : quelqu'un a modifié l'aggregate entre-temps return ErrConflict } // Étape 2 : Insérer les événements for i, event := range events { version := expectedVersion + i + 1 _, err := tx.ExecContext(ctx, ` INSERT INTO es_event (aggregate_id, version, event_type, schema_version, payload, metadata) VALUES ($1, $2, $3, $4, $5, $6)`, aggregateID, version, event.EventType, event.SchemaVersion, event.Payload, event.Metadata, ) if err != nil { // La contrainte UNIQUE(aggregate_id, version) peut aussi déclencher un conflit var pgErr *pgconn.PgError if errors.As(err, &pgErr) && pgErr.Code == "23505" { return ErrConflict } return fmt.Errorf("insert event version %d: %w", version, err) } } return nil }) } ``` ### Helper pour les transactions sqlx ```go // BeginTxx exécute fn dans une transaction, rollback automatique si erreur func (db *DB) BeginTxx(ctx context.Context, opts *sql.TxOptions, fn func(*sqlx.Tx) error) error { tx, err := db.sqlx.BeginTxx(ctx, opts) if err != nil { return fmt.Errorf("begin transaction: %w", err) } defer func() { if p := recover(); p != nil { _ = tx.Rollback() panic(p) // re-panic après rollback } }() if err := fn(tx); err != nil { _ = tx.Rollback() return err } return tx.Commit() } ``` ### Load ```go // Load charge tous les événements d'un aggregate, triés par version ASC func (s *PostgresEventStore) Load( ctx context.Context, aggregateID uuid.UUID, ) ([]StoredEvent, error) { return s.LoadFrom(ctx, aggregateID, 0) } // LoadFrom charge les événements à partir d'une version donnée (pour snapshots) func (s *PostgresEventStore) LoadFrom( ctx context.Context, aggregateID uuid.UUID, fromVersion int, ) ([]StoredEvent, error) { var events []StoredEvent err := s.db.SelectContext(ctx, &events, ` SELECT id, transaction_id, aggregate_id, version, event_type, schema_version, payload, metadata, created_at FROM es_event WHERE aggregate_id = $1 AND version > $2 ORDER BY version ASC`, aggregateID, fromVersion, ) if err != nil { return nil, fmt.Errorf("load events for aggregate %s: %w", aggregateID, err) } return events, nil } ``` ### Repository avec sérialisation/désérialisation ```go // PostgresRepository implémente order.Repository type PostgresRepository struct { store EventStore serializer EventSerializer } // Save sérialise les événements et les persiste func (r *PostgresRepository) Save(ctx context.Context, o *order.Order) error { events := o.UncommittedEvents() if len(events) == 0 { return nil } serialized := make([]SerializedEvent, len(events)) for i, e := range events { s, err := r.serializer.Serialize(e) if err != nil { return fmt.Errorf("serialize event %T: %w", e, err) } serialized[i] = s } // expectedVersion = version actuelle - nb d'événements non persistés expectedVersion := o.Version() - len(events) err := r.store.Save(ctx, o.ID(), order.AggregateType(), expectedVersion, serialized) if err != nil { return err // ErrConflict propagé tel quel } o.ClearChanges() return nil } // Load charge et désérialise les événements, reconstruit l'aggregate func (r *PostgresRepository) Load(ctx context.Context, id uuid.UUID) (*order.Order, error) { stored, err := r.store.Load(ctx, id) if err != nil { return nil, fmt.Errorf("load events: %w", err) } if len(stored) == 0 { return nil, order.ErrNotFound } events := make([]order.Event, len(stored)) for i, s := range stored { e, err := r.serializer.Deserialize(s) if err != nil { return nil, fmt.Errorf("deserialize event %s v%d: %w", s.EventType, s.Version, err) } events[i] = e.(order.Event) } return order.NewFromEvents(events) } ``` ### Sérialiseur ```go // EventSerializer gère la sérialisation/désérialisation des événements type EventSerializer struct { upcasters map[string][]Upcaster // pour le versioning } func (s *EventSerializer) Serialize(e order.Event) (SerializedEvent, error) { payload, err := json.Marshal(e) if err != nil { return SerializedEvent{}, fmt.Errorf("marshal event: %w", err) } return SerializedEvent{ EventType: eventType(e), SchemaVersion: 1, Payload: payload, }, nil } func (s *EventSerializer) Deserialize(stored StoredEvent) (interface{}, error) { // Appliquer les upcasters si nécessaire payload := s.upcast(stored.EventType, stored.SchemaVersion, stored.Payload) switch stored.EventType { case "OrderPlaced": var e order.OrderPlaced return e, json.Unmarshal(payload, &e) case "OrderConfirmed": var e order.OrderConfirmed return e, json.Unmarshal(payload, &e) case "OrderShipped": var e order.OrderShipped return e, json.Unmarshal(payload, &e) case "OrderCancelled": var e order.OrderCancelled return e, json.Unmarshal(payload, &e) default: return nil, fmt.Errorf("unknown event type: %s", stored.EventType) } } func eventType(e order.Event) string { t := reflect.TypeOf(e) if t.Kind() == reflect.Ptr { t = t.Elem() } return t.Name() } ``` --- ## Section 7 : Snapshots — éviter le replay intégral ### Quand utiliser les snapshots Les snapshots sont nécessaires quand un aggregate accumule de nombreux événements sur sa durée de vie. Un compte bancaire avec 10 ans d'historique pourrait avoir des milliers d'événements — rejouer tous ces événements à chaque chargement est trop coûteux. **Règle pratique** : implémenter les snapshots quand un aggregate dépasse ~100 événements. ### Schéma et types ```go // SnapshotStore gère la persistance des snapshots type SnapshotStore interface { // Save sauvegarde un snapshot (idempotent : ON CONFLICT DO NOTHING) Save(ctx context.Context, snap StoredSnapshot) error // Load charge le dernier snapshot d'un aggregate // Retourne nil, nil si aucun snapshot n'existe Load(ctx context.Context, aggregateID uuid.UUID) (*StoredSnapshot, error) } // StoredSnapshot représente une ligne de es_snapshot type StoredSnapshot struct { AggregateID uuid.UUID `db:"aggregate_id"` AggregateType string `db:"aggregate_type"` Version int `db:"version"` Payload json.RawMessage `db:"payload"` CreatedAt time.Time `db:"created_at"` } ``` ### Implémentation PostgreSQL ```go // Save sauvegarde un snapshot de manière idempotente func (s *PostgresSnapshotStore) Save(ctx context.Context, snap StoredSnapshot) error { _, err := s.db.ExecContext(ctx, ` INSERT INTO es_snapshot (aggregate_id, aggregate_type, version, payload) VALUES ($1, $2, $3, $4) ON CONFLICT (aggregate_id, version) DO NOTHING`, snap.AggregateID, snap.AggregateType, snap.Version, snap.Payload, ) if err != nil { return fmt.Errorf("save snapshot for %s v%d: %w", snap.AggregateID, snap.Version, err) } return nil } // Load charge le snapshot le plus récent func (s *PostgresSnapshotStore) Load(ctx context.Context, aggregateID uuid.UUID) (*StoredSnapshot, error) { var snap StoredSnapshot err := s.db.GetContext(ctx, &snap, ` SELECT aggregate_id, aggregate_type, version, payload, created_at FROM es_snapshot WHERE aggregate_id = $1 ORDER BY version DESC LIMIT 1`, aggregateID, ) if errors.Is(err, sql.ErrNoRows) { return nil, nil // Aucun snapshot, pas une erreur } if err != nil { return nil, fmt.Errorf("load snapshot for %s: %w", aggregateID, err) } return &snap, nil } ``` ### Repository avec support des snapshots ```go const snapshotInterval = 100 // Prendre un snapshot tous les 100 événements // SnapshotRepository wrap un Repository de base avec support des snapshots type SnapshotRepository struct { base *PostgresRepository snapStore SnapshotStore } // Load charge depuis snapshot + événements ultérieurs func (r *SnapshotRepository) Load(ctx context.Context, id uuid.UUID) (*order.Order, error) { // Essayer de charger un snapshot snap, err := r.snapStore.Load(ctx, id) if err != nil { return nil, fmt.Errorf("load snapshot: %w", err) } if snap == nil { // Pas de snapshot : chargement complet depuis les événements return r.base.Load(ctx, id) } // Charger seulement les événements APRÈS le snapshot stored, err := r.base.store.LoadFrom(ctx, id, snap.Version) if err != nil { return nil, fmt.Errorf("load events after snapshot: %w", err) } // Désérialiser le snapshot var orderSnap order.Snapshot if err := json.Unmarshal(snap.Payload, &orderSnap); err != nil { return nil, fmt.Errorf("deserialize snapshot: %w", err) } // Désérialiser les événements ultérieurs events := make([]order.Event, len(stored)) for i, s := range stored { e, err := r.base.serializer.Deserialize(s) if err != nil { return nil, fmt.Errorf("deserialize event: %w", err) } events[i] = e.(order.Event) } // Reconstruire depuis snapshot + événements return order.NewFromSnapshot(orderSnap, events) } // Save persiste les événements et prend un snapshot si nécessaire func (r *SnapshotRepository) Save(ctx context.Context, o *order.Order) error { // Sauvegarder les événements normalement if err := r.base.Save(ctx, o); err != nil { return err } // Prendre un snapshot si on atteint l'intervalle if o.Version()%snapshotInterval == 0 { snap := o.TakeSnapshot() payload, err := json.Marshal(snap) if err != nil { // Non-fatal : loguer mais ne pas échouer slog.Warn("failed to marshal snapshot", "order_id", o.ID(), "version", o.Version(), "error", err, ) return nil } stored := StoredSnapshot{ AggregateID: o.ID(), AggregateType: order.AggregateType(), Version: o.Version(), Payload: payload, } if err := r.snapStore.Save(ctx, stored); err != nil { // Non-fatal : loguer mais ne pas échouer slog.Warn("failed to save snapshot", "order_id", o.ID(), "version", o.Version(), "error", err, ) } } return nil } ``` --- ## Section 8 : Projections — read models idempotents ### Principes - Les projections construisent des read models optimisés pour les queries - Elles sont **entièrement reconstituables** depuis l'Event Store — elles sont dispensables - Chaque projection doit être **idempotente** : rejouer le même événement ne doit pas altérer l'état - Ne jamais utiliser une projection pour des décisions côté write - Sauvegarder le checkpoint dans la **même transaction** que la mise à jour du read model ### Définition d'une projection ```go package projection import ( "context" "database/sql" "github.com/jmoiron/sqlx" "myapp/internal/eventstore" "myapp/internal/order" ) // OrderSummaryProjection construit une vue résumée des commandes type OrderSummaryProjection struct { db *sqlx.DB } // Name retourne le nom unique de cette projection (clé dans projection_checkpoint) func (p *OrderSummaryProjection) Name() string { return "order_summary" } // Handle traite un événement stocké func (p *OrderSummaryProjection) Handle(ctx context.Context, stored eventstore.StoredEvent) error { switch stored.EventType { case "OrderPlaced": return p.handleOrderPlaced(ctx, stored) case "OrderConfirmed": return p.handleOrderConfirmed(ctx, stored) case "OrderShipped": return p.handleOrderShipped(ctx, stored) case "OrderCancelled": return p.handleOrderCancelled(ctx, stored) } // Événements inconnus ignorés silencieusement return nil } // handleOrderPlaced traite un événement OrderPlaced de manière idempotente func (p *OrderSummaryProjection) handleOrderPlaced( ctx context.Context, stored eventstore.StoredEvent, ) error { var e order.OrderPlaced if err := json.Unmarshal(stored.Payload, &e); err != nil { return fmt.Errorf("unmarshal OrderPlaced: %w", err) } // ON CONFLICT DO NOTHING : idempotent — si déjà inséré, ne rien faire _, err := p.db.ExecContext(ctx, ` INSERT INTO order_summary (id, customer_id, status, total_cents, item_count, placed_at) VALUES ($1, $2, 'pending', $3, $4, $5) ON CONFLICT (id) DO NOTHING`, e.OrderID, e.CustomerID, e.TotalCents, len(e.Items), e.PlacedAt, ) if err != nil { return fmt.Errorf("insert order_summary: %w", err) } return nil } // handleOrderShipped traite un événement OrderShipped func (p *OrderSummaryProjection) handleOrderShipped( ctx context.Context, stored eventstore.StoredEvent, ) error { var e order.OrderShipped if err := json.Unmarshal(stored.Payload, &e); err != nil { return fmt.Errorf("unmarshal OrderShipped: %w", err) } // UPDATE idempotent : WHERE garantit qu'on ne met à jour qu'au bon moment _, err := p.db.ExecContext(ctx, ` UPDATE order_summary SET status = 'shipped', tracking_number = $1, shipped_at = $2 WHERE id = $3 AND status != 'shipped'`, -- idempotence via guard e.TrackingNumber, e.ShippedAt, e.OrderID, ) if err != nil { return fmt.Errorf("update order_summary for ship: %w", err) } return nil } ``` ### Projection Worker ```go // ProjectionWorker exécute une projection en lisant les événements en continu type ProjectionWorker struct { store EventStore projection Projection db *sqlx.DB batchSize int pollInterval time.Duration } // Projection est l'interface implémentée par toutes les projections type Projection interface { Name() string Handle(ctx context.Context, stored StoredEvent) error } // Run lance le worker en boucle jusqu'à annulation du contexte func (w *ProjectionWorker) Run(ctx context.Context) error { slog.Info("projection worker started", "projection", w.projection.Name()) for { select { case <-ctx.Done(): return ctx.Err() default: } processed, err := w.processBatch(ctx) if err != nil { slog.Error("projection batch failed", "projection", w.projection.Name(), "error", err, ) // Attendre avant de réessayer pour éviter le busy-loop select { case <-ctx.Done(): return ctx.Err() case <-time.After(5 * time.Second): } continue } if processed == 0 { // Rien à traiter : attendre le prochain poll select { case <-ctx.Done(): return ctx.Err() case <-time.After(w.pollInterval): } } } } // processBatch traite un lot d'événements dans une transaction unique func (w *ProjectionWorker) processBatch(ctx context.Context) (int, error) { // Lire le checkpoint courant var checkpoint int64 err := w.db.GetContext(ctx, &checkpoint, ` SELECT last_event_id FROM projection_checkpoint WHERE projection_name = $1`, w.projection.Name(), ) if errors.Is(err, sql.ErrNoRows) { checkpoint = 0 } else if err != nil { return 0, fmt.Errorf("read checkpoint: %w", err) } // Charger le prochain batch d'événements events, err := w.store.LoadForProjection(ctx, checkpoint, w.batchSize) if err != nil { return 0, fmt.Errorf("load events: %w", err) } if len(events) == 0 { return 0, nil } // Traiter tous les événements dans une seule transaction // Le checkpoint et le read model sont mis à jour atomiquement err = w.db.BeginTxx(ctx, nil, func(tx *sqlx.Tx) error { for _, event := range events { if err := w.projection.Handle(ctx, event); err != nil { return fmt.Errorf("handle event %d (%s): %w", event.ID, event.EventType, err) } } // Mettre à jour le checkpoint dans la MÊME transaction lastID := events[len(events)-1].ID _, err := tx.ExecContext(ctx, ` INSERT INTO projection_checkpoint (projection_name, last_event_id) VALUES ($1, $2) ON CONFLICT (projection_name) DO UPDATE SET last_event_id = $2, updated_at = NOW()`, w.projection.Name(), lastID, ) return err }) if err != nil { return 0, fmt.Errorf("process batch: %w", err) } slog.Info("projection batch processed", "projection", w.projection.Name(), "count", len(events), "last_event_id", events[len(events)-1].ID, ) return len(events), nil } ``` ### Rebuild d'une projection ```go // Rebuild recrée entièrement une projection depuis le début // Utile après un bug ou un changement de schéma func (w *ProjectionWorker) Rebuild(ctx context.Context) error { slog.Info("rebuilding projection", "projection", w.projection.Name()) // Réinitialiser le checkpoint _, err := w.db.ExecContext(ctx, ` INSERT INTO projection_checkpoint (projection_name, last_event_id) VALUES ($1, 0) ON CONFLICT (projection_name) DO UPDATE SET last_event_id = 0`, w.projection.Name(), ) if err != nil { return fmt.Errorf("reset checkpoint: %w", err) } // Optionnel : vider le read model avant de reconstruire // (selon si la projection est idempotente ou non) // Relancer le worker normalement return w.Run(ctx) } ``` --- ## Section 9 : Idempotence — déduplication des commandes ### Problème Dans un système distribué, les commandes peuvent être reçues plusieurs fois (retry du client, at-least-once delivery, timeout réseau suivi d'une réémission). Chaque command handler doit être idempotent. ### Stratégie : clé d'idempotence Le client génère une clé unique (UUID) par opération et la renvoie lors des retries. Le serveur mémorise les commandes déjà traitées. ```go // IdempotencyKey est une clé unique fournie par le client // Format recommandé : "resource-type:uuid" ou simplement un UUID v4 type IdempotencyKey string // CommandWithIdempotency wrap une commande avec sa clé d'idempotence type CommandWithIdempotency[T any] struct { Key IdempotencyKey Command T } ``` ### Déduplication en base de données ```go // IdempotencyStore gère la table processed_commands type IdempotencyStore struct { db *sqlx.DB } // Claim tente de "réclamer" une clé d'idempotence // Retourne true si la clé est nouvelle (commande à traiter) // Retourne false si déjà traitée (réponse idempotente) func (s *IdempotencyStore) Claim(ctx context.Context, key IdempotencyKey) (bool, error) { result, err := s.db.ExecContext(ctx, ` INSERT INTO processed_commands (idempotency_key) VALUES ($1) ON CONFLICT (idempotency_key) DO NOTHING`, string(key), ) if err != nil { return false, fmt.Errorf("claim idempotency key: %w", err) } rows, err := result.RowsAffected() if err != nil { return false, err } return rows > 0, nil // true = nouvelle commande } ``` ### Middleware d'idempotence pour les handlers ```go // IdempotentCommandHandler wrap un handler avec déduplication type IdempotentCommandHandler[T any] struct { inner func(context.Context, T) error idempStore *IdempotencyStore } func (h *IdempotentCommandHandler[T]) Handle( ctx context.Context, cmd CommandWithIdempotency[T], ) error { // Tenter de réclamer la clé isNew, err := h.idempStore.Claim(ctx, cmd.Key) if err != nil { return fmt.Errorf("check idempotency: %w", err) } if !isNew { // Commande déjà traitée : réponse idempotente slog.Info("duplicate command ignored", "idempotency_key", cmd.Key) return nil } // Nouvelle commande : traiter normalement return h.inner(ctx, cmd.Command) } ``` ### Dans le HTTP Handler ```go // POST /orders avec support de l'idempotence func (h *HTTPHandler) PlaceOrder(w http.ResponseWriter, r *http.Request) { // La clé d'idempotence est dans le header HTTP // Convention: Idempotency-Key: idempKey := r.Header.Get("Idempotency-Key") if idempKey == "" { // Générer une clé basée sur les données si non fournie // (moins robuste que la clé fournie par le client) idempKey = uuid.New().String() } var req PlaceOrderRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, "invalid body", http.StatusBadRequest) return } cmd := command.PlaceOrder{ OrderID: req.OrderID, CustomerID: req.CustomerID, Items: mapItems(req.Items), } wrapped := CommandWithIdempotency[command.PlaceOrder]{ Key: IdempotencyKey(idempKey), Command: cmd, } if err := h.idempHandler.Handle(r.Context(), wrapped); err != nil { handleCommandError(w, err) return } w.Header().Set("Content-Location", "/orders/"+req.OrderID.String()) w.WriteHeader(http.StatusNoContent) } ``` ### Nettoyage des vieilles clés ```go // CleanupOldKeys supprime les clés d'idempotence plus vieilles que duration // À exécuter en tâche périodique (ex: toutes les heures) func (s *IdempotencyStore) CleanupOldKeys(ctx context.Context, olderThan time.Duration) (int64, error) { result, err := s.db.ExecContext(ctx, ` DELETE FROM processed_commands WHERE processed_at < $1`, time.Now().Add(-olderThan), ) if err != nil { return 0, fmt.Errorf("cleanup idempotency keys: %w", err) } count, _ := result.RowsAffected() return count, nil } ``` --- ## Section 10 : Outbox pattern — publication fiable d'events ### Problème Si on publie un event sur Kafka APRÈS avoir commité la transaction DB, il y a un risque de perte (crash entre les deux). L'Outbox pattern résout ce problème en écrivant l'event dans la même transaction que les données. ### Distinction événements domaine vs intégration | Type | Scope | Audience | Format | |------|-------|----------|--------| | **Domain Event** | Aggregate interne | Même bounded context | Riche en données métier | | **Integration Event** | Publié à l'extérieur | Autres services | Contrat stable, versioning strict | Ne pas publier les domain events bruts à l'extérieur. Les transformer en integration events. ### Écriture dans l'Outbox (même TX que les événements) ```go // OutboxEntry représente un message en attente de publication type OutboxEntry struct { Topic string Payload []byte } // SaveWithOutbox sauvegarde les événements ET les messages outbox dans la même TX func (s *PostgresEventStore) SaveWithOutbox( ctx context.Context, aggregateID uuid.UUID, aggregateType string, expectedVersion int, events []SerializedEvent, outboxEntries []OutboxEntry, ) error { return s.db.BeginTxx(ctx, nil, func(tx *sqlx.Tx) error { // 1. Sauvegarder les événements (avec optimistic concurrency) if err := s.saveInTx(ctx, tx, aggregateID, aggregateType, expectedVersion, events); err != nil { return err } // 2. Écrire les messages outbox dans LA MÊME TRANSACTION for _, entry := range outboxEntries { _, err := tx.ExecContext(ctx, ` INSERT INTO outbox (topic, payload) VALUES ($1, $2)`, entry.Topic, entry.Payload, ) if err != nil { return fmt.Errorf("insert outbox entry for topic %s: %w", entry.Topic, err) } } return nil }) } ``` ### Outbox Relay — publication vers le broker ```go // OutboxRelay lit les messages pending et les publie sur le broker type OutboxRelay struct { db *sqlx.DB publisher MessagePublisher batchSize int } // MessagePublisher est l'interface vers le broker (Kafka, NATS, etc.) type MessagePublisher interface { Publish(ctx context.Context, topic string, payload []byte) error } // Run exécute le relay en boucle func (r *OutboxRelay) Run(ctx context.Context) error { for { select { case <-ctx.Done(): return ctx.Err() default: } published, err := r.publishBatch(ctx) if err != nil { slog.Error("outbox relay batch failed", "error", err) select { case <-ctx.Done(): return ctx.Err() case <-time.After(5 * time.Second): } continue } if published == 0 { select { case <-ctx.Done(): return ctx.Err() case <-time.After(500 * time.Millisecond): } } } } // publishBatch publie un lot de messages outbox avec FOR UPDATE SKIP LOCKED func (r *OutboxRelay) publishBatch(ctx context.Context) (int, error) { // FOR UPDATE SKIP LOCKED : permet plusieurs relay workers en parallèle // sans conflit — chaque worker prend des lignes différentes rows, err := r.db.QueryxContext(ctx, ` SELECT id, topic, payload FROM outbox WHERE state = 'pending' ORDER BY id ASC LIMIT $1 FOR UPDATE SKIP LOCKED`, r.batchSize, ) if err != nil { return 0, fmt.Errorf("query outbox: %w", err) } defer rows.Close() type entry struct { ID int64 `db:"id"` Topic string `db:"topic"` Payload []byte `db:"payload"` } var entries []entry for rows.Next() { var e entry if err := rows.StructScan(&e); err != nil { return 0, fmt.Errorf("scan outbox row: %w", err) } entries = append(entries, e) } published := 0 for _, e := range entries { // Publier sur le broker if err := r.publisher.Publish(ctx, e.Topic, e.Payload); err != nil { slog.Error("failed to publish outbox message", "id", e.ID, "topic", e.Topic, "error", err, ) continue // Resilient execution : continuer avec les autres } // Marquer comme envoyé _, err := r.db.ExecContext(ctx, ` UPDATE outbox SET state = 'sent', sent_at = NOW() WHERE id = $1`, e.ID, ) if err != nil { slog.Error("failed to mark outbox message as sent", "id", e.ID, "error", err, ) continue } published++ } return published, nil } ``` ### Alternative : WAL-based outbox avec pglogrepl Pour une latence minimale (< 100ms), utiliser PostgreSQL logical replication au lieu du polling : ```go // WALOutboxListener écoute le WAL PostgreSQL pour une latence ultra-basse // Nécessite : wal_level = logical dans postgresql.conf // Utilise la librairie : github.com/jackc/pglogrepl type WALOutboxListener struct { conn *pgconn.PgConn publisher MessagePublisher slotName string } // Note : implémentation simplifiée — voir pglogrepl pour le code complet func (l *WALOutboxListener) Start(ctx context.Context) error { // Créer un slot de réplication logique si inexistant // Puis écouter les changements sur la table outbox // Beaucoup plus complexe à implémenter mais latence < 100ms vs polling // Recommandé seulement pour les cas nécessitant une très faible latence return nil } ``` --- ## Section 11 : Sagas — chorégraphie et orchestration ### Chorégraphie Chaque service écoute les événements et réagit de manière autonome. Couplage faible mais difficile à tracer. ``` OrderService PaymentService InventoryService │ │ │ │── OrderPlaced ──────▶│ │ │ │── PaymentProcessed ─▶│ │ │ │── InventoryReserved ──▶ (OrderService réagit) │◀── InventoryReserved ─────────────────────│ ``` ```go // PaymentService écoute OrderPlaced et traite le paiement type PaymentEventHandler struct { paymentService PaymentService } func (h *PaymentEventHandler) HandleOrderPlaced(ctx context.Context, e OrderPlaced) error { // Traiter le paiement if err := h.paymentService.ProcessPayment(ctx, e.OrderID, e.TotalCents); err != nil { // Publier un PaymentFailed pour que OrderService compense return h.publishPaymentFailed(ctx, e.OrderID, err.Error()) } return h.publishPaymentProcessed(ctx, e.OrderID) } ``` ### Orchestration avec Process Manager L'orchestrateur est un aggregate event-sourcé qui coordonne la séquence. ```go // OrderFulfillmentSaga est le Process Manager pour l'expédition d'une commande type OrderFulfillmentSaga struct { id uuid.UUID version int changes []SagaEvent // État de la saga orderID uuid.UUID status SagaStatus retryCount int compensation []CompensationStep } type SagaStatus string const ( SagaStarted SagaStatus = "started" SagaCompleted SagaStatus = "completed" SagaFailed SagaStatus = "failed" SagaCompensating SagaStatus = "compensating" ) // Start démarre la saga pour une commande func StartOrderFulfillmentSaga(sagaID, orderID uuid.UUID) (*OrderFulfillmentSaga, error) { s := &OrderFulfillmentSaga{} s.raise(SagaStarted{ SagaID: sagaID, OrderID: orderID, StartedAt: time.Now().UTC(), }) return s, nil } // HandlePaymentProcessed réagit à un paiement réussi func (s *OrderFulfillmentSaga) HandlePaymentProcessed(paymentID uuid.UUID) error { if s.status != SagaStarted { return fmt.Errorf("unexpected payment in status %s", s.status) } s.raise(PaymentConfirmedInSaga{ SagaID: s.id, PaymentID: paymentID, }) return nil } // HandlePaymentFailed déclenche la compensation func (s *OrderFulfillmentSaga) HandlePaymentFailed(reason string) error { s.raise(SagaCompensationStarted{ SagaID: s.id, Reason: reason, }) return nil } func (s *OrderFulfillmentSaga) transition(e SagaEvent) { switch ev := e.(type) { case SagaStarted: s.id = ev.SagaID s.orderID = ev.OrderID s.status = SagaStarted case PaymentConfirmedInSaga: s.status = SagaCompleted case SagaCompensationStarted: s.status = SagaCompensating // Enregistrer les étapes de compensation à exécuter s.compensation = []CompensationStep{ {Type: "cancel_order", ResourceID: s.orderID}, } } } ``` ### Compensation — transactions compensatoires Toutes les transactions compensatoires doivent être **idempotentes**. ```go // CompensationExecutor exécute les compensations type CompensationExecutor struct { orderRepo order.Repository // autres repos... } // Execute exécute une étape de compensation de manière idempotente func (e *CompensationExecutor) Execute(ctx context.Context, step CompensationStep) error { switch step.Type { case "cancel_order": // Idempotent : si déjà annulée, l'erreur est ignorée err := e.orderRepo.Update(ctx, step.ResourceID, func(o *order.Order) error { err := o.Cancel("saga compensation") if errors.Is(err, order.ErrAlreadyCancelled) { return nil // Déjà compensé — OK } return err }) return err default: return fmt.Errorf("unknown compensation type: %s", step.Type) } } ``` ### Note sur Temporal Pour les sagas complexes (multi-step, longs timeouts, retry sophistiqués), considérer [Temporal](https://temporal.io) : ```go // Avec Temporal, une saga devient un simple workflow Go func OrderFulfillmentWorkflow(ctx workflow.Context, orderID string) error { // Temporal gère automatiquement : // - La persistance de l'état // - Les retries avec backoff // - Les timeouts // - La compensation en cas d'échec ao := workflow.ActivityOptions{ StartToCloseTimeout: 10 * time.Minute, RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 3, }, } ctx = workflow.WithActivityOptions(ctx, ao) var paymentResult PaymentResult if err := workflow.ExecuteActivity(ctx, ProcessPayment, orderID).Get(ctx, &paymentResult); err != nil { // Compensation automatique return workflow.ExecuteActivity(ctx, CancelOrder, orderID).Get(ctx, nil) } return workflow.ExecuteActivity(ctx, ReserveInventory, orderID).Get(ctx, nil) } ``` --- ## Section 12 : Event versioning — schéma faible et upcasters ### Stratégie 1 : schéma faible (préférée) Les changements additifs ne nécessitent pas de versioning si on utilise `omitempty`. Le schéma JSON est naturellement extensible. ```go // Version 1 originale type OrderPlaced struct { OrderID uuid.UUID `json:"order_id"` CustomerID uuid.UUID `json:"customer_id"` Items []OrderItem `json:"items"` TotalCents int64 `json:"total_cents"` PlacedAt time.Time `json:"placed_at"` } // Version 2 : ajout d'un champ optionnel (backward compatible) // Les anciens events qui ne contiennent pas PromoCode auront "" // schema_version reste à 1 — aucun upcaster nécessaire type OrderPlaced struct { OrderID uuid.UUID `json:"order_id"` CustomerID uuid.UUID `json:"customer_id"` Items []OrderItem `json:"items"` TotalCents int64 `json:"total_cents"` PlacedAt time.Time `json:"placed_at"` // Nouveau champ optionnel — backward compatible PromoCode string `json:"promo_code,omitempty"` // Autre ajout optionnel Channel string `json:"channel,omitempty"` // "web", "mobile", "api" } ``` ### Stratégie 2 : upcasters pour changements cassants Quand un changement est incompatible (renommage de champ, changement de type), utiliser un upcaster. ```go // Upcaster transforme un event d'une version vers la suivante type Upcaster interface { EventType() string FromVersion() int ToVersion() int Upcast(payload json.RawMessage) (json.RawMessage, error) } // OrderPlacedV1ToV2Upcaster : migration du champ "amount" → "total_cents" // (changement cassant : renommage + conversion centimes) type OrderPlacedV1ToV2Upcaster struct{} func (u *OrderPlacedV1ToV2Upcaster) EventType() string { return "OrderPlaced" } func (u *OrderPlacedV1ToV2Upcaster) FromVersion() int { return 1 } func (u *OrderPlacedV1ToV2Upcaster) ToVersion() int { return 2 } func (u *OrderPlacedV1ToV2Upcaster) Upcast(payload json.RawMessage) (json.RawMessage, error) { // Lire l'ancienne structure var v1 struct { OrderID string `json:"order_id"` CustomerID string `json:"customer_id"` Amount float64 `json:"amount"` // ancien: montant en euros PlacedAt time.Time `json:"placed_at"` } if err := json.Unmarshal(payload, &v1); err != nil { return nil, fmt.Errorf("unmarshal v1: %w", err) } // Transformer vers la nouvelle structure v2 := struct { OrderID string `json:"order_id"` CustomerID string `json:"customer_id"` TotalCents int64 `json:"total_cents"` // nouveau: montant en centimes PlacedAt time.Time `json:"placed_at"` }{ OrderID: v1.OrderID, CustomerID: v1.CustomerID, TotalCents: int64(v1.Amount * 100), // conversion PlacedAt: v1.PlacedAt, } return json.Marshal(v2) } // UpcasterChain applique une chaîne d'upcasters type UpcasterChain struct { upcasters map[string][]Upcaster // eventType → []Upcaster triés par version } func (c *UpcasterChain) Upcast(eventType string, fromVersion int, payload json.RawMessage) (json.RawMessage, int, error) { upcasters, ok := c.upcasters[eventType] if !ok { return payload, fromVersion, nil // Pas d'upcaster nécessaire } currentVersion := fromVersion currentPayload := payload for _, u := range upcasters { if u.FromVersion() < currentVersion { continue // Déjà appliqué } if u.FromVersion() != currentVersion { break // Gap dans la chaîne } var err error currentPayload, err = u.Upcast(currentPayload) if err != nil { return nil, 0, fmt.Errorf("upcast %s v%d→v%d: %w", eventType, u.FromVersion(), u.ToVersion(), err) } currentVersion = u.ToVersion() } return currentPayload, currentVersion, nil } ``` ### Métadonnées essentielles Toujours inclure ces champs dans les métadonnées de chaque événement : ```go // EventMetadata — traçabilité et corrélation type EventMetadata struct { // CorrelationID : ID de la requête originale, propagé à travers tous les services CorrelationID string `json:"correlation_id"` // CausationID : ID de l'événement ou commande qui a causé cet événement CausationID string `json:"causation_id"` // SchemaVersion : version du schéma de l'événement pour les upcasters SchemaVersion int `json:"schema_version"` // UserID : qui a déclenché l'action UserID string `json:"user_id,omitempty"` // Timestamp : quand l'événement a été créé (précision nanoseconde) Timestamp time.Time `json:"timestamp"` } // RÈGLES ABSOLUES pour le versioning : // 1. Ne JAMAIS modifier un événement déjà persisté // 2. Ne JAMAIS supprimer des champs (utiliser omitempty) // 3. Ne JAMAIS changer le sens d'un champ existant // 4. TOUJOURS bumper schema_version pour les changements cassants // 5. TOUJOURS écrire un upcaster pour chaque changement cassant ``` --- ## Section 13 : Testing — Given/When/Then et intégration ### Tests unitaires d'aggregate avec Given/When/Then ```go package order_test import ( "testing" "time" "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "myapp/internal/order" ) // testCase structure le pattern Given/When/Then type testCase struct { name string given []order.Event // état initial (events rejoués) when func(*order.Order) error // opération à tester then []order.Event // events attendus wantErr bool } // customerID et orderID réutilisables dans les tests var ( testOrderID = uuid.MustParse("00000000-0000-0000-0000-000000000001") testCustomerID = uuid.MustParse("00000000-0000-0000-0000-000000000002") testProductID = uuid.MustParse("00000000-0000-0000-0000-000000000003") testItems = []order.OrderItem{{ ProductID: testProductID, Quantity: 2, PriceCents: 1500, }} ) // orderPlacedEvent crée un OrderPlaced pour les tests func orderPlacedEvent() order.OrderPlaced { return order.OrderPlaced{ OrderID: testOrderID, CustomerID: testCustomerID, Items: testItems, TotalCents: 3000, PlacedAt: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC), } } func TestOrderConfirm(t *testing.T) { tests := []testCase{ { name: "confirm a pending order", given: []order.Event{orderPlacedEvent()}, when: func(o *order.Order) error { return o.Confirm() }, then: []order.Event{ order.OrderConfirmed{OrderID: testOrderID}, }, }, { name: "cannot confirm already confirmed order", given: []order.Event{ orderPlacedEvent(), order.OrderConfirmed{OrderID: testOrderID}, }, when: func(o *order.Order) error { return o.Confirm() }, wantErr: true, }, { name: "cannot confirm cancelled order", given: []order.Event{ orderPlacedEvent(), order.OrderCancelled{ OrderID: testOrderID, Reason: "customer request", }, }, when: func(o *order.Order) error { return o.Confirm() }, wantErr: true, }, } runTestCases(t, tests) } func TestOrderCancel(t *testing.T) { tests := []testCase{ { name: "cancel a pending order", given: []order.Event{orderPlacedEvent()}, when: func(o *order.Order) error { return o.Cancel("customer changed mind") }, then: []order.Event{ order.OrderCancelled{ OrderID: testOrderID, Reason: "customer changed mind", }, }, }, { name: "cannot cancel a shipped order", given: []order.Event{ orderPlacedEvent(), order.OrderConfirmed{OrderID: testOrderID}, order.OrderShipped{ OrderID: testOrderID, TrackingNumber: "TRACK123", }, }, when: func(o *order.Order) error { return o.Cancel("too late") }, wantErr: true, }, { name: "cancel requires non-empty reason", given: []order.Event{orderPlacedEvent()}, when: func(o *order.Order) error { return o.Cancel("") }, wantErr: true, }, } runTestCases(t, tests) } // runTestCases exécute tous les cas de test avec le pattern Given/When/Then func runTestCases(t *testing.T, tests []testCase) { t.Helper() for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { // GIVEN : reconstruire l'aggregate depuis les événements initiaux var agg *order.Order var err error if len(tt.given) > 0 { agg, err = order.NewFromEvents(tt.given) require.NoError(t, err, "failed to reconstruct aggregate from given events") } else { agg = &order.Order{} // aggregate vide pour les factory methods } // WHEN : exécuter l'opération err = tt.when(agg) // THEN : vérifier le résultat if tt.wantErr { assert.Error(t, err) return } require.NoError(t, err) // Vérifier les événements produits uncommitted := agg.UncommittedEvents() require.Len(t, uncommitted, len(tt.then), "expected %d events, got %d", len(tt.then), len(uncommitted)) for i, expected := range tt.then { actual := uncommitted[i] // Comparer le type assert.IsType(t, expected, actual, "event[%d]: expected type %T, got %T", i, expected, actual) // Comparer les champs importants (sans les timestamps) assertEventEquals(t, expected, actual, i) } }) } } // assertEventEquals compare deux events en ignorant les timestamps auto-générés func assertEventEquals(t *testing.T, expected, actual order.Event, index int) { t.Helper() switch exp := expected.(type) { case order.OrderConfirmed: act, ok := actual.(order.OrderConfirmed) require.True(t, ok) assert.Equal(t, exp.OrderID, act.OrderID, "event[%d]: OrderID mismatch", index) case order.OrderCancelled: act, ok := actual.(order.OrderCancelled) require.True(t, ok) assert.Equal(t, exp.OrderID, act.OrderID) assert.Equal(t, exp.Reason, act.Reason) case order.OrderShipped: act, ok := actual.(order.OrderShipped) require.True(t, ok) assert.Equal(t, exp.OrderID, act.OrderID) assert.Equal(t, exp.TrackingNumber, act.TrackingNumber) } } ``` ### Tests d'intégration — Event Store et concurrence optimiste ```go package integration_test import ( "context" "sync" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) // TestOptimisticConcurrencyConflict vérifie que deux saves concurrents conflictuels // retournent bien ErrConflict func TestOptimisticConcurrencyConflict(t *testing.T) { ctx := context.Background() db := setupTestDB(t) // helper qui crée une DB de test store := eventstore.NewPostgresEventStore(db) orderID := uuid.New() // Créer un aggregate initial event1 := SerializedEvent{EventType: "OrderPlaced", Payload: []byte(`{}`)} err := store.Save(ctx, orderID, "order", 0, []SerializedEvent{event1}) require.NoError(t, err) // Simuler deux modifications concurrentes de la version 1 var wg sync.WaitGroup results := make([]error, 2) for i := 0; i < 2; i++ { wg.Add(1) go func(idx int) { defer wg.Done() event := SerializedEvent{ EventType: "OrderConfirmed", Payload: []byte(`{}`), } // Les deux tentent de passer de version 1 → 2 results[idx] = store.Save(ctx, orderID, "order", 1, []SerializedEvent{event}) }(i) } wg.Wait() // Exactement l'un doit réussir, l'autre doit avoir un conflit successes := 0 conflicts := 0 for _, err := range results { if err == nil { successes++ } else if errors.Is(err, eventstore.ErrConflict) { conflicts++ } } assert.Equal(t, 1, successes, "exactly one save should succeed") assert.Equal(t, 1, conflicts, "exactly one save should conflict") } // TestProjectionIdempotency vérifie qu'une projection est idempotente func TestProjectionIdempotency(t *testing.T) { ctx := context.Background() db := setupTestDB(t) proj := projection.NewOrderSummaryProjection(db) orderID := uuid.New() event := eventstore.StoredEvent{ ID: 1, EventType: "OrderPlaced", Payload: mustMarshal(order.OrderPlaced{OrderID: orderID, TotalCents: 3000}), } // Traiter le même événement 3 fois for i := 0; i < 3; i++ { err := proj.Handle(ctx, event) require.NoError(t, err, "handle attempt %d failed", i+1) } // Vérifier qu'il n'y a qu'une seule ligne dans la projection var count int err := db.GetContext(ctx, &count, "SELECT COUNT(*) FROM order_summary WHERE id = $1", orderID) require.NoError(t, err) assert.Equal(t, 1, count, "projection should be idempotent") } ``` ### Tests avec eventual consistency ```go // TestEventualConsistency vérifie qu'une projection est éventuellement cohérente func TestEventualConsistency(t *testing.T) { ctx := context.Background() // Setup : event store + projection worker db := setupTestDB(t) store := eventstore.NewPostgresEventStore(db) proj := projection.NewOrderSummaryProjection(db) worker := projection.NewWorker(store, proj, db) // Lancer le worker en arrière-plan workerCtx, cancel := context.WithCancel(ctx) defer cancel() go func() { if err := worker.Run(workerCtx); err != nil && !errors.Is(err, context.Canceled) { t.Errorf("worker failed: %v", err) } }() // Créer une commande orderID := uuid.New() o, err := order.Place(orderID, uuid.New(), testItems) require.NoError(t, err) repo := NewSnapshotRepository(store, db) err = repo.Save(ctx, o) require.NoError(t, err) // Attendre que la projection soit mise à jour (eventual consistency) assert.Eventually(t, func() bool { var count int err := db.GetContext(ctx, &count, "SELECT COUNT(*) FROM order_summary WHERE id = $1 AND status = 'pending'", orderID, ) return err == nil && count == 1 }, 5*time.Second, 100*time.Millisecond, "projection should eventually reflect the placed order", ) } ``` --- ## Section 14 : Anti-patterns — ce qu'il ne faut jamais faire ### 1. Property Sourcing — trop granulaire ```go // ❌ MAUVAIS : Property Sourcing // Chaque modification de champ = un event séparé // Résultat : des milliers d'events sans signification métier type OrderCustomerIDChanged struct { OrderID uuid.UUID CustomerID uuid.UUID } type OrderTotalAmountChanged struct { OrderID uuid.UUID TotalCents int64 } // ✅ BON : Event métier riche // L'event capture l'intention, pas le changement de propriété type OrderPlaced struct { OrderID uuid.UUID CustomerID uuid.UUID Items []OrderItem TotalCents int64 PlacedAt time.Time } ``` ### 2. CRUD Events — sans signification métier ```go // ❌ MAUVAIS : Events CRUD type OrderCreated struct{ OrderID uuid.UUID } type OrderUpdated struct{ OrderID uuid.UUID; Fields map[string]interface{} } type OrderDeleted struct{ OrderID uuid.UUID } // ✅ BON : Events du domaine type OrderPlaced struct{ /* ... */ } type OrderConfirmed struct{ /* ... */ } type OrderCancelled struct{ Reason string /* ... */ } ``` ### 3. God Aggregate — contention massive ```go // ❌ MAUVAIS : Un seul aggregate pour tout le catalogue produit type ProductCatalog struct { id uuid.UUID products []Product // 100 000 produits — lock sur chaque modification ! } // ✅ BON : Un aggregate par produit type Product struct { id uuid.UUID name string price int64 // ... } ``` ### 4. Appels externes dans Apply/Transition ```go // ❌ MAUVAIS : Appel externe dans Apply // Brise le replay des événements historiques func (o *Order) applyOrderPlaced(e OrderPlaced) { o.status = StatusPending price, _ := http.Get("http://pricing-service/price/" + e.Items[0].ProductID.String()) // ❌ JAMAIS } // ✅ BON : Apply ne fait que muter l'état func (o *Order) applyOrderPlaced(e OrderPlaced) { o.id = e.OrderID o.status = StatusPending o.items = e.Items o.totalCents = e.TotalCents // Calculé au moment de la commande } ``` ### 5. Event Store utilisé comme Event Bus ```go // ❌ MAUVAIS : Les projections écoutent directement l'Event Store d'un autre service // Crée un couplage invisible et difficile à tracer projectionWorker.Subscribe("payment-service-event-store") // ✅ BON : Publier des integration events via Outbox → Broker // Chaque service a ses propres topics publics avec contrat versioned broker.Subscribe("payment.payment-processed.v1", handler) ``` ### 6. Retourner des données depuis les commandes ```go // ❌ MAUVAIS : Le command handler retourne des données // Viole CQRS — le write side ne doit pas retourner l'état func (h *Handler) HandlePlaceOrder(ctx context.Context, cmd PlaceOrder) (*Order, error) { // ... return order, nil // ❌ CQRS violation } // ✅ BON : Le command handler retourne seulement une erreur // Le client utilise le Content-Location pour lire l'état via le query side func (h *Handler) HandlePlaceOrder(ctx context.Context, cmd PlaceOrder) error { // ... return nil } ``` ### 7. Projections non-idempotentes ```go // ❌ MAUVAIS : Pas d'idempotence func (p *Projection) HandleOrderPlaced(ctx context.Context, e OrderPlaced) error { _, err := p.db.ExecContext(ctx, "INSERT INTO order_summary (id, status) VALUES ($1, 'pending')", e.OrderID, ) // Si cet event est rejoué, ça plante avec une violation de contrainte unique return err } // ✅ BON : Idempotent avec ON CONFLICT DO NOTHING func (p *Projection) HandleOrderPlaced(ctx context.Context, e OrderPlaced) error { _, err := p.db.ExecContext(ctx, "INSERT INTO order_summary (id, status) VALUES ($1, 'pending') ON CONFLICT (id) DO NOTHING", e.OrderID, ) return err } ``` ### 8. Mauvaises boundaries d'aggregates ```go // ❌ MAUVAIS : Order et Customer dans le même aggregate // Nécessite de modifier deux "aggregates" dans une transaction type OrderWithCustomer struct { orderID uuid.UUID customer Customer // Entité séparée avec son propre cycle de vie } // ✅ BON : Chaque aggregate a son propre cycle de vie // Order référence Customer par ID seulement type Order struct { id uuid.UUID customerID uuid.UUID // Référence par ID, pas par valeur } ``` ### 9. Snapshots manquants sur aggregates long-lived ```go // ❌ MAUVAIS : Aggregate avec des années d'historique, jamais de snapshot // Un compte bancaire avec 50 000 transactions va prendre des secondes à charger // ✅ BON : Implémenter les snapshots dès que l'aggregate peut dépasser ~100 events // Voir Section 7 pour l'implémentation complète ``` --- ## Section 15 : Quand NE PAS utiliser CQRS/ES ### Critères d'exclusion CQRS/Event Sourcing apporte une complexité significative. Ne l'utiliser que si les bénéfices justifient ce coût. | Situation | Recommandation | |-----------|---------------| | CRUD simple, pas de règles métier complexes | CRUD standard avec ORM | | Cohérence forte requise immédiatement | CQRS/ES avec projection synchrone OU architecture plus simple | | Petite équipe, deadline serrée, pas d'expérience ES | Commencer simple, migrer plus tard | | Contrainte de stockage forte | ES utilise 3-5x plus d'espace | | Pas besoin d'audit/historique | Pas justifié | | Prototype ou MVP | Trop complexe pour valider une idée | ### Signaux positifs pour CQRS/ES Utiliser CQRS/ES si la réponse est oui à au moins 3 de ces questions : ``` □ Le domaine a des règles métier complexes avec de nombreux invariants ? □ Un historique complet des changements est requis (audit, compliance) ? □ Les read models et write models ont des besoins très différents ? □ Plusieurs services doivent réagir aux changements d'état ? □ La scalabilité lecture/écriture indépendante est nécessaire ? □ Le métier parle naturellement en termes d'événements passés ? □ L'équipe a l'expérience nécessaire pour maintenir l'architecture ? □ Le système vivra plusieurs années et devra évoluer ? ``` ### Alternative graduée ``` Niveau 1 : CRUD + triggers DB pour audit ↓ si règles métier complexes Niveau 2 : Domain Model + Repository (sans ES) ↓ si historique/audit requis Niveau 3 : CQRS sans Event Sourcing (séparation write/read model) ↓ si historique COMPLET requis + scalabilité Niveau 4 : CQRS + Event Sourcing complet ``` --- ## Section 16 : Monitoring production et alertes ### Métriques Prometheus ```go package metrics import "github.com/prometheus/client_golang/prometheus" var ( // Compteur de writes dans l'Event Store EventStoreWritesTotal = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "event_store_writes_total", Help: "Total number of event store write operations", }, []string{"aggregate_type", "status"}, // status: success, conflict, error ) // Durée des writes EventStoreWriteDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Name: "event_store_write_duration_seconds", Help: "Duration of event store write operations", Buckets: []float64{.001, .005, .01, .025, .05, .1, .25, .5, 1}, }, []string{"aggregate_type"}, ) // Lag des projections (en nombre d'events) ProjectionLagEvents = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "projection_lag_events", Help: "Number of events not yet processed by this projection", }, []string{"projection_name"}, ) // Lag des projections (en durée) ProjectionLagDuration = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "projection_lag_seconds", Help: "Age of the oldest unprocessed event for this projection", }, []string{"projection_name"}, ) // Conflits de concurrence optimiste ConcurrencyConflictsTotal = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "optimistic_concurrency_conflicts_total", Help: "Total number of optimistic concurrency conflicts", }, []string{"aggregate_type"}, ) // Outbox messages en attente OutboxPendingMessages = prometheus.NewGauge( prometheus.GaugeOpts{ Name: "outbox_pending_messages", Help: "Number of messages pending in the outbox", }, ) // Durée de replay des aggregates AggregateReplayDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Name: "aggregate_replay_duration_seconds", Help: "Duration to replay an aggregate from its events", Buckets: []float64{.001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5}, }, []string{"aggregate_type"}, ) // Taille des aggregates (nombre d'events) AggregateEventCount = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Name: "aggregate_event_count", Help: "Number of events per aggregate at load time", Buckets: []float64{1, 5, 10, 25, 50, 100, 250, 500, 1000}, }, []string{"aggregate_type"}, ) ) ``` ### Intégration des métriques dans le code ```go // Dans le repository func (r *InstrumentedRepository) Save(ctx context.Context, o *order.Order) error { timer := prometheus.NewTimer( metrics.EventStoreWriteDuration.WithLabelValues("order"), ) defer timer.ObserveDuration() err := r.base.Save(ctx, o) status := "success" if err != nil { if errors.Is(err, order.ErrConflict) { status = "conflict" metrics.ConcurrencyConflictsTotal.WithLabelValues("order").Inc() } else { status = "error" } } metrics.EventStoreWritesTotal.WithLabelValues("order", status).Inc() return err } // Dans le projection worker — collecte du lag func (w *ProjectionWorker) collectLagMetrics(ctx context.Context) { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: } var lag struct { LagCount int64 `db:"lag_count"` LagDuration sql.NullString `db:"lag_duration"` } err := w.db.GetContext(ctx, &lag, ` SELECT MAX(e.id) - pc.last_event_id AS lag_count, EXTRACT(EPOCH FROM (MAX(e.created_at) - ( SELECT created_at FROM es_event WHERE id = pc.last_event_id ))) AS lag_duration FROM projection_checkpoint pc, es_event e WHERE pc.projection_name = $1`, w.projection.Name(), ) if err != nil { continue } metrics.ProjectionLagEvents. WithLabelValues(w.projection.Name()). Set(float64(lag.LagCount)) if lag.LagDuration.Valid { duration, _ := strconv.ParseFloat(lag.LagDuration.String, 64) metrics.ProjectionLagDuration. WithLabelValues(w.projection.Name()). Set(duration) } } } ``` ### Règles d'alerte Prometheus/Alertmanager ```yaml # alerting-rules.yml groups: - name: event_sourcing rules: # Projection lag > 5 minutes : alerte critique - alert: ProjectionLagTooHigh expr: projection_lag_seconds > 300 for: 2m labels: severity: critical annotations: summary: "Projection {{ $labels.projection_name }} is lagging" description: "Projection lag is {{ $value }}s (threshold: 300s)" # Outbox > 1000 messages pending - alert: OutboxBacklogTooLarge expr: outbox_pending_messages > 1000 for: 5m labels: severity: warning annotations: summary: "Outbox backlog is growing" description: "{{ $value }} messages pending in outbox" # Taux de conflit > 5% sur les 5 dernières minutes - alert: HighConcurrencyConflictRate expr: | rate(optimistic_concurrency_conflicts_total[5m]) / rate(event_store_writes_total[5m]) > 0.05 for: 5m labels: severity: warning annotations: summary: "High optimistic concurrency conflict rate" description: "Conflict rate is {{ $value | humanizePercentage }}" # Replay lent > 1 seconde (besoin de snapshots ?) - alert: SlowAggregateReplay expr: | histogram_quantile(0.95, rate(aggregate_replay_duration_seconds_bucket[10m]) ) > 1.0 for: 5m labels: severity: warning annotations: summary: "Aggregate replay is slow" description: "P95 replay duration is {{ $value }}s — consider adding snapshots" ``` ### Dashboard Grafana — requêtes clés ``` # Taux d'écriture dans l'Event Store rate(event_store_writes_total{status="success"}[1m]) # Taux de conflits de concurrence rate(optimistic_concurrency_conflicts_total[5m]) # Lag des projections (toutes) max by(projection_name) (projection_lag_seconds) # P99 durée de replay histogram_quantile(0.99, rate(aggregate_replay_duration_seconds_bucket[5m])) # Taille moyenne des aggregates histogram_quantile(0.50, rate(aggregate_event_count_bucket[5m])) ``` --- ## Section 17 : Checklist complète ### Checklist Aggregate ``` Architecture □ Tous les champs de l'aggregate sont privés □ Les quatre accesseurs obligatoires sont présents : ID(), Version(), UncommittedEvents(), ClearChanges() □ L'interface Event a une méthode isEvent() non exportée □ Une aggregate = une transaction boundary Command Methods □ Tous les invariants sont validés AVANT d'appeler raise() □ Les erreurs retournées sont des erreurs métier claires □ Aucun appel externe dans les command methods raise() et transition() □ raise() appelle transition() PUIS incrémente version PUIS append à changes □ transition() ne fait que de la mutation d'état □ transition() n'a pas de return error □ transition() n'a aucun appel externe (DB, HTTP, temps actuel, etc.) □ transition() est déterministe Reconstruction □ NewFromEvents(events) fonctionne correctement □ NewFromSnapshot(snap, events) fonctionne correctement si snapshots implémentés □ La version de l'aggregate après reconstruction est correcte ``` ### Checklist Event Store ``` Schéma □ Table es_aggregate avec contrainte d'unicité sur (id) □ Table es_event avec contrainte UNIQUE(aggregate_id, version) □ Index sur (aggregate_id, version ASC) pour les loads □ Index sur (id ASC) pour les projections □ Table es_snapshot si snapshots implémentés □ Table projection_checkpoint □ Table outbox si outbox pattern utilisé □ Table processed_commands si idempotence requise Save avec optimistic concurrency □ BEGIN TX avant toute opération □ UPDATE es_aggregate WHERE version = expected □ Si 0 rows affectées → retourner ErrConflict □ INSERT events dans la même TX □ Contrainte UNIQUE(aggregate_id, version) comme filet de sécurité supplémentaire □ COMMIT final Load □ SELECT WHERE aggregate_id ORDER BY version ASC □ Gestion correcte du cas "aggregate not found" (0 events) ``` ### Checklist Command Handlers ``` □ Commands retournent seulement error, jamais de données □ Le client fournit l'UUID (pas généré côté serveur dans le handler) □ Language du domaine dans les noms de commandes (CancelOrder, pas DeleteOrder) □ Un handler par commande □ Le pattern Update (load → fn → save) est utilisé pour les modifications □ Cross-cutting concerns via décorateurs, pas dans le handler métier □ HTTP: 204 + Content-Location header pour les commands réussies ``` ### Checklist Projections ``` □ Toutes les projections sont idempotentes □ INSERT ON CONFLICT DO NOTHING pour les OrderPlaced-like events □ UPDATE avec guard (WHERE status != 'already-applied') pour idempotence □ Le checkpoint est mis à jour dans la MÊME transaction que le read model □ Les projections ne sont JAMAIS utilisées pour des décisions write-side □ Un rebuild depuis zéro est possible et testé □ Les événements inconnus sont ignorés silencieusement ``` ### Checklist Outbox ``` □ Messages outbox écrits dans la MÊME transaction que les events □ Relay utilise FOR UPDATE SKIP LOCKED pour la concurrence □ Les integration events ont un contrat versioned distinct des domain events □ La publication sur le broker est idempotente (ou le consumer l'est) □ Monitoring de l'outbox backlog (alerte si > 1000) ``` ### Checklist Idempotence ``` □ Tous les command handlers ont une clé d'idempotence □ La table processed_commands est utilisée pour la déduplication □ Les transactions compensatoires sont idempotentes □ Les projections sont idempotentes (Section 8) □ Le cleanup des vieilles clés est planifié ``` ### Checklist Testing ``` □ Tests unitaires avec pattern Given/When/Then pour chaque command method □ Cas de succès ET cas d'erreur testés □ Test de concurrence optimiste (deux saves concurrents) □ Test d'idempotence des projections (rejouer les mêmes events) □ Test d'eventual consistency avec assert.Eventually □ Test de reconstruction depuis snapshot si implémenté □ Test d'upcasters si versioning implémenté ``` ### Checklist Event Versioning ``` □ schema_version présent dans toutes les métadonnées □ correlation_id et causation_id présents dans les métadonnées □ Les nouveaux champs utilisent omitempty □ Un upcaster est écrit pour chaque changement cassant □ Les anciens events ne sont JAMAIS modifiés ni supprimés □ Les upcasters sont testés avec des payloads historiques réels ``` ### Checklist Monitoring ``` □ Métriques : event_store_writes_total, write_duration □ Métriques : projection_lag (events et durée) □ Métriques : concurrency_conflicts_total □ Métriques : outbox_pending_messages □ Métriques : aggregate_replay_duration □ Alerte : projection lag > 5 minutes □ Alerte : outbox backlog > 1000 □ Alerte : conflict rate > 5% □ Alerte : P95 replay > 1 seconde ``` ### Checklist Architecture générale ``` □ Un aggregate = une transaction (jamais deux dans la même TX DB) □ Les projections ne sont pas utilisées pour les décisions write-side □ Les domain events ne sont pas publiés tels quels à l'extérieur □ Les integration events ont un contrat stable et versionné □ La livraison est at-least-once → chaque consumer est idempotent □ Les sagas ont des compensations idempotentes □ Les snapshots sont implémentés pour les aggregates long-lived □ Le rebuild de toutes les projections est testé et documenté ``` ### Bibliothèques Go recommandées ```go // Event Sourcing frameworks // github.com/hallgren/eventsourcing — Simple, bien maintenu // github.com/looplab/eventhorizon — Complet, opinionated // github.com/thefabric-io/eventsourcing — Léger // Messaging // github.com/ThreeDotsLabs/watermill — Excellent, multi-broker // Supporte : Kafka, NATS, RabbitMQ, PostgreSQL, etc. // Workflow complexes (sagas) // go.temporal.io/sdk — Temporal SDK Go // PostgreSQL // github.com/jmoiron/sqlx — Extension de database/sql // github.com/jackc/pgx/v5 — Driver natif PostgreSQL // github.com/jackc/pglogrepl — Logical replication (WAL outbox) // UUID // github.com/google/uuid — UUID v4/v7 // Tests // github.com/stretchr/testify — Assert, require, mock // github.com/testcontainers/testcontainers-go — DB de test réelle // Métriques // github.com/prometheus/client_golang — Prometheus ``` --- ## Testing ### Tester un Command Handler en isolation Le command handler dépend d'un event store — injecter une implémentation in-memory pour les tests unitaires. ```go // InMemoryEventStore — implémentation de test type InMemoryEventStore struct { mu sync.Mutex events map[string][]Event } func (s *InMemoryEventStore) Load(ctx context.Context, aggregateID string) ([]Event, error) { s.mu.Lock() defer s.mu.Unlock() return s.events[aggregateID], nil } func (s *InMemoryEventStore) Append(ctx context.Context, aggregateID string, events []Event, expectedVersion int) error { s.mu.Lock() defer s.mu.Unlock() current := s.events[aggregateID] if len(current) != expectedVersion { return ErrOptimisticConcurrency } s.events[aggregateID] = append(current, events...) return nil } // Test du command handler func TestPlaceOrderHandler(t *testing.T) { store := &InMemoryEventStore{events: make(map[string][]Event)} handler := NewPlaceOrderHandler(store) cmd := PlaceOrder{ OrderID: "order-1", CustomerID: "cust-42", Items: []OrderItem{{ProductID: "p1", Qty: 2, Price: 1500}}, } err := handler.Handle(context.Background(), cmd) require.NoError(t, err) events, _ := store.Load(context.Background(), "order-1") require.Len(t, events, 1) placed, ok := events[0].(OrderPlaced) require.True(t, ok) assert.Equal(t, "cust-42", placed.CustomerID) } ``` ### Tester un Projector Les projectors sont des fonctions pures : event en entrée, état en sortie. Tester chaque transition. ```go func TestOrderProjector_OrderPlaced(t *testing.T) { projector := NewOrderProjector() state := projector.InitialState() event := OrderPlaced{ OrderID: "order-1", CustomerID: "cust-42", TotalCents: 3000, PlacedAt: time.Now(), } newState := projector.Apply(state, event) order, ok := newState.(OrderReadModel) require.True(t, ok) assert.Equal(t, "pending", order.Status) assert.Equal(t, 3000, order.TotalCents) } // Tester une séquence d'événements (replay) func TestOrderProjector_FullLifecycle(t *testing.T) { projector := NewOrderProjector() events := []Event{ OrderPlaced{OrderID: "o1", TotalCents: 1000}, OrderShipped{OrderID: "o1", TrackingCode: "TRACK-123"}, OrderDelivered{OrderID: "o1"}, } state := projector.Replay(events) order := state.(OrderReadModel) assert.Equal(t, "delivered", order.Status) assert.Equal(t, "TRACK-123", order.TrackingCode) } ``` ### Tester une Saga Tester que la saga émet les bons commands en réponse aux events. ```go func TestPaymentSaga_OrderPlaced_EmitsChargeCommand(t *testing.T) { var capturedCommands []Command dispatcher := func(cmd Command) error { capturedCommands = append(capturedCommands, cmd) return nil } saga := NewPaymentSaga(dispatcher) err := saga.Handle(context.Background(), OrderPlaced{ OrderID: "order-1", TotalCents: 5000, PaymentMethodID: "pm-abc", }) require.NoError(t, err) require.Len(t, capturedCommands, 1) charge, ok := capturedCommands[0].(ChargePaymentMethod) require.True(t, ok) assert.Equal(t, 5000, charge.AmountCents) assert.Equal(t, "pm-abc", charge.PaymentMethodID) } ``` ### Tests d'intégration avec Testcontainers Pour tester l'event store réel avec PostgreSQL. ```go func TestEventStore_Integration(t *testing.T) { if testing.Short() { t.Skip("skipping integration test") } ctx := context.Background() container, err := postgres.Run(ctx, "postgres:16-alpine", postgres.WithDatabase("testdb"), postgres.WithUsername("test"), postgres.WithPassword("test"), ) require.NoError(t, err) defer container.Terminate(ctx) dsn, _ := container.ConnectionString(ctx, "sslmode=disable") store := NewPostgresEventStore(dsn) require.NoError(t, store.Migrate(ctx)) // Tester append + load events := []Event{OrderPlaced{OrderID: "o1", TotalCents: 1000}} err = store.Append(ctx, "o1", events, 0) require.NoError(t, err) loaded, err := store.Load(ctx, "o1") require.NoError(t, err) require.Len(t, loaded, 1) } ``` ### Tester la concurrence optimiste ```go func TestEventStore_OptimisticConcurrency(t *testing.T) { store := &InMemoryEventStore{events: make(map[string][]Event)} // Premier append : version 0 → 1 err := store.Append(ctx, "agg-1", []Event{evt1}, 0) require.NoError(t, err) // Deuxième append concurrent avec version incorrecte err = store.Append(ctx, "agg-1", []Event{evt2}, 0) // ← devrait être 1 require.ErrorIs(t, err, ErrOptimisticConcurrency) } ``` --- *Last updated: 2025-03 — Revoir si : Go 1.24+ (nouvelles APIs itertools/rangefunc), PostgreSQL 17+ (changements logical replication), ou émergence d'un framework CQRS/ES Go dominant.*