Le parallélisme en Go — Partie 3 : context, errgroup et patterns de production

Série : Le parallélisme en Go

Goroutines et channels, vous maîtrisez. Maintenant les vraies questions : comment on annule proprement ? Comment on gère les erreurs de plusieurs goroutines sans perdre les détails ? Comment on évite que le programme crash à 3h du matin parce que SIGTERM a coupé une opération en plein milieu ?

Cette partie couvre les outils et patterns que j'utilise en production. Pas de théorie pure — des réponses à des problèmes que j'ai rencontrés pour de vrai.

1. context.Context — annuler des goroutines proprement

Premier réflexe quand on découvre Go : chercher comment « tuer » une goroutine depuis l'extérieur. La réponse est simple et un peu déconcertante : on ne peut pas. C'est voulu. Go n'expose pas de mécanisme d'arrêt forcé parce qu'un arrêt forcé laisse des ressources dans un état inconnu — fichiers ouverts à moitié écrits, transactions non commitées, locks jamais relâchés.

La solution propre, c'est le signal coopératif : on signale à la goroutine qu'elle devrait s'arrêter, et c'est elle qui décide quand c'est safe de le faire. C'est exactement ce que fait context.Context.

Trois constructeurs à connaître :

  • context.WithCancel — annulation manuelle, on appelle cancel() quand on veut
  • context.WithTimeout — annulation automatique après une durée
  • context.WithDeadline — annulation à une heure précise

Dans tous les cas, le pattern est identique : la goroutine écoute ctx.Done(), un channel fermé quand le context est annulé.

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() // libère les ressources même si le timeout n'est pas encore atteint

go func() {
    select {
    case <-ctx.Done():
        fmt.Println("annulé :", ctx.Err()) // context.DeadlineExceeded ou Canceled
        return
    case résultat := <-traitement(ctx):
        fmt.Println("résultat :", résultat)
    }
}()

Le defer cancel() est obligatoire, même si le timeout va se déclencher de toute façon. Sans lui, le context et ses ressources associées restent en mémoire jusqu'à l'expiration. Le linter go vet vous le rappellera si vous l'oubliez.

En pratique, pour une requête HTTP qui lance plusieurs goroutines filles :

func handleSearch(w http.ResponseWriter, r *http.Request) {
    ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second)
    defer cancel()

    // Les deux requêtes parallèles reçoivent le même context.
    // Si le timeout expire, les deux sont annulées automatiquement.
    résultatsDB    := make(chan []Item, 1)
    résultatsCache := make(chan []Item, 1)

    go func() {
        items, err := db.Search(ctx, r.URL.Query().Get("q"))
        if err != nil {
            résultatsDB <- nil
            return
        }
        résultatsDB <- items
    }()

    go func() {
        items, _ := cache.Search(ctx, r.URL.Query().Get("q"))
        résultatsCache <- items
    }()

    db     := <-résultatsDB
    cached := <-résultatsCache
    render(w, merge(db, cached))
}

Règle importante : le context se propage toujours vers le bas, jamais vers le haut. Une fonction peut restreindre le context qu'elle reçoit (ajouter un timeout plus court), mais jamais l'annuler depuis l'intérieur pour signaler quelque chose à l'appelant. Pour ça, on utilise les valeurs de retour normales.

2. errgroup — goroutines + gestion d'erreurs propre

sync.WaitGroup a une limitation bien connue : il ne gère pas les erreurs. Le pattern classique pour pallier ça, c'est d'ajouter un channel d'erreurs :

// Pattern WaitGroup + channel d'erreurs — fonctionne, mais verbeux
var wg sync.WaitGroup
errs := make(chan error, len(urls))

for _, url := range urls {
    url := url
    wg.Add(1)
    go func() {
        defer wg.Done()
        if err := télécharger(url); err != nil {
            errs <- err
        }
    }()
}

wg.Wait()
close(errs)

for err := range errs {
    log.Println("erreur :", err)
}

Ça marche, mais c'est du boilerplate que vous allez réécrire à chaque fois. golang.org/x/sync/errgroup condense ça en quelques lignes :

