← Contextes /
grpc-go-streaming.md 344 lignes · 10.1 KB
Personnaliser Télécharger
# CLAUDE.md — gRPC en Go : streaming temps réel pour microservices

> Contexte spécialisé pour Claude Code. Coller ce fichier à la racine du projet pour implémenter du streaming gRPC entre services Go (server streaming, client streaming, bidirectionnel).

---

## Section 1 : Setup Protobuf et génération de code

Tout commence par le fichier `.proto` — source de vérité partagée entre serveur et clients. Ne jamais écrire les structs Go à la main : elles sont générées par `protoc`.

```protobuf
syntax = "proto3";
package pricefeed;
option go_package = "./pb";

message PriceUpdate {
    string pair      = 1;  // "BTC/USDT"
    string exchange  = 2;  // "binance"
    double bid       = 3;
    double ask       = 4;
    int64  timestamp = 5;  // unix millis
}

message SubscribeRequest {
    repeated string pairs = 1;  // ["BTC/USDT", "ETH/USDT"]
}

service PriceFeed {
    // Server streaming : le client s'abonne, le serveur pousse
    rpc Subscribe(SubscribeRequest) returns (stream PriceUpdate);

    // Unary : dernier prix connu
    rpc GetLatest(SubscribeRequest) returns (PriceUpdate);
}
```

Génération du code Go :

```bash
protoc --go_out=. --go_opt=paths=source_relative \
       --go-grpc_out=. --go-grpc_opt=paths=source_relative \
       pricefeed.proto
```

Résultat : deux fichiers dans `./pb/` — structs et interfaces serveur/client. **Ne jamais modifier ces fichiers à la main** : ils seront écrasés à la prochaine génération.

---

## Section 2 : Les 4 modes de streaming

Choisir le bon mode selon le pattern de communication.

### Unary — requête / réponse classique

```protobuf
rpc GetLatest(SubscribeRequest) returns (PriceUpdate);
```

Comportement REST classique. Idéal pour lectures ponctuelles. Pas de `stream` dans la signature.

### Server streaming — abonnement à un flux

```protobuf
rpc Subscribe(SubscribeRequest) returns (stream PriceUpdate);
```

Le client envoie **une** requête, le serveur pousse autant de réponses qu'il veut. Parfait pour les feeds : prix, métriques, logs.

### Client streaming — envoi de données en batch

```protobuf
rpc BatchOrders(stream Order) returns (BatchResult);
```

Le client envoie un flux de requêtes, le serveur répond **une seule fois** à la fin. Cas d'usage : ingestion en bulk, flux d'ordres à agréger.

### Bidirectionnel — flux dans les deux sens

```protobuf
rpc MarketDataFeed(stream MarketQuery) returns (stream MarketEvent);
```

Le mode le plus complexe à implémenter et déboguer. Pour la plupart des besoins microservices, server streaming ou unary suffisent. Le bidirectionnel est souvent sur-ingénié.

---

## Section 3 : Implémentation serveur Go

Le serveur implémente l'interface générée par `protoc`. Toujours embedder `pb.UnimplementedXxxServer` pour la compatibilité forward.

```go
type PriceFeedServer struct {
    pb.UnimplementedPriceFeedServer
    updates chan *pb.PriceUpdate
}

func (s *PriceFeedServer) Subscribe(req *pb.SubscribeRequest, stream pb.PriceFeed_SubscribeServer) error {
    pairs := make(map[string]bool)
    for _, p := range req.Pairs {
        pairs[p] = true
    }

    for {
        select {
        case update := <-s.updates:
            if !pairs[update.Pair] {
                continue
            }
            if err := stream.Send(update); err != nil {
                // Client déconnecté — pas une erreur serveur
                return nil
            }
        case <-stream.Context().Done():
            return nil  // Client a annulé la souscription
        }
    }
}
```

**Points critiques :**
- `stream.Send()` retourne une erreur si le client est déconnecté. Retourner `nil` est intentionnel : déconnexion normale, pas une erreur serveur.
- `stream.Context().Done()` catch les cancellations et timeouts. Sans ça, la goroutine continue après déconnexion — goroutine leak.
- Le channel `updates` est alimenté par la goroutine qui collecte les données. Le serveur gRPC ne fait que distribuer.

Démarrage du serveur :

```go
func main() {
    updates := make(chan *pb.PriceUpdate, 100)

    srv := grpc.NewServer()
    pb.RegisterPriceFeedServer(srv, &PriceFeedServer{updates: updates})

    lis, err := net.Listen("tcp", ":50051")
    if err != nil {
        slog.Error("failed to listen", "error", err)
        os.Exit(1)
    }

    slog.Info("gRPC server listening", "addr", ":50051")
    if err := srv.Serve(lis); err != nil {
        slog.Error("serve error", "error", err)
        os.Exit(1)
    }
}
```

---

## Section 4 : Implémentation client Go

La connexion gRPC est réutilisable — la créer une fois et l'injecter dans les services qui en ont besoin.

