# CLAUDE.md — Concurrence & Parallélisme en Go
> Contexte spécialisé pour Claude Code. Coller ce fichier à la racine du projet pour guider le travail sur la concurrence en Go.
---
## Quand utiliser ce contexte
- ✅ Worker pools pour traiter des lots de tâches CPU-bound ou I/O-bound en parallèle
- ✅ Pipelines de traitement de données avec étapes productrices/consommatrices
- ✅ Services long-running avec goroutines de fond (jobs, schedulers, watchers)
- ✅ Diagnostic de goroutine leaks ou de race conditions sur un codebase existant
- ❌ Tâches séquentielles simples : ajouter des goroutines n'améliore pas la lisibilité
- ❌ Parallélisme sur des opérations très courtes (< microseconde) : overhead des channels dépasse le gain
- ❌ Code mono-goroutine sans concurrence réelle : ne pas anticiper "au cas où"
---
## Section 1 : Goroutines — règles fondamentales
### Coût et cycle de vie
Chaque goroutine démarre avec une stack d'environ **2 KB** (extensible dynamiquement jusqu'à 1 GB par défaut). Elles sont multiplexées sur les threads OS par le scheduler Go (M:N threading) — pas de coût de thread OS à chaque création.
Mais "pas cher à créer" ne veut pas dire "gratuit à laisser fuir". Règles absolues :
1. **Toujours savoir qui possède une goroutine et qui l'arrête**
2. **Jamais de fire-and-forget sans suivi de completion**
3. **Un panic dans une goroutine crashe tout le processus** si non récupéré à l'intérieur de cette goroutine
### Sizing des worker pools
| Type de travail | Taille recommandée |
|----------------|-------------------|
| CPU-bound | `runtime.NumCPU()` |
| I/O-bound | `runtime.NumCPU() * 2` ou plus (benchmarker) |
| Mixte | Commencer à `NumCPU() * 2`, ajuster selon profiling |
### Pattern SafeGo — wrapper avec recover
Ne jamais lancer une goroutine dans du code de production sans wrapper de panic :
```go
// safeGo lance une goroutine avec recover automatique.
// fn est la fonction à exécuter, onPanic est appelé si panic.
func safeGo(fn func(), onPanic func(r any)) {
go func() {
defer func() {
if r := recover(); r != nil {
if onPanic != nil {
onPanic(r)
}
}
}()
fn()
}()
}
// Usage dans un service
func (s *EmailService) sendAsync(ctx context.Context, msg Message) {
safeGo(func() {
if err := s.send(ctx, msg); err != nil {
slog.Error("async send failed", "msg_id", msg.ID, "error", err)
}
}, func(r any) {
slog.Error("panic in sendAsync", "recover", r, "msg_id", msg.ID)
})
}
```
### Goroutine avec ownership clair
```go
// ✅ BON : la goroutine s'arrête sur signal du contexte
func (w *Worker) Run(ctx context.Context) error {
for {
select {
case job := <-w.jobs:
if err := w.process(ctx, job); err != nil {
slog.Error("job failed", "error", err)
}
case <-ctx.Done():
return ctx.Err() // ✅ sortie propre
}
}
}
// ❌ MAUVAIS : goroutine orpheline sans moyen de l'arrêter
func startWorker() {
go func() {
for {
processNextJob() // bloque pour toujours
}
}()
}
```
### Tracking via WaitGroup
```go
// ✅ BON : toujours tracker les goroutines lancées
func processAll(ctx context.Context, items []Item) error {
var wg sync.WaitGroup
for _, item := range items {
wg.Add(1) // ✅ Add AVANT le go, pas dedans
go func(it Item) {
defer wg.Done()
if err := process(ctx, it); err != nil {
slog.Error("failed", "item", it.ID, "error", err)
}
}(item) // ✅ passer en paramètre, pas capturer
}
wg.Wait()
return nil
}
```
### Goroutines longues durée — pattern daemon
```go
// Goroutine de fond avec restart automatique
func (s *Scheduler) Start(ctx context.Context) {
go func() {
for {
s.runLoop(ctx)
select {
case <-ctx.Done():
return // arrêt propre demandé
case <-time.After(5 * time.Second):
slog.Info("scheduler restarting after crash")
}
}
}()
}
func (s *Scheduler) runLoop(ctx context.Context) {
defer func() {
if r := recover(); r != nil {
slog.Error("scheduler panic", "recover", r)
}
}()
ticker := time.NewTicker(s.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.tick(ctx)
case <-ctx.Done():
return
}
}
}
```
---
## Section 2 : Channels — patterns et pièges
### Règles fondamentales des channels
```
RÈGLE 1 : C'est l'émetteur qui ferme, jamais le récepteur
RÈGLE 2 : Fermer un channel déjà fermé → panic
RÈGLE 3 : Envoyer sur un channel fermé → panic
RÈGLE 4 : Recevoir d'un channel fermé → zero value immédiat
RÈGLE 5 : Utiliser les types directionnels dans les signatures
```
### Types directionnels
```go
// ✅ BON : types directionnels dans les signatures
func produce(ctx context.Context) <-chan int {
ch := make(chan int)
go func() {
defer close(ch) // l'émetteur ferme
for i := 0; ; i++ {
select {
case ch <- i:
case <-ctx.Done():
return
}
}
}()
return ch
}
func consume(ctx context.Context, in <-chan int) {
for {
select {
case v, ok := <-in:
if !ok {
return // channel fermé
}
fmt.Println(v)
case <-ctx.Done():
return
}
}
}
```
### Buffered vs unbuffered
```go
// Unbuffered : synchronisation stricte émetteur/récepteur
ch := make(chan int)
// L'émetteur bloque jusqu'à ce que le récepteur lise
// Buffered : découplage temporel
ch := make(chan int, 100)
// L'émetteur peut envoyer 100 valeurs sans que le récepteur soit prêt
// Règle : commencer unbuffered, ajouter buffer seulement si profiling justifie
// Un buffer cache souvent un vrai problème de design
```
### Done channel — broadcast d'annulation
```go
// Done channel : fermer pour broadcaster à N goroutines simultanément
func fanOutWithDone(ctx context.Context, in <-chan Work) {
done := ctx.Done() // récupérer une fois, pas à chaque itération
for i := 0; i < runtime.NumCPU(); i++ {
go func() {
for {
select {
case w, ok := <-in:
if !ok {
return
}
process(w)
case <-done:
return // tous les workers reçoivent ce signal simultanément
}
}
}()
}
}
```
### Pattern Pipeline
```go
// gen : source de données
func gen(ctx context.Context, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case out <- n:
case <-ctx.Done():
return
}
}
}()
return out
}
// sq : transformation
func sq(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-ctx.Done():
return
}
}
}()
return out
}
// Usage : pipeline composable
func runPipeline(ctx context.Context) {
for v := range sq(ctx, sq(ctx, gen(ctx, 2, 3, 4))) {
fmt.Println(v) // 16, 81, 256
}
}
```
### Fan-In — merge de plusieurs channels
```go
// merge reçoit N channels en entrée, émet tout sur un seul channel de sortie
func merge(ctx context.Context, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
merged := make(chan int, 1)
output := func(c <-chan int) {
defer wg.Done()
for v := range c {
select {
case merged <- v:
case <-ctx.Done():
return
}
}
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Fermer merged quand tous les inputs sont épuisés
go func() {
wg.Wait()
close(merged)
}()
return merged
}
```
### Fan-Out — distribution de travail
```go
// distribute envoie chaque valeur à l'un des N workers (round-robin implicite via select)
func distribute(ctx context.Context, in <-chan Job, n int) []<-chan Job {
channels := make([]chan Job, n)
outputs := make([]<-chan Job, n)
for i := range channels {
channels[i] = make(chan Job, 10)
outputs[i] = channels[i]
}
go func() {
defer func() {
for _, ch := range channels {
close(ch)
}
}()
i := 0
for job := range in {
select {
case channels[i%n] <- job:
i++
case <-ctx.Done():
return
}
}
}()
return outputs
}
```
### Semaphore via channel bufferisé
```go
// Limiter les accès concurrents à une ressource externe
type Semaphore chan struct{}
func NewSemaphore(n int) Semaphore {
return make(Semaphore, n)
}
func (s Semaphore) Acquire() { s <- struct{}{} }
func (s Semaphore) Release() { <-s }
// Usage : limiter à 5 requêtes HTTP simultanées
sem := NewSemaphore(5)
for _, url := range urls {
url := url
go func() {
sem.Acquire()
defer sem.Release()
fetch(url)
}()
}
```
---
## Section 3 : Primitives sync (Mutex, WaitGroup, Once, Pool)
### Mutex — règles strictes
```go
// RÈGLE 1 : jamais copier un Mutex (passer par pointeur)
// RÈGLE 2 : toujours defer Unlock immédiatement après Lock
// RÈGLE 3 : ne pas tenir un Mutex pendant des opérations lentes (I/O, réseau)
type Cache struct {
mu sync.RWMutex // RWMutex si lectures >> écritures
items map[string]Item
}
// ✅ BON : RLock pour les lectures
func (c *Cache) Get(key string) (Item, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
v, ok := c.items[key]
return v, ok
}
// ✅ BON : Lock pour les écritures
func (c *Cache) Set(key string, item Item) {
c.mu.Lock()
defer c.mu.Unlock()
c.items[key] = item
}
// ❌ MAUVAIS : lock tenu pendant une opération réseau
func (c *Cache) SetFromDB(key string) {
c.mu.Lock()
defer c.mu.Unlock()
item := fetchFromDB(key) // ← DB call sous le lock = contention maximale
c.items[key] = item
}
// ✅ BON : fetch sans lock, write avec lock
func (c *Cache) SetFromDBCorrect(key string) error {
item, err := fetchFromDB(key) // ← pas de lock ici
if err != nil {
return err
}
c.mu.Lock()
c.items[key] = item
c.mu.Unlock()
return nil
}
```
### Sharded locking — haute contention
Quand un seul Mutex devient un goulot d'étranglement (profiling montre >50% du temps en `sync.Mutex.Lock`) :
```go
const numShards = 256
type ShardedCache struct {
shards [numShards]struct {
sync.RWMutex
items map[string]any
}
}
func (c *ShardedCache) shard(key string) int {
h := fnv.New32a()
h.Write([]byte(key))
return int(h.Sum32()) % numShards
}
func (c *ShardedCache) Get(key string) (any, bool) {
s := c.shard(key)
c.shards[s].RLock()
defer c.shards[s].RUnlock()
v, ok := c.shards[s].items[key]
return v, ok
}
func (c *ShardedCache) Set(key string, value any) {
s := c.shard(key)
c.shards[s].Lock()
defer c.shards[s].Unlock()
if c.shards[s].items == nil {
c.shards[s].items = make(map[string]any)
}
c.shards[s].items[key] = value
}
```
### WaitGroup — timing critique
```go
// ✅ BON : Add AVANT le goroutine launch
var wg sync.WaitGroup
for _, task := range tasks {
wg.Add(1) // ← ici, avant go
go func(t Task) {
defer wg.Done()
process(t)
}(task)
}
wg.Wait()
// ❌ MAUVAIS : Add à l'intérieur de la goroutine (race condition)
for _, task := range tasks {
go func(t Task) {
wg.Add(1) // ← race : wg.Wait() peut s'exécuter avant ce Add
defer wg.Done()
process(t)
}(task)
}
```
### sync.Once — singleton thread-safe
```go
type DBPool struct {
once sync.Once
pool *sql.DB
err error
}
func (d *DBPool) Get() (*sql.DB, error) {
d.once.Do(func() {
d.pool, d.err = sql.Open("postgres", os.Getenv("DATABASE_URL"))
if d.err == nil {
d.err = d.pool.Ping()
}
})
return d.pool, d.err
}
// Variante : reset possible (Once ne peut pas être réinitialisé, utiliser atomic)
type ResettableSingleton struct {
val atomic.Pointer[ExpensiveResource]
}
func (r *ResettableSingleton) Get() *ExpensiveResource {
if v := r.val.Load(); v != nil {
return v
}
v := newExpensiveResource()
r.val.CompareAndSwap(nil, v)
return r.val.Load()
}
```
### sync.Pool — réutilisation d'allocations
```go
// Pool pour buffers (cas d'usage principal)
var bufPool = sync.Pool{
New: func() any {
return bytes.NewBuffer(make([]byte, 0, 4096))
},
}
func encode(data any) ([]byte, error) {
buf := bufPool.Get().(*bytes.Buffer)
buf.Reset() // ✅ TOUJOURS reset avant utilisation
defer bufPool.Put(buf) // ✅ rendre après utilisation
if err := json.NewEncoder(buf).Encode(data); err != nil {
return nil, fmt.Errorf("encode: %w", err)
}
result := make([]byte, buf.Len())
copy(result, buf.Bytes())
return result, nil
}
// PIÈGE : ne stocker que des pointeurs dans Pool, pas des valeurs
// Le GC peut collecter les objets du Pool après 2 cycles GC
// Ne pas mettre de connexions réseau ou de fichiers dans un Pool
```
### sync.Map — cas d'usage limités
```go
// sync.Map est optimisée pour deux cas précis :
// 1. Write-once, read-many (ex: registre de handlers)
// 2. Clés disjointes entre goroutines (ex: chaque goroutine écrit ses propres clés)
var registry sync.Map
// Enregistrer les handlers au démarrage (write-once)
func init() {
registry.Store("user.created", handleUserCreated)
registry.Store("order.placed", handleOrderPlaced)
}
// Lecture fréquente pendant le runtime
func dispatch(event string, payload any) {
if handler, ok := registry.Load(event); ok {
handler.(func(any))(payload)
}
}
// ⚠️ Pour un cache général avec writes/reads mélangés : utiliser RWMutex + map
```
### Atomic operations — compteurs simples
```go
// Pour les compteurs sans logique complexe, atomic est plus performant que Mutex
type Metrics struct {
requests atomic.Int64
errors atomic.Int64
inFlight atomic.Int64
}
func (m *Metrics) RecordRequest() func(err error) {
m.requests.Add(1)
m.inFlight.Add(1)
return func(err error) {
m.inFlight.Add(-1)
if err != nil {
m.errors.Add(1)
}
}
}
```
---
## Section 4 : context.Context — propagation et annulation
### Règles fondamentales
```
RÈGLE 1 : context.Context = premier paramètre, nommé ctx
RÈGLE 2 : ne JAMAIS stocker un context dans une struct
RÈGLE 3 : toujours defer cancel() immédiatement après création
RÈGLE 4 : context.TODO() = dette technique (chercher TODO(odilon) dans le code)
RÈGLE 5 : ne jamais utiliser context.Background() dans un handler HTTP
RÈGLE 6 : les valeurs dans le contexte = métadonnées de request (trace ID, auth token)
JAMAIS de logique métier dans les valeurs contexte
```
### Hiérarchie des contextes
```
context.Background()
└── context.WithCancel() — annulation manuelle
└── context.WithTimeout() — deadline relative
└── context.WithDeadline() — deadline absolue
└── context.WithValue() — métadonnées
```
### Propagation correcte
```go
// ✅ BON : ctx propagé à tous les niveaux
func (s *OrderService) PlaceOrder(ctx context.Context, req PlaceOrderRequest) (*Order, error) {
if err := s.validateStock(ctx, req.Items); err != nil {
return nil, fmt.Errorf("stock check: %w", err)
}
order, err := s.repo.CreateOrder(ctx, req)
if err != nil {
return nil, fmt.Errorf("create order: %w", err)
}
if err := s.notifier.Send(ctx, order); err != nil {
// log mais ne pas échouer (notification = best-effort)
slog.Warn("notification failed", "order_id", order.ID, "error", err)
}
return order, nil
}
// ❌ MAUVAIS : context.Background() ignore l'annulation du client
func (s *OrderService) BadPlaceOrder(ctx context.Context, req PlaceOrderRequest) (*Order, error) {
if err := s.validateStock(context.Background(), req.Items); err != nil { // ← perd l'annulation
return nil, err
}
// ...
}
```
### defer cancel() — immédiatement
```go
// ✅ BON : defer cancel() sur la même ligne que la création
func processWithTimeout(parent context.Context) error {
ctx, cancel := context.WithTimeout(parent, 30*time.Second)
defer cancel() // ← immédiatement, avant toute autre ligne
return doWork(ctx)
}
// ❌ MAUVAIS : cancel() oublié = context leak
func processWithTimeoutBad(parent context.Context) error {
ctx, cancel := context.WithTimeout(parent, 30*time.Second)
_ = cancel // ← oublié de defer
return doWork(ctx)
}
```
### Clés de contexte — typage strict
```go
// ✅ BON : type non-exporté pour éviter les collisions
type contextKey int
const (
contextKeyTraceID contextKey = iota
contextKeyUserID
contextKeyTenant
)
func WithTraceID(ctx context.Context, traceID string) context.Context {
return context.WithValue(ctx, contextKeyTraceID, traceID)
}
func TraceIDFromContext(ctx context.Context) (string, bool) {
v, ok := ctx.Value(contextKeyTraceID).(string)
return v, ok
}
// ❌ MAUVAIS : clé string = risque de collision entre packages
ctx = context.WithValue(ctx, "userID", "123") // ← collisions avec autres packages
```
### Deadline budget — vérification avant opération longue
```go
// Vérifier qu'on a assez de budget avant une opération longue
func fetchWithBudget(ctx context.Context, url string) ([]byte, error) {
deadline, ok := ctx.Deadline()
if ok {
remaining := time.Until(deadline)
if remaining < 500*time.Millisecond {
return nil, fmt.Errorf("insufficient budget: %v remaining", remaining)
}
}
// Les enfants ne peuvent que réduire la deadline, jamais l'étendre
childCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(childCtx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
// Distinguer annulation de timeout
if errors.Is(err, context.Canceled) {
return nil, fmt.Errorf("request canceled by caller: %w", err)
}
if errors.Is(err, context.DeadlineExceeded) {
return nil, fmt.Errorf("request timed out: %w", err)
}
return nil, err
}
defer resp.Body.Close()
return io.ReadAll(resp.Body)
}
```
### context.WithoutCancel (Go 1.21+) — travail post-request
```go
// Cas : cleanup/audit après fin de request HTTP
// Le contexte de la request sera annulé quand le handler retourne
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
result, err := h.service.Process(r.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(result)
// Audit log post-request : utiliser un context sans annulation
// pour que le log survive après la fin du handler
auditCtx := context.WithoutCancel(r.Context()) // Go 1.21+
go func() {
if err := h.auditor.Log(auditCtx, result); err != nil {
slog.Error("audit log failed", "error", err)
}
}()
}
```
### Vérification dans les boucles CPU-bound
```go
// Pour les boucles longues qui ne font pas d'I/O, vérifier ctx manuellement
func processLargeDataset(ctx context.Context, items []Item) ([]Result, error) {
results := make([]Result, 0, len(items))
for i, item := range items {
// Vérifier le contexte tous les 1000 items (pas à chaque itération)
if i%1000 == 0 {
if err := ctx.Err(); err != nil {
return results, fmt.Errorf("canceled after %d items: %w", i, err)
}
}
result, err := transform(item)
if err != nil {
slog.Warn("transform failed, skipping", "item", item.ID, "error", err)
continue
}
results = append(results, result)
}
return results, nil
}
```
---
## Section 5 : errgroup — WaitGroup + erreurs + contexte
### Pattern de base
`golang.org/x/sync/errgroup` combine WaitGroup + propagation d'erreur + annulation de contexte.
```go
import "golang.org/x/sync/errgroup"
func fetchAll(ctx context.Context, urls []string) ([][]byte, error) {
g, ctx := errgroup.WithContext(ctx)
results := make([][]byte, len(urls))
for i, url := range urls {
i, url := i, url // ✅ capturer pour la goroutine (Go < 1.22)
g.Go(func() error {
body, err := fetchURL(ctx, url)
if err != nil {
return fmt.Errorf("fetch %s: %w", url, err)
}
results[i] = body // ✅ indices disjoints, pas de race
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return results, nil
}
```
### SetLimit — concurrence bornée (Go 1.22+)
```go
func processImages(ctx context.Context, paths []string) error {
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(runtime.NumCPU()) // ← limiter la concurrence
for _, path := range paths {
path := path
g.Go(func() error {
return resizeImage(ctx, path)
})
}
return g.Wait()
}
```
### Fan-out avec collecte de résultats
```go
// Collecte de résultats avec mutex quand les indices ne sont pas disjoints
func aggregateData(ctx context.Context, sources []DataSource) ([]Record, error) {
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(10)
var mu sync.Mutex
var all []Record
for _, src := range sources {
src := src
g.Go(func() error {
records, err := src.Fetch(ctx)
if err != nil {
return fmt.Errorf("source %s: %w", src.Name(), err)
}
mu.Lock()
all = append(all, records...)
mu.Unlock()
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return all, nil
}
```
### Annulation automatique sur première erreur
```go
// errgroup.WithContext annule automatiquement le ctx si une goroutine retourne une erreur.
// Les autres goroutines doivent surveiller ctx.Done() pour s'arrêter proprement.
func parallelSearch(ctx context.Context, query string, backends []SearchBackend) (*Result, error) {
g, ctx := errgroup.WithContext(ctx)
results := make(chan *Result, len(backends))
for _, b := range backends {
b := b
g.Go(func() error {
r, err := b.Search(ctx, query) // ← ctx sera annulé si un autre échoue
if err != nil {
if errors.Is(err, context.Canceled) {
return nil // annulation normale, pas une vraie erreur
}
return fmt.Errorf("backend %s: %w", b.Name(), err)
}
select {
case results <- r:
case <-ctx.Done():
}
return nil
})
}
go func() {
g.Wait()
close(results)
}()
// Prendre le premier résultat reçu
for r := range results {
return r, nil
}
return nil, g.Wait()
}
```
### TryGo — non-bloquant
```go
// TryGo lance seulement si la limite n'est pas atteinte (retourne false sinon)
func submitBestEffort(g *errgroup.Group, fn func() error) bool {
return g.TryGo(fn)
}
```
---
## Section 6 : Worker Pool — patterns production
### Pattern standard
```go
type Job struct {
ID string
Payload any
}
type Result struct {
JobID string
Value any
Err error
}
func RunWorkerPool(ctx context.Context, numWorkers int, jobs <-chan Job) <-chan Result {
results := make(chan Result, numWorkers*2)
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case job, ok := <-jobs:
if !ok {
return // channel fermé, plus de travail
}
result := processJob(ctx, job)
select {
case results <- result:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
}
// Fermer results quand tous les workers ont terminé
go func() {
wg.Wait()
close(results)
}()
return results
}
func processJob(ctx context.Context, job Job) Result {
// Simulation d'un traitement avec timeout par job
jobCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
value, err := doWork(jobCtx, job.Payload)
return Result{JobID: job.ID, Value: value, Err: err}
}
```
### Production : processeur d'images avec semaphore
```go
import "golang.org/x/sync/semaphore"
type ImageProcessor struct {
sem *semaphore.Weighted
maxBytes int64 // budget mémoire total
}
func NewImageProcessor(maxConcurrent int, maxMemoryMB int64) *ImageProcessor {
return &ImageProcessor{
sem: semaphore.NewWeighted(maxMemoryMB * 1024 * 1024),
maxBytes: maxMemoryMB * 1024 * 1024,
}
}
func (p *ImageProcessor) ProcessAll(ctx context.Context, paths []string) error {
var wg sync.WaitGroup
errs := make(chan error, len(paths))
for _, path := range paths {
path := path
// Estimer le coût en mémoire de cette image
info, err := os.Stat(path)
if err != nil {
errs <- fmt.Errorf("stat %s: %w", path, err)
continue
}
cost := info.Size() * 4 // facteur décompression
// Acquérir le budget mémoire (bloque si trop d'images en parallèle)
if err := p.sem.Acquire(ctx, cost); err != nil {
return fmt.Errorf("context canceled: %w", err)
}
wg.Add(1)
go func() {
defer wg.Done()
defer p.sem.Release(cost)
if err := p.processImage(ctx, path); err != nil {
errs <- fmt.Errorf("process %s: %w", path, err)
}
}()
}
wg.Wait()
close(errs)
var allErrors []error
for err := range errs {
allErrors = append(allErrors, err)
}
return errors.Join(allErrors...)
}
func (p *ImageProcessor) processImage(ctx context.Context, path string) error {
// traitement réel ici
return nil
}
```
### Worker pool avec résultats partiels (resilient execution)
```go
type BatchResult struct {
Processed int
Failed int
Errors []error
}
func ProcessBatch(ctx context.Context, items []Item) BatchResult {
jobs := make(chan Item, len(items))
for _, item := range items {
jobs <- item
}
close(jobs)
type outcome struct {
item Item
err error
}
outcomes := make(chan outcome, len(items))
var wg sync.WaitGroup
for i := 0; i < runtime.NumCPU(); i++ {
wg.Add(1)
go func() {
defer wg.Done()
for item := range jobs {
err := process(ctx, item)
outcomes <- outcome{item: item, err: err}
}
}()
}
go func() {
wg.Wait()
close(outcomes)
}()
var result BatchResult
for o := range outcomes {
if o.err != nil {
result.Failed++
result.Errors = append(result.Errors, fmt.Errorf("item %s: %w", o.item.ID, o.err))
slog.Error("item failed, continuing", "item", o.item.ID, "error", o.err)
} else {
result.Processed++
}
}
return result
}
```
---
## Section 7 : Goroutine leaks — détection et prévention
### Causes racines
| Cause | Description | Fix |
|-------|-------------|-----|
| Channel bloqué sans sender | goroutine attend sur `<-ch`, ch n'a plus de sender | Ajouter done channel ou fermer ch |
| Channel bloqué sans receiver | goroutine attend sur `ch <-`, personne ne lit | Bufferiser ou ajouter ctx.Done() |
| Boucle infinie | `for {}` sans condition de sortie | Ajouter `case <-ctx.Done()` |
| cancel() jamais appelé | contexte WithTimeout non nettoyé | `defer cancel()` systématique |
| WaitGroup sous-incrémentée | `Done()` appelé plus de fois que `Add()` | `Add()` avant `go`, comptage rigoureux |
| close() oublié | goroutines rangent sur channel jamais fermé | Fermer côté émetteur |
### goleak — détection en tests
```go
import "go.uber.org/goleak"
// Package-level : détecter les leaks sur toute la suite de tests
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
// Test unitaire : détecter les leaks d'un test précis
func TestMyWorker(t *testing.T) {
defer goleak.VerifyNone(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
w := NewWorker()
w.Start(ctx)
// ... test ...
// Si Start lance une goroutine qui ne se termine pas → goleak échoue le test
}
// Ignorer les goroutines connues (ex: goroutines de bibliothèques tierces)
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m,
goleak.IgnoreTopFunction("google.golang.org/grpc.(*ClientConn).WaitForStateChange"),
)
}
```
### Monitoring runtime — Prometheus
```go
import (
"runtime"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var goroutineGauge = promauto.NewGauge(prometheus.GaugeOpts{
Name: "app_goroutines_total",
Help: "Current number of goroutines",
})
// Collecter toutes les 10 secondes
func StartGoroutineMetrics(ctx context.Context) {
go func() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
goroutineGauge.Set(float64(runtime.NumGoroutine()))
case <-ctx.Done():
return
}
}
}()
}
// Heuristique santé : 50 connexions → <200 goroutines normalement
// Si NumGoroutine() croît sans limite → leak probable
```
### pprof — analyse en production
```go
import _ "net/http/pprof"
// Ajouter un serveur pprof (sur port interne uniquement !)
go func() {
slog.Info("pprof server started", "addr", "localhost:6060")
if err := http.ListenAndServe("localhost:6060", nil); err != nil {
slog.Error("pprof server failed", "error", err)
}
}()
```
```bash
# Voir les goroutines actives
curl -s localhost:6060/debug/pprof/goroutine?debug=2 | head -100
# Snapshot goroutines dans un fichier
go tool pprof http://localhost:6060/debug/pprof/goroutine
```
### Exemple de leak classique et fix
```go
// ❌ MAUVAIS : goroutine leak si timeout atteint
func leaky(ctx context.Context) error {
result := make(chan int) // unbuffered
go func() {
time.Sleep(10 * time.Second)
result <- 42 // bloque pour toujours si le caller est parti
}()
select {
case v := <-result:
fmt.Println(v)
return nil
case <-ctx.Done():
return ctx.Err() // ← goroutine ci-dessus reste bloquée !
}
}
// ✅ BON : channel bufferisé, goroutine peut toujours écrire
func fixed(ctx context.Context) error {
result := make(chan int, 1) // bufferisé → goroutine ne bloque pas
go func() {
time.Sleep(10 * time.Second)
result <- 42 // écriture non-bloquante grâce au buffer
}()
select {
case v := <-result:
fmt.Println(v)
return nil
case <-ctx.Done():
return ctx.Err() // goroutine terminera d'elle-même et le GC nettoiera
}
}
```
---
## Section 8 : Data races — les 7 patterns Uber
Toujours activer le race detector :
```bash
go test -race ./...
go run -race main.go
GORACE="halt_on_error=1" go test -race ./... # stopper au premier race détecté
```
### Pattern 1 : Capture de variable de boucle (Go < 1.22)
```go
// ❌ MAUVAIS (Go < 1.22) : toutes les goroutines capturent le même 'i'
for i := 0; i < 5; i++ {
go func() {
fmt.Println(i) // race : i peut valoir 5 quand la goroutine démarre
}()
}
// ✅ FIX : passer en paramètre
for i := 0; i < 5; i++ {
i := i // reshadow (Go < 1.22)
go func() {
fmt.Println(i) // copie locale
}()
}
// ✅ Go 1.22+ : chaque itération crée sa propre variable i
// Le fix automatique est dans le compilateur — mais toujours nommer explicitement
```
### Pattern 2 : Map non protégée
```go
// ❌ MAUVAIS : map concurrente sans synchronisation
var cache = map[string]int{}
func inc(key string) {
cache[key]++ // race : read-modify-write non-atomique
}
// ✅ FIX option A : RWMutex
var (
cacheMu sync.RWMutex
cache = map[string]int{}
)
func inc(key string) {
cacheMu.Lock()
defer cacheMu.Unlock()
cache[key]++
}
// ✅ FIX option B : sync.Map (write-heavy → option A est plus rapide)
var cache sync.Map
func inc(key string) {
v, _ := cache.LoadOrStore(key, new(int64))
atomic.AddInt64(v.(*int64), 1)
}
```
### Pattern 3 : Mutex copié par valeur
```go
// ❌ MAUVAIS : copie de Mutex invalide son état interne
type Counter struct {
mu sync.Mutex
value int
}
func badCopy(c Counter) { // ← copie par valeur, Mutex copié aussi
c.mu.Lock() // comportement indéfini
c.value++
c.mu.Unlock()
}
// ✅ FIX : toujours passer par pointeur
func goodPointer(c *Counter) {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
```
### Pattern 4 : Variable err partagée
```go
// ❌ MAUVAIS : plusieurs goroutines écrivent dans la même err
var err error
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
err = doA() // race !
}()
go func() {
defer wg.Done()
err = doB() // race !
}()
wg.Wait()
// ✅ FIX : errgroup ou channels d'erreurs
g, _ := errgroup.WithContext(ctx)
g.Go(func() error { return doA() })
g.Go(func() error { return doB() })
if err := g.Wait(); err != nil {
return err
}
```
### Pattern 5 : Race sur slice append
```go
// ❌ MAUVAIS : append concurrent sur la même slice
var results []int
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
i := i
go func() {
defer wg.Done()
results = append(results, i*i) // race sur len/cap/pointeur
}()
}
wg.Wait()
// ✅ FIX option A : mutex
var mu sync.Mutex
for i := 0; i < 10; i++ {
wg.Add(1)
i := i
go func() {
defer wg.Done()
mu.Lock()
results = append(results, i*i)
mu.Unlock()
}()
}
// ✅ FIX option B : indices pré-alloués (pas de append)
results := make([]int, 10)
for i := 0; i < 10; i++ {
wg.Add(1)
i := i
go func() {
defer wg.Done()
results[i] = i * i // indices disjoints → pas de race
}()
}
```
### Pattern 6 : Race struct fields
```go
// ❌ MAUVAIS : deux goroutines écrivent dans des champs différents de la même struct
type Config struct {
Host string
Port int
}
var cfg Config
go func() { cfg.Host = "localhost" }() // race
go func() { cfg.Port = 8080 }() // race (même struct, même cache line)
// ✅ FIX : construire la struct en dehors, puis pointer swap atomique
var cfgPtr atomic.Pointer[Config]
go func() {
newCfg := &Config{Host: "localhost", Port: 8080}
cfgPtr.Store(newCfg)
}()
```
### Pattern 7 : Send et close non synchronisés
```go
// ❌ MAUVAIS : close et send depuis des goroutines différentes sans coordination
ch := make(chan int)
go func() { ch <- 1 }() // peut paniquer si close arrive en premier
go func() { close(ch) }() // peut paniquer si send arrive après
// ✅ FIX : une seule goroutine ferme, les autres savent qu'elles ne doivent plus envoyer
ch := make(chan int, 1)
done := make(chan struct{})
go func() {
select {
case ch <- 1:
case <-done: // ne pas envoyer si fermé
}
}()
close(done) // signal aux senders
// puis fermer ch séparément depuis le seul owner
```
---
## Section 9 : Singleflight et déduplication
### Problème : cache stampede
Quand un cache expire, des centaines de goroutines lancent simultanément la même requête DB coûteuse.
```go
import "golang.org/x/sync/singleflight"
type WeatherService struct {
cache *Cache
db *DB
group singleflight.Group
}
// GetWeather déduplique les appels simultanés pour la même ville
func (s *WeatherService) GetWeather(ctx context.Context, city string) (*Weather, error) {
// 1. Essayer le cache d'abord
if w, ok := s.cache.Get(city); ok {
return w.(*Weather), nil
}
// 2. Dédupliquer : une seule goroutine fait la requête, les autres attendent
v, err, shared := s.group.Do(city, func() (any, error) {
slog.Debug("cache miss, fetching from DB", "city", city)
w, err := s.db.GetWeather(ctx, city)
if err != nil {
return nil, err
}
s.cache.Set(city, w, 5*time.Minute)
return w, nil
})
if err != nil {
return nil, err
}
if shared {
slog.Debug("request deduplicated", "city", city)
}
return v.(*Weather), nil
}
```
### DoChan — version non-bloquante avec contexte
```go
// DoChan permet d'attendre avec le context (annulable)
func (s *WeatherService) GetWeatherCtx(ctx context.Context, city string) (*Weather, error) {
ch := s.group.DoChan(city, func() (any, error) {
return s.db.GetWeather(context.Background(), city) // noter: Background car le travail continue même si un caller annule
})
select {
case result := <-ch:
if result.Err != nil {
return nil, result.Err
}
return result.Val.(*Weather), nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
```
### Forget — invalider le in-flight
```go
// Utile après une mise à jour : forcer un nouveau fetch même si une requête est en cours
func (s *WeatherService) Invalidate(city string) {
s.cache.Delete(city)
s.group.Forget(city) // prochaine requête ne sera pas dédupliquée avec l'in-flight
}
```
### Cas d'usage singleflight
| Scénario | Bénéfice |
|----------|----------|
| Cache miss concurrent | 1 seule requête DB au lieu de N |
| Token refresh OAuth | 1 seul appel API au lieu de N simultanés |
| Calcul coûteux partagé | 1 seule exécution, résultat partagé |
| Configuration reload | Pas de double fetch sous charge |
---
## Section 10 : Graceful shutdown
### Pattern complet pour serveur HTTP
```go
func main() {
ctx, stop := signal.NotifyContext(context.Background(),
os.Interrupt,
syscall.SIGTERM,
)
defer stop()
srv := &http.Server{
Addr: ":8080",
Handler: buildRouter(),
ReadTimeout: 15 * time.Second,
WriteTimeout: 15 * time.Second,
IdleTimeout: 60 * time.Second,
}
// Démarrer en arrière-plan
srvErr := make(chan error, 1)
go func() {
slog.Info("server starting", "addr", srv.Addr)
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
srvErr <- err
}
}()
// Attendre signal ou erreur fatale
select {
case err := <-srvErr:
slog.Error("server error", "error", err)
os.Exit(1)
case <-ctx.Done():
slog.Info("shutdown signal received")
stop() // libérer les ressources du signal.NotifyContext
}
// Laisser 30 secondes aux requêtes en cours de se terminer
shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := srv.Shutdown(shutdownCtx); err != nil {
slog.Error("graceful shutdown failed", "error", err)
// Force close si timeout
if err := srv.Close(); err != nil {
slog.Error("force close failed", "error", err)
}
os.Exit(1)
}
slog.Info("server stopped gracefully")
}
```
### Shutdown multi-composants ordonné
```go
type App struct {
server *http.Server
workers []*Worker
db *sql.DB
cache *redis.Client
shutdownCh chan struct{}
}
func (a *App) Shutdown(ctx context.Context) error {
close(a.shutdownCh) // signal à tous les composants
// 1. Arrêter le serveur HTTP (plus de nouvelles requêtes)
if err := a.server.Shutdown(ctx); err != nil {
slog.Error("http server shutdown", "error", err)
}
// 2. Arrêter les workers (drainer les jobs en cours)
var wg sync.WaitGroup
for _, w := range a.workers {
wg.Add(1)
go func(worker *Worker) {
defer wg.Done()
worker.Stop()
}(w)
}
workerDone := make(chan struct{})
go func() {
wg.Wait()
close(workerDone)
}()
select {
case <-workerDone:
slog.Info("all workers stopped")
case <-ctx.Done():
slog.Warn("workers forced stop due to timeout")
}
// 3. Fermer les connexions (DB, cache) après le flush des workers
if err := a.db.Close(); err != nil {
slog.Error("db close", "error", err)
}
if err := a.cache.Close(); err != nil {
slog.Error("cache close", "error", err)
}
return nil
}
```
### Kubernetes — terminationGracePeriodSeconds
En Kubernetes, le pod reçoit `SIGTERM`, puis après `terminationGracePeriodSeconds` (défaut 30s) un `SIGKILL`.
```yaml
spec:
terminationGracePeriodSeconds: 60 # plus long si jobs longs
containers:
- name: app
lifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 5"] # laisser le LB supprimer le pod avant SIGTERM
```
```go
// Dans le code : timeout de shutdown = terminationGracePeriodSeconds - marge
const shutdownTimeout = 55 * time.Second // terminationGracePeriodSeconds = 60
```
---
## Section 11 : Circuit breaker et rate limiting
### Circuit breaker avec sony/gobreaker
```go
import "github.com/sony/gobreaker/v2"
type PaymentGateway struct {
cb *gobreaker.CircuitBreaker[*PaymentResponse]
}
func NewPaymentGateway(client *http.Client) *PaymentGateway {
settings := gobreaker.Settings{
Name: "payment-gateway",
MaxRequests: 3, // requêtes en half-open avant de décider
Interval: 60 * time.Second, // reset des compteurs si fermé
Timeout: 30 * time.Second, // durée open avant half-open
ReadyToTrip: func(counts gobreaker.Counts) bool {
failureRatio := float64(counts.TotalFailures) / float64(counts.Requests)
return counts.Requests >= 10 && failureRatio >= 0.6
},
OnStateChange: func(name string, from, to gobreaker.State) {
slog.Warn("circuit breaker state change",
"name", name,
"from", from,
"to", to,
)
},
IsSuccessful: func(err error) bool {
// Certaines erreurs ne doivent pas ouvrir le circuit (ex: validation)
if err == nil {
return true
}
var validationErr *ValidationError
return errors.As(err, &validationErr)
},
}
return &PaymentGateway{
cb: gobreaker.NewCircuitBreaker[*PaymentResponse](settings),
}
}
func (g *PaymentGateway) Charge(ctx context.Context, req ChargeRequest) (*PaymentResponse, error) {
return g.cb.Execute(func() (*PaymentResponse, error) {
return g.doCharge(ctx, req)
})
}
```
### États du circuit breaker
```
Closed (normal) → trop d'erreurs → Open (rejette tout)
Open → après Timeout → Half-Open (teste quelques requêtes)
Half-Open → succès → Closed
Half-Open → échec → Open
```
### Rate limiter — token bucket
```go
import "golang.org/x/time/rate"
// Limiter basique : 100 req/sec avec burst de 20
limiter := rate.NewLimiter(rate.Limit(100), 20)
func (h *APIHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if !limiter.Allow() {
http.Error(w, "rate limit exceeded", http.StatusTooManyRequests)
return
}
h.handle(w, r)
}
// Limiter avec wait (bloque jusqu'à token disponible)
func (c *APIClient) Call(ctx context.Context, req Request) (*Response, error) {
if err := c.limiter.Wait(ctx); err != nil {
return nil, fmt.Errorf("rate limiter: %w", err)
}
return c.doCall(ctx, req)
}
// Rate limiter par clé (ex: par IP, par user)
type PerKeyLimiter struct {
mu sync.RWMutex
limiters map[string]*rate.Limiter
r rate.Limit
burst int
}
func (l *PerKeyLimiter) Allow(key string) bool {
l.mu.RLock()
lim, ok := l.limiters[key]
l.mu.RUnlock()
if !ok {
l.mu.Lock()
// Double-check pattern
if lim, ok = l.limiters[key]; !ok {
lim = rate.NewLimiter(l.r, l.burst)
l.limiters[key] = lim
}
l.mu.Unlock()
}
return lim.Allow()
}
// Limiter avec reservation — prévoir le délai avant de commencer le travail
func scheduledWork(ctx context.Context, limiter *rate.Limiter) error {
r := limiter.Reserve()
if !r.OK() {
return errors.New("rate limiter: impossible to satisfy")
}
delay := r.Delay()
if delay > 0 {
select {
case <-time.After(delay):
case <-ctx.Done():
r.Cancel()
return ctx.Err()
}
}
return doWork(ctx)
}
```
---
## Section 12 : Tester du code concurrent
### Race detector — obligatoire en CI
```bash
# CI pipeline : toujours -race
go test -race ./...
# Avec GORACE pour avoir plus d'infos
GORACE="log_path=/tmp/race halt_on_error=1" go test -race ./...
# Arrêter au premier race pour ne pas noyer les logs
GORACE="halt_on_error=1" go test -race ./...
```
### goleak — détection de goroutine leaks
```go
import "go.uber.org/goleak"
// Au niveau package : couvre tous les tests
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
// Pour un test précis
func TestWorkerPool(t *testing.T) {
defer goleak.VerifyNone(t)
ctx, cancel := context.WithCancel(context.Background())
pool := NewWorkerPool(ctx, 4)
// Lancer du travail
pool.Submit(func() { time.Sleep(10 * time.Millisecond) })
// Annuler et attendre l'arrêt propre
cancel()
pool.Wait()
// goleak vérifie ici qu'aucune goroutine n'a fui
}
```
### Ne jamais utiliser time.Sleep dans les tests
```go
// ❌ MAUVAIS : time.Sleep fragile (trop court en CI lent, trop long localement)
func TestAsync(t *testing.T) {
service.StartAsync()
time.Sleep(100 * time.Millisecond) // flaky !
assert.Equal(t, 1, service.Count())
}
// ✅ BON : attendre avec polling + timeout
func waitUntil(t *testing.T, condition func() bool, timeout time.Duration) {
t.Helper()
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
if condition() {
return
}
time.Sleep(5 * time.Millisecond)
}
t.Fatalf("condition not met after %v", timeout)
}
func TestAsync(t *testing.T) {
service.StartAsync()
waitUntil(t, func() bool { return service.Count() == 1 }, 5*time.Second)
}
// ✅ ENCORE MIEUX : channel de notification
func TestAsyncWithChannel(t *testing.T) {
done := make(chan struct{})
service.OnComplete(func() { close(done) })
service.StartAsync()
select {
case <-done:
// succès
case <-time.After(5 * time.Second):
t.Fatal("timeout waiting for completion")
}
}
```
### Tests parallèles et table-driven
```go
func TestConcurrentCache(t *testing.T) {
tests := []struct {
name string
keys []string
values []string
}{
{
name: "single key",
keys: []string{"k1"},
values: []string{"v1"},
},
{
name: "multiple keys",
keys: []string{"k1", "k2", "k3"},
values: []string{"v1", "v2", "v3"},
},
}
for _, tt := range tests {
tt := tt // Go < 1.22
t.Run(tt.name, func(t *testing.T) {
t.Parallel() // ✅ tests en parallèle
cache := NewCache()
var wg sync.WaitGroup
for i, key := range tt.keys {
wg.Add(1)
key, value := key, tt.values[i]
go func() {
defer wg.Done()
cache.Set(key, value)
}()
}
wg.Wait()
for i, key := range tt.keys {
got, ok := cache.Get(key)
if !ok {
t.Errorf("key %s not found", key)
continue
}
if got != tt.values[i] {
t.Errorf("got %s, want %s", got, tt.values[i])
}
}
})
}
}
```
### Tester l'annulation par contexte
```go
func TestCancellation(t *testing.T) {
t.Run("respects context cancellation", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
started := make(chan struct{})
finished := make(chan error, 1)
go func() {
close(started)
finished <- longRunningWork(ctx)
}()
<-started // attendre que le travail démarre
cancel() // annuler
select {
case err := <-finished:
if !errors.Is(err, context.Canceled) {
t.Errorf("expected context.Canceled, got %v", err)
}
case <-time.After(2 * time.Second):
t.Fatal("work did not stop after cancellation")
}
})
t.Run("respects deadline", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
err := longRunningWork(ctx)
if !errors.Is(err, context.DeadlineExceeded) {
t.Errorf("expected DeadlineExceeded, got %v", err)
}
})
}
```
### Stress testing — augmenter la probabilité de races
```go
// -count=1000 répète le test 1000 fois pour maximiser l'exposition des races
// go test -race -count=1000 -run TestMyFunc ./...
// Test de charge concurrent
func TestConcurrentWrites(t *testing.T) {
if testing.Short() {
t.Skip("skipping stress test in short mode")
}
cache := NewCache()
const goroutines = 100
const opsPerGoroutine = 1000
var wg sync.WaitGroup
for g := 0; g < goroutines; g++ {
wg.Add(1)
g := g
go func() {
defer wg.Done()
for i := 0; i < opsPerGoroutine; i++ {
key := fmt.Sprintf("key-%d-%d", g, i)
cache.Set(key, i)
cache.Get(key)
}
}()
}
wg.Wait()
}
```
### testing/synctest (Go 1.24 — expérimental)
```go
// testing/synctest permet de contrôler le temps dans les tests
// sans time.Sleep réel. Encore expérimental en Go 1.24.
//go:build go1.24
import "testing/synctest"
func TestWithFakeTime(t *testing.T) {
synctest.Run(func() {
done := make(chan struct{})
go func() {
time.Sleep(5 * time.Minute) // ne bloque pas réellement
close(done)
}()
synctest.Wait() // avancer le temps jusqu'au prochain event
select {
case <-done:
// le Sleep de 5 minutes s'est "écoulé" instantanément
default:
t.Fatal("goroutine not done")
}
})
}
```
---
## Section 13 : Anti-patterns — tableau récapitulatif
### Tableau des anti-patterns
| Anti-pattern | Problème | Fix |
|-------------|----------|-----|
| `go func() { heavy() }()` sans tracking | Goroutine orpheline, leak | WaitGroup ou errgroup avec suivi |
| `close(ch)` depuis le receiver | Panic si le sender envoie après | Toujours fermer côté sender |
| `close(ch)` depuis plusieurs goroutines | Double-close = panic | `sync.Once` pour le close |
| Mutex dans une struct copiée par valeur | Mutex invalide | Passer par pointeur `*MyStruct` |
| `defer mu.Unlock()` absent | Deadlock si panic entre Lock et Unlock | Toujours `defer mu.Unlock()` |
| Lock tenu pendant appel réseau/DB | Contention excessive | Fetch sans lock, puis lock pour write |
| `wg.Add(1)` à l'intérieur de la goroutine | Race avec `wg.Wait()` | `wg.Add(1)` avant `go func()` |
| `context.Background()` dans un handler | Ignore l'annulation du client | Utiliser `r.Context()` |
| Valeurs métier dans `context.WithValue` | Couplage caché, lisibilité nulle | Paramètres de fonction explicites |
| `time.Sleep` dans les tests | Tests flaky en CI lent | Channels de notification + timeout |
| Map concurrente sans mutex | Race condition (panic runtime) | `sync.RWMutex` ou `sync.Map` |
| Channel unbuffered dans goroutine qui peut ne pas recevoir | Goroutine leak | Channel bufferisé `make(chan T, 1)` |
| `cancel()` non appelé | Context et ressources leakent | `defer cancel()` immédiatement |
| `context.TODO()` laissé en production | Dette technique invisible | Chercher TODO régulièrement |
| Panic non récupéré dans goroutine | Crash tout le process | Wrapper `safeGo` avec `recover()` |
| select sans `default` dans boucle | Bloque si tous les channels vides | Ajouter `case <-ctx.Done()` |
| `select` avec `default` dans boucle active | Busy-wait = CPU à 100% | `time.After` ou condition variable |
| Ignorer le retour de `errgroup.TryGo` | Travail silencieusement perdu | Checker le bool retourné |
| Worker pool sans drain des channels | Deadlock à l'arrêt | Fermer jobs channel, drainer results |
| Goroutine qui lit `results` avant la fin | Lecture partielle | `wg.Wait()` puis lire |
| `append` concurrent sans mutex | Race sur pointeur/len/cap | Mutex ou indices pré-alloués |
| singleflight sans `Forget` après invalidation | Résultat périmé partagé | `group.Forget(key)` après invalidation |
| Rate limiter global partagé entre tests | Tests interdépendants | Créer un limiter par test |
| `go test` sans `-race` | Races non détectées | Ajouter `-race` dans CI systématiquement |
### Anti-patterns visuels — les plus courants
```go
// ❌ Anti-pattern 1 : goroutine fire-and-forget
go processJob(job) // pas de tracking, pas de récupération d'erreur
// ✅ Fix
g.Go(func() error { return processJob(ctx, job) })
// ---
// ❌ Anti-pattern 2 : channel partiellement drainé
results := make(chan Result, 100)
for _, job := range jobs {
go func(j Job) { results <- process(j) }(job)
}
// Si certaines goroutines n'ont pas de place dans le channel → leak
// ✅ Fix : WaitGroup + close proper
go func() { wg.Wait(); close(results) }()
for r := range results { collect(r) }
// ---
// ❌ Anti-pattern 3 : double close
close(done)
close(done) // panic: close of closed channel
// ✅ Fix
var closeOnce sync.Once
closeOnce.Do(func() { close(done) })
// ---
// ❌ Anti-pattern 4 : goroutine lancée dans init()
func init() {
go backgroundTask() // impossible à arrêter proprement
}
// ✅ Fix : exposer un Start/Stop
type Service struct { stopCh chan struct{} }
func (s *Service) Start() { go s.run(s.stopCh) }
func (s *Service) Stop() { close(s.stopCh) }
```
---
## Section 14 : Checklist production
### Checklist goroutines et channels
- [ ] Chaque goroutine a un owner identifié (qui la démarre, qui l'arrête)
- [ ] Aucune goroutine fire-and-forget sans tracking (WaitGroup, errgroup, ou channel résultat)
- [ ] Toutes les goroutines s'arrêtent sur `ctx.Done()` ou fermeture de channel
- [ ] `safeGo` wrapper avec `recover()` sur toutes les goroutines de prod
- [ ] Channel fermé uniquement côté sender (jamais receiver)
- [ ] `sync.Once` ou coordination explicite si plusieurs goroutines peuvent fermer
- [ ] Types directionnels dans les signatures (`<-chan T`, `chan<- T`)
- [ ] Channels bufferisés là où les goroutines doivent continuer sans bloquer
### Checklist sync primitives
- [ ] Aucun Mutex copié par valeur (passer par pointeur)
- [ ] `defer mu.Unlock()` sur chaque `mu.Lock()` sans exception
- [ ] Aucun I/O ou appel réseau sous un Mutex
- [ ] `wg.Add(N)` avant le `go func()`, jamais à l'intérieur
- [ ] `defer cancel()` immédiatement après chaque `WithCancel/WithTimeout/WithDeadline`
- [ ] Clés de contexte = types non-exportés (jamais strings)
- [ ] Aucune valeur métier dans les valeurs de contexte
### Checklist tests
- [ ] `-race` actif dans le CI pour tous les tests
- [ ] `goleak.VerifyTestMain(m)` dans chaque package avec goroutines
- [ ] Aucun `time.Sleep` dans les tests (remplacer par channels + timeout)
- [ ] Tests concurrents avec `t.Parallel()` sur les tests table-driven
- [ ] Tests d'annulation de contexte (Canceled et DeadlineExceeded)
- [ ] Stress test `go test -race -count=100` pour les structures partagées critiques
### Checklist monitoring production
- [ ] `runtime.NumGoroutine()` exposé en Prometheus (alerte si croissance continue)
- [ ] `/debug/pprof` disponible sur port interne (jamais exposé public)
- [ ] Logs structurés (slog) avec contexte sur chaque erreur de goroutine
- [ ] Circuit breaker sur chaque dépendance externe (DB, API, cache)
- [ ] Rate limiter sur les endpoints publics
- [ ] Graceful shutdown avec drain des workers avant fermeture DB
### Checklist code review concurrence
- [ ] Toute nouvelle goroutine : propriété claire, arrêt garanti
- [ ] Toute nouvelle map partagée : protégée par RWMutex ou sync.Map
- [ ] Tout nouveau canal : fermeture côté sender documentée
- [ ] Tout appel DB/HTTP : utilise les méthodes `*Context` (jamais sans ctx)
- [ ] Tout nouveau Mutex : dans une struct passée par pointeur
- [ ] `go test -race ./...` passe sans avertissements
- [ ] `goleak.VerifyNone(t)` passe pour le test du composant concerné
### Librairies recommandées
| Librairie | Import | Usage |
|-----------|--------|-------|
| errgroup | `golang.org/x/sync/errgroup` | WaitGroup + erreurs + annulation |
| semaphore | `golang.org/x/sync/semaphore` | Concurrence pondérée |
| singleflight | `golang.org/x/sync/singleflight` | Déduplication de requêtes |
| rate | `golang.org/x/time/rate` | Token bucket rate limiting |
| goleak | `go.uber.org/goleak` | Détection goroutine leaks en tests |
| conc | `github.com/sourcegraph/conc` | Goroutines avec panic recovery intégré |
| gobreaker | `github.com/sony/gobreaker/v2` | Circuit breaker production |
| synctest | `testing/synctest` | Contrôle du temps dans les tests (Go 1.24+) |
```bash
go get golang.org/x/sync@latest
go get golang.org/x/time@latest
go get go.uber.org/goleak@latest
go get github.com/sourcegraph/conc@latest
go get github.com/sony/gobreaker/v2@latest
```
### Références
- [The Go Memory Model](https://go.dev/ref/mem) — spec officielle
- [Concurrency Patterns in Go](https://go.dev/talks/2012/concurrency.slide) — Rob Pike
- [Rethinking Classical Concurrency Patterns](https://drive.google.com/file/d/1nPdvhB0PutEJzdCq5ms6UI58dp50fcAN/view) — Bryan Mills
- [Uber Go Style Guide — Goroutines](https://github.com/uber-go/guide/blob/master/style.md#goroutines)
- [go.uber.org/goleak](https://pkg.go.dev/go.uber.org/goleak)
- [github.com/sourcegraph/conc](https://github.com/sourcegraph/conc)
---
## Testing
### Race detector — première ligne de défense
Toujours lancer les tests avec `-race`. Le race detector instrumente le binaire et détecte les accès concurrents non protégés.
```bash
go test -race ./...
# Pour un test spécifique
go test -race -run TestWorkerPool ./internal/jobs/
# En CI : ajouter -race systématiquement
go test -race -count=1 -timeout=60s ./...
```
Le race detector ralentit l'exécution (~2-10x) mais est indispensable. Ne jamais désactiver en CI.
### Tester un worker pool
```go
func TestWorkerPool_ProcessesAllItems(t *testing.T) {
var (
mu sync.Mutex
processed []int
)
items := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
pool := NewWorkerPool(3) // 3 workers
err := pool.Run(context.Background(), items, func(item int) error {
mu.Lock()
processed = append(processed, item)
mu.Unlock()
return nil
})
require.NoError(t, err)
assert.Len(t, processed, len(items))
assert.ElementsMatch(t, items, processed) // ordre non garanti
}
func TestWorkerPool_HandlesPartialErrors(t *testing.T) {
items := []int{1, 2, 3, 4, 5}
pool := NewWorkerPool(2)
err := pool.Run(context.Background(), items, func(item int) error {
if item%2 == 0 {
return fmt.Errorf("item %d failed", item)
}
return nil
})
// Le pool doit retourner les erreurs mais avoir traité tous les items
require.Error(t, err)
var jobErrs *JobErrors
require.ErrorAs(t, err, &jobErrs)
assert.Len(t, jobErrs.Errors, 2) // items 2 et 4
}
```
### Tester l'annulation via context
```go
func TestWorkerPool_RespectsContextCancellation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
// Annuler après 50ms
go func() {
time.Sleep(50 * time.Millisecond)
cancel()
}()
items := make([]int, 100) // beaucoup d'items
processed := atomic.Int32{}
pool := NewWorkerPool(2)
err := pool.Run(ctx, items, func(item int) error {
time.Sleep(10 * time.Millisecond) // travail lent
processed.Add(1)
return nil
})
require.ErrorIs(t, err, context.Canceled)
// Seule une fraction des items a été traitée
assert.Less(t, int(processed.Load()), len(items))
}
```
### Détecter les goroutine leaks avec goleak
```go
import "go.uber.org/goleak"
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m) // Vérifie qu'aucune goroutine ne leak après chaque test
}
// Ou par test individuel
func TestMyService_NoLeak(t *testing.T) {
defer goleak.VerifyNone(t)
svc := NewMyService()
svc.Start()
// ... test ...
svc.Stop() // Si Stop() ne termine pas toutes les goroutines → goleak signale le leak
}
```
### Tester avec timeout pour éviter les deadlocks
```go
func TestChannel_NoDeadlock(t *testing.T) {
done := make(chan struct{})
go func() {
defer close(done)
result := processWithChannels() // fonction à tester
assert.Equal(t, expected, result)
}()
select {
case <-done:
// OK
case <-time.After(5 * time.Second):
t.Fatal("test timed out — possible deadlock")
}
}
```
### sync/synctest (Go 1.24+)
Go 1.24 introduit `testing/synctest` pour tester du code concurrent avec un scheduler déterministe.
```go
// go test -run TestWithFakeTime
func TestRetryWithBackoff(t *testing.T) {
synctest.Run(func() {
attempts := 0
svc := NewRetryService(func() error {
attempts++
if attempts < 3 {
return errors.New("transient error")
}
return nil
}, WithBackoff(time.Second))
go svc.Run(context.Background())
synctest.Wait() // avance le temps virtuel jusqu'à la prochaine goroutine bloquée
assert.Equal(t, 1, attempts)
synctest.Wait() // après le premier backoff
assert.Equal(t, 2, attempts)
synctest.Wait()
assert.Equal(t, 3, attempts)
})
}
```
---
*Last updated: 2025-03 — Revoir si : Go 1.24+ (sync/synctest GA, rangefunc), changements du scheduler Go, ou nouvelles APIs errgroup/singleflight.*