Composer des étapes : le pipeline
Vous savez lancer des goroutines et les faire communiquer. L'étape d'après, c'est de les composer : enchaîner des étapes (stages) reliées par des channels, comme une chaîne de montage. Chaque étape reçoit sur un channel d'entrée, travaille, et envoie sur un channel de sortie.
Le vocabulaire officiel (Go blog) : une source (ou producteur) démarre la chaîne, des étapes intermédiaires transforment, un sink (ou consommateur) termine.
// Étape source : émet 1, 2, 3...
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out) // on ferme quand on a fini d'émettre
}()
return out
}
// Étape intermédiaire : élève au carré
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in { // range lit jusqu'à la fermeture de in
out <- n * n
}
close(out)
}()
return out
}
func main() {
for n := range sq(gen(2, 3, 4)) { // sink : on consomme
fmt.Println(n) // 4, 9, 16
}
}
range de l'étape suivante s'arrête alors tout seul.La règle des pipelines. Celui qui envoie est celui qui ferme le channel (jamais le récepteur). Le consommateur lit en range jusqu'à la fermeture. C'est ce qui propage proprement la fin de la chaîne, sans signal manuel.
Fan-out / fan-in : paralléliser une étape lente
Si une étape est lente (un appel réseau, un calcul lourd), on la démultiplie : plusieurs goroutines lisent le même channel d'entrée (fan-out), puis on fusionne leurs résultats dans un seul channel (fan-in). C'est le worker pool.
// fan-in : fusionne plusieurs channels en un seul
func merge(cs ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
for _, c := range cs {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for n := range c {
out <- n
}
}(c)
}
go func() {
wg.Wait() // goroutine à part : Wait() bloque jusqu'à la fin
close(out) // sinon merge ne pourrait jamais retourner out
}()
return out
}
func main() {
in := gen(2, 3, 4, 5, 6)
// fan-out : 3 workers lisent le même channel in
out := merge(sq(in), sq(in), sq(in))
for n := range out { // fan-in : on consomme la fusion
fmt.Println(n)
}
}
Annuler proprement : context.Context
Comment dire à des goroutines « arrêtez-vous, on n'a plus besoin de vous » ? Un utilisateur ferme l'onglet, un timeout expire, une requête est abandonnée. La réponse standard en Go, c'est context.Context : un signal d'annulation qu'on propage à toutes les goroutines concernées.
func main() {
// Annule automatiquement au bout de 2 secondes
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() // libère les ressources, toujours
resultat := make(chan int)
go travail(ctx, resultat)
select {
case r := <-resultat:
fmt.Println("Résultat :", r)
case <-ctx.Done(): // le contexte est annulé
fmt.Println("Abandonné :", ctx.Err()) // context deadline exceeded
}
}
func travail(ctx context.Context, out chan<- int) {
select {
case <-time.After(5 * time.Second): // travail trop long
out <- 42
case <-ctx.Done(): // on écoute l'annulation
return // on s'arrête proprement
}
}
Convention. Le context.Context se passe toujours en premier argument d'une fonction (func travail(ctx context.Context, ...)). Chaque goroutine qui peut durer écoute ctx.Done() dans un select et s'arrête quand il se ferme. C'est le fil d'annulation qui traverse tout ton programme.
Le piège : les fuites de goroutines
Une goroutine bloquée sur un channel que personne ne lira jamais ne se termine jamais. Elle reste en mémoire, pour toujours. Répétez l'erreur à chaque requête, et la mémoire grimpe lentement jusqu'au crash. C'est une fuite de goroutine.
On lance go func() { val := <-ch }() sur un channel non bufférisé, mais plus personne n'écrira jamais dans ch. La fonction principale continue et se termine. Avant de dérouler : le ramasse-miettes (GC) de Go va-t-il nettoyer automatiquement cette goroutine bloquée ? Que devient-elle ?
Voir la réponse
Non, le GC ne la nettoie pas. Une goroutine ne se termine que lorsque sa fonction retourne. Bloquée indéfiniment sur <-ch (aucune valeur n'arrivera jamais), elle ne retourne jamais. Pour le runtime, elle est simplement « en attente », pas morte : elle conserve sa pile et tout ce qu'elle référence en mémoire. C'est une fuite de goroutine. Multipliée (une par requête), elle épuise la mémoire sans erreur visible. La parade : garantir une porte de sortie à chaque goroutine, par exemple fermer le channel quand la production est finie, propager un context annulable (<-ctx.Done()), ou ajouter un cas select de sortie. Règle : avant chaque go func(), demandez-vous toujours « comment et quand va-t-elle se terminer ? ».
func chercher() int {
ch := make(chan int) // channel NON bufferisé
go func() {
ch <- travailLong() // ⛔ bloque ici pour toujours...
}()
return <-ch // si on ne lit qu'une fois, ok ;
} // mais avec un timeout qui part avant,
// la goroutine reste bloquée sur ch <-
Le scénario classique : on ajoute un select avec timeout côté lecteur, on part sur le timeout, et la goroutine reste coincée à vouloir envoyer dans ch que plus personne ne lit. Deux parades :
- donner au channel un buffer (
make(chan int, 1)) pour que l'envoi ne bloque pas ; - passer un
contextet faire écouterctx.Done()à la goroutine pour qu'elle abandonne.
Toute goroutine lancée doit avoir un chemin garanti vers sa fin. Avant d'écrire go func(), demandez-vous : « qui va lire ce qu'elle envoie, et que se passe-t-il si le lecteur disparaît ? ». Une goroutine sans sortie garantie est une fuite en sommeil.
Pour aller plus loin sur la détection (pprof, goleak) et les 4 patterns qui fuient le plus, j'ai écrit un article dédié : Goroutine leaks en Go : détecter, comprendre, corriger.
À toi de jouer : répare la fuite
Le code ci-dessous lance une goroutine qui envoie dans un channel non bufferisé, mais le select peut partir sur le timeout : la goroutine reste alors bloquée à jamais. Corrige-le (un buffer suffit ici), puis exécute :
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan int) // ← à corriger : make(chan int, 1)
go func() {
time.Sleep(50 * time.Millisecond)
ch <- 42 // si le select est déjà parti, bloque pour toujours
}()
select {
case r := <-ch:
fmt.Println("Résultat :", r)
case <-time.After(100 * time.Millisecond):
fmt.Println("Timeout")
}
}
La correction. Remplace make(chan int) par make(chan int, 1). Avec un buffer de 1, l'envoi ch <- 42 réussit même si plus personne ne lit : la goroutine dépose sa valeur et se termine, au lieu de rester bloquée. À noter : un buffer ne supprime pas le blocage en général, il le diffère (l'envoi bloque quand le buffer est plein) ; ici il n'y a qu'une valeur à envoyer, donc ça suffit. Diagnostiquer la fuite est une chose ; la réparer proprement en est une autre.
Composing stages: the pipeline
You can launch goroutines and make them communicate. The next step is to compose them: chain stages connected by channels, like an assembly line. Each stage receives on an input channel, works, and sends on an output channel.
The official vocabulary (Go blog): a source (or producer) starts the chain, intermediate stages transform, a sink (or consumer) ends it.
// Source stage: emits 1, 2, 3...
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out) // close when done emitting
}()
return out
}
// Intermediate stage: square the numbers
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in { // range reads until in is closed
out <- n * n
}
close(out)
}()
return out
}
func main() {
for n := range sq(gen(2, 3, 4)) { // sink: we consume
fmt.Println(n) // 4, 9, 16
}
}
range then stops on its own.The pipeline rule. The one who sends is the one who closes the channel (never the receiver). The consumer reads with range until it closes. That's what cleanly propagates the end of the chain, with no manual signal.
Fan-out / fan-in: parallelize a slow stage
If a stage is slow (a network call, a heavy computation), you multiply it: several goroutines read the same input channel (fan-out), then you merge their results into a single channel (fan-in). That's the worker pool.
// fan-in: merge several channels into one
func merge(cs ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
for _, c := range cs {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for n := range c {
out <- n
}
}(c)
}
go func() {
wg.Wait() // separate goroutine: Wait() blocks until done
close(out) // otherwise merge could never return out
}()
return out
}
func main() {
in := gen(2, 3, 4, 5, 6)
// fan-out: 3 workers read the same channel in
out := merge(sq(in), sq(in), sq(in))
for n := range out { // fan-in: we consume the merge
fmt.Println(n)
}
}
Cancelling cleanly: context.Context
How do you tell goroutines "stop, we don't need you anymore"? A user closes the tab, a timeout fires, a request is abandoned. The standard answer in Go is context.Context: a cancellation signal you propagate to every goroutine involved.
func main() {
// Auto-cancel after 2 seconds
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() // release resources, always
result := make(chan int)
go work(ctx, result)
select {
case r := <-result:
fmt.Println("Result:", r)
case <-ctx.Done(): // the context was cancelled
fmt.Println("Aborted:", ctx.Err()) // context deadline exceeded
}
}
func work(ctx context.Context, out chan<- int) {
select {
case <-time.After(5 * time.Second): // work takes too long
out <- 42
case <-ctx.Done(): // we listen for cancellation
return // stop cleanly
}
}
Convention. The context.Context is always passed as the first argument of a function (func work(ctx context.Context, ...)). Every goroutine that can run long listens to ctx.Done() in a select and stops when it closes. It's the cancellation thread running through your whole program.
The pitfall: goroutine leaks
A goroutine blocked on a channel that no one will ever read never finishes. It stays in memory, forever. Repeat the mistake on every request and memory creeps up until it crashes. That's a goroutine leak.
We launch go func() { val := <-ch }() on an unbuffered channel, but no one will ever write into ch again. The main function carries on and finishes. Before unfolding: will Go's garbage collector (GC) automatically clean up this blocked goroutine? What happens to it?
See the answer
No, the GC does not clean it up. A goroutine only ends when its function returns. Blocked indefinitely on <-ch (no value will ever arrive), it never returns. From the runtime's perspective, it is simply "waiting", not dead: it keeps its stack and everything it references in memory. That is a goroutine leak. Repeated on every request, it drains memory with no immediate visible error. The fix: give every goroutine a guaranteed exit — close the channel when production is done, propagate a cancellable context (<-ctx.Done()), or add a select exit case. Rule: before every go func(), always ask "how and when will this goroutine finish?".
func search() int {
ch := make(chan int) // UNbuffered channel
go func() {
ch <- longWork() // blocks here forever...
}()
return <-ch // fine if we read exactly once;
} // but with a timeout that fires first,
// the goroutine stays stuck on ch <-
The classic scenario: you add a select with a timeout on the reader side, the timeout wins, and the goroutine stays stuck trying to send into ch that no one reads anymore. Two fixes:
- give the channel a buffer (
make(chan int, 1)) so the send doesn't block; - pass a
contextand have the goroutine listen toctx.Done()so it can give up.
Every goroutine you launch must have a guaranteed path to its end. Before writing go func(), ask: "who will read what it sends, and what happens if the reader disappears?". A goroutine with no guaranteed exit is a leak waiting to happen.
For detection (pprof, goleak) and the 4 patterns that leak the most, I wrote a dedicated article: Goroutine leaks in Go: detect, understand, fix.
Your turn: fix the leak
The code below launches a goroutine that sends into an unbuffered channel, but the select can fire on the timeout: the goroutine then stays blocked forever. Fix it (a buffer is enough here), then run it:
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan int) // ← fix this: make(chan int, 1)
go func() {
time.Sleep(50 * time.Millisecond)
ch <- 42 // if the select already left, blocks forever
}()
select {
case r := <-ch:
fmt.Println("Result:", r)
case <-time.After(100 * time.Millisecond):
fmt.Println("Timeout")
}
}
The fix. Replace make(chan int) with make(chan int, 1). With a buffer of 1, the send ch <- 42 succeeds even if no one reads: the goroutine drops its value and finishes, instead of staying blocked. Note: a buffer doesn't remove blocking in general, it defers it (the send blocks once the buffer is full); here there's only one value to send, so it's enough. Diagnosing the leak is one thing; repairing it cleanly is another.
🎯 Pratique
S'entraîner (clique pour ouvrir) :
✨ Prompt IA
Copiez ce prompt dans Claude ou ChatGPT :
Écris un worker pool en Go : un pipeline qui lit des URLs sur un channel, les télécharge avec 5 workers (fan-out), fusionne les résultats (fan-in), et s'annule proprement via un context.WithTimeout. Explique où une goroutine pourrait fuiter.
💬 Ré-explique sans regarder
L'IA te rend un worker pool avec fan-out/fan-in. Explique avec tes mots : que fait le fan-out, que fait le fan-in, et qui doit close() le channel de sortie ?
fan-out lance plusieurs goroutines (workers) qui lisent le même channel d'entrée, pour traiter les éléments en parallèle ; le fan-in fusionne leurs sorties dans un seul channel via merge. C'est le merge (l'émetteur de la sortie) qui doit close() le channel de sortie, et seulement une fois que tous les workers ont fini (wg.Wait() avant close).⚖️ Juge le code de l'IA
L'IA propose ce code pour lancer un calcul avec timeout. L'accepter ou le rejeter, et pourquoi ?
func calcul() int {
ch := make(chan int)
go func() {
ch <- travailLong()
}()
select {
case r := <-ch:
return r
case <-time.After(time.Second):
return -1
}
}
ch n'est pas bufferisé. Si le time.After gagne, calcul retourne -1, mais la goroutine reste bloquée pour toujours sur ch <- travailLong() : personne ne lira jamais cette valeur. À chaque appel en timeout, une goroutine fuit. Le correctif : make(chan int, 1) (l'envoi ne bloque plus), ou passer un context que la goroutine écoute.🧠 Rappel libre
Sans remonter : que reçoit une goroutine sur ctx.Done(), et qu'est-elle censée faire à ce moment-là ?
ctx.Done() renvoie un channel qui se ferme quand le contexte est annulé (timeout atteint ou cancel() appelé). Une réception sur un channel fermé débloque immédiatement : la goroutine détecte l'annulation dans un select et doit s'arrêter proprement (faire return, libérer ses ressources) au lieu de continuer un travail devenu inutile.Vous savez composer, paralléliser et annuler des goroutines proprement. On met tout ça au service du web : un vrai serveur HTTP et une API REST en Go, avec ses routes et ses réponses JSON.
Leçon 10 : HTTP et API REST →