En HTTP, l'idempotence c'est simple : on stocke la clé, on retourne le cache. Dans un système CQRS/Event Sourcing, c'est plus subtil. La command peut être idempotente, mais l'event qu'elle génère ? La projection qui le consomme ? L'idempotence doit traverser tout le stack.
Si vous n'avez pas lu la partie 1, elle couvre les bases et l'implémentation HTTP en Go — avec un store PostgreSQL complet qui remplace Redis. Ici on monte d'un cran : command store, optimistic locking, projections idempotentes, outbox pattern. Ce sont les patterns qu'on retrouve dans les systèmes financiers, les plateformes e-commerce, partout où un doublon de traitement coûte de l'argent ou de la confiance. Tout repose sur PostgreSQL — pas de Redis, pas de broker externe pour l'idempotence.
Le problème spécifique à Event Sourcing
En Event Sourcing, l'état de l'application est le stream d'events. Le solde d'un compte n'est pas une
colonne en base — c'est la somme des events CompteCrédité et CompteDébité depuis
l'ouverture. Si le même event est enregistré deux fois, l'état reconstruit est faux. Et souvent irréparable
sans intervention manuelle.
Scénario concret sur un système de paiement :
- La command
DébiterCompte(montant=100, idempotencyKey=abc123)arrive - L'event
CompteDébité(montant=100)est enregistré, le solde passe de 200 à 100 - Le réseau coupe. Le client retry. La même command arrive une deuxième fois
- Sans protection : un deuxième
CompteDébité(montant=100)est enregistré, le solde tombe à 0 - L'utilisateur a été débité deux fois pour un seul achat
Dans une base relationnelle classique, on aurait pu corriger avec un UPDATE. En Event Sourcing, on ne modifie jamais le passé — on ne peut qu'ajouter un event correctif, ce qui complique l'audit et le support. Mieux vaut ne pas laisser entrer le doublon.
Idempotence des commands avec l'idempotency key
Le principe est le même qu'en HTTP : associer chaque command à une clé unique fournie par le client, et vérifier avant traitement si cette clé a déjà été vue. La différence, c'est qu'on stocke le résultat dans un command store dédié, et que la sauvegarde doit être atomique avec les events.
CREATE TABLE processed_commands (
idempotency_key UUID PRIMARY KEY,
aggregate_id UUID NOT NULL,
command_type VARCHAR(100) NOT NULL,
result_event_id UUID,
processed_at TIMESTAMPTZ DEFAULT NOW()
);
Le handler suit un schéma en trois étapes :
func (h *PaymentHandler) Handle(ctx context.Context, cmd DébiterCompte) error {
// 1. Vérifier si déjà traité
existing, err := h.commandStore.Get(ctx, cmd.IdempotencyKey)
if err == nil {
// Déjà traité — retourner succès silencieux
_ = existing
return nil
}
if !errors.Is(err, ErrNotFound) {
return fmt.Errorf("checking idempotency: %w", err)
}
// 2. Traiter la command
events, err := h.aggregate.Handle(ctx, cmd)
if err != nil {
return err
}
// 3. Sauvegarder events + marquer command comme traitée — ATOMIQUEMENT
return h.store.SaveEventsAndMarkCommand(ctx, events, cmd.IdempotencyKey)
}
L'atomicité de l'étape 3 est non-négociable. Si on sauvegarde les events dans une transaction et qu'on marque la command dans une deuxième transaction séparée, un crash entre les deux donne : events en base, command non marquée. Au retry suivant, le handler ne voit pas la clé, traite à nouveau, enregistre un doublon d'events. Les deux opérations doivent vivre dans la même transaction PostgreSQL.
Optimistic locking — détecter les conflits concurrents
L'idempotency key protège contre les doublons d'une même command. Elle ne protège pas contre deux commands différentes qui modifient le même aggregate en même temps. C'est le rôle de l'optimistic locking.
Chaque aggregate a un numéro de version. Quand un client lit l'état d'un aggregate, il reçoit sa version courante. Quand il envoie une command, il inclut cette version. Si entre-temps quelqu'un d'autre a écrit sur le même aggregate, les versions ne correspondent plus et la command est rejetée — à charge du client de relire l'état et de décider si sa command tient toujours.
CREATE TABLE account_events (
id UUID PRIMARY KEY,
aggregate_id UUID NOT NULL,
version INT NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(aggregate_id, version) -- contrainte clé
);
La contrainte UNIQUE(aggregate_id, version) fait tout le travail. Si deux commandes concurrentes
tentent d'écrire la version 5 du même aggregate, PostgreSQL en laisse passer une et rejette l'autre avec une
violation d'unicité.
type DébiterCompte struct {
AggregateID uuid.UUID
ExpectedVersion int // version que le client a lue
Montant float64
IdempotencyKey uuid.UUID
}
func (s *EventStore) AppendEvents(
ctx context.Context,
aggregateID uuid.UUID,
expectedVersion int,
events []Event,
) error {
for i, event := range events {
_, err := s.db.ExecContext(ctx,
`INSERT INTO account_events (id, aggregate_id, version, event_type, payload)
VALUES ($1, $2, $3, $4, $5)`,
event.ID,
aggregateID,
expectedVersion+i+1,
event.Type,
event.Payload,
)
if isUniqueViolation(err) {
return ErrVersionConflict // l'appelant peut retry avec la nouvelle version
}
if err != nil {
return fmt.Errorf("appending event: %w", err)
}
}
return nil
}
En retournant ErrVersionConflict, on laisse à l'appelant la décision de retry ou de remonter
l'erreur à l'utilisateur. Dans un système financier, on remonte souvent une erreur explicite ("votre opération
a été annulée car le compte a été modifié entre-temps") plutôt que de retenter silencieusement.
Idempotence des projections
Les projections consomment le stream d'events pour construire des vues dénormalisées — le solde courant d'un compte, la liste des commandes d'un client, etc. Avec Kafka ou tout système at-least-once, le même event peut arriver plusieurs fois. La projection doit produire le même résultat qu'elle le voie une ou dix fois.
Approche 1 : tracking de la position dans le stream
Chaque projection mémorise l'ID du dernier event qu'elle a traité. Les events antérieurs ou égaux au checkpoint sont ignorés.
CREATE TABLE projection_checkpoints (
projection_name VARCHAR(100) PRIMARY KEY,
last_event_id UUID NOT NULL,
updated_at TIMESTAMPTZ DEFAULT NOW()
);
Cette approche fonctionne bien quand les events sont ordonnés et qu'on peut comparer les IDs (UUIDs v7 ou séquences). Elle suppose que l'event store garantit un ordre stable, ce qui est généralement vrai par aggregate mais peut l'être moins entre aggregates différents.
Approche 2 : upsert idempotent
Plutôt que de filtrer les doublons en amont, on écrit de façon à ce que réécrire la même donnée soit inoffensif.
L'instruction INSERT ON CONFLICT DO UPDATE de PostgreSQL est taillée pour ça :
INSERT INTO account_balances (account_id, balance, last_event_id)
VALUES ($1, $2, $3)
ON CONFLICT (account_id) DO UPDATE
SET balance = EXCLUDED.balance,
last_event_id = EXCLUDED.last_event_id
WHERE account_balances.last_event_id < EXCLUDED.last_event_id;
-- La clause WHERE évite d'écraser un état plus récent avec un event plus ancien
La clause WHERE est importante. Sans elle, si un event ancien arrive après un event récent (ce qui
peut arriver avec des partitions Kafka multiples), on écrase un état correct avec un état périmé. Avec elle,
on ne met à jour que si l'event entrant est plus récent que ce qu'on a déjà.
En pratique, les deux approches sont complémentaires : le checkpoint évite de rejouer des milliers d'events inutilement, et l'upsert idempotent agit comme filet de sécurité pour les doublons qui passent quand même.
L'Outbox pattern — publier des events sans perdre de données
Supposons que votre système sauvegarde les events en PostgreSQL et les publie sur Kafka pour les autres services. Si le process crash après le commit PostgreSQL mais avant la publication Kafka, l'event est perdu du point de vue des consommateurs. Si on inverse l'ordre, on peut publier un event que la transaction va ensuite rollbacker. Il est impossible de faire un commit atomique sur deux systèmes indépendants sans coordinateur distribué.
L'Outbox pattern contourne le problème sans two-phase commit :
- Dans la même transaction PostgreSQL : sauvegarder l'event et l'écrire dans une table
outbox - Un worker dédié lit
outboxet publie sur Kafka - Une fois publié avec succès, marquer la ligne comme envoyée
CREATE TABLE outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
published BOOLEAN DEFAULT FALSE,
created_at TIMESTAMPTZ DEFAULT NOW(),
published_at TIMESTAMPTZ
);
func (s *Store) SaveEventAndOutbox(ctx context.Context, tx *sqlx.Tx, event Event) error {
// 1. Sauvegarder l'event dans l'event store
if err := s.saveEvent(ctx, tx, event); err != nil {
return err
}
// 2. Écrire dans l'outbox — même transaction, même commit ou même rollback
_, err := tx.ExecContext(ctx,
`INSERT INTO outbox (event_type, payload) VALUES ($1, $2)`,
event.Type,
event.Payload,
)
return err
}
// Worker séparé — tourne en arrière-plan
func (w *OutboxWorker) Run(ctx context.Context) {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := w.publishPending(ctx); err != nil {
slog.Error("outbox publish failed", "error", err)
}
case <-ctx.Done():
return
}
}
}
func (w *OutboxWorker) publishPending(ctx context.Context) error {
rows, err := w.db.QueryContext(ctx,
`SELECT id, event_type, payload FROM outbox
WHERE published = FALSE
ORDER BY created_at
LIMIT 100`,
)
if err != nil {
return fmt.Errorf("querying outbox: %w", err)
}
defer rows.Close()
for rows.Next() {
var id uuid.UUID
var eventType string
var payload []byte
if err := rows.Scan(&id, &eventType, &payload); err != nil {
return fmt.Errorf("scanning outbox row: %w", err)
}
if err := w.kafka.Publish(ctx, eventType, payload); err != nil {
return fmt.Errorf("publishing event %s: %w", id, err)
}
_, err = w.db.ExecContext(ctx,
`UPDATE outbox SET published = TRUE, published_at = NOW() WHERE id = $1`,
id,
)
if err != nil {
return fmt.Errorf("marking event published %s: %w", id, err)
}
}
return rows.Err()
}
Le worker peut publier le même event deux fois s'il crash après la publication Kafka mais avant le
UPDATE outbox. C'est du at-least-once delivery, pas exactly-once. Les consommateurs Kafka
doivent donc être idempotents — ce que l'approche upsert décrite en section précédente garantit.
Pour les systèmes à fort volume, le polling toutes les 100ms peut être remplacé par PostgreSQL LISTEN/NOTIFY pour déclencher le worker immédiatement après chaque insertion en outbox, sans attendre le prochain tick.
Récapitulatif — qui protège quoi
| Problème | Solution |
|---|---|
| Command reçue deux fois (retry client) | Idempotency key dans processed_commands |
| Deux commands concurrentes sur le même aggregate | Optimistic locking (contrainte UNIQUE sur la version) |
| Event publié en double par le broker | Projection idempotente (checkpoint ou upsert) |
| Crash entre sauvegarde et publication | Outbox pattern (atomicité via transaction) |
Chaque couche protège contre un type de doublon différent. L'idempotency key ne remplace pas l'optimistic locking, et l'outbox ne rend pas la projection idempotente — ce sont des garanties orthogonales qui couvrent chacune un point de défaillance distinct.
Conclusion
L'idempotence dans CQRS/Event Sourcing n'est pas un détail qu'on ajoute après coup — c'est ce qui sépare un système qui marche en démo d'un système qui tient en prod. Les retries arrivent. Les processes crashent. Kafka re-délivre des messages. Ce n'est pas de la paranoïa, c'est le comportement normal de tout système distribué sous charge.
La bonne nouvelle : ces quatre patterns sont bien connus, bien supportés par PostgreSQL, et s'assemblent proprement en Go. Une fois en place, ils fonctionnent silencieusement — et c'est exactement ce qu'on attend d'eux. Le ticket "paiement en double" qui n'arrive jamais, c'est ça le vrai succès.