CQRS+ES : le pubsub bridge pour les outcomes commands et l'audit log atomique

Dans les articles précédents de cette série, on a couvert les couches réseau et sécurité du service d'authentification : PKCS#12, timing oracle, mTLS, CRL. Plongeons maintenant dans l'architecture applicative. Le service est construit en CQRS + Event Sourcing, et deux patterns non-évidents méritent un article.

L'invariant event-XOR-error

Dans un système CQRS/ES bien discipliné, un command handler fait exactement une chose : il émet un event OU il retourne une erreur. Jamais les deux. Jamais aucun.

func (h *LoginHandler) Handle(cmd LoginCommand) ([]Event, error) {
    user, err := h.repo.Load(cmd.UserID)
    if err != nil {
        return nil, err // erreur infra = pas d'event
    }

    if !user.VerifyPassword(cmd.Password) {
        return []Event{LoginFailed{UserID: cmd.UserID}}, nil // event, pas erreur
    }

    return []Event{LoginSucceeded{UserID: cmd.UserID}}, nil // event, pas erreur
}

Cette discipline est cruciale : les erreurs sont des problèmes d'infrastructure (DB down, timeout réseau). Les résultats métier sont des events (login réussi, login échoué). Mélanger les deux casse la traçabilité et rend les projectors imprévisibles.

Le problème : comment informer le caller ?

Le command handler émet un event LoginSucceeded. L'event est persisté dans l'event store. Un projector le consomme et met à jour la read model. Tout ça est asynchrone.

Mais le HTTP handler qui a envoyé la commande a besoin d'une réponse maintenant. L'utilisateur attend devant son écran. Comment lui dire "ton login a réussi, voici ton cookie de session" ?

La tentation : mettre le résultat dans une erreur typée.

// NE FAIS PAS CA
if user.VerifyPassword(cmd.Password) {
    return nil, &LoginResult{Success: true, SessionID: "abc"}
    // Ca casse l'invariant : c'est un resultat metier, pas une erreur infra
}

Ça casse la séparation "erreur = infra" vs "résultat métier = event". Les middlewares d'erreur, les retry policies, les circuit breakers — tout est calibré sur l'hypothèse que error != nil signifie un problème, pas un succès.

Le pattern pubsub bridge

La solution : le projector republie l'event sur un canal pubsub avec le CorrelationID de la commande. Le caller s'abonne avant d'envoyer la commande, filtre par CorrelationID, et traduit l'event en valeur de retour.

func (h *HTTPLoginHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    cmd := LoginCommand{
        UserID:        extractUserID(r),
        Password:      r.FormValue("password"),
        CorrelationID: uuid.New().String(),
    }

    // S'abonner AVANT d'envoyer la commande
    sub := h.pubsub.Subscribe(cmd.CorrelationID)
    defer sub.Close()

    // Envoyer la commande
    if err := h.bus.Dispatch(cmd); err != nil {
        http.Error(w, "Internal error", 500)
        return
    }

    // Attendre l'event avec timeout
    select {
    case event := <-sub.Events():
        switch e := event.(type) {
        case LoginSucceeded:
            setSessionCookie(w, e.SessionID)
            http.Redirect(w, r, "/dashboard", 302)
        case LoginFailed:
            renderLoginPage(w, "Invalid credentials")
        }
    case <-time.After(5 * time.Second):
        http.Error(w, "Timeout", 504)
    }
}

Le flux complet :

  1. HTTP handler génère un CorrelationID et s'abonne au pubsub
  2. HTTP handler dispatch la commande
  3. Command handler émet un event (LoginSucceeded ou LoginFailed)
  4. Event est persisté dans l'event store
  5. Projector consomme l'event, le republie sur le pubsub avec le CorrelationID
  6. HTTP handler reçoit l'event via le pubsub, traduit en réponse HTTP

L'invariant event-XOR-error est préservé. Le caller reçoit une réponse synchrone. Le projector reste le seul point de transformation event → side-effect.

L'audit log atomique

Deuxième pattern : l'audit log. Dans un projector, tu fais souvent plusieurs choses en réponse à un event : mettre à jour la read model, écrire une entrée d'audit, envoyer une notification, parfois déclencher un logout.

Le piège : si la projection métier réussit et l'audit échoue, tu as une divergence. L'utilisateur a été connecté, mais l'audit log ne le sait pas. Ou pire : l'audit dit "login à 14:03" mais la session a été créée à 14:02 parce que l'audit a été retenté.

Le pattern : tous les side-effects dans une seule transaction DB, sauf le logout post-tx qui est best-effort.

func (p *LoginProjector) Handle(event LoginSucceeded) error {
    tx, err := p.db.Begin()
    if err != nil {
        return err
    }
    defer tx.Rollback()

    // 1. Mettre a jour la read model
    if err := p.updateSession(tx, event); err != nil {
        return err
    }

    // 2. Ecrire l'audit log - DANS la meme transaction
    if err := p.writeAuditEntry(tx, AuditEntry{
        Action:    "login_succeeded",
        UserID:    event.UserID,
        Timestamp: event.Timestamp,
        IP:        event.IP,
    }); err != nil {
        return err
    }

    // 3. Publier sur le pubsub bridge - DANS la meme transaction
    // (utilise pg_notify ou outbox pattern)
    if err := p.publishOutbox(tx, event); err != nil {
        return err
    }

    // Commit atomique : tout ou rien
    if err := tx.Commit(); err != nil {
        return err
    }

    // 4. Best-effort : notifier, cleanup, etc.
    // Si ca echoue, la transaction est deja commitee
    go p.notifySecurityTeam(event)

    return nil
}

Login failures sur unknown users : slog only

Un piège corollaire : les login failures sur des utilisateurs qui n'existent pas. Si tu écris une entrée d'audit pour chaque tentative, un brute-forcer qui essaie 10 millions d'emails random produit 10 millions de rows dans audit_events.

La règle : les login failures sur des comptes existants méritent un audit (c'est un signal de sécurité). Les login failures sur des comptes inexistants sont du bruit — slog.Warn et c'est tout. Le rate limiter par IP en amont limite le volume des logs eux-mêmes.

Conclusion

Le pubsub bridge résout le problème du "retour synchrone dans un système asynchrone" sans casser l'invariant event-XOR-error. L'audit log atomique force le "tout ou rien" sur les side-effects critiques.

Les deux patterns ont un point commun : ils imposent une discipline sur les frontières transactionnelles. Dans un système event-sourcé, ces frontières sont le seul filet de sécurité entre "le système est cohérent" et "on ne sait plus ce qui s'est passé".

L'architecture est en place, la sécurité est auditée. Mais comment l'audit lui-même a été conduit ? La méthodologie — audit itératif en passes, faux positifs auto-générés, et comment transformer les probes en tests de régression — c'est le sujet du prochain article.

Commentaires (0)