```go
func connectPriceFeed(ctx context.Context, addr string) error {
    conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
    if err != nil {
        return fmt.Errorf("dial: %w", err)
    }
    defer conn.Close()

    client := pb.NewPriceFeedClient(conn)
    stream, err := client.Subscribe(ctx, &pb.SubscribeRequest{
        Pairs: []string{"BTC/USDT", "ETH/USDT"},
    })
    if err != nil {
        return fmt.Errorf("subscribe: %w", err)
    }

    for {
        update, err := stream.Recv()
        if err == io.EOF {
            return nil  // Serveur a fermé le stream proprement
        }
        if err != nil {
            return fmt.Errorf("recv: %w", err)
        }
        slog.Info("price update",
            "pair", update.Pair,
            "bid", update.Bid,
            "ask", update.Ask,
            "exchange", update.Exchange,
        )
    }
}
```

**Notes importantes :**
- `insecure.NewCredentials()` : pour dev local et communications intra-cluster (mTLS géré par le service mesh). En production sur réseau public : utiliser `credentials.NewClientTLSFromFile()` ou `credentials.NewTLS()`.
- La boucle `stream.Recv()` est bloquante sans consommer de CPU — HTTP/2 gère ça, pas de polling actif.
- Quand le contexte est annulé, `Recv()` retourne une erreur et la boucle s'arrête proprement.

---

## Section 5 : Error handling gRPC

gRPC a son propre système de codes d'erreur. Les mapper correctement évite les malentendus entre services.

```go
import "google.golang.org/grpc/codes"
import "google.golang.org/grpc/status"

// Côté serveur : retourner des erreurs typées
func (s *PriceFeedServer) GetLatest(ctx context.Context, req *pb.SubscribeRequest) (*pb.PriceUpdate, error) {
    if len(req.Pairs) == 0 {
        return nil, status.Error(codes.InvalidArgument, "at least one pair required")
    }

    update, ok := s.cache.Get(req.Pairs[0])
    if !ok {
        return nil, status.Errorf(codes.NotFound, "no data for pair %s", req.Pairs[0])
    }

    return update, nil
}

// Côté client : inspecter le code d'erreur
update, err := client.GetLatest(ctx, req)
if err != nil {
    st, ok := status.FromError(err)
    if ok {
        switch st.Code() {
        case codes.NotFound:
            // Pas d'erreur critique — la paire n'est pas encore disponible
            return nil
        case codes.Unavailable:
            // Serveur temporairement indisponible — retry
            return fmt.Errorf("server unavailable, retry later: %w", err)
        default:
            return fmt.Errorf("grpc error %s: %w", st.Code(), err)
        }
    }
    return fmt.Errorf("unexpected error: %w", err)
}
```

**Codes fréquents :**
- `codes.OK` — succès
- `codes.InvalidArgument` — paramètre invalide (équivalent HTTP 400)
- `codes.NotFound` — ressource absente (équivalent HTTP 404)
- `codes.Unavailable` — service temporairement indisponible, retryable (équivalent HTTP 503)
- `codes.DeadlineExceeded` — timeout de contexte dépassé

---

## Section 6 : Testing gRPC

Tester les services gRPC avec `bufconn` — un listener in-memory qui évite les vraies connexions réseau.

```go
import "google.golang.org/grpc/test/bufconn"

const bufSize = 1024 * 1024

func setupTestServer(t *testing.T) (pb.PriceFeedClient, func()) {
    t.Helper()

    lis := bufconn.Listen(bufSize)
    srv := grpc.NewServer()

    updates := make(chan *pb.PriceUpdate, 10)
    pb.RegisterPriceFeedServer(srv, &PriceFeedServer{updates: updates})

    go srv.Serve(lis)

    conn, err := grpc.DialContext(context.Background(), "bufnet",
        grpc.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) {
            return lis.DialContext(ctx)
        }),
        grpc.WithTransportCredentials(insecure.NewCredentials()),
    )
    if err != nil {
        t.Fatalf("dial bufconn: %v", err)
    }

    client := pb.NewPriceFeedClient(conn)

    cleanup := func() {
        conn.Close()
        srv.GracefulStop()
    }

    return client, cleanup
}

func TestSubscribe(t *testing.T) {
    client, cleanup := setupTestServer(t)
    defer cleanup()

    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    stream, err := client.Subscribe(ctx, &pb.SubscribeRequest{Pairs: []string{"BTC/USDT"}})
    if err != nil {
        t.Fatalf("subscribe: %v", err)
    }

    // Injecter un update et vérifier la réception
    // ...
}
```

---

## Section 7 : Deployment et debugging

Outils pour déboguer en production sans Postman :

```bash
# Installer grpcurl
go install github.com/fullstorydev/grpcurl/cmd/grpcurl@latest

# Lister les services disponibles
grpcurl -plaintext localhost:50051 list

# Lister les méthodes d'un service
grpcurl -plaintext localhost:50051 list pricefeed.PriceFeed

# Appel unary
grpcurl -plaintext -d '{"pairs": ["BTC/USDT"]}' localhost:50051 pricefeed.PriceFeed/GetLatest

# Server streaming (ctrl+C pour arrêter)
grpcurl -plaintext -d '{"pairs": ["BTC/USDT", "ETH/USDT"]}' localhost:50051 pricefeed.PriceFeed/Subscribe
```

Graceful shutdown :

```go
srv := grpc.NewServer()
// ...

quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit

slog.Info("shutting down gRPC server")
srv.GracefulStop()  // Attend que les streams en cours se terminent
```

**Quand choisir gRPC vs REST :**
- gRPC : services internes Go, besoin de streaming, contrats stricts entre équipes
- REST : API publiques, clients navigateur, clients tiers non-Go
- Ne pas gRPCifier une API publique — `grpc-web` ajoute de la complexité sans bénéfice notable