import "golang.org/x/sync/errgroup"

g, ctx := errgroup.WithContext(context.Background())

for _, url := range urls {
    url := url // capture de la variable de boucle
    g.Go(func() error {
        return télécharger(ctx, url)
    })
}

// Wait bloque jusqu'à ce que toutes les goroutines aient terminé.
// Il retourne la première erreur non-nil rencontrée.
if err := g.Wait(); err != nil {
    log.Fatal("au moins une erreur :", err)
}

Avec errgroup.WithContext, si une goroutine retourne une erreur, le context est annulé automatiquement. Toutes les autres goroutines qui écoutent ctx.Done() s'arrêtent proprement. C'est le comportement « fail fast » : dès qu'une partie échoue, on ne continue pas inutilement.

Une nuance importante : g.Wait() retourne la première erreur, pas toutes. Si vous avez besoin de collecter toutes les erreurs pour les logger ou les retourner au client, le pattern WaitGroup + channel reste pertinent. errgroup est adapté aux cas où une seule erreur suffit à invalider le tout.

3. Graceful shutdown

SIGTERM arrive — déploiement, redémarrage, kill propre — et là deux comportements possibles. Le mauvais : le process s'arrête immédiatement, le job en cours est perdu à mi-chemin, la base de données a une transaction ouverte qui va rester jusqu'au timeout. Le bon : les goroutines finissent ce qu'elles font, libèrent leurs ressources, puis le process quitte.

Depuis Go 1.16, signal.NotifyContext simplifie beaucoup ça :

import (
    "context"
    "os"
    "os/signal"
    "syscall"
)

func main() {
    ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
    defer stop() // désenregistre le handler de signal

    // ctx est annulé dès que SIGTERM ou Ctrl+C arrive.
    // Toutes les goroutines qui reçoivent ce context s'arrêtent proprement.
    if err := runApplication(ctx); err != nil {
        log.Fatal(err)
    }
}

Le point clé : ce context unique descend dans toute l'application. Pas besoin de plomberie supplémentaire. Quand SIGTERM arrive, le signal se propage automatiquement à tout ce qui écoute ctx.Done().

Pour un worker qui traite des jobs en queue, le graceful shutdown ressemble à ça :

func worker(ctx context.Context, jobs <-chan Job) {
    for {
        select {
        case <-ctx.Done():
            slog.Info("worker: arrêt propre")
            return
        case job, ok := <-jobs:
            if !ok {
                return // channel fermé, plus de jobs à traiter
            }
            // Le job en cours va jusqu'au bout.
            // On passe ctx aux opérations internes (DB, HTTP)
            // pour qu'elles respectent aussi l'annulation.
            if err := processJob(ctx, job); err != nil {
                slog.Error("job échoué", "id", job.ID, "error", err)
            }
        }
    }
}

Subtilité : dans un select, si les deux cases sont prêts simultanément (un job disponible ET le context annulé), Go en choisit un au hasard. Pour garantir qu'on finit le job en cours avant de s'arrêter, il faut vérifier ctx.Done() seulement entre les jobs, pas pendant leur traitement — c'est ce que fait ce pattern : processJob va jusqu'au bout, et c'est au prochain tour de boucle que le select détecte l'annulation.

4. sync.Mutex et sync.RWMutex

Les channels sont élégants, mais ils ont un coût. Pour partager de l'état entre goroutines, un mutex est souvent plus adapté — notamment quand le « partage » se résume à lire ou écrire dans une map ou une struct.

sync.RWMutex distingue les lecteurs des écrivains. Plusieurs goroutines peuvent lire en même temps ; une seule peut écrire, et elle attend que tous les lecteurs en cours aient fini. C'est le pattern idéal pour un cache en mémoire :

type Cache struct {
    mu    sync.RWMutex
    store map[string]string
}

func NewCache() *Cache {
    return &Cache{store: make(map[string]string)}
}

