CQRS en Go — Partie 4 : PostgreSQL comme event store

Série CQRS en Go :

Les trois premières parties ont posé les bases : aggregates immutables, command handlers purs, sagas par chorégraphie. Il reste une question centrale — où persiste-t-on les events ? Cette partie répond à cette question en utilisant PostgreSQL comme event store append-only, sans dépendance supplémentaire.

Pourquoi PostgreSQL et pas EventStoreDB ou Kafka

EventStoreDB est un outil excellent, conçu spécifiquement pour l'event sourcing. Mais c'est une brique supplémentaire à déployer, monitorer, backuper et maintenir. Pour une équipe qui gère déjà PostgreSQL en production, ajouter EventStoreDB signifie doubler la surface opérationnelle pour une base de données.

Kafka est souvent cité dans ce contexte. C'est un bus de messages, pas un event store. La lecture par aggregate ID n'est pas son point fort — Kafka est optimisé pour la consommation séquentielle de partitions, pas pour "donne-moi tous les events de la commande CMD-4521 dans l'ordre". La rétention par défaut est limitée dans le temps. Reconstituer un aggregate depuis Kafka demande du travail non trivial.

PostgreSQL, vous l'avez déjà. ACID natif, JSONB avec indexation, sauvegardes intégrées dans votre infrastructure existante, monitoring que votre équipe connaît, pg_dump, réplication, point-in-time recovery. Pour 90% des projets, une table append-only en PostgreSQL suffit largement. Le passage à un outil dédié se justifie quand vous traitez des millions d'events par seconde — un seuil que la plupart des projets n'atteignent jamais.

La table es_events — le coeur

Tout repose sur une seule table :

CREATE TABLE es_events (
    id            BIGSERIAL PRIMARY KEY,
    aggregate_id  UUID NOT NULL,
    aggregate_type VARCHAR(64) NOT NULL,
    event_type    VARCHAR(128) NOT NULL,
    event_data    JSONB NOT NULL,
    metadata      JSONB DEFAULT '{}',
    version       INT NOT NULL,
    created_at    TIMESTAMPTZ NOT NULL DEFAULT NOW(),

    UNIQUE (aggregate_id, version)
);

CREATE INDEX idx_es_events_aggregate ON es_events (aggregate_id, version);

Chaque colonne a un rôle précis :

  • id (BIGSERIAL) : position globale dans le stream. C'est le curseur qu'utilisent les projectors pour savoir où ils en sont. Monotone croissant, jamais réutilisé.
  • aggregate_id : l'identifiant de l'entité concernée — l'UUID de la commande, du paiement, de l'expédition. C'est la clé de lecture principale pour reconstituer un aggregate.
  • aggregate_type : le type de l'aggregate — "order", "payment", "shipping". Permet de filtrer les events par domaine et d'éviter les collisions d'UUID entre types différents.
  • event_type : le nom de l'event — "OrderPlaced", "PaymentConfirmed", "ShipmentDispatched". Utilisé pour désérialiser event_data dans le bon type Go.
  • event_data (JSONB) : le payload sérialisé de l'event. JSONB permet l'indexation et les requêtes sur les champs si nécessaire, sans sacrifier la flexibilité.
  • metadata (JSONB) : informations de traçabilité — qui a déclenché l'action (user ID), correlation ID, causation ID, timestamp client. Pas du domaine, mais indispensable en production.
  • version : numéro de séquence par aggregate. Le premier event d'un aggregate est à la version 1, le deuxième à la version 2, et ainsi de suite.

La contrainte UNIQUE (aggregate_id, version) est la pièce centrale de la sécurité concurrente. Elle rend impossible l'insertion de deux events avec la même version sur le même aggregate. C'est le mécanisme d'optimistic locking — pas de verrou en lecture, une violation de contrainte à l'écriture en cas de conflit.

Append — écrire un event

type EventStore struct {
    db *sqlx.DB
}

