Série CQRS en Go :
- Partie 1 : l'aggregate, Transition() et Clone()
- Partie 2 : command handlers sans effet de bord
- Partie 3 : sagas et chorégraphie par events
- Partie 4 : PostgreSQL comme event store
Les trois premières parties ont posé les bases : aggregates immutables, command handlers purs, sagas par chorégraphie. Il reste une question centrale — où persiste-t-on les events ? Cette partie répond à cette question en utilisant PostgreSQL comme event store append-only, sans dépendance supplémentaire.
Pourquoi PostgreSQL et pas EventStoreDB ou Kafka
EventStoreDB est un outil excellent, conçu spécifiquement pour l'event sourcing. Mais c'est une brique supplémentaire à déployer, monitorer, backuper et maintenir. Pour une équipe qui gère déjà PostgreSQL en production, ajouter EventStoreDB signifie doubler la surface opérationnelle pour une base de données.
Kafka est souvent cité dans ce contexte. C'est un bus de messages, pas un event store. La lecture par aggregate ID n'est pas son point fort — Kafka est optimisé pour la consommation séquentielle de partitions, pas pour "donne-moi tous les events de la commande CMD-4521 dans l'ordre". La rétention par défaut est limitée dans le temps. Reconstituer un aggregate depuis Kafka demande du travail non trivial.
PostgreSQL, vous l'avez déjà. ACID natif, JSONB avec indexation, sauvegardes intégrées dans votre infrastructure existante, monitoring que votre équipe connaît, pg_dump, réplication, point-in-time recovery. Pour 90% des projets, une table append-only en PostgreSQL suffit largement. Le passage à un outil dédié se justifie quand vous traitez des millions d'events par seconde — un seuil que la plupart des projets n'atteignent jamais.
La table es_events — le coeur
Tout repose sur une seule table :
CREATE TABLE es_events (
id BIGSERIAL PRIMARY KEY,
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(64) NOT NULL,
event_type VARCHAR(128) NOT NULL,
event_data JSONB NOT NULL,
metadata JSONB DEFAULT '{}',
version INT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (aggregate_id, version)
);
CREATE INDEX idx_es_events_aggregate ON es_events (aggregate_id, version);
Chaque colonne a un rôle précis :
- id (BIGSERIAL) : position globale dans le stream. C'est le curseur qu'utilisent les projectors pour savoir où ils en sont. Monotone croissant, jamais réutilisé.
- aggregate_id : l'identifiant de l'entité concernée — l'UUID de la commande, du paiement, de l'expédition. C'est la clé de lecture principale pour reconstituer un aggregate.
- aggregate_type : le type de l'aggregate — "order", "payment", "shipping". Permet de filtrer les events par domaine et d'éviter les collisions d'UUID entre types différents.
-
event_type : le nom de l'event — "OrderPlaced", "PaymentConfirmed", "ShipmentDispatched".
Utilisé pour désérialiser
event_datadans le bon type Go. - event_data (JSONB) : le payload sérialisé de l'event. JSONB permet l'indexation et les requêtes sur les champs si nécessaire, sans sacrifier la flexibilité.
- metadata (JSONB) : informations de traçabilité — qui a déclenché l'action (user ID), correlation ID, causation ID, timestamp client. Pas du domaine, mais indispensable en production.
- version : numéro de séquence par aggregate. Le premier event d'un aggregate est à la version 1, le deuxième à la version 2, et ainsi de suite.
La contrainte UNIQUE (aggregate_id, version) est la pièce centrale de la sécurité concurrente.
Elle rend impossible l'insertion de deux events avec la même version sur le même aggregate. C'est le
mécanisme d'optimistic locking — pas de verrou en lecture, une violation de contrainte à l'écriture en
cas de conflit.
Append — écrire un event
type EventStore struct {
db *sqlx.DB
}
func (s *EventStore) Append(ctx context.Context, aggregateID uuid.UUID, aggregateType string, expectedVersion int, events []StorableEvent) error {
tx, err := s.db.BeginTxx(ctx, nil)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback()
for i, event := range events {
version := expectedVersion + i + 1
data, err := json.Marshal(event.Data)
if err != nil {
return fmt.Errorf("marshal event: %w", err)
}
_, err = tx.ExecContext(ctx, `
INSERT INTO es_events (aggregate_id, aggregate_type, event_type, event_data, metadata, version)
VALUES ($1, $2, $3, $4, $5, $6)
`, aggregateID, aggregateType, event.Type, data, event.Metadata, version)
if err != nil {
// UNIQUE violation = conflit de version = optimistic locking
if isUniqueViolation(err) {
return ErrConcurrencyConflict
}
return fmt.Errorf("insert event: %w", err)
}
}
return tx.Commit()
}
Le paramètre expectedVersion est le numéro du dernier event connu au moment où l'aggregate
a été chargé. Si le handler produit deux events, ils seront insérés aux versions
expectedVersion + 1 et expectedVersion + 2.
Si une autre goroutine a écrit un event entre le chargement et l'écriture, la contrainte UNIQUE entre en
jeu — PostgreSQL rejette l'insertion avec une violation de contrainte, traduite en
ErrConcurrencyConflict. Le client peut réessayer : il rechargera l'aggregate avec le nouvel
event, recalculera le state, et réexécutera le handler. C'est l'optimistic locking — pas de lock en
lecture, juste une vérification à l'écriture.
Load — recharger un aggregate
func (s *EventStore) Load(ctx context.Context, aggregateID uuid.UUID) ([]StoredEvent, error) {
var events []StoredEvent
err := s.db.SelectContext(ctx, &events, `
SELECT id, aggregate_id, aggregate_type, event_type, event_data, metadata, version, created_at
FROM es_events
WHERE aggregate_id = $1
ORDER BY version ASC
`, aggregateID)
if err != nil {
return nil, fmt.Errorf("loading events: %w", err)
}
return events, nil
}
Le state de l'aggregate n'est jamais stocké directement. Il est toujours recalculé en rejouant les events
dans l'ordre, en appelant Transition() sur chacun d'eux (voir partie 1). C'est le replay.
Le résultat est déterministe — les mêmes events donnent toujours le même state. C'est ce qui rend le
système testable et auditable.
Les subscriptions — alimenter les projections
Les projectors — les composants qui maintiennent les vues de lecture — doivent être notifiés des nouveaux events. La solution la plus simple : une table de positions.
CREATE TABLE es_subscriptions (
subscriber_id VARCHAR(128) PRIMARY KEY,
last_event_id BIGINT NOT NULL DEFAULT 0,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
Chaque projector enregistre l'identifiant du dernier event traité. Au démarrage ou après un crash, il repart de cette position. Le pattern de polling :
type Subscriber struct {
db *sqlx.DB
subscriberID string
handler EventHandler
}
func (s *Subscriber) Poll(ctx context.Context) error {
// 1. Lire la dernière position traitée
var lastID int64
s.db.GetContext(ctx, &lastID,
"SELECT last_event_id FROM es_subscriptions WHERE subscriber_id = $1",
s.subscriberID,
)
// 2. Charger les nouveaux events
var events []StoredEvent
s.db.SelectContext(ctx, &events,
"SELECT * FROM es_events WHERE id > $1 ORDER BY id ASC LIMIT 100",
lastID,
)
// 3. Traiter dans une transaction
for _, event := range events {
tx, _ := s.db.BeginTxx(ctx, nil)
if err := s.handler.Handle(ctx, tx, event); err != nil {
tx.Rollback()
return fmt.Errorf("handling event %d: %w", event.ID, err)
}
// 4. Mettre à jour la position dans la même transaction
tx.ExecContext(ctx,
"UPDATE es_subscriptions SET last_event_id = $1, updated_at = NOW() WHERE subscriber_id = $2",
event.ID, s.subscriberID,
)
tx.Commit()
}
return nil
}
Le détail critique est dans le point 4 : la mise à jour de la position et le traitement de l'event sont dans la même transaction. Si le process crash entre le traitement et la mise à jour de la position, l'event sera retraité au prochain poll. C'est de l'at-least-once delivery — les handlers doivent être idempotents pour absorber les doublons sans effet de bord.
L'idempotence en pratique : un handler qui insère une ligne dans une vue de lecture peut utiliser
INSERT ... ON CONFLICT DO NOTHING, ou vérifier si la ligne existe déjà. La clé de déduplication
est généralement l'event_id ou une combinaison de champs métier uniques.
Notifications temps réel avec LISTEN/NOTIFY
Le polling introduit un délai — au mieux quelques centaines de millisecondes si le poll tourne fréquemment, au pire plusieurs secondes. Pour des vues de lecture qui doivent être réactives, PostgreSQL offre un mécanisme natif : LISTEN/NOTIFY.
// Côté event store : notifier après chaque INSERT
func (s *EventStore) Append(ctx context.Context, aggregateID uuid.UUID, aggregateType string, expectedVersion int, events []StorableEvent) error {
tx, err := s.db.BeginTxx(ctx, nil)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback()
// ... insert events ...
// Notifier les subscribers
if _, err := tx.ExecContext(ctx, "NOTIFY es_events_channel"); err != nil {
return fmt.Errorf("notify: %w", err)
}
return tx.Commit()
}
// Côté subscriber : écouter au lieu de poller
func (s *Subscriber) Listen(ctx context.Context) error {
conn, err := s.db.Conn(ctx)
if err != nil {
return fmt.Errorf("acquire conn: %w", err)
}
defer conn.Close()
if _, err := conn.ExecContext(ctx, "LISTEN es_events_channel"); err != nil {
return fmt.Errorf("listen: %w", err)
}
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Attendre notification (bloquant avec timeout)
_, err := conn.WaitForNotification(ctx)
if err != nil {
return fmt.Errorf("wait notification: %w", err)
}
// Poll immédiatement après la notification
if err := s.Poll(ctx); err != nil {
return err
}
}
}
La notification est envoyée dans la transaction de l'Append — elle n'est délivrée aux listeners que si la transaction est committée. Pas de fausse alerte en cas de rollback.
En production, combiner les deux approches : LISTEN pour la réactivité en temps normal, polling périodique (toutes les 5 à 30 secondes) comme filet de sécurité en cas de reconnexion ou de notification manquée. La connexion dédiée au LISTEN ne doit pas être prise dans le pool de connexions standard — elle occupe une connexion en permanence.
Le command gateway — assembler le tout
Avec l'event store en place, le flux complet d'une command devient explicite :
type CommandGateway struct {
store *EventStore
}
func (gw *CommandGateway) Execute(ctx context.Context, aggregateID uuid.UUID, cmd Command, handler CommandHandler) error {
// 1. Charger les events
storedEvents, err := gw.store.Load(ctx, aggregateID)
if err != nil {
return fmt.Errorf("load aggregate: %w", err)
}
// 2. Reconstruire le state
state := Replay(storedEvents)
currentVersion := len(storedEvents)
// 3. Exécuter le handler (pur, sans side effect)
newEvents, err := handler.Handle(ctx, state, cmd)
if err != nil {
return err
}
// 4. Persister les nouveaux events
return gw.store.Append(ctx, aggregateID, cmd.AggregateType(), currentVersion, newEvents)
}
Si deux commands arrivent simultanément sur le même aggregate, la deuxième échoue avec
ErrConcurrencyConflict. Le client peut réessayer — il rechargera l'aggregate incluant les
events de la première command, recalculera le state, et réexécutera le handler sur le state à jour. En
pratique, les conflits réels sont rares sur des aggregates bien délimités ; la plupart des opérations
concurrentes touchent des aggregates différents.
C'est aussi ce pattern qui rend les command handlers testables sans infrastructure (partie 2) : le gateway injecte le state, le handler produit des events, le gateway persiste. Chaque responsabilité est isolée.
Performance — quand les events s'accumulent
Un aggregate avec quelques dizaines d'events se recharge en quelques microsecondes. Un aggregate avec 10 000 events — une commande longue durée, un compte utilisateur très actif — commence à peser sur le replay. La solution est les snapshots.
CREATE TABLE es_snapshots (
aggregate_id UUID PRIMARY KEY,
aggregate_type VARCHAR(64) NOT NULL,
version INT NOT NULL,
state_data JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
Un snapshot est une capture du state serialisé à un instant donné, associé à sa version. Le chargement avec snapshot se déroule en deux étapes :
- Charger le snapshot le plus récent pour l'aggregate (s'il existe)
- Charger uniquement les events postérieurs à la version du snapshot
- Rejouer les events partiels sur le state du snapshot
La règle pratique la plus simple : créer un snapshot tous les 100 events. La plupart des aggregates n'atteignent jamais cette limite — un aggregate e-commerce typique génère 5 à 20 events sur sa durée de vie. Les snapshots ne sont nécessaires que pour les aggregates à haut volume : comptes, wallets, inventaires mis à jour des centaines de fois par jour.
Important : les snapshots sont une optimisation, pas une modification du modèle de données. Les events restent la source de vérité. Un snapshot corrompu ou supprimé n'est pas une perte de données — l'aggregate peut être reconstitué depuis le début des events.
Résumé de la série
En quatre parties, nous avons construit une implémentation CQRS complète en Go :
-
Partie 1 — L'aggregate : state immutable,
Transition()pour appliquer les events,Clone()pour le dry-run. La base qui rend tout le reste testable. - Partie 2 — Les command handlers : fonctions pures qui prennent un state et une command, retournent des events ou une erreur. Pas de base de données, pas d'HTTP, testables en isolation totale.
- Partie 3 — Les sagas et la chorégraphie : coordination entre aggregates par events publiés. Chaque service réagit à ce qui l'intéresse, sans couplage direct. Les sagas gèrent les compensations en cas d'échec distribué.
- Partie 4 — L'event store : PostgreSQL comme infrastructure de persistance. Table append-only, optimistic locking par contrainte UNIQUE, subscriptions pour les projectors, LISTEN/NOTIFY pour la réactivité, snapshots pour la performance à grande échelle.
Ces quatre patterns se combinent : le command gateway charge via l'event store, appelle le handler pur, persiste les events produits. Les subscribers lisent le stream global et maintiennent les vues de lecture. Les sagas écoutent certains event types et déclenchent des commands sur d'autres aggregates.
Le résultat est un système auditable par construction — l'historique complet est dans les events —, testable à chaque couche, et opérable avec l'infrastructure PostgreSQL que vous gérez déjà.