func (c *Cache) Get(key string) (string, bool) {
    c.mu.RLock() // plusieurs lecteurs simultanés autorisés
    defer c.mu.RUnlock()
    val, ok := c.store[key]
    return val, ok
}

func (c *Cache) Set(key, val string) {
    c.mu.Lock() // écriture exclusive — attend que tous les lecteurs aient fini
    defer c.mu.Unlock()
    c.store[key] = val
}

func (c *Cache) Delete(key string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    delete(c.store, key)
}

Règle mnémotechnique : si la logique de coordination s'exprime naturellement comme « envoie ça à quelqu'un qui va le traiter », utilisez un channel. Si c'est « plusieurs goroutines accèdent à la même donnée », utilisez un mutex.

5. sync.Once — initialisation thread-safe

Cas classique : une ressource coûteuse à initialiser — connexion DB, chargement d'un fichier de config, parsing d'un certificat TLS — qu'on veut créer au premier besoin, une seule fois, même si plusieurs goroutines arrivent simultanément.

type DBPool struct {
    once sync.Once
    pool *sql.DB
    err  error
}

func (d *DBPool) Get() (*sql.DB, error) {
    d.once.Do(func() {
        d.pool, d.err = sql.Open("postgres", os.Getenv("DATABASE_URL"))
    })
    return d.pool, d.err
}

var globalDB DBPool

func getDB() (*sql.DB, error) {
    return globalDB.Get()
}

sync.Once garantit que la fonction passée à Do ne s'exécute qu'une seule fois, même si dix goroutines appellent Get() simultanément. Les neuf autres attendent que la première ait fini, puis récupèrent le résultat stocké.

Attention : si la fonction échoue, elle ne sera pas ré-exécutée. Si vous avez besoin d'une initialisation avec retry, sync.Once ne suffit pas — il faudra gérer ça manuellement avec un mutex et un flag d'état.

6. Cas réel — worker pool de production

Un exemple complet qui combine tout ce qu'on a vu. C'est la structure que j'utilise pour les systèmes de traitement de jobs : importation de données, envoi d'emails, génération de rapports en batch.

package main

import (
    "context"
    "errors"
    "fmt"
    "log/slog"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"
)

type Job struct {
    ID      string
    Payload []byte
}

type Stats struct {
    mu      sync.Mutex
    success int
    failed  int
}

func (s *Stats) RecordSuccess() {
    s.mu.Lock()
    defer s.mu.Unlock()
    s.success++
}

func (s *Stats) RecordFailure() {
    s.mu.Lock()
    defer s.mu.Unlock()
    s.failed++
}

func (s *Stats) Log() {
    s.mu.Lock()
    defer s.mu.Unlock()
    slog.Info("statistiques finales", "succès", s.success, "échecs", s.failed)
}

func processJob(ctx context.Context, job Job) error {
    // Simule une opération qui respecte le context (requête DB, appel HTTP...)
    select {
    case <-ctx.Done():
        return ctx.Err()
    case <-time.After(100 * time.Millisecond):
    }

    if job.ID == "bad-job" {
        return errors.New("payload invalide")
    }
    return nil
}

func runWorker(ctx context.Context, id int, jobs <-chan Job, stats *Stats, wg *sync.WaitGroup) {
    defer wg.Done()
    slog.Info("worker démarré", "worker", id)

    for {
        select {
        case <-ctx.Done():
            slog.Info("worker arrêté proprement", "worker", id)
            return
        case job, ok := <-jobs:
            if !ok {
                slog.Info("worker terminé (queue vide)", "worker", id)
                return
            }

            start := time.Now()
            err := processJob(ctx, job)
            duration := time.Since(start)

            if err != nil {
                // Partial failure : on logue et on continue.
                // Un job raté n'impacte pas les autres.
                slog.Error("job échoué",
                    "worker", id,
                    "job_id", job.ID,
                    "duration_ms", duration.Milliseconds(),
                    "error", err,
                )
                stats.RecordFailure()
            } else {
                slog.Info("job traité",
                    "worker", id,
                    "job_id", job.ID,
                    "duration_ms", duration.Milliseconds(),
                )
                stats.RecordSuccess()
            }
        }
    }
}