func (s *EventStore) Append(ctx context.Context, aggregateID uuid.UUID, aggregateType string, expectedVersion int, events []StorableEvent) error {
    tx, err := s.db.BeginTxx(ctx, nil)
    if err != nil {
        return fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback()

    for i, event := range events {
        version := expectedVersion + i + 1

        data, err := json.Marshal(event.Data)
        if err != nil {
            return fmt.Errorf("marshal event: %w", err)
        }

        _, err = tx.ExecContext(ctx, `
            INSERT INTO es_events (aggregate_id, aggregate_type, event_type, event_data, metadata, version)
            VALUES ($1, $2, $3, $4, $5, $6)
        `, aggregateID, aggregateType, event.Type, data, event.Metadata, version)

        if err != nil {
            // UNIQUE violation = conflit de version = optimistic locking
            if isUniqueViolation(err) {
                return ErrConcurrencyConflict
            }
            return fmt.Errorf("insert event: %w", err)
        }
    }

    return tx.Commit()
}

Le paramètre expectedVersion est le numéro du dernier event connu au moment où l'aggregate a été chargé. Si le handler produit deux events, ils seront insérés aux versions expectedVersion + 1 et expectedVersion + 2.

Si une autre goroutine a écrit un event entre le chargement et l'écriture, la contrainte UNIQUE entre en jeu — PostgreSQL rejette l'insertion avec une violation de contrainte, traduite en ErrConcurrencyConflict. Le client peut réessayer : il rechargera l'aggregate avec le nouvel event, recalculera le state, et réexécutera le handler. C'est l'optimistic locking — pas de lock en lecture, juste une vérification à l'écriture.

Load — recharger un aggregate

func (s *EventStore) Load(ctx context.Context, aggregateID uuid.UUID) ([]StoredEvent, error) {
    var events []StoredEvent

    err := s.db.SelectContext(ctx, &events, `
        SELECT id, aggregate_id, aggregate_type, event_type, event_data, metadata, version, created_at
        FROM es_events
        WHERE aggregate_id = $1
        ORDER BY version ASC
    `, aggregateID)

    if err != nil {
        return nil, fmt.Errorf("loading events: %w", err)
    }

    return events, nil
}

Le state de l'aggregate n'est jamais stocké directement. Il est toujours recalculé en rejouant les events dans l'ordre, en appelant Transition() sur chacun d'eux (voir partie 1). C'est le replay. Le résultat est déterministe — les mêmes events donnent toujours le même state. C'est ce qui rend le système testable et auditable.

Les subscriptions — alimenter les projections

Les projectors — les composants qui maintiennent les vues de lecture — doivent être notifiés des nouveaux events. La solution la plus simple : une table de positions.

CREATE TABLE es_subscriptions (
    subscriber_id  VARCHAR(128) PRIMARY KEY,
    last_event_id  BIGINT NOT NULL DEFAULT 0,
    updated_at     TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

Chaque projector enregistre l'identifiant du dernier event traité. Au démarrage ou après un crash, il repart de cette position. Le pattern de polling :

type Subscriber struct {
    db           *sqlx.DB
    subscriberID string
    handler      EventHandler
}

func (s *Subscriber) Poll(ctx context.Context) error {
    // 1. Lire la dernière position traitée
    var lastID int64
    s.db.GetContext(ctx, &lastID,
        "SELECT last_event_id FROM es_subscriptions WHERE subscriber_id = $1",
        s.subscriberID,
    )

    // 2. Charger les nouveaux events
    var events []StoredEvent
    s.db.SelectContext(ctx, &events,
        "SELECT * FROM es_events WHERE id > $1 ORDER BY id ASC LIMIT 100",
        lastID,
    )

    // 3. Traiter dans une transaction
    for _, event := range events {
        tx, _ := s.db.BeginTxx(ctx, nil)

        if err := s.handler.Handle(ctx, tx, event); err != nil {
            tx.Rollback()
            return fmt.Errorf("handling event %d: %w", event.ID, err)
        }

        // 4. Mettre à jour la position dans la même transaction
        tx.ExecContext(ctx,
            "UPDATE es_subscriptions SET last_event_id = $1, updated_at = NOW() WHERE subscriber_id = $2",
            event.ID, s.subscriberID,
        )

        tx.Commit()
    }

    return nil
}

Le détail critique est dans le point 4 : la mise à jour de la position et le traitement de l'event sont dans la même transaction. Si le process crash entre le traitement et la mise à jour de la position, l'event sera retraité au prochain poll. C'est de l'at-least-once delivery — les handlers doivent être idempotents pour absorber les doublons sans effet de bord.

L'idempotence en pratique : un handler qui insère une ligne dans une vue de lecture peut utiliser INSERT ... ON CONFLICT DO NOTHING, ou vérifier si la ligne existe déjà. La clé de déduplication est généralement l'event_id ou une combinaison de champs métier uniques.

Notifications temps réel avec LISTEN/NOTIFY

Le polling introduit un délai — au mieux quelques centaines de millisecondes si le poll tourne fréquemment, au pire plusieurs secondes. Pour des vues de lecture qui doivent être réactives, PostgreSQL offre un mécanisme natif : LISTEN/NOTIFY.

// Côté event store : notifier après chaque INSERT
func (s *EventStore) Append(ctx context.Context, aggregateID uuid.UUID, aggregateType string, expectedVersion int, events []StorableEvent) error {
    tx, err := s.db.BeginTxx(ctx, nil)
    if err != nil {
        return fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback()

    // ... insert events ...

    // Notifier les subscribers
    if _, err := tx.ExecContext(ctx, "NOTIFY es_events_channel"); err != nil {
        return fmt.Errorf("notify: %w", err)
    }

    return tx.Commit()
}

// Côté subscriber : écouter au lieu de poller
func (s *Subscriber) Listen(ctx context.Context) error {
    conn, err := s.db.Conn(ctx)
    if err != nil {
        return fmt.Errorf("acquire conn: %w", err)
    }
    defer conn.Close()

    if _, err := conn.ExecContext(ctx, "LISTEN es_events_channel"); err != nil {
        return fmt.Errorf("listen: %w", err)
    }

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }

        // Attendre notification (bloquant avec timeout)
        _, err := conn.WaitForNotification(ctx)
        if err != nil {
            return fmt.Errorf("wait notification: %w", err)
        }

        // Poll immédiatement après la notification
        if err := s.Poll(ctx); err != nil {
            return err
        }
    }
}

La notification est envoyée dans la transaction de l'Append — elle n'est délivrée aux listeners que si la transaction est committée. Pas de fausse alerte en cas de rollback.

En production, combiner les deux approches : LISTEN pour la réactivité en temps normal, polling périodique (toutes les 5 à 30 secondes) comme filet de sécurité en cas de reconnexion ou de notification manquée. La connexion dédiée au LISTEN ne doit pas être prise dans le pool de connexions standard — elle occupe une connexion en permanence.

Le command gateway — assembler le tout

Avec l'event store en place, le flux complet d'une command devient explicite :

type CommandGateway struct {
    store *EventStore
}

func (gw *CommandGateway) Execute(ctx context.Context, aggregateID uuid.UUID, cmd Command, handler CommandHandler) error {
    // 1. Charger les events
    storedEvents, err := gw.store.Load(ctx, aggregateID)
    if err != nil {
        return fmt.Errorf("load aggregate: %w", err)
    }

    // 2. Reconstruire le state
    state := Replay(storedEvents)
    currentVersion := len(storedEvents)

    // 3. Exécuter le handler (pur, sans side effect)
    newEvents, err := handler.Handle(ctx, state, cmd)
    if err != nil {
        return err
    }

    // 4. Persister les nouveaux events
    return gw.store.Append(ctx, aggregateID, cmd.AggregateType(), currentVersion, newEvents)
}

Si deux commands arrivent simultanément sur le même aggregate, la deuxième échoue avec ErrConcurrencyConflict. Le client peut réessayer — il rechargera l'aggregate incluant les events de la première command, recalculera le state, et réexécutera le handler sur le state à jour. En pratique, les conflits réels sont rares sur des aggregates bien délimités ; la plupart des opérations concurrentes touchent des aggregates différents.

C'est aussi ce pattern qui rend les command handlers testables sans infrastructure (partie 2) : le gateway injecte le state, le handler produit des events, le gateway persiste. Chaque responsabilité est isolée.

Performance — quand les events s'accumulent

Un aggregate avec quelques dizaines d'events se recharge en quelques microsecondes. Un aggregate avec 10 000 events — une commande longue durée, un compte utilisateur très actif — commence à peser sur le replay. La solution est les snapshots.

CREATE TABLE es_snapshots (
    aggregate_id   UUID PRIMARY KEY,
    aggregate_type VARCHAR(64) NOT NULL,
    version        INT NOT NULL,
    state_data     JSONB NOT NULL,
    created_at     TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

Un snapshot est une capture du state serialisé à un instant donné, associé à sa version. Le chargement avec snapshot se déroule en deux étapes :

  • Charger le snapshot le plus récent pour l'aggregate (s'il existe)
  • Charger uniquement les events postérieurs à la version du snapshot
  • Rejouer les events partiels sur le state du snapshot

La règle pratique la plus simple : créer un snapshot tous les 100 events. La plupart des aggregates n'atteignent jamais cette limite — un aggregate e-commerce typique génère 5 à 20 events sur sa durée de vie. Les snapshots ne sont nécessaires que pour les aggregates à haut volume : comptes, wallets, inventaires mis à jour des centaines de fois par jour.

Important : les snapshots sont une optimisation, pas une modification du modèle de données. Les events restent la source de vérité. Un snapshot corrompu ou supprimé n'est pas une perte de données — l'aggregate peut être reconstitué depuis le début des events.

Résumé de la série

En quatre parties, nous avons construit une implémentation CQRS complète en Go :

  • Partie 1 — L'aggregate : state immutable, Transition() pour appliquer les events, Clone() pour le dry-run. La base qui rend tout le reste testable.
  • Partie 2 — Les command handlers : fonctions pures qui prennent un state et une command, retournent des events ou une erreur. Pas de base de données, pas d'HTTP, testables en isolation totale.
  • Partie 3 — Les sagas et la chorégraphie : coordination entre aggregates par events publiés. Chaque service réagit à ce qui l'intéresse, sans couplage direct. Les sagas gèrent les compensations en cas d'échec distribué.
  • Partie 4 — L'event store : PostgreSQL comme infrastructure de persistance. Table append-only, optimistic locking par contrainte UNIQUE, subscriptions pour les projectors, LISTEN/NOTIFY pour la réactivité, snapshots pour la performance à grande échelle.

Ces quatre patterns se combinent : le command gateway charge via l'event store, appelle le handler pur, persiste les events produits. Les subscribers lisent le stream global et maintiennent les vues de lecture. Les sagas écoutent certains event types et déclenchent des commands sur d'autres aggregates.

Le résultat est un système auditable par construction — l'historique complet est dans les events —, testable à chaque couche, et opérable avec l'infrastructure PostgreSQL que vous gérez déjà.

📄 CLAUDE.md associé

Commentaires (0)