# CLAUDE.md — Concurrence & Parallélisme en Go > Contexte spécialisé pour Claude Code. Coller ce fichier à la racine du projet pour guider le travail sur la concurrence en Go. --- ## Quand utiliser ce contexte - ✅ Worker pools pour traiter des lots de tâches CPU-bound ou I/O-bound en parallèle - ✅ Pipelines de traitement de données avec étapes productrices/consommatrices - ✅ Services long-running avec goroutines de fond (jobs, schedulers, watchers) - ✅ Diagnostic de goroutine leaks ou de race conditions sur un codebase existant - ❌ Tâches séquentielles simples : ajouter des goroutines n'améliore pas la lisibilité - ❌ Parallélisme sur des opérations très courtes (< microseconde) : overhead des channels dépasse le gain - ❌ Code mono-goroutine sans concurrence réelle : ne pas anticiper "au cas où" --- ## Section 1 : Goroutines — règles fondamentales ### Coût et cycle de vie Chaque goroutine démarre avec une stack d'environ **2 KB** (extensible dynamiquement jusqu'à 1 GB par défaut). Elles sont multiplexées sur les threads OS par le scheduler Go (M:N threading) — pas de coût de thread OS à chaque création. Mais "pas cher à créer" ne veut pas dire "gratuit à laisser fuir". Règles absolues : 1. **Toujours savoir qui possède une goroutine et qui l'arrête** 2. **Jamais de fire-and-forget sans suivi de completion** 3. **Un panic dans une goroutine crashe tout le processus** si non récupéré à l'intérieur de cette goroutine ### Sizing des worker pools | Type de travail | Taille recommandée | |----------------|-------------------| | CPU-bound | `runtime.NumCPU()` | | I/O-bound | `runtime.NumCPU() * 2` ou plus (benchmarker) | | Mixte | Commencer à `NumCPU() * 2`, ajuster selon profiling | ### Pattern SafeGo — wrapper avec recover Ne jamais lancer une goroutine dans du code de production sans wrapper de panic : ```go // safeGo lance une goroutine avec recover automatique. // fn est la fonction à exécuter, onPanic est appelé si panic. func safeGo(fn func(), onPanic func(r any)) { go func() { defer func() { if r := recover(); r != nil { if onPanic != nil { onPanic(r) } } }() fn() }() } // Usage dans un service func (s *EmailService) sendAsync(ctx context.Context, msg Message) { safeGo(func() { if err := s.send(ctx, msg); err != nil { slog.Error("async send failed", "msg_id", msg.ID, "error", err) } }, func(r any) { slog.Error("panic in sendAsync", "recover", r, "msg_id", msg.ID) }) } ``` ### Goroutine avec ownership clair ```go // ✅ BON : la goroutine s'arrête sur signal du contexte func (w *Worker) Run(ctx context.Context) error { for { select { case job := <-w.jobs: if err := w.process(ctx, job); err != nil { slog.Error("job failed", "error", err) } case <-ctx.Done(): return ctx.Err() // ✅ sortie propre } } } // ❌ MAUVAIS : goroutine orpheline sans moyen de l'arrêter func startWorker() { go func() { for { processNextJob() // bloque pour toujours } }() } ``` ### Tracking via WaitGroup ```go // ✅ BON : toujours tracker les goroutines lancées func processAll(ctx context.Context, items []Item) error { var wg sync.WaitGroup for _, item := range items { wg.Add(1) // ✅ Add AVANT le go, pas dedans go func(it Item) { defer wg.Done() if err := process(ctx, it); err != nil { slog.Error("failed", "item", it.ID, "error", err) } }(item) // ✅ passer en paramètre, pas capturer } wg.Wait() return nil } ``` ### Goroutines longues durée — pattern daemon ```go // Goroutine de fond avec restart automatique func (s *Scheduler) Start(ctx context.Context) { go func() { for { s.runLoop(ctx) select { case <-ctx.Done(): return // arrêt propre demandé case <-time.After(5 * time.Second): slog.Info("scheduler restarting after crash") } } }() } func (s *Scheduler) runLoop(ctx context.Context) { defer func() { if r := recover(); r != nil { slog.Error("scheduler panic", "recover", r) } }() ticker := time.NewTicker(s.interval) defer ticker.Stop() for { select { case <-ticker.C: s.tick(ctx) case <-ctx.Done(): return } } } ``` --- ## Section 2 : Channels — patterns et pièges ### Règles fondamentales des channels ``` RÈGLE 1 : C'est l'émetteur qui ferme, jamais le récepteur RÈGLE 2 : Fermer un channel déjà fermé → panic RÈGLE 3 : Envoyer sur un channel fermé → panic RÈGLE 4 : Recevoir d'un channel fermé → zero value immédiat RÈGLE 5 : Utiliser les types directionnels dans les signatures ``` ### Types directionnels ```go // ✅ BON : types directionnels dans les signatures func produce(ctx context.Context) <-chan int { ch := make(chan int) go func() { defer close(ch) // l'émetteur ferme for i := 0; ; i++ { select { case ch <- i: case <-ctx.Done(): return } } }() return ch } func consume(ctx context.Context, in <-chan int) { for { select { case v, ok := <-in: if !ok { return // channel fermé } fmt.Println(v) case <-ctx.Done(): return } } } ``` ### Buffered vs unbuffered ```go // Unbuffered : synchronisation stricte émetteur/récepteur ch := make(chan int) // L'émetteur bloque jusqu'à ce que le récepteur lise // Buffered : découplage temporel ch := make(chan int, 100) // L'émetteur peut envoyer 100 valeurs sans que le récepteur soit prêt // Règle : commencer unbuffered, ajouter buffer seulement si profiling justifie // Un buffer cache souvent un vrai problème de design ``` ### Done channel — broadcast d'annulation ```go // Done channel : fermer pour broadcaster à N goroutines simultanément func fanOutWithDone(ctx context.Context, in <-chan Work) { done := ctx.Done() // récupérer une fois, pas à chaque itération for i := 0; i < runtime.NumCPU(); i++ { go func() { for { select { case w, ok := <-in: if !ok { return } process(w) case <-done: return // tous les workers reçoivent ce signal simultanément } } }() } } ``` ### Pattern Pipeline ```go // gen : source de données func gen(ctx context.Context, nums ...int) <-chan int { out := make(chan int) go func() { defer close(out) for _, n := range nums { select { case out <- n: case <-ctx.Done(): return } } }() return out } // sq : transformation func sq(ctx context.Context, in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { select { case out <- n * n: case <-ctx.Done(): return } } }() return out } // Usage : pipeline composable func runPipeline(ctx context.Context) { for v := range sq(ctx, sq(ctx, gen(ctx, 2, 3, 4))) { fmt.Println(v) // 16, 81, 256 } } ``` ### Fan-In — merge de plusieurs channels ```go // merge reçoit N channels en entrée, émet tout sur un seul channel de sortie func merge(ctx context.Context, cs ...<-chan int) <-chan int { var wg sync.WaitGroup merged := make(chan int, 1) output := func(c <-chan int) { defer wg.Done() for v := range c { select { case merged <- v: case <-ctx.Done(): return } } } wg.Add(len(cs)) for _, c := range cs { go output(c) } // Fermer merged quand tous les inputs sont épuisés go func() { wg.Wait() close(merged) }() return merged } ``` ### Fan-Out — distribution de travail ```go // distribute envoie chaque valeur à l'un des N workers (round-robin implicite via select) func distribute(ctx context.Context, in <-chan Job, n int) []<-chan Job { channels := make([]chan Job, n) outputs := make([]<-chan Job, n) for i := range channels { channels[i] = make(chan Job, 10) outputs[i] = channels[i] } go func() { defer func() { for _, ch := range channels { close(ch) } }() i := 0 for job := range in { select { case channels[i%n] <- job: i++ case <-ctx.Done(): return } } }() return outputs } ``` ### Semaphore via channel bufferisé ```go // Limiter les accès concurrents à une ressource externe type Semaphore chan struct{} func NewSemaphore(n int) Semaphore { return make(Semaphore, n) } func (s Semaphore) Acquire() { s <- struct{}{} } func (s Semaphore) Release() { <-s } // Usage : limiter à 5 requêtes HTTP simultanées sem := NewSemaphore(5) for _, url := range urls { url := url go func() { sem.Acquire() defer sem.Release() fetch(url) }() } ``` --- ## Section 3 : Primitives sync (Mutex, WaitGroup, Once, Pool) ### Mutex — règles strictes ```go // RÈGLE 1 : jamais copier un Mutex (passer par pointeur) // RÈGLE 2 : toujours defer Unlock immédiatement après Lock // RÈGLE 3 : ne pas tenir un Mutex pendant des opérations lentes (I/O, réseau) type Cache struct { mu sync.RWMutex // RWMutex si lectures >> écritures items map[string]Item } // ✅ BON : RLock pour les lectures func (c *Cache) Get(key string) (Item, bool) { c.mu.RLock() defer c.mu.RUnlock() v, ok := c.items[key] return v, ok } // ✅ BON : Lock pour les écritures func (c *Cache) Set(key string, item Item) { c.mu.Lock() defer c.mu.Unlock() c.items[key] = item } // ❌ MAUVAIS : lock tenu pendant une opération réseau func (c *Cache) SetFromDB(key string) { c.mu.Lock() defer c.mu.Unlock() item := fetchFromDB(key) // ← DB call sous le lock = contention maximale c.items[key] = item } // ✅ BON : fetch sans lock, write avec lock func (c *Cache) SetFromDBCorrect(key string) error { item, err := fetchFromDB(key) // ← pas de lock ici if err != nil { return err } c.mu.Lock() c.items[key] = item c.mu.Unlock() return nil } ``` ### Sharded locking — haute contention Quand un seul Mutex devient un goulot d'étranglement (profiling montre >50% du temps en `sync.Mutex.Lock`) : ```go const numShards = 256 type ShardedCache struct { shards [numShards]struct { sync.RWMutex items map[string]any } } func (c *ShardedCache) shard(key string) int { h := fnv.New32a() h.Write([]byte(key)) return int(h.Sum32()) % numShards } func (c *ShardedCache) Get(key string) (any, bool) { s := c.shard(key) c.shards[s].RLock() defer c.shards[s].RUnlock() v, ok := c.shards[s].items[key] return v, ok } func (c *ShardedCache) Set(key string, value any) { s := c.shard(key) c.shards[s].Lock() defer c.shards[s].Unlock() if c.shards[s].items == nil { c.shards[s].items = make(map[string]any) } c.shards[s].items[key] = value } ``` ### WaitGroup — timing critique ```go // ✅ BON : Add AVANT le goroutine launch var wg sync.WaitGroup for _, task := range tasks { wg.Add(1) // ← ici, avant go go func(t Task) { defer wg.Done() process(t) }(task) } wg.Wait() // ❌ MAUVAIS : Add à l'intérieur de la goroutine (race condition) for _, task := range tasks { go func(t Task) { wg.Add(1) // ← race : wg.Wait() peut s'exécuter avant ce Add defer wg.Done() process(t) }(task) } ``` ### sync.Once — singleton thread-safe ```go type DBPool struct { once sync.Once pool *sql.DB err error } func (d *DBPool) Get() (*sql.DB, error) { d.once.Do(func() { d.pool, d.err = sql.Open("postgres", os.Getenv("DATABASE_URL")) if d.err == nil { d.err = d.pool.Ping() } }) return d.pool, d.err } // Variante : reset possible (Once ne peut pas être réinitialisé, utiliser atomic) type ResettableSingleton struct { val atomic.Pointer[ExpensiveResource] } func (r *ResettableSingleton) Get() *ExpensiveResource { if v := r.val.Load(); v != nil { return v } v := newExpensiveResource() r.val.CompareAndSwap(nil, v) return r.val.Load() } ``` ### sync.Pool — réutilisation d'allocations ```go // Pool pour buffers (cas d'usage principal) var bufPool = sync.Pool{ New: func() any { return bytes.NewBuffer(make([]byte, 0, 4096)) }, } func encode(data any) ([]byte, error) { buf := bufPool.Get().(*bytes.Buffer) buf.Reset() // ✅ TOUJOURS reset avant utilisation defer bufPool.Put(buf) // ✅ rendre après utilisation if err := json.NewEncoder(buf).Encode(data); err != nil { return nil, fmt.Errorf("encode: %w", err) } result := make([]byte, buf.Len()) copy(result, buf.Bytes()) return result, nil } // PIÈGE : ne stocker que des pointeurs dans Pool, pas des valeurs // Le GC peut collecter les objets du Pool après 2 cycles GC // Ne pas mettre de connexions réseau ou de fichiers dans un Pool ``` ### sync.Map — cas d'usage limités ```go // sync.Map est optimisée pour deux cas précis : // 1. Write-once, read-many (ex: registre de handlers) // 2. Clés disjointes entre goroutines (ex: chaque goroutine écrit ses propres clés) var registry sync.Map // Enregistrer les handlers au démarrage (write-once) func init() { registry.Store("user.created", handleUserCreated) registry.Store("order.placed", handleOrderPlaced) } // Lecture fréquente pendant le runtime func dispatch(event string, payload any) { if handler, ok := registry.Load(event); ok { handler.(func(any))(payload) } } // ⚠️ Pour un cache général avec writes/reads mélangés : utiliser RWMutex + map ``` ### Atomic operations — compteurs simples ```go // Pour les compteurs sans logique complexe, atomic est plus performant que Mutex type Metrics struct { requests atomic.Int64 errors atomic.Int64 inFlight atomic.Int64 } func (m *Metrics) RecordRequest() func(err error) { m.requests.Add(1) m.inFlight.Add(1) return func(err error) { m.inFlight.Add(-1) if err != nil { m.errors.Add(1) } } } ``` --- ## Section 4 : context.Context — propagation et annulation ### Règles fondamentales ``` RÈGLE 1 : context.Context = premier paramètre, nommé ctx RÈGLE 2 : ne JAMAIS stocker un context dans une struct RÈGLE 3 : toujours defer cancel() immédiatement après création RÈGLE 4 : context.TODO() = dette technique (chercher TODO(odilon) dans le code) RÈGLE 5 : ne jamais utiliser context.Background() dans un handler HTTP RÈGLE 6 : les valeurs dans le contexte = métadonnées de request (trace ID, auth token) JAMAIS de logique métier dans les valeurs contexte ``` ### Hiérarchie des contextes ``` context.Background() └── context.WithCancel() — annulation manuelle └── context.WithTimeout() — deadline relative └── context.WithDeadline() — deadline absolue └── context.WithValue() — métadonnées ``` ### Propagation correcte ```go // ✅ BON : ctx propagé à tous les niveaux func (s *OrderService) PlaceOrder(ctx context.Context, req PlaceOrderRequest) (*Order, error) { if err := s.validateStock(ctx, req.Items); err != nil { return nil, fmt.Errorf("stock check: %w", err) } order, err := s.repo.CreateOrder(ctx, req) if err != nil { return nil, fmt.Errorf("create order: %w", err) } if err := s.notifier.Send(ctx, order); err != nil { // log mais ne pas échouer (notification = best-effort) slog.Warn("notification failed", "order_id", order.ID, "error", err) } return order, nil } // ❌ MAUVAIS : context.Background() ignore l'annulation du client func (s *OrderService) BadPlaceOrder(ctx context.Context, req PlaceOrderRequest) (*Order, error) { if err := s.validateStock(context.Background(), req.Items); err != nil { // ← perd l'annulation return nil, err } // ... } ``` ### defer cancel() — immédiatement ```go // ✅ BON : defer cancel() sur la même ligne que la création func processWithTimeout(parent context.Context) error { ctx, cancel := context.WithTimeout(parent, 30*time.Second) defer cancel() // ← immédiatement, avant toute autre ligne return doWork(ctx) } // ❌ MAUVAIS : cancel() oublié = context leak func processWithTimeoutBad(parent context.Context) error { ctx, cancel := context.WithTimeout(parent, 30*time.Second) _ = cancel // ← oublié de defer return doWork(ctx) } ``` ### Clés de contexte — typage strict ```go // ✅ BON : type non-exporté pour éviter les collisions type contextKey int const ( contextKeyTraceID contextKey = iota contextKeyUserID contextKeyTenant ) func WithTraceID(ctx context.Context, traceID string) context.Context { return context.WithValue(ctx, contextKeyTraceID, traceID) } func TraceIDFromContext(ctx context.Context) (string, bool) { v, ok := ctx.Value(contextKeyTraceID).(string) return v, ok } // ❌ MAUVAIS : clé string = risque de collision entre packages ctx = context.WithValue(ctx, "userID", "123") // ← collisions avec autres packages ``` ### Deadline budget — vérification avant opération longue ```go // Vérifier qu'on a assez de budget avant une opération longue func fetchWithBudget(ctx context.Context, url string) ([]byte, error) { deadline, ok := ctx.Deadline() if ok { remaining := time.Until(deadline) if remaining < 500*time.Millisecond { return nil, fmt.Errorf("insufficient budget: %v remaining", remaining) } } // Les enfants ne peuvent que réduire la deadline, jamais l'étendre childCtx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() req, err := http.NewRequestWithContext(childCtx, http.MethodGet, url, nil) if err != nil { return nil, err } resp, err := http.DefaultClient.Do(req) if err != nil { // Distinguer annulation de timeout if errors.Is(err, context.Canceled) { return nil, fmt.Errorf("request canceled by caller: %w", err) } if errors.Is(err, context.DeadlineExceeded) { return nil, fmt.Errorf("request timed out: %w", err) } return nil, err } defer resp.Body.Close() return io.ReadAll(resp.Body) } ``` ### context.WithoutCancel (Go 1.21+) — travail post-request ```go // Cas : cleanup/audit après fin de request HTTP // Le contexte de la request sera annulé quand le handler retourne func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { result, err := h.service.Process(r.Context()) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(result) // Audit log post-request : utiliser un context sans annulation // pour que le log survive après la fin du handler auditCtx := context.WithoutCancel(r.Context()) // Go 1.21+ go func() { if err := h.auditor.Log(auditCtx, result); err != nil { slog.Error("audit log failed", "error", err) } }() } ``` ### Vérification dans les boucles CPU-bound ```go // Pour les boucles longues qui ne font pas d'I/O, vérifier ctx manuellement func processLargeDataset(ctx context.Context, items []Item) ([]Result, error) { results := make([]Result, 0, len(items)) for i, item := range items { // Vérifier le contexte tous les 1000 items (pas à chaque itération) if i%1000 == 0 { if err := ctx.Err(); err != nil { return results, fmt.Errorf("canceled after %d items: %w", i, err) } } result, err := transform(item) if err != nil { slog.Warn("transform failed, skipping", "item", item.ID, "error", err) continue } results = append(results, result) } return results, nil } ``` --- ## Section 5 : errgroup — WaitGroup + erreurs + contexte ### Pattern de base `golang.org/x/sync/errgroup` combine WaitGroup + propagation d'erreur + annulation de contexte. ```go import "golang.org/x/sync/errgroup" func fetchAll(ctx context.Context, urls []string) ([][]byte, error) { g, ctx := errgroup.WithContext(ctx) results := make([][]byte, len(urls)) for i, url := range urls { i, url := i, url // ✅ capturer pour la goroutine (Go < 1.22) g.Go(func() error { body, err := fetchURL(ctx, url) if err != nil { return fmt.Errorf("fetch %s: %w", url, err) } results[i] = body // ✅ indices disjoints, pas de race return nil }) } if err := g.Wait(); err != nil { return nil, err } return results, nil } ``` ### SetLimit — concurrence bornée (Go 1.22+) ```go func processImages(ctx context.Context, paths []string) error { g, ctx := errgroup.WithContext(ctx) g.SetLimit(runtime.NumCPU()) // ← limiter la concurrence for _, path := range paths { path := path g.Go(func() error { return resizeImage(ctx, path) }) } return g.Wait() } ``` ### Fan-out avec collecte de résultats ```go // Collecte de résultats avec mutex quand les indices ne sont pas disjoints func aggregateData(ctx context.Context, sources []DataSource) ([]Record, error) { g, ctx := errgroup.WithContext(ctx) g.SetLimit(10) var mu sync.Mutex var all []Record for _, src := range sources { src := src g.Go(func() error { records, err := src.Fetch(ctx) if err != nil { return fmt.Errorf("source %s: %w", src.Name(), err) } mu.Lock() all = append(all, records...) mu.Unlock() return nil }) } if err := g.Wait(); err != nil { return nil, err } return all, nil } ``` ### Annulation automatique sur première erreur ```go // errgroup.WithContext annule automatiquement le ctx si une goroutine retourne une erreur. // Les autres goroutines doivent surveiller ctx.Done() pour s'arrêter proprement. func parallelSearch(ctx context.Context, query string, backends []SearchBackend) (*Result, error) { g, ctx := errgroup.WithContext(ctx) results := make(chan *Result, len(backends)) for _, b := range backends { b := b g.Go(func() error { r, err := b.Search(ctx, query) // ← ctx sera annulé si un autre échoue if err != nil { if errors.Is(err, context.Canceled) { return nil // annulation normale, pas une vraie erreur } return fmt.Errorf("backend %s: %w", b.Name(), err) } select { case results <- r: case <-ctx.Done(): } return nil }) } go func() { g.Wait() close(results) }() // Prendre le premier résultat reçu for r := range results { return r, nil } return nil, g.Wait() } ``` ### TryGo — non-bloquant ```go // TryGo lance seulement si la limite n'est pas atteinte (retourne false sinon) func submitBestEffort(g *errgroup.Group, fn func() error) bool { return g.TryGo(fn) } ``` --- ## Section 6 : Worker Pool — patterns production ### Pattern standard ```go type Job struct { ID string Payload any } type Result struct { JobID string Value any Err error } func RunWorkerPool(ctx context.Context, numWorkers int, jobs <-chan Job) <-chan Result { results := make(chan Result, numWorkers*2) var wg sync.WaitGroup for i := 0; i < numWorkers; i++ { wg.Add(1) go func() { defer wg.Done() for { select { case job, ok := <-jobs: if !ok { return // channel fermé, plus de travail } result := processJob(ctx, job) select { case results <- result: case <-ctx.Done(): return } case <-ctx.Done(): return } } }() } // Fermer results quand tous les workers ont terminé go func() { wg.Wait() close(results) }() return results } func processJob(ctx context.Context, job Job) Result { // Simulation d'un traitement avec timeout par job jobCtx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() value, err := doWork(jobCtx, job.Payload) return Result{JobID: job.ID, Value: value, Err: err} } ``` ### Production : processeur d'images avec semaphore ```go import "golang.org/x/sync/semaphore" type ImageProcessor struct { sem *semaphore.Weighted maxBytes int64 // budget mémoire total } func NewImageProcessor(maxConcurrent int, maxMemoryMB int64) *ImageProcessor { return &ImageProcessor{ sem: semaphore.NewWeighted(maxMemoryMB * 1024 * 1024), maxBytes: maxMemoryMB * 1024 * 1024, } } func (p *ImageProcessor) ProcessAll(ctx context.Context, paths []string) error { var wg sync.WaitGroup errs := make(chan error, len(paths)) for _, path := range paths { path := path // Estimer le coût en mémoire de cette image info, err := os.Stat(path) if err != nil { errs <- fmt.Errorf("stat %s: %w", path, err) continue } cost := info.Size() * 4 // facteur décompression // Acquérir le budget mémoire (bloque si trop d'images en parallèle) if err := p.sem.Acquire(ctx, cost); err != nil { return fmt.Errorf("context canceled: %w", err) } wg.Add(1) go func() { defer wg.Done() defer p.sem.Release(cost) if err := p.processImage(ctx, path); err != nil { errs <- fmt.Errorf("process %s: %w", path, err) } }() } wg.Wait() close(errs) var allErrors []error for err := range errs { allErrors = append(allErrors, err) } return errors.Join(allErrors...) } func (p *ImageProcessor) processImage(ctx context.Context, path string) error { // traitement réel ici return nil } ``` ### Worker pool avec résultats partiels (resilient execution) ```go type BatchResult struct { Processed int Failed int Errors []error } func ProcessBatch(ctx context.Context, items []Item) BatchResult { jobs := make(chan Item, len(items)) for _, item := range items { jobs <- item } close(jobs) type outcome struct { item Item err error } outcomes := make(chan outcome, len(items)) var wg sync.WaitGroup for i := 0; i < runtime.NumCPU(); i++ { wg.Add(1) go func() { defer wg.Done() for item := range jobs { err := process(ctx, item) outcomes <- outcome{item: item, err: err} } }() } go func() { wg.Wait() close(outcomes) }() var result BatchResult for o := range outcomes { if o.err != nil { result.Failed++ result.Errors = append(result.Errors, fmt.Errorf("item %s: %w", o.item.ID, o.err)) slog.Error("item failed, continuing", "item", o.item.ID, "error", o.err) } else { result.Processed++ } } return result } ``` --- ## Section 7 : Goroutine leaks — détection et prévention ### Causes racines | Cause | Description | Fix | |-------|-------------|-----| | Channel bloqué sans sender | goroutine attend sur `<-ch`, ch n'a plus de sender | Ajouter done channel ou fermer ch | | Channel bloqué sans receiver | goroutine attend sur `ch <-`, personne ne lit | Bufferiser ou ajouter ctx.Done() | | Boucle infinie | `for {}` sans condition de sortie | Ajouter `case <-ctx.Done()` | | cancel() jamais appelé | contexte WithTimeout non nettoyé | `defer cancel()` systématique | | WaitGroup sous-incrémentée | `Done()` appelé plus de fois que `Add()` | `Add()` avant `go`, comptage rigoureux | | close() oublié | goroutines rangent sur channel jamais fermé | Fermer côté émetteur | ### goleak — détection en tests ```go import "go.uber.org/goleak" // Package-level : détecter les leaks sur toute la suite de tests func TestMain(m *testing.M) { goleak.VerifyTestMain(m) } // Test unitaire : détecter les leaks d'un test précis func TestMyWorker(t *testing.T) { defer goleak.VerifyNone(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() w := NewWorker() w.Start(ctx) // ... test ... // Si Start lance une goroutine qui ne se termine pas → goleak échoue le test } // Ignorer les goroutines connues (ex: goroutines de bibliothèques tierces) func TestMain(m *testing.M) { goleak.VerifyTestMain(m, goleak.IgnoreTopFunction("google.golang.org/grpc.(*ClientConn).WaitForStateChange"), ) } ``` ### Monitoring runtime — Prometheus ```go import ( "runtime" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" ) var goroutineGauge = promauto.NewGauge(prometheus.GaugeOpts{ Name: "app_goroutines_total", Help: "Current number of goroutines", }) // Collecter toutes les 10 secondes func StartGoroutineMetrics(ctx context.Context) { go func() { ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: goroutineGauge.Set(float64(runtime.NumGoroutine())) case <-ctx.Done(): return } } }() } // Heuristique santé : 50 connexions → <200 goroutines normalement // Si NumGoroutine() croît sans limite → leak probable ``` ### pprof — analyse en production ```go import _ "net/http/pprof" // Ajouter un serveur pprof (sur port interne uniquement !) go func() { slog.Info("pprof server started", "addr", "localhost:6060") if err := http.ListenAndServe("localhost:6060", nil); err != nil { slog.Error("pprof server failed", "error", err) } }() ``` ```bash # Voir les goroutines actives curl -s localhost:6060/debug/pprof/goroutine?debug=2 | head -100 # Snapshot goroutines dans un fichier go tool pprof http://localhost:6060/debug/pprof/goroutine ``` ### Exemple de leak classique et fix ```go // ❌ MAUVAIS : goroutine leak si timeout atteint func leaky(ctx context.Context) error { result := make(chan int) // unbuffered go func() { time.Sleep(10 * time.Second) result <- 42 // bloque pour toujours si le caller est parti }() select { case v := <-result: fmt.Println(v) return nil case <-ctx.Done(): return ctx.Err() // ← goroutine ci-dessus reste bloquée ! } } // ✅ BON : channel bufferisé, goroutine peut toujours écrire func fixed(ctx context.Context) error { result := make(chan int, 1) // bufferisé → goroutine ne bloque pas go func() { time.Sleep(10 * time.Second) result <- 42 // écriture non-bloquante grâce au buffer }() select { case v := <-result: fmt.Println(v) return nil case <-ctx.Done(): return ctx.Err() // goroutine terminera d'elle-même et le GC nettoiera } } ``` --- ## Section 8 : Data races — les 7 patterns Uber Toujours activer le race detector : ```bash go test -race ./... go run -race main.go GORACE="halt_on_error=1" go test -race ./... # stopper au premier race détecté ``` ### Pattern 1 : Capture de variable de boucle (Go < 1.22) ```go // ❌ MAUVAIS (Go < 1.22) : toutes les goroutines capturent le même 'i' for i := 0; i < 5; i++ { go func() { fmt.Println(i) // race : i peut valoir 5 quand la goroutine démarre }() } // ✅ FIX : passer en paramètre for i := 0; i < 5; i++ { i := i // reshadow (Go < 1.22) go func() { fmt.Println(i) // copie locale }() } // ✅ Go 1.22+ : chaque itération crée sa propre variable i // Le fix automatique est dans le compilateur — mais toujours nommer explicitement ``` ### Pattern 2 : Map non protégée ```go // ❌ MAUVAIS : map concurrente sans synchronisation var cache = map[string]int{} func inc(key string) { cache[key]++ // race : read-modify-write non-atomique } // ✅ FIX option A : RWMutex var ( cacheMu sync.RWMutex cache = map[string]int{} ) func inc(key string) { cacheMu.Lock() defer cacheMu.Unlock() cache[key]++ } // ✅ FIX option B : sync.Map (write-heavy → option A est plus rapide) var cache sync.Map func inc(key string) { v, _ := cache.LoadOrStore(key, new(int64)) atomic.AddInt64(v.(*int64), 1) } ``` ### Pattern 3 : Mutex copié par valeur ```go // ❌ MAUVAIS : copie de Mutex invalide son état interne type Counter struct { mu sync.Mutex value int } func badCopy(c Counter) { // ← copie par valeur, Mutex copié aussi c.mu.Lock() // comportement indéfini c.value++ c.mu.Unlock() } // ✅ FIX : toujours passer par pointeur func goodPointer(c *Counter) { c.mu.Lock() defer c.mu.Unlock() c.value++ } ``` ### Pattern 4 : Variable err partagée ```go // ❌ MAUVAIS : plusieurs goroutines écrivent dans la même err var err error var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() err = doA() // race ! }() go func() { defer wg.Done() err = doB() // race ! }() wg.Wait() // ✅ FIX : errgroup ou channels d'erreurs g, _ := errgroup.WithContext(ctx) g.Go(func() error { return doA() }) g.Go(func() error { return doB() }) if err := g.Wait(); err != nil { return err } ``` ### Pattern 5 : Race sur slice append ```go // ❌ MAUVAIS : append concurrent sur la même slice var results []int var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) i := i go func() { defer wg.Done() results = append(results, i*i) // race sur len/cap/pointeur }() } wg.Wait() // ✅ FIX option A : mutex var mu sync.Mutex for i := 0; i < 10; i++ { wg.Add(1) i := i go func() { defer wg.Done() mu.Lock() results = append(results, i*i) mu.Unlock() }() } // ✅ FIX option B : indices pré-alloués (pas de append) results := make([]int, 10) for i := 0; i < 10; i++ { wg.Add(1) i := i go func() { defer wg.Done() results[i] = i * i // indices disjoints → pas de race }() } ``` ### Pattern 6 : Race struct fields ```go // ❌ MAUVAIS : deux goroutines écrivent dans des champs différents de la même struct type Config struct { Host string Port int } var cfg Config go func() { cfg.Host = "localhost" }() // race go func() { cfg.Port = 8080 }() // race (même struct, même cache line) // ✅ FIX : construire la struct en dehors, puis pointer swap atomique var cfgPtr atomic.Pointer[Config] go func() { newCfg := &Config{Host: "localhost", Port: 8080} cfgPtr.Store(newCfg) }() ``` ### Pattern 7 : Send et close non synchronisés ```go // ❌ MAUVAIS : close et send depuis des goroutines différentes sans coordination ch := make(chan int) go func() { ch <- 1 }() // peut paniquer si close arrive en premier go func() { close(ch) }() // peut paniquer si send arrive après // ✅ FIX : une seule goroutine ferme, les autres savent qu'elles ne doivent plus envoyer ch := make(chan int, 1) done := make(chan struct{}) go func() { select { case ch <- 1: case <-done: // ne pas envoyer si fermé } }() close(done) // signal aux senders // puis fermer ch séparément depuis le seul owner ``` --- ## Section 9 : Singleflight et déduplication ### Problème : cache stampede Quand un cache expire, des centaines de goroutines lancent simultanément la même requête DB coûteuse. ```go import "golang.org/x/sync/singleflight" type WeatherService struct { cache *Cache db *DB group singleflight.Group } // GetWeather déduplique les appels simultanés pour la même ville func (s *WeatherService) GetWeather(ctx context.Context, city string) (*Weather, error) { // 1. Essayer le cache d'abord if w, ok := s.cache.Get(city); ok { return w.(*Weather), nil } // 2. Dédupliquer : une seule goroutine fait la requête, les autres attendent v, err, shared := s.group.Do(city, func() (any, error) { slog.Debug("cache miss, fetching from DB", "city", city) w, err := s.db.GetWeather(ctx, city) if err != nil { return nil, err } s.cache.Set(city, w, 5*time.Minute) return w, nil }) if err != nil { return nil, err } if shared { slog.Debug("request deduplicated", "city", city) } return v.(*Weather), nil } ``` ### DoChan — version non-bloquante avec contexte ```go // DoChan permet d'attendre avec le context (annulable) func (s *WeatherService) GetWeatherCtx(ctx context.Context, city string) (*Weather, error) { ch := s.group.DoChan(city, func() (any, error) { return s.db.GetWeather(context.Background(), city) // noter: Background car le travail continue même si un caller annule }) select { case result := <-ch: if result.Err != nil { return nil, result.Err } return result.Val.(*Weather), nil case <-ctx.Done(): return nil, ctx.Err() } } ``` ### Forget — invalider le in-flight ```go // Utile après une mise à jour : forcer un nouveau fetch même si une requête est en cours func (s *WeatherService) Invalidate(city string) { s.cache.Delete(city) s.group.Forget(city) // prochaine requête ne sera pas dédupliquée avec l'in-flight } ``` ### Cas d'usage singleflight | Scénario | Bénéfice | |----------|----------| | Cache miss concurrent | 1 seule requête DB au lieu de N | | Token refresh OAuth | 1 seul appel API au lieu de N simultanés | | Calcul coûteux partagé | 1 seule exécution, résultat partagé | | Configuration reload | Pas de double fetch sous charge | --- ## Section 10 : Graceful shutdown ### Pattern complet pour serveur HTTP ```go func main() { ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM, ) defer stop() srv := &http.Server{ Addr: ":8080", Handler: buildRouter(), ReadTimeout: 15 * time.Second, WriteTimeout: 15 * time.Second, IdleTimeout: 60 * time.Second, } // Démarrer en arrière-plan srvErr := make(chan error, 1) go func() { slog.Info("server starting", "addr", srv.Addr) if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { srvErr <- err } }() // Attendre signal ou erreur fatale select { case err := <-srvErr: slog.Error("server error", "error", err) os.Exit(1) case <-ctx.Done(): slog.Info("shutdown signal received") stop() // libérer les ressources du signal.NotifyContext } // Laisser 30 secondes aux requêtes en cours de se terminer shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() if err := srv.Shutdown(shutdownCtx); err != nil { slog.Error("graceful shutdown failed", "error", err) // Force close si timeout if err := srv.Close(); err != nil { slog.Error("force close failed", "error", err) } os.Exit(1) } slog.Info("server stopped gracefully") } ``` ### Shutdown multi-composants ordonné ```go type App struct { server *http.Server workers []*Worker db *sql.DB cache *redis.Client shutdownCh chan struct{} } func (a *App) Shutdown(ctx context.Context) error { close(a.shutdownCh) // signal à tous les composants // 1. Arrêter le serveur HTTP (plus de nouvelles requêtes) if err := a.server.Shutdown(ctx); err != nil { slog.Error("http server shutdown", "error", err) } // 2. Arrêter les workers (drainer les jobs en cours) var wg sync.WaitGroup for _, w := range a.workers { wg.Add(1) go func(worker *Worker) { defer wg.Done() worker.Stop() }(w) } workerDone := make(chan struct{}) go func() { wg.Wait() close(workerDone) }() select { case <-workerDone: slog.Info("all workers stopped") case <-ctx.Done(): slog.Warn("workers forced stop due to timeout") } // 3. Fermer les connexions (DB, cache) après le flush des workers if err := a.db.Close(); err != nil { slog.Error("db close", "error", err) } if err := a.cache.Close(); err != nil { slog.Error("cache close", "error", err) } return nil } ``` ### Kubernetes — terminationGracePeriodSeconds En Kubernetes, le pod reçoit `SIGTERM`, puis après `terminationGracePeriodSeconds` (défaut 30s) un `SIGKILL`. ```yaml spec: terminationGracePeriodSeconds: 60 # plus long si jobs longs containers: - name: app lifecycle: preStop: exec: command: ["/bin/sh", "-c", "sleep 5"] # laisser le LB supprimer le pod avant SIGTERM ``` ```go // Dans le code : timeout de shutdown = terminationGracePeriodSeconds - marge const shutdownTimeout = 55 * time.Second // terminationGracePeriodSeconds = 60 ``` --- ## Section 11 : Circuit breaker et rate limiting ### Circuit breaker avec sony/gobreaker ```go import "github.com/sony/gobreaker/v2" type PaymentGateway struct { cb *gobreaker.CircuitBreaker[*PaymentResponse] } func NewPaymentGateway(client *http.Client) *PaymentGateway { settings := gobreaker.Settings{ Name: "payment-gateway", MaxRequests: 3, // requêtes en half-open avant de décider Interval: 60 * time.Second, // reset des compteurs si fermé Timeout: 30 * time.Second, // durée open avant half-open ReadyToTrip: func(counts gobreaker.Counts) bool { failureRatio := float64(counts.TotalFailures) / float64(counts.Requests) return counts.Requests >= 10 && failureRatio >= 0.6 }, OnStateChange: func(name string, from, to gobreaker.State) { slog.Warn("circuit breaker state change", "name", name, "from", from, "to", to, ) }, IsSuccessful: func(err error) bool { // Certaines erreurs ne doivent pas ouvrir le circuit (ex: validation) if err == nil { return true } var validationErr *ValidationError return errors.As(err, &validationErr) }, } return &PaymentGateway{ cb: gobreaker.NewCircuitBreaker[*PaymentResponse](settings), } } func (g *PaymentGateway) Charge(ctx context.Context, req ChargeRequest) (*PaymentResponse, error) { return g.cb.Execute(func() (*PaymentResponse, error) { return g.doCharge(ctx, req) }) } ``` ### États du circuit breaker ``` Closed (normal) → trop d'erreurs → Open (rejette tout) Open → après Timeout → Half-Open (teste quelques requêtes) Half-Open → succès → Closed Half-Open → échec → Open ``` ### Rate limiter — token bucket ```go import "golang.org/x/time/rate" // Limiter basique : 100 req/sec avec burst de 20 limiter := rate.NewLimiter(rate.Limit(100), 20) func (h *APIHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if !limiter.Allow() { http.Error(w, "rate limit exceeded", http.StatusTooManyRequests) return } h.handle(w, r) } // Limiter avec wait (bloque jusqu'à token disponible) func (c *APIClient) Call(ctx context.Context, req Request) (*Response, error) { if err := c.limiter.Wait(ctx); err != nil { return nil, fmt.Errorf("rate limiter: %w", err) } return c.doCall(ctx, req) } // Rate limiter par clé (ex: par IP, par user) type PerKeyLimiter struct { mu sync.RWMutex limiters map[string]*rate.Limiter r rate.Limit burst int } func (l *PerKeyLimiter) Allow(key string) bool { l.mu.RLock() lim, ok := l.limiters[key] l.mu.RUnlock() if !ok { l.mu.Lock() // Double-check pattern if lim, ok = l.limiters[key]; !ok { lim = rate.NewLimiter(l.r, l.burst) l.limiters[key] = lim } l.mu.Unlock() } return lim.Allow() } // Limiter avec reservation — prévoir le délai avant de commencer le travail func scheduledWork(ctx context.Context, limiter *rate.Limiter) error { r := limiter.Reserve() if !r.OK() { return errors.New("rate limiter: impossible to satisfy") } delay := r.Delay() if delay > 0 { select { case <-time.After(delay): case <-ctx.Done(): r.Cancel() return ctx.Err() } } return doWork(ctx) } ``` --- ## Section 12 : Tester du code concurrent ### Race detector — obligatoire en CI ```bash # CI pipeline : toujours -race go test -race ./... # Avec GORACE pour avoir plus d'infos GORACE="log_path=/tmp/race halt_on_error=1" go test -race ./... # Arrêter au premier race pour ne pas noyer les logs GORACE="halt_on_error=1" go test -race ./... ``` ### goleak — détection de goroutine leaks ```go import "go.uber.org/goleak" // Au niveau package : couvre tous les tests func TestMain(m *testing.M) { goleak.VerifyTestMain(m) } // Pour un test précis func TestWorkerPool(t *testing.T) { defer goleak.VerifyNone(t) ctx, cancel := context.WithCancel(context.Background()) pool := NewWorkerPool(ctx, 4) // Lancer du travail pool.Submit(func() { time.Sleep(10 * time.Millisecond) }) // Annuler et attendre l'arrêt propre cancel() pool.Wait() // goleak vérifie ici qu'aucune goroutine n'a fui } ``` ### Ne jamais utiliser time.Sleep dans les tests ```go // ❌ MAUVAIS : time.Sleep fragile (trop court en CI lent, trop long localement) func TestAsync(t *testing.T) { service.StartAsync() time.Sleep(100 * time.Millisecond) // flaky ! assert.Equal(t, 1, service.Count()) } // ✅ BON : attendre avec polling + timeout func waitUntil(t *testing.T, condition func() bool, timeout time.Duration) { t.Helper() deadline := time.Now().Add(timeout) for time.Now().Before(deadline) { if condition() { return } time.Sleep(5 * time.Millisecond) } t.Fatalf("condition not met after %v", timeout) } func TestAsync(t *testing.T) { service.StartAsync() waitUntil(t, func() bool { return service.Count() == 1 }, 5*time.Second) } // ✅ ENCORE MIEUX : channel de notification func TestAsyncWithChannel(t *testing.T) { done := make(chan struct{}) service.OnComplete(func() { close(done) }) service.StartAsync() select { case <-done: // succès case <-time.After(5 * time.Second): t.Fatal("timeout waiting for completion") } } ``` ### Tests parallèles et table-driven ```go func TestConcurrentCache(t *testing.T) { tests := []struct { name string keys []string values []string }{ { name: "single key", keys: []string{"k1"}, values: []string{"v1"}, }, { name: "multiple keys", keys: []string{"k1", "k2", "k3"}, values: []string{"v1", "v2", "v3"}, }, } for _, tt := range tests { tt := tt // Go < 1.22 t.Run(tt.name, func(t *testing.T) { t.Parallel() // ✅ tests en parallèle cache := NewCache() var wg sync.WaitGroup for i, key := range tt.keys { wg.Add(1) key, value := key, tt.values[i] go func() { defer wg.Done() cache.Set(key, value) }() } wg.Wait() for i, key := range tt.keys { got, ok := cache.Get(key) if !ok { t.Errorf("key %s not found", key) continue } if got != tt.values[i] { t.Errorf("got %s, want %s", got, tt.values[i]) } } }) } } ``` ### Tester l'annulation par contexte ```go func TestCancellation(t *testing.T) { t.Run("respects context cancellation", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) started := make(chan struct{}) finished := make(chan error, 1) go func() { close(started) finished <- longRunningWork(ctx) }() <-started // attendre que le travail démarre cancel() // annuler select { case err := <-finished: if !errors.Is(err, context.Canceled) { t.Errorf("expected context.Canceled, got %v", err) } case <-time.After(2 * time.Second): t.Fatal("work did not stop after cancellation") } }) t.Run("respects deadline", func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) defer cancel() err := longRunningWork(ctx) if !errors.Is(err, context.DeadlineExceeded) { t.Errorf("expected DeadlineExceeded, got %v", err) } }) } ``` ### Stress testing — augmenter la probabilité de races ```go // -count=1000 répète le test 1000 fois pour maximiser l'exposition des races // go test -race -count=1000 -run TestMyFunc ./... // Test de charge concurrent func TestConcurrentWrites(t *testing.T) { if testing.Short() { t.Skip("skipping stress test in short mode") } cache := NewCache() const goroutines = 100 const opsPerGoroutine = 1000 var wg sync.WaitGroup for g := 0; g < goroutines; g++ { wg.Add(1) g := g go func() { defer wg.Done() for i := 0; i < opsPerGoroutine; i++ { key := fmt.Sprintf("key-%d-%d", g, i) cache.Set(key, i) cache.Get(key) } }() } wg.Wait() } ``` ### testing/synctest (Go 1.24 — expérimental) ```go // testing/synctest permet de contrôler le temps dans les tests // sans time.Sleep réel. Encore expérimental en Go 1.24. //go:build go1.24 import "testing/synctest" func TestWithFakeTime(t *testing.T) { synctest.Run(func() { done := make(chan struct{}) go func() { time.Sleep(5 * time.Minute) // ne bloque pas réellement close(done) }() synctest.Wait() // avancer le temps jusqu'au prochain event select { case <-done: // le Sleep de 5 minutes s'est "écoulé" instantanément default: t.Fatal("goroutine not done") } }) } ``` --- ## Section 13 : Anti-patterns — tableau récapitulatif ### Tableau des anti-patterns | Anti-pattern | Problème | Fix | |-------------|----------|-----| | `go func() { heavy() }()` sans tracking | Goroutine orpheline, leak | WaitGroup ou errgroup avec suivi | | `close(ch)` depuis le receiver | Panic si le sender envoie après | Toujours fermer côté sender | | `close(ch)` depuis plusieurs goroutines | Double-close = panic | `sync.Once` pour le close | | Mutex dans une struct copiée par valeur | Mutex invalide | Passer par pointeur `*MyStruct` | | `defer mu.Unlock()` absent | Deadlock si panic entre Lock et Unlock | Toujours `defer mu.Unlock()` | | Lock tenu pendant appel réseau/DB | Contention excessive | Fetch sans lock, puis lock pour write | | `wg.Add(1)` à l'intérieur de la goroutine | Race avec `wg.Wait()` | `wg.Add(1)` avant `go func()` | | `context.Background()` dans un handler | Ignore l'annulation du client | Utiliser `r.Context()` | | Valeurs métier dans `context.WithValue` | Couplage caché, lisibilité nulle | Paramètres de fonction explicites | | `time.Sleep` dans les tests | Tests flaky en CI lent | Channels de notification + timeout | | Map concurrente sans mutex | Race condition (panic runtime) | `sync.RWMutex` ou `sync.Map` | | Channel unbuffered dans goroutine qui peut ne pas recevoir | Goroutine leak | Channel bufferisé `make(chan T, 1)` | | `cancel()` non appelé | Context et ressources leakent | `defer cancel()` immédiatement | | `context.TODO()` laissé en production | Dette technique invisible | Chercher TODO régulièrement | | Panic non récupéré dans goroutine | Crash tout le process | Wrapper `safeGo` avec `recover()` | | select sans `default` dans boucle | Bloque si tous les channels vides | Ajouter `case <-ctx.Done()` | | `select` avec `default` dans boucle active | Busy-wait = CPU à 100% | `time.After` ou condition variable | | Ignorer le retour de `errgroup.TryGo` | Travail silencieusement perdu | Checker le bool retourné | | Worker pool sans drain des channels | Deadlock à l'arrêt | Fermer jobs channel, drainer results | | Goroutine qui lit `results` avant la fin | Lecture partielle | `wg.Wait()` puis lire | | `append` concurrent sans mutex | Race sur pointeur/len/cap | Mutex ou indices pré-alloués | | singleflight sans `Forget` après invalidation | Résultat périmé partagé | `group.Forget(key)` après invalidation | | Rate limiter global partagé entre tests | Tests interdépendants | Créer un limiter par test | | `go test` sans `-race` | Races non détectées | Ajouter `-race` dans CI systématiquement | ### Anti-patterns visuels — les plus courants ```go // ❌ Anti-pattern 1 : goroutine fire-and-forget go processJob(job) // pas de tracking, pas de récupération d'erreur // ✅ Fix g.Go(func() error { return processJob(ctx, job) }) // --- // ❌ Anti-pattern 2 : channel partiellement drainé results := make(chan Result, 100) for _, job := range jobs { go func(j Job) { results <- process(j) }(job) } // Si certaines goroutines n'ont pas de place dans le channel → leak // ✅ Fix : WaitGroup + close proper go func() { wg.Wait(); close(results) }() for r := range results { collect(r) } // --- // ❌ Anti-pattern 3 : double close close(done) close(done) // panic: close of closed channel // ✅ Fix var closeOnce sync.Once closeOnce.Do(func() { close(done) }) // --- // ❌ Anti-pattern 4 : goroutine lancée dans init() func init() { go backgroundTask() // impossible à arrêter proprement } // ✅ Fix : exposer un Start/Stop type Service struct { stopCh chan struct{} } func (s *Service) Start() { go s.run(s.stopCh) } func (s *Service) Stop() { close(s.stopCh) } ``` --- ## Section 14 : Checklist production ### Checklist goroutines et channels - [ ] Chaque goroutine a un owner identifié (qui la démarre, qui l'arrête) - [ ] Aucune goroutine fire-and-forget sans tracking (WaitGroup, errgroup, ou channel résultat) - [ ] Toutes les goroutines s'arrêtent sur `ctx.Done()` ou fermeture de channel - [ ] `safeGo` wrapper avec `recover()` sur toutes les goroutines de prod - [ ] Channel fermé uniquement côté sender (jamais receiver) - [ ] `sync.Once` ou coordination explicite si plusieurs goroutines peuvent fermer - [ ] Types directionnels dans les signatures (`<-chan T`, `chan<- T`) - [ ] Channels bufferisés là où les goroutines doivent continuer sans bloquer ### Checklist sync primitives - [ ] Aucun Mutex copié par valeur (passer par pointeur) - [ ] `defer mu.Unlock()` sur chaque `mu.Lock()` sans exception - [ ] Aucun I/O ou appel réseau sous un Mutex - [ ] `wg.Add(N)` avant le `go func()`, jamais à l'intérieur - [ ] `defer cancel()` immédiatement après chaque `WithCancel/WithTimeout/WithDeadline` - [ ] Clés de contexte = types non-exportés (jamais strings) - [ ] Aucune valeur métier dans les valeurs de contexte ### Checklist tests - [ ] `-race` actif dans le CI pour tous les tests - [ ] `goleak.VerifyTestMain(m)` dans chaque package avec goroutines - [ ] Aucun `time.Sleep` dans les tests (remplacer par channels + timeout) - [ ] Tests concurrents avec `t.Parallel()` sur les tests table-driven - [ ] Tests d'annulation de contexte (Canceled et DeadlineExceeded) - [ ] Stress test `go test -race -count=100` pour les structures partagées critiques ### Checklist monitoring production - [ ] `runtime.NumGoroutine()` exposé en Prometheus (alerte si croissance continue) - [ ] `/debug/pprof` disponible sur port interne (jamais exposé public) - [ ] Logs structurés (slog) avec contexte sur chaque erreur de goroutine - [ ] Circuit breaker sur chaque dépendance externe (DB, API, cache) - [ ] Rate limiter sur les endpoints publics - [ ] Graceful shutdown avec drain des workers avant fermeture DB ### Checklist code review concurrence - [ ] Toute nouvelle goroutine : propriété claire, arrêt garanti - [ ] Toute nouvelle map partagée : protégée par RWMutex ou sync.Map - [ ] Tout nouveau canal : fermeture côté sender documentée - [ ] Tout appel DB/HTTP : utilise les méthodes `*Context` (jamais sans ctx) - [ ] Tout nouveau Mutex : dans une struct passée par pointeur - [ ] `go test -race ./...` passe sans avertissements - [ ] `goleak.VerifyNone(t)` passe pour le test du composant concerné ### Librairies recommandées | Librairie | Import | Usage | |-----------|--------|-------| | errgroup | `golang.org/x/sync/errgroup` | WaitGroup + erreurs + annulation | | semaphore | `golang.org/x/sync/semaphore` | Concurrence pondérée | | singleflight | `golang.org/x/sync/singleflight` | Déduplication de requêtes | | rate | `golang.org/x/time/rate` | Token bucket rate limiting | | goleak | `go.uber.org/goleak` | Détection goroutine leaks en tests | | conc | `github.com/sourcegraph/conc` | Goroutines avec panic recovery intégré | | gobreaker | `github.com/sony/gobreaker/v2` | Circuit breaker production | | synctest | `testing/synctest` | Contrôle du temps dans les tests (Go 1.24+) | ```bash go get golang.org/x/sync@latest go get golang.org/x/time@latest go get go.uber.org/goleak@latest go get github.com/sourcegraph/conc@latest go get github.com/sony/gobreaker/v2@latest ``` ### Références - [The Go Memory Model](https://go.dev/ref/mem) — spec officielle - [Concurrency Patterns in Go](https://go.dev/talks/2012/concurrency.slide) — Rob Pike - [Rethinking Classical Concurrency Patterns](https://drive.google.com/file/d/1nPdvhB0PutEJzdCq5ms6UI58dp50fcAN/view) — Bryan Mills - [Uber Go Style Guide — Goroutines](https://github.com/uber-go/guide/blob/master/style.md#goroutines) - [go.uber.org/goleak](https://pkg.go.dev/go.uber.org/goleak) - [github.com/sourcegraph/conc](https://github.com/sourcegraph/conc) --- ## Testing ### Race detector — première ligne de défense Toujours lancer les tests avec `-race`. Le race detector instrumente le binaire et détecte les accès concurrents non protégés. ```bash go test -race ./... # Pour un test spécifique go test -race -run TestWorkerPool ./internal/jobs/ # En CI : ajouter -race systématiquement go test -race -count=1 -timeout=60s ./... ``` Le race detector ralentit l'exécution (~2-10x) mais est indispensable. Ne jamais désactiver en CI. ### Tester un worker pool ```go func TestWorkerPool_ProcessesAllItems(t *testing.T) { var ( mu sync.Mutex processed []int ) items := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} pool := NewWorkerPool(3) // 3 workers err := pool.Run(context.Background(), items, func(item int) error { mu.Lock() processed = append(processed, item) mu.Unlock() return nil }) require.NoError(t, err) assert.Len(t, processed, len(items)) assert.ElementsMatch(t, items, processed) // ordre non garanti } func TestWorkerPool_HandlesPartialErrors(t *testing.T) { items := []int{1, 2, 3, 4, 5} pool := NewWorkerPool(2) err := pool.Run(context.Background(), items, func(item int) error { if item%2 == 0 { return fmt.Errorf("item %d failed", item) } return nil }) // Le pool doit retourner les erreurs mais avoir traité tous les items require.Error(t, err) var jobErrs *JobErrors require.ErrorAs(t, err, &jobErrs) assert.Len(t, jobErrs.Errors, 2) // items 2 et 4 } ``` ### Tester l'annulation via context ```go func TestWorkerPool_RespectsContextCancellation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) // Annuler après 50ms go func() { time.Sleep(50 * time.Millisecond) cancel() }() items := make([]int, 100) // beaucoup d'items processed := atomic.Int32{} pool := NewWorkerPool(2) err := pool.Run(ctx, items, func(item int) error { time.Sleep(10 * time.Millisecond) // travail lent processed.Add(1) return nil }) require.ErrorIs(t, err, context.Canceled) // Seule une fraction des items a été traitée assert.Less(t, int(processed.Load()), len(items)) } ``` ### Détecter les goroutine leaks avec goleak ```go import "go.uber.org/goleak" func TestMain(m *testing.M) { goleak.VerifyTestMain(m) // Vérifie qu'aucune goroutine ne leak après chaque test } // Ou par test individuel func TestMyService_NoLeak(t *testing.T) { defer goleak.VerifyNone(t) svc := NewMyService() svc.Start() // ... test ... svc.Stop() // Si Stop() ne termine pas toutes les goroutines → goleak signale le leak } ``` ### Tester avec timeout pour éviter les deadlocks ```go func TestChannel_NoDeadlock(t *testing.T) { done := make(chan struct{}) go func() { defer close(done) result := processWithChannels() // fonction à tester assert.Equal(t, expected, result) }() select { case <-done: // OK case <-time.After(5 * time.Second): t.Fatal("test timed out — possible deadlock") } } ``` ### sync/synctest (Go 1.24+) Go 1.24 introduit `testing/synctest` pour tester du code concurrent avec un scheduler déterministe. ```go // go test -run TestWithFakeTime func TestRetryWithBackoff(t *testing.T) { synctest.Run(func() { attempts := 0 svc := NewRetryService(func() error { attempts++ if attempts < 3 { return errors.New("transient error") } return nil }, WithBackoff(time.Second)) go svc.Run(context.Background()) synctest.Wait() // avance le temps virtuel jusqu'à la prochaine goroutine bloquée assert.Equal(t, 1, attempts) synctest.Wait() // après le premier backoff assert.Equal(t, 2, attempts) synctest.Wait() assert.Equal(t, 3, attempts) }) } ``` --- *Last updated: 2025-03 — Revoir si : Go 1.24+ (sync/synctest GA, rangefunc), changements du scheduler Go, ou nouvelles APIs errgroup/singleflight.*