Leçon 9/11 11 min

Concurrence avancée

Pipelines, fan-out/fan-in, annulation avec context.Context et fuites de goroutines : les patterns avancés de concurrence en Go.

Composer des étapes : le pipeline

Vous savez lancer des goroutines et les faire communiquer. L'étape d'après, c'est de les composer : enchaîner des étapes (stages) reliées par des channels, comme une chaîne de montage. Chaque étape reçoit sur un channel d'entrée, travaille, et envoie sur un channel de sortie.

Le vocabulaire officiel (Go blog) : une source (ou producteur) démarre la chaîne, des étapes intermédiaires transforment, un sink (ou consommateur) termine.

// Étape source : émet 1, 2, 3...
func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)            // on ferme quand on a fini d'émettre
    }()
    return out
}

// Étape intermédiaire : élève au carré
func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {   // range lit jusqu'à la fermeture de in
            out <- n * n
        }
        close(out)
    }()
    return out
}

func main() {
    for n := range sq(gen(2, 3, 4)) {  // sink : on consomme
        fmt.Println(n)                  // 4, 9, 16
    }
}
Un pipeline : la source gen émet des nombres dans un channel, l'étape sq les élève au carré dans un autre channel, et main les consomme. gen() source : 2,3,4 sq() élève au carré main sink : affiche chan int chan int
Chaque étape ferme son channel de sortie quand elle a fini ; le range de l'étape suivante s'arrête alors tout seul.

La règle des pipelines. Celui qui envoie est celui qui ferme le channel (jamais le récepteur). Le consommateur lit en range jusqu'à la fermeture. C'est ce qui propage proprement la fin de la chaîne, sans signal manuel.

Fan-out / fan-in : paralléliser une étape lente

Si une étape est lente (un appel réseau, un calcul lourd), on la démultiplie : plusieurs goroutines lisent le même channel d'entrée (fan-out), puis on fusionne leurs résultats dans un seul channel (fan-in). C'est le worker pool.

// fan-in : fusionne plusieurs channels en un seul
func merge(cs ...<-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup
    for _, c := range cs {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for n := range c {
                out <- n
            }
        }(c)
    }
    go func() {
        wg.Wait()   // goroutine à part : Wait() bloque jusqu'à la fin
        close(out)  // sinon merge ne pourrait jamais retourner out
    }()
    return out
}

func main() {
    in := gen(2, 3, 4, 5, 6)
    // fan-out : 3 workers lisent le même channel in
    out := merge(sq(in), sq(in), sq(in))
    for n := range out {   // fan-in : on consomme la fusion
        fmt.Println(n)
    }
}
Fan-out fan-in : un channel d'entrée est lu par trois workers en parallèle, dont les sorties sont fusionnées par merge dans un seul channel de sortie. in un channel worker 1 worker 2 worker 3 fan-out merge fan-in out
Fan-out : N workers lisent le même channel. Fan-in : leurs sorties sont fusionnées en une seule.

Annuler proprement : context.Context

Comment dire à des goroutines « arrêtez-vous, on n'a plus besoin de vous » ? Un utilisateur ferme l'onglet, un timeout expire, une requête est abandonnée. La réponse standard en Go, c'est context.Context : un signal d'annulation qu'on propage à toutes les goroutines concernées.

func main() {
    // Annule automatiquement au bout de 2 secondes
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()  // libère les ressources, toujours

    resultat := make(chan int)
    go travail(ctx, resultat)

    select {
    case r := <-resultat:
        fmt.Println("Résultat :", r)
    case <-ctx.Done():               // le contexte est annulé
        fmt.Println("Abandonné :", ctx.Err())  // context deadline exceeded
    }
}

func travail(ctx context.Context, out chan<- int) {
    select {
    case <-time.After(5 * time.Second):  // travail trop long
        out <- 42
    case <-ctx.Done():                   // on écoute l'annulation
        return                          // on s'arrête proprement
    }
}

Convention. Le context.Context se passe toujours en premier argument d'une fonction (func travail(ctx context.Context, ...)). Chaque goroutine qui peut durer écoute ctx.Done() dans un select et s'arrête quand il se ferme. C'est le fil d'annulation qui traverse tout ton programme.

Le piège : les fuites de goroutines