func main() {
    // Graceful shutdown : ctx annulé sur SIGTERM ou Ctrl+C
    ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
    defer stop()

    const numWorkers = 4
    jobs  := make(chan Job, 100) // buffer pour absorber les pics
    stats := &Stats{}

    var wg sync.WaitGroup
    for i := range numWorkers {
        wg.Add(1)
        go runWorker(ctx, i+1, jobs, stats, &wg)
    }

    // Producteur de jobs
    // En pratique : lecture depuis Redis, SQS, Kafka, etc.
    go func() {
        defer close(jobs) // signal aux workers : plus rien à distribuer
        for i := range 20 {
            select {
            case <-ctx.Done():
                slog.Info("producteur arrêté, jobs restants abandonnés")
                return
            case jobs <- Job{ID: fmt.Sprintf("job-%d", i)}:
            }
        }
    }()

    wg.Wait() // attend que tous les workers aient fini proprement
    stats.Log()
    slog.Info("shutdown complet")
}

Ce que ce code garantit en production :

  • SIGTERM arrive → le producteur s'arrête de distribuer de nouveaux jobs
  • Les workers finissent le job en cours avant de quitter
  • Un job qui échoue ne bloque pas les autres (partial failure pattern)
  • Les statistiques sont protégées par mutex et disponibles en fin de run
  • Aucune goroutine ne leak : toutes se terminent avant le wg.Wait()

7. Détecter les problèmes

Même avec les bons patterns, les bugs de concurrence arrivent. Trois outils indispensables :

Le race detector

À activer systématiquement en développement et dans la CI. Il instrumente le binaire pour signaler les accès concurrents non protégés au moment où ils se produisent :

go run -race ./...
go test -race ./...

Le coût est réel (5x à 10x plus lent, 5x à 10x plus de mémoire), donc on n'active pas -race en prod. Mais en tests, il est indispensable — une race condition silencieuse en dev devient un incident en prod sous charge.

Surveiller le nombre de goroutines

import "runtime"

// Dans un handler de debug, ou dans un ticker toutes les 30s
slog.Info("goroutines actives", "count", runtime.NumGoroutine())

Si ce nombre croît sans plafonner sur un process qui traite un flux constant, vous avez probablement un leak. Pour aller plus loin sur la détection et la correction, j'ai écrit un article dédié : Goroutine leaks : détecter et corriger.

pprof en production

import _ "net/http/pprof"

// Dans main(), en parallèle du serveur principal.
// Ne jamais exposer publiquement — loopback uniquement.
go func() {
    log.Println(http.ListenAndServe("localhost:6060", nil))
}()

Depuis votre machine, via un tunnel SSH :

// Profil goroutines : voir toutes les goroutines bloquées avec leur stack
go tool pprof http://localhost:6060/debug/pprof/goroutine

// Dans l'interface interactive
(pprof) top    // goroutines les plus représentées
(pprof) web    // graphe SVG des stacks (nécessite graphviz)

Le profil goroutine de pprof est l'outil le plus puissant pour diagnostiquer un comportement anormal en production. Il montre exactement où chaque goroutine est bloquée, avec les stacks complètes. En quelques secondes, vous savez si le problème vient d'un lock contenu, d'un channel qui ne reçoit jamais, ou d'une requête DB sans timeout.

Ce que vous savez faire maintenant

En trois parties, vous êtes passé des bases aux patterns de production :

  • Partie 1 — goroutines, WaitGroup, race conditions et le race detector
  • Partie 2 — channels buffered et unbuffered, select, worker pool
  • Partie 3 — erreurs en contexte concurrent et gestion des panics

La concurrence en Go n'est pas magiquement simple, mais elle est cohérente. Les mêmes patterns reviennent dans tous les projets : context qui descend, channels qui coordonnent, mutex qui protège l'état partagé. Une fois internalisés, écrire du code concurrent lisible et robuste devient naturel.

Pour aller plus loin, les ressources que je recommande vraiment :

Commentaires (0)