← Contextes /
sse-streaming-auth.md 286 lignes · 9.4 KB
Personnaliser Télécharger
# CLAUDE.md — SSE avec authentification custom

> Contexte spécialisé pour Claude Code. Coller ce fichier à la racine du projet pour implémenter du streaming SSE authentifié avec fetch + ReadableStream, sans EventSource.

---

## Section 1 : Pourquoi pas EventSource

`EventSource` est l'API native pour consommer du SSE — reconnexion automatique, parsing inclus.
**Mais** : il fait uniquement des requêtes GET, sans headers custom.

```javascript
// EventSource — simple, mais pas d'auth par header possible
const es = new EventSource('/api/v1/jobs/123/sse')
es.onmessage = e => console.log(e.data)
es.onerror = e => console.error('Erreur', e)
```

**Problème :** `X-API-Key`, `Authorization: Bearer` — impossible.

Le contournement habituel (token dans l'URL `?token=xxx`) est à éviter : le token apparaît dans les logs serveur, l'historique navigateur, et les en-têtes `Referer`.

**Règle** : dès qu'il faut de l'auth par header, utiliser `fetch + ReadableStream`.

---

## Section 2 : fetch + ReadableStream — la boucle pump()

Au lieu d'attendre que `response.text()` retourne le body complet, on lit le stream au fur et à mesure via `response.body.getReader()`.

```javascript
var ctrl = new AbortController()

fetch('/api/v1/jobs/' + jobId + '/sse', {
    headers: authHeaders(),   // X-API-Key ou Authorization: Bearer
    signal: ctrl.signal
})
.then(function(r) {
    var reader = r.body.getReader()   // lit le body en streaming
    var decoder = new TextDecoder()
    var buf = ''

    function pump() {
        reader.read().then(function(chunk) {
            if (chunk.done) { return }                        // connexion fermée côté serveur
            buf += decoder.decode(chunk.value, { stream: true })
            var parts = buf.split('\n\n')                     // découpe par événement SSE
            buf = parts.pop()                                 // garde le fragment incomplet
            parts.forEach(parseSSEChunk)                      // traite chaque événement complet
            pump()                                            // rappel récursif asynchrone
        })
    }
    pump()
})
```

**Points clés :**
- `r.body.getReader()` retourne un lecteur de stream — chaque `reader.read()` résout dès que des bytes arrivent, même si le stream est encore ouvert
- `buf.split('\n\n')` — le séparateur d'événements SSE. Un événement complet : `event: chunk\ndata: {"text":"..."}\n\n`
- `buf = parts.pop()` — garde le dernier fragment potentiellement tronqué pour le prochain tour
- La récursion `pump()` est asynchrone (dans `.then()`) : pas de stack overflow, même sur des streams longs

---

## Section 3 : Headers custom et authentification

```javascript
function authHeaders() {
    return {
        'X-API-Key': getApiKey(),
        // ou
        'Authorization': 'Bearer ' + getToken(),
        'Content-Type': 'application/json'
    }
}

// Utilisation dans fetch SSE
fetch(url, {
    headers: authHeaders(),
    signal: ctrl.signal
})
```

---

## Section 4 : TextDecoder avec `{ stream: true }`

**Subtilité critique pour les caractères non-ASCII** (accents, emojis) :

```javascript
// ✅ Correct — permet de découper les chunks UTF-8 sur plusieurs reads
decoder.decode(chunk.value, { stream: true })

// ❌ Incorrect — peut casser les caractères multi-bytes en bout de chunk
decoder.decode(chunk.value)
```

UTF-8 encode les caractères non-ASCII sur 2 à 4 bytes. Un chunk réseau peut se terminer au milieu d'un caractère multi-bytes. Avec `{ stream: true }`, le décodeur maintient un état interne et attend le prochain chunk. Sans cette option, les bytes partiels sont remplacés par le caractère de remplacement Unicode (U+FFFD) et le texte affiché est corrompu.

---

## Section 5 : Parseur SSE manuel

`EventSource` parse automatiquement le format SSE. Avec `fetch`, c'est à implémenter.

Format SSE envoyé par le serveur :

```
event: chunk
data: {"text":"Bonjour"}

event: chunk
data: {"text":" monde"}

event: result
data: {"status":"completed","result":"Bonjour monde"}
```

Parseur :

```javascript
function parseSSEChunk(raw) {
    // raw = "event: chunk\ndata: {\"text\":\"Bonjour\"}"
    var lines = raw.split('\n')
    var eventName = '', dataStr = ''

    for (var k = 0; k < lines.length; k++) {
        if (lines[k].startsWith('event: '))      eventName = lines[k].slice(7)
        else if (lines[k].startsWith('data: '))  dataStr   = lines[k].slice(6)
    }

    if (!dataStr) { return }  // ligne vide ou commentaire SSE

    var data = JSON.parse(dataStr)

    if (eventName === 'chunk') {
        responseBox.textContent += data.text   // affiche au fur et à mesure
    } else if (eventName === 'result') {
        console.log('Terminé :', data.result)
    } else if (eventName === 'error') {
        console.error('Erreur job :', data.message)
    }
}
```

---

## Section 6 : AbortController — annulation et error handling

```javascript
var ctrl = new AbortController()

// Au démarrage du stream
fetch(url, { signal: ctrl.signal, headers: authHeaders() })
    .then(function(r) { /* pump() */ })
    .catch(function(err) {
        if (err.name === 'AbortError') {
            console.log('Stream annulé par l\'utilisateur')
        } else {
            console.error('Erreur réseau :', err)
        }
    })

// Pour annuler (bouton Stop, fermeture modal, navigation)
function stopStream() {
    ctrl.abort()
}
```

L'annulation est propre des deux côtés :
- Navigateur : connexion TCP fermée immédiatement
- Go : `r.Context().Done()` se déclenche, le handler SSE sort de la boucle, `defer Unsubscribe` nettoie le channel. Aucune goroutine ne reste pendue.

---

## Section 7 : Backend SSE en Go

```go
func writeSSEEvent(w http.ResponseWriter, flusher http.Flusher, eventType string, data any) {
    payload, _ := json.Marshal(data)
    fmt.Fprintf(w, "event: %s\ndata: %s\n\n", eventType, payload)
    flusher.Flush() // envoie immédiatement — sans ça, les données restent en buffer
}

func StreamSSE(w http.ResponseWriter, r *http.Request) {
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "streaming non supporté", http.StatusInternalServerError)
        return
    }

    // Headers obligatoires
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")      // évite l'interception par les proxies
    w.Header().Set("Connection", "keep-alive")

    // S'abonner aux événements du job
    ch := broker.Subscribe(jobID)
    defer broker.Unsubscribe(jobID, ch)

    for {
        select {
        case event, ok := <-ch:
            if !ok {
                return  // channel fermé = job terminé
            }
            writeSSEEvent(w, flusher, event.Type, event.Data)
        case <-r.Context().Done():
            return  // client déconnecté — contexte annulé proprement
        }
    }
}
```

**Contrainte non-négociable** : `flusher.Flush()` après chaque événement. Sans ça, les données restent dans le buffer HTTP et n'arrivent pas en temps réel.

---

## Section 8 : Flux complet bout en bout

Architecture avec deux requêtes fetch distinctes :

```
[User clique Send]
    → fetch POST /api/v1/jobs              → CreateJob → SQLite + Queue
    → fetch GET  /api/v1/jobs/{id}/sse     → StreamSSE → Subscribe(id)
                                                ↕ channel Go
                                            Worker → notify("chunk")
                                                → reader.read() → textContent +=
                                            Worker → notifyAndClose("result")
                                                → channel fermé
                                                → chunk.done = true
                                                → pump() s'arrête
```

Le worker Go traite le job dans une goroutine séparée et pousse les événements dans un channel. Le handler SSE lit ce channel et les envoie au navigateur. Quand le job termine, le channel est fermé, `chunk.done` passe à `true`, et `pump()` s'arrête.

---

## Section 9 : Comparaison EventSource vs fetch + ReadableStream

| | EventSource | fetch + ReadableStream |
|---|---|---|
| Headers custom | Non | Oui |
| Auth Bearer / API Key | Non | Oui |
| Méthode HTTP | GET uniquement | GET, POST, PUT… |
| Reconnexion automatique | Oui | A implémenter manuellement |
| Parsing SSE automatique | Oui | Parseur manuel (~20 lignes) |
| Annulation | `es.close()` | `ctrl.abort()` |
| Support navigateurs | Excellent | Excellent |

**Règle de choix** :
- Flux public sans auth → `EventSource`
- Auth par header (API Key, Bearer token) → `fetch + ReadableStream` obligatoire

---

## Section 10 : Reconnexion manuelle (si nécessaire)

`EventSource` reconnecte automatiquement. Avec `fetch`, c'est à implémenter si besoin :

```javascript
var retries = 0
var MAX_RETRIES = 3

function startStream() {
    ctrl = new AbortController()

    fetch(url, { signal: ctrl.signal, headers: authHeaders() })
        .then(function(r) {
            retries = 0 // reset au succès
            // pump()...
        })
        .catch(function(err) {
            if (err.name === 'AbortError') { return } // annulation volontaire, pas de retry
            if (retries < MAX_RETRIES) {
                retries++
                var delay = Math.pow(2, retries) * 1000 // backoff exponentiel
                setTimeout(startStream, delay)
            } else {
                console.error('Stream failed after', MAX_RETRIES, 'retries')
            }
        })
}
```