Une goroutine bloquée sur un channel que personne ne lira jamais ne se termine jamais. Elle reste en mémoire, pour toujours. Répétez l'erreur à chaque requête, et la mémoire grimpe lentement jusqu'au crash. C'est une fuite de goroutine.

Prédisez avant de lire

On lance go func() { val := <-ch }() sur un channel non bufférisé, mais plus personne n'écrira jamais dans ch. La fonction principale continue et se termine. Avant de dérouler : le ramasse-miettes (GC) de Go va-t-il nettoyer automatiquement cette goroutine bloquée ? Que devient-elle ?

Voir la réponse

Non, le GC ne la nettoie pas. Une goroutine ne se termine que lorsque sa fonction retourne. Bloquée indéfiniment sur <-ch (aucune valeur n'arrivera jamais), elle ne retourne jamais. Pour le runtime, elle est simplement « en attente », pas morte : elle conserve sa pile et tout ce qu'elle référence en mémoire. C'est une fuite de goroutine. Multipliée (une par requête), elle épuise la mémoire sans erreur visible. La parade : garantir une porte de sortie à chaque goroutine, par exemple fermer le channel quand la production est finie, propager un context annulable (<-ctx.Done()), ou ajouter un cas select de sortie. Règle : avant chaque go func(), demandez-vous toujours « comment et quand va-t-elle se terminer ? ».

func chercher() int {
    ch := make(chan int)        // channel NON bufferisé
    go func() {
        ch <- travailLong()     // ⛔ bloque ici pour toujours...
    }()
    return <-ch                 // si on ne lit qu'une fois, ok ;
}                               // mais avec un timeout qui part avant,
                                // la goroutine reste bloquée sur ch <-

Le scénario classique : on ajoute un select avec timeout côté lecteur, on part sur le timeout, et la goroutine reste coincée à vouloir envoyer dans ch que plus personne ne lit. Deux parades :

  • donner au channel un buffer (make(chan int, 1)) pour que l'envoi ne bloque pas ;
  • passer un context et faire écouter ctx.Done() à la goroutine pour qu'elle abandonne.

Toute goroutine lancée doit avoir un chemin garanti vers sa fin. Avant d'écrire go func(), demandez-vous : « qui va lire ce qu'elle envoie, et que se passe-t-il si le lecteur disparaît ? ». Une goroutine sans sortie garantie est une fuite en sommeil.

Pour aller plus loin sur la détection (pprof, goleak) et les 4 patterns qui fuient le plus, j'ai écrit un article dédié : Goroutine leaks en Go : détecter, comprendre, corriger.

À toi de jouer : répare la fuite

Le code ci-dessous lance une goroutine qui envoie dans un channel non bufferisé, mais le select peut partir sur le timeout : la goroutine reste alors bloquée à jamais. Corrige-le (un buffer suffit ici), puis exécute :

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan int) // ← à corriger : make(chan int, 1)
    go func() {
        time.Sleep(50 * time.Millisecond)
        ch <- 42 // si le select est déjà parti, bloque pour toujours
    }()

    select {
    case r := <-ch:
        fmt.Println("Résultat :", r)
    case <-time.After(100 * time.Millisecond):
        fmt.Println("Timeout")
    }
}

La correction. Remplace make(chan int) par make(chan int, 1). Avec un buffer de 1, l'envoi ch <- 42 réussit même si plus personne ne lit : la goroutine dépose sa valeur et se termine, au lieu de rester bloquée. À noter : un buffer ne supprime pas le blocage en général, il le diffère (l'envoi bloque quand le buffer est plein) ; ici il n'y a qu'une valeur à envoyer, donc ça suffit. Diagnostiquer la fuite est une chose ; la réparer proprement en est une autre.

Composing stages: the pipeline

You can launch goroutines and make them communicate. The next step is to compose them: chain stages connected by channels, like an assembly line. Each stage receives on an input channel, works, and sends on an output channel.

The official vocabulary (Go blog): a source (or producer) starts the chain, intermediate stages transform, a sink (or consumer) ends it.

// Source stage: emits 1, 2, 3...
func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)            // close when done emitting
    }()
    return out
}

// Intermediate stage: square the numbers
func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {   // range reads until in is closed
            out <- n * n
        }
        close(out)
    }()
    return out
}

func main() {
    for n := range sq(gen(2, 3, 4)) {  // sink: we consume
        fmt.Println(n)                  // 4, 9, 16
    }
}
A pipeline: the source gen emits numbers into a channel, the sq stage squares them into another channel, and main consumes them. gen() source: 2,3,4 sq() squares them main sink: prints chan int chan int
Each stage closes its output channel when done; the next stage's range then stops on its own.

The pipeline rule. The one who sends is the one who closes the channel (never the receiver). The consumer reads with range until it closes. That's what cleanly propagates the end of the chain, with no manual signal.

Fan-out / fan-in: parallelize a slow stage

If a stage is slow (a network call, a heavy computation), you multiply it: several goroutines read the same input channel (fan-out), then you merge their results into a single channel (fan-in). That's the worker pool.

// fan-in: merge several channels into one
func merge(cs ...<-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup
    for _, c := range cs {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for n := range c {
                out <- n
            }
        }(c)
    }
    go func() {
        wg.Wait()   // separate goroutine: Wait() blocks until done
        close(out)  // otherwise merge could never return out
    }()
    return out
}

func main() {
    in := gen(2, 3, 4, 5, 6)
    // fan-out: 3 workers read the same channel in
    out := merge(sq(in), sq(in), sq(in))
    for n := range out {   // fan-in: we consume the merge
        fmt.Println(n)
    }
}
Fan-out fan-in: one input channel is read by three workers in parallel, whose outputs are merged by merge into a single output channel. in one channel worker 1 worker 2 worker 3 fan-out merge fan-in out
Fan-out: N workers read the same channel. Fan-in: their outputs are merged into one.

Cancelling cleanly: context.Context

How do you tell goroutines "stop, we don't need you anymore"? A user closes the tab, a timeout fires, a request is abandoned. The standard answer in Go is context.Context: a cancellation signal you propagate to every goroutine involved.

func main() {
    // Auto-cancel after 2 seconds
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()  // release resources, always

    result := make(chan int)
    go work(ctx, result)

    select {
    case r := <-result:
        fmt.Println("Result:", r)
    case <-ctx.Done():               // the context was cancelled
        fmt.Println("Aborted:", ctx.Err())  // context deadline exceeded
    }
}

func work(ctx context.Context, out chan<- int) {
    select {
    case <-time.After(5 * time.Second):  // work takes too long
        out <- 42
    case <-ctx.Done():                   // we listen for cancellation
        return                          // stop cleanly
    }
}

Convention. The context.Context is always passed as the first argument of a function (func work(ctx context.Context, ...)). Every goroutine that can run long listens to ctx.Done() in a select and stops when it closes. It's the cancellation thread running through your whole program.

The pitfall: goroutine leaks

A goroutine blocked on a channel that no one will ever read never finishes. It stays in memory, forever. Repeat the mistake on every request and memory creeps up until it crashes. That's a goroutine leak.

Predict before reading

We launch go func() { val := <-ch }() on an unbuffered channel, but no one will ever write into ch again. The main function carries on and finishes. Before unfolding: will Go's garbage collector (GC) automatically clean up this blocked goroutine? What happens to it?

See the answer

No, the GC does not clean it up. A goroutine only ends when its function returns. Blocked indefinitely on <-ch (no value will ever arrive), it never returns. From the runtime's perspective, it is simply "waiting", not dead: it keeps its stack and everything it references in memory. That is a goroutine leak. Repeated on every request, it drains memory with no immediate visible error. The fix: give every goroutine a guaranteed exit — close the channel when production is done, propagate a cancellable context (<-ctx.Done()), or add a select exit case. Rule: before every go func(), always ask "how and when will this goroutine finish?".

func search() int {
    ch := make(chan int)        // UNbuffered channel
    go func() {
        ch <- longWork()        // blocks here forever...
    }()
    return <-ch                 // fine if we read exactly once;
}                               // but with a timeout that fires first,
                                // the goroutine stays stuck on ch <-

The classic scenario: you add a select with a timeout on the reader side, the timeout wins, and the goroutine stays stuck trying to send into ch that no one reads anymore. Two fixes:

  • give the channel a buffer (make(chan int, 1)) so the send doesn't block;
  • pass a context and have the goroutine listen to ctx.Done() so it can give up.

Every goroutine you launch must have a guaranteed path to its end. Before writing go func(), ask: "who will read what it sends, and what happens if the reader disappears?". A goroutine with no guaranteed exit is a leak waiting to happen.

For detection (pprof, goleak) and the 4 patterns that leak the most, I wrote a dedicated article: Goroutine leaks in Go: detect, understand, fix.

Your turn: fix the leak

The code below launches a goroutine that sends into an unbuffered channel, but the select can fire on the timeout: the goroutine then stays blocked forever. Fix it (a buffer is enough here), then run it:

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan int) // ← fix this: make(chan int, 1)
    go func() {
        time.Sleep(50 * time.Millisecond)
        ch <- 42 // if the select already left, blocks forever
    }()

    select {
    case r := <-ch:
        fmt.Println("Result:", r)
    case <-time.After(100 * time.Millisecond):
        fmt.Println("Timeout")
    }
}

The fix. Replace make(chan int) with make(chan int, 1). With a buffer of 1, the send ch <- 42 succeeds even if no one reads: the goroutine drops its value and finishes, instead of staying blocked. Note: a buffer doesn't remove blocking in general, it defers it (the send blocks once the buffer is full); here there's only one value to send, so it's enough. Diagnosing the leak is one thing; repairing it cleanly is another.

🎯 Pratique

S'entraîner (clique pour ouvrir) :

Prompt IA
Avec l'IA

Copiez ce prompt dans Claude ou ChatGPT :

Écris un worker pool en Go : un pipeline qui lit des URLs sur un channel, les télécharge avec 5 workers (fan-out), fusionne les résultats (fan-in), et s'annule proprement via un context.WithTimeout. Explique où une goroutine pourrait fuiter.
💬 Ré-explique sans regarder
Ré-explique sans regarder

L'IA te rend un worker pool avec fan-out/fan-in. Explique avec tes mots : que fait le fan-out, que fait le fan-in, et qui doit close() le channel de sortie ?

Une bonne explication dit : le fan-out lance plusieurs goroutines (workers) qui lisent le même channel d'entrée, pour traiter les éléments en parallèle ; le fan-in fusionne leurs sorties dans un seul channel via merge. C'est le merge (l'émetteur de la sortie) qui doit close() le channel de sortie, et seulement une fois que tous les workers ont fini (wg.Wait() avant close).
⚖️ Juge le code de l'IA
Accepter ou rejeter le code de l'IA

L'IA propose ce code pour lancer un calcul avec timeout. L'accepter ou le rejeter, et pourquoi ?

func calcul() int {
    ch := make(chan int)
    go func() {
        ch <- travailLong()
    }()
    select {
    case r := <-ch:
        return r
    case <-time.After(time.Second):
        return -1
    }
}
À rejeter : fuite de goroutine. Le channel ch n'est pas bufferisé. Si le time.After gagne, calcul retourne -1, mais la goroutine reste bloquée pour toujours sur ch <- travailLong() : personne ne lira jamais cette valeur. À chaque appel en timeout, une goroutine fuit. Le correctif : make(chan int, 1) (l'envoi ne bloque plus), ou passer un context que la goroutine écoute.
🧠 Rappel libre
Rappel libre

Sans remonter : que reçoit une goroutine sur ctx.Done(), et qu'est-elle censée faire à ce moment-là ?

ctx.Done() renvoie un channel qui se ferme quand le contexte est annulé (timeout atteint ou cancel() appelé). Une réception sur un channel fermé débloque immédiatement : la goroutine détecte l'annulation dans un select et doit s'arrêter proprement (faire return, libérer ses ressources) au lieu de continuer un travail devenu inutile.
Dans un pipeline, qui doit fermer (close) un channel ?
À quoi sert context.Context ?
Une goroutine envoie dans un channel non bufferisé, mais le lecteur est parti sur un timeout. Que se passe-t-il, et comment l'éviter ?
Prochaine étape

Vous savez composer, paralléliser et annuler des goroutines proprement. On met tout ça au service du web : un vrai serveur HTTP et une API REST en Go, avec ses routes et ses réponses JSON.

Leçon 10 : HTTP et API REST →

Une erreur dans cette leçon, un passage flou, une question ? Écrivez-moi : chaque retour améliore ce cours.

Besoin d'un développeur pour votre projet ?

Réponse sous 24h · Sans engagement