diff --git a/internal/api/handlers.go b/internal/api/handlers.go index a7c1bdc..751d496 100644 --- a/internal/api/handlers.go +++ b/internal/api/handlers.go @@ -44,12 +44,41 @@ type handlers struct { // nil (dev/test path), upload() falls back to a detached goroutine. workq *workq.Pool - // Upload progress tracking + // Upload progress tracking. jobProgress is the legacy plain-string + // channel — kept so existing tests / callers without a job channel + // still see the most recent state. jobEvents holds the structured + // per-phase event log used by the SSE handler. uploadMu sync.Mutex jobProgress map[string]string + jobEvents map[string]*jobProgressLog jobCounter atomic.Int64 } +// jobProgressLog accumulates structured upload events for one job. +// Append is O(1), the SSE handler walks the slice from a per-stream +// cursor. done is set when a terminal event lands; the handler uses +// it to flush and close the stream. +type jobProgressLog struct { + mu sync.Mutex + events []uploadEvent + done bool + cond *sync.Cond +} + +// uploadEvent is the JSON shape streamed over GET /api/upload/progress. +// Field order intentionally matches the wire format documented in +// docs/rest-api.md so the UI hook can rely on it. +type uploadEvent struct { + JobID string `json:"job_id"` + File string `json:"file,omitempty"` + Phase string `json:"phase"` + ChunksDone int `json:"chunks_done,omitempty"` + ChunksTotal int `json:"chunks_total,omitempty"` + Message string `json:"message,omitempty"` + Done bool `json:"done"` + Error string `json:"error,omitempty"` +} + // resolveStore is the single entry point for every doc handler. It // pulls the slug off ctx and returns the matching per-project store. // A missing/empty slug or open failure becomes a 500 — the project @@ -575,27 +604,103 @@ func (h *handlers) upload(w http.ResponseWriter, r *http.Request) { slog.Info("📦 upload job queued", "job_id", jobID, "files", len(paths), "project", slug) h.setProgress(jobID, fmt.Sprintf("queued: %d files", len(paths))) + h.appendEvent(jobID, uploadEvent{ + JobID: jobID, + Phase: "queued", + Message: fmt.Sprintf("queued: %d files", len(paths)), + }) job := func(ctx context.Context) { defer os.RemoveAll(tmpDir) pl := pipeline.New(st, h.provider, h.cfg) + + // Per-job pipeline progress channel. Buffered so a slow SSE + // consumer never stalls indexing; the pipeline emit() helper is + // also non-blocking so we are belt-and-braces here. + progressCh := make(chan pipeline.ProgressEvent, 32) + stopRelay := make(chan struct{}) + var relayDone sync.WaitGroup + relayDone.Add(1) + go func() { + defer relayDone.Done() + for { + select { + case evt, ok := <-progressCh: + if !ok { + return + } + errStr := "" + if evt.Error != nil { + errStr = evt.Error.Error() + } + h.appendEvent(jobID, uploadEvent{ + JobID: jobID, + File: evt.File, + Phase: evt.Phase, + ChunksDone: evt.ChunksDone, + ChunksTotal: evt.ChunksTotal, + Message: evt.Message, + Done: evt.Done, + Error: errStr, + }) + case <-stopRelay: + return + } + } + }() + for _, p := range paths { if ctx.Err() != nil { slog.Warn("🛑 upload indexing cancelled on shutdown", "job_id", jobID, "file", filepath.Base(p)) h.setProgress(jobID, "cancelled") + h.finishEvent(jobID, uploadEvent{ + JobID: jobID, + File: filepath.Base(p), + Phase: "cancelled", + Message: "cancelled on shutdown", + Done: true, + }) + close(stopRelay) + relayDone.Wait() return } slog.Info("📦 upload indexing file", "job_id", jobID, "file", filepath.Base(p)) h.setProgress(jobID, fmt.Sprintf("indexing: %s", filepath.Base(p))) - if err := pl.IndexPath(ctx, p, pipeline.IndexOptions{}); err != nil { + h.appendEvent(jobID, uploadEvent{ + JobID: jobID, + File: filepath.Base(p), + Phase: "indexing", + Message: "indexing " + filepath.Base(p), + }) + if err := pl.IndexPath(ctx, p, pipeline.IndexOptions{Progress: progressCh}); err != nil { slog.Error("❌ upload indexing failed", "job_id", jobID, "file", filepath.Base(p), "err", err) h.setProgress(jobID, fmt.Sprintf("error: %v", err)) + h.finishEvent(jobID, uploadEvent{ + JobID: jobID, + File: filepath.Base(p), + Phase: "error", + Message: err.Error(), + Done: true, + Error: err.Error(), + }) + close(stopRelay) + relayDone.Wait() return } } h.setProgress(jobID, "finalizing") + h.appendEvent(jobID, uploadEvent{ + JobID: jobID, + Phase: "finalize", + Message: "running community detection and summaries", + }) if err := pl.Finalize(ctx, false, true); err != nil { slog.Warn("⚠️ upload finalization failed", "job_id", jobID, "err", err) + h.appendEvent(jobID, uploadEvent{ + JobID: jobID, + Phase: "finalize", + Message: "finalize warning: " + err.Error(), + }) } // Invalidate the vector index for this project so the next // search rebuild picks up the newly-indexed chunks. @@ -604,6 +709,14 @@ func (h *handlers) upload(w http.ResponseWriter, r *http.Request) { } slog.Info("✅ upload job complete", "job_id", jobID, "files", len(paths), "project", slug) h.setProgress(jobID, "done") + h.finishEvent(jobID, uploadEvent{ + JobID: jobID, + Phase: "done", + Message: fmt.Sprintf("indexed %d files", len(paths)), + Done: true, + }) + close(stopRelay) + relayDone.Wait() } if h.workq == nil { @@ -613,11 +726,25 @@ func (h *handlers) upload(w http.ResponseWriter, r *http.Request) { if err := h.workq.Submit(job); err != nil { if errors.Is(err, workq.ErrQueueFull) { h.setProgress(jobID, "rejected: indexing queue full") + h.finishEvent(jobID, uploadEvent{ + JobID: jobID, + Phase: "error", + Message: "indexing queue full; retry later", + Done: true, + Error: "indexing queue full", + }) w.Header().Set("Retry-After", "30") writeError(w, r, http.StatusServiceUnavailable, "indexing queue full; retry later", nil) return } h.setProgress(jobID, "rejected: server unavailable") + h.finishEvent(jobID, uploadEvent{ + JobID: jobID, + Phase: "error", + Message: "server shutting down", + Done: true, + Error: "server shutting down", + }) writeError(w, r, http.StatusServiceUnavailable, "server shutting down", err) return } @@ -666,10 +793,69 @@ func (h *handlers) clearProgress(jobID string) { delete(h.jobProgress, jobID) } +// appendEvent records a structured upload event for jobID. Wakes any +// SSE streamers blocked on the cond. Done flag must NOT be set here — +// terminal events go through finishEvent so the cleanup path stays +// in one place. +func (h *handlers) appendEvent(jobID string, evt uploadEvent) { + if jobID == "" { + return + } + log := h.getOrCreateEventLog(jobID) + log.mu.Lock() + log.events = append(log.events, evt) + log.cond.Broadcast() + log.mu.Unlock() +} + +// finishEvent appends a terminal event and marks the log done. +// SSE streamers wake, flush the final event, and disconnect. +func (h *handlers) finishEvent(jobID string, evt uploadEvent) { + if jobID == "" { + return + } + log := h.getOrCreateEventLog(jobID) + log.mu.Lock() + evt.Done = true + log.events = append(log.events, evt) + log.done = true + log.cond.Broadcast() + log.mu.Unlock() +} + +// getOrCreateEventLog returns the per-job structured log, allocating +// on first access. Holds uploadMu only briefly. +func (h *handlers) getOrCreateEventLog(jobID string) *jobProgressLog { + h.uploadMu.Lock() + defer h.uploadMu.Unlock() + if h.jobEvents == nil { + h.jobEvents = make(map[string]*jobProgressLog) + } + log, ok := h.jobEvents[jobID] + if !ok { + log = &jobProgressLog{} + log.cond = sync.NewCond(&log.mu) + h.jobEvents[jobID] = log + } + return log +} + +// clearEventLog drops the per-job structured log. Called after the SSE +// stream observes a terminal state so the map does not grow unbounded. +func (h *handlers) clearEventLog(jobID string) { + if jobID == "" { + return + } + h.uploadMu.Lock() + defer h.uploadMu.Unlock() + delete(h.jobEvents, jobID) +} + func (h *handlers) uploadProgress(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") flusher, ok := w.(http.Flusher) if !ok { @@ -677,10 +863,72 @@ func (h *handlers) uploadProgress(w http.ResponseWriter, r *http.Request) { return } - // P1-1: filter by job_id. When present, only emit events for the - // specific job and terminate when THAT job reaches done/error. + // Filter by job_id when present. Without one, fall back to the + // legacy plain-text relay so older clients still see something. jobID := r.URL.Query().Get("job_id") + if jobID == "" { + h.uploadProgressLegacy(w, r, flusher) + return + } + + log := h.getOrCreateEventLog(jobID) + ctx := r.Context() + cursor := 0 + + // Wake the cond from a watcher goroutine when the client cancels; + // otherwise sync.Cond.Wait would block forever. + doneCh := make(chan struct{}) + defer close(doneCh) + go func() { + select { + case <-ctx.Done(): + log.mu.Lock() + log.cond.Broadcast() + log.mu.Unlock() + case <-doneCh: + } + }() + + enc := json.NewEncoder(w) + for { + log.mu.Lock() + for cursor >= len(log.events) && !log.done && ctx.Err() == nil { + log.cond.Wait() + } + // Snapshot the events we have not yet flushed. + batch := append([]uploadEvent(nil), log.events[cursor:]...) + cursor = len(log.events) + terminal := log.done + log.mu.Unlock() + + if ctx.Err() != nil { + return + } + + for _, evt := range batch { + if _, err := w.Write([]byte("data: ")); err != nil { + return + } + if err := enc.Encode(evt); err != nil { + return + } + // json.Encoder appends a newline; SSE needs a blank line. + if _, err := w.Write([]byte("\n")); err != nil { + return + } + flusher.Flush() + } + + if terminal { + h.clearEventLog(jobID) + return + } + } +} +// uploadProgressLegacy preserves the Wave-A plain-text behaviour for +// callers that connect without a job_id query parameter. +func (h *handlers) uploadProgressLegacy(w http.ResponseWriter, r *http.Request, flusher http.Flusher) { ctx := r.Context() lastMsg := "" ticker := time.NewTicker(500 * time.Millisecond) @@ -691,14 +939,12 @@ func (h *handlers) uploadProgress(w http.ResponseWriter, r *http.Request) { case <-ctx.Done(): return case <-ticker.C: - msg, _ := h.progressForJob(jobID) - + msg, _ := h.progressForJob("") if msg != "" && msg != lastMsg { fmt.Fprintf(w, "data: %s\n\n", msg) flusher.Flush() lastMsg = msg if msg == "done" || strings.HasPrefix(msg, "error:") { - h.clearProgress(jobID) return } } diff --git a/internal/api/upload_progress_test.go b/internal/api/upload_progress_test.go index e332d66..245d26e 100644 --- a/internal/api/upload_progress_test.go +++ b/internal/api/upload_progress_test.go @@ -2,6 +2,7 @@ package api import ( "context" + "encoding/json" "net/http" "net/http/httptest" "strings" @@ -17,9 +18,13 @@ import ( func TestUploadProgress_JobIDFiltering(t *testing.T) { h := &handlers{} - // Seed two jobs in progress. + // Seed two jobs in progress via the structured event log (the new + // canonical store) AND the legacy plain-string map (kept for + // progressForJob fall-back). h.setProgress("job-A", "indexing: a.md") h.setProgress("job-B", "indexing: b.md") + h.appendEvent("job-A", uploadEvent{JobID: "job-A", Phase: "indexing", File: "a.md", Message: "indexing: a.md"}) + h.appendEvent("job-B", uploadEvent{JobID: "job-B", Phase: "indexing", File: "b.md", Message: "indexing: b.md"}) // Verify progressForJob filters correctly. if m, _ := h.progressForJob("job-A"); m != "indexing: a.md" { @@ -53,12 +58,11 @@ func TestUploadProgress_JobIDFiltering(t *testing.T) { defer cancelA() // Complete job B; recA must not terminate. - time.Sleep(700 * time.Millisecond) // one tick at 500 ms - h.setProgress("job-B", "done") + time.Sleep(200 * time.Millisecond) + h.finishEvent("job-B", uploadEvent{JobID: "job-B", Phase: "done", Message: "done"}) - // Wait another tick; A should still be streaming (the goroutine - // should not have finished yet). - time.Sleep(700 * time.Millisecond) + // Wait long enough for any spurious wake; A should still be streaming. + time.Sleep(200 * time.Millisecond) select { case <-streamDone(wgA): t.Fatalf("stream A terminated when job B completed; SSE filtering broken. body=%q", @@ -68,7 +72,7 @@ func TestUploadProgress_JobIDFiltering(t *testing.T) { } // Now complete job A — stream should terminate. - h.setProgress("job-A", "done") + h.finishEvent("job-A", uploadEvent{JobID: "job-A", Phase: "done", Message: "done"}) select { case <-streamDone(wgA): // ok @@ -77,19 +81,112 @@ func TestUploadProgress_JobIDFiltering(t *testing.T) { t.Fatalf("stream A did not terminate after job A done") } - // Verify job A was pruned from the map. - if _, ok := h.progressForJob("job-A"); ok { - t.Fatalf("job A not cleared from progress map after done") + // Verify job A was pruned from the structured event map. + h.uploadMu.Lock() + _, present := h.jobEvents["job-A"] + h.uploadMu.Unlock() + if present { + t.Fatalf("job A not cleared from event map after done") } - // Job A's body should contain its own events but never "indexing: b.md". + // Job A's body should contain its own events but never job B's. body := recA.Body.String() - if !strings.Contains(body, "indexing: a.md") { + if !strings.Contains(body, "a.md") { t.Errorf("stream A missed its own event; body=%q", body) } - if strings.Contains(body, "indexing: b.md") { + if strings.Contains(body, "b.md") { t.Errorf("stream A leaked job B's event; body=%q", body) } + + // Each data: line must be a valid JSON event with job_id == "job-A". + for _, line := range strings.Split(body, "\n") { + if !strings.HasPrefix(line, "data: ") { + continue + } + var evt uploadEvent + if err := json.Unmarshal([]byte(strings.TrimPrefix(line, "data: ")), &evt); err != nil { + t.Errorf("malformed JSON event %q: %v", line, err) + continue + } + if evt.JobID != "job-A" { + t.Errorf("event leaked from job %q on stream A: %+v", evt.JobID, evt) + } + } +} + +// TestUploadProgress_StructuredEventFormat asserts the SSE wire format +// the UI hook depends on: every emission is a `data: {json}\n\n` frame +// whose JSON parses into uploadEvent and surfaces phase/file/chunk +// counters from the pipeline ProgressEvent. +func TestUploadProgress_StructuredEventFormat(t *testing.T) { + h := &handlers{} + + // Pre-seed a small phase sequence mirroring what indexFile emits. + h.appendEvent("job-X", uploadEvent{JobID: "job-X", Phase: "queued", Message: "queued: 1 files"}) + h.appendEvent("job-X", uploadEvent{JobID: "job-X", Phase: "chunk", File: "doc.md", ChunksTotal: 5, Message: "split into 5 chunks"}) + h.appendEvent("job-X", uploadEvent{JobID: "job-X", Phase: "embed", File: "doc.md", ChunksDone: 5, ChunksTotal: 5, Message: "embedded 5/5 chunks"}) + h.appendEvent("job-X", uploadEvent{JobID: "job-X", Phase: "extract_entities", File: "doc.md", Message: "extracting entities and relationships"}) + h.appendEvent("job-X", uploadEvent{JobID: "job-X", Phase: "extract_claims", File: "doc.md", Message: "claims extracted"}) + h.appendEvent("job-X", uploadEvent{JobID: "job-X", Phase: "structure", File: "doc.md", Message: "structure summary complete"}) + h.finishEvent("job-X", uploadEvent{JobID: "job-X", Phase: "done", Message: "indexed 1 files"}) + + req := httptest.NewRequest(http.MethodGet, "/api/upload/progress?job_id=job-X", nil) + rec := httptest.NewRecorder() + h.uploadProgress(rec, req) + + body := rec.Body.String() + if ct := rec.Header().Get("Content-Type"); ct != "text/event-stream" { + t.Errorf("Content-Type = %q; want text/event-stream", ct) + } + + var phases []string + for _, line := range strings.Split(body, "\n") { + if !strings.HasPrefix(line, "data: ") { + continue + } + var evt uploadEvent + if err := json.Unmarshal([]byte(strings.TrimPrefix(line, "data: ")), &evt); err != nil { + t.Fatalf("malformed JSON event %q: %v", line, err) + } + phases = append(phases, evt.Phase) + if evt.JobID != "job-X" { + t.Errorf("event %q has wrong job_id %q", evt.Phase, evt.JobID) + } + if evt.Phase == "embed" { + if evt.ChunksDone != 5 || evt.ChunksTotal != 5 { + t.Errorf("embed phase chunk counts = %d/%d; want 5/5", evt.ChunksDone, evt.ChunksTotal) + } + } + } + + // At least 6 distinct phase events plus the terminal one. + wantPhases := []string{"queued", "chunk", "embed", "extract_entities", "extract_claims", "structure", "done"} + for _, want := range wantPhases { + found := false + for _, p := range phases { + if p == want { + found = true + break + } + } + if !found { + t.Errorf("missing phase %q in stream; saw %v", want, phases) + } + } + + // Terminal event must set done:true. + lastEvent := uploadEvent{} + for _, line := range strings.Split(body, "\n") { + if !strings.HasPrefix(line, "data: ") { + continue + } + var evt uploadEvent + _ = json.Unmarshal([]byte(strings.TrimPrefix(line, "data: ")), &evt) + lastEvent = evt + } + if !lastEvent.Done { + t.Errorf("terminal event missing done:true: %+v", lastEvent) + } } // streamDone returns a chan that closes when the handler goroutine diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index 65e6fd5..9a9a47c 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -40,11 +40,34 @@ func timeStage(stage string, fn func() error) error { } // ProgressEvent sent over progress channel. +// +// Phase is one of: "queued", "load", "chunk", "embed", "extract_entities", +// "extract_relationships", "extract_claims", "structure", "finalize", +// "done", "error". Done=true marks the terminal event for a job. +// +// ChunksDone / ChunksTotal are populated for the "embed" phase and any +// chunk-aware phases; other phases leave them at 0. File is the basename +// of the file currently being processed (empty for job-wide events). type ProgressEvent struct { - Phase string - Message string - Done bool - Error error + Phase string + Message string + File string + ChunksDone int + ChunksTotal int + Done bool + Error error +} + +// emit sends a non-blocking ProgressEvent on opts.Progress when the +// channel is non-nil. A slow consumer never stalls the pipeline. +func emit(opts IndexOptions, evt ProgressEvent) { + if opts.Progress == nil { + return + } + select { + case opts.Progress <- evt: + default: + } } // Pipeline orchestrates the 5-phase GraphRAG pipeline. @@ -210,6 +233,8 @@ func (p *Pipeline) indexFile(ctx context.Context, path string, opts IndexOptions return fmt.Errorf("resolve absolute path: %w", err) } path = absPath + fileBase := filepath.Base(path) + emit(opts, ProgressEvent{Phase: "load", File: fileBase, Message: "loading " + fileBase}) // Phase-5 cheap short-circuit: if mtime matches a previously-indexed // snapshot, skip hashing AND reading the whole file. Only useful when @@ -276,6 +301,12 @@ func (p *Pipeline) indexFile(ctx context.Context, path string, opts IndexOptions } slog.Debug("📄 indexing file", "path", path, "version", nextVersion, "chunks", len(chunks), "doc_type", doc.DocType) + emit(opts, ProgressEvent{ + Phase: "chunk", + File: fileBase, + ChunksTotal: len(chunks), + Message: fmt.Sprintf("split into %d chunks", len(chunks)), + }) docID := uuid.New().String() if err := p.store.UpsertDocument(ctx, &store.Document{ @@ -317,17 +348,38 @@ func (p *Pipeline) indexFile(ctx context.Context, path string, opts IndexOptions // graph-only flow); chunks are still persisted, downstream extraction // uses raw text rather than vectors. CLAUDE.md guarantees this no-op path. if p.embedder != nil { + emit(opts, ProgressEvent{ + Phase: "embed", + File: fileBase, + ChunksDone: 0, + ChunksTotal: len(texts), + Message: fmt.Sprintf("embedding 0/%d chunks", len(texts)), + }) vecs, err := p.embedder.EmbedTexts(ctx, texts) if err != nil { return fmt.Errorf("embed: %w", err) } slog.Debug("📊 chunks embedded", "path", path, "chunks", len(vecs)) + emit(opts, ProgressEvent{ + Phase: "embed", + File: fileBase, + ChunksDone: len(vecs), + ChunksTotal: len(texts), + Message: fmt.Sprintf("embedded %d/%d chunks", len(vecs), len(texts)), + }) if err := p.store.BatchUpsertEmbeddings(ctx, p.provider.ModelID(), chunkIDs, vecs); err != nil { return fmt.Errorf("batch store embeddings: %w", err) } } else { slog.Debug("⏭️ skipping embedding (provider=none)", "path", path, "chunks", len(texts)) + emit(opts, ProgressEvent{ + Phase: "embed", + File: fileBase, + ChunksDone: len(texts), + ChunksTotal: len(texts), + Message: "embedding skipped (no provider)", + }) } // Phase 2: Run graph extraction, claims, and structured doc in parallel @@ -339,25 +391,61 @@ func (p *Pipeline) indexFile(ctx context.Context, path string, opts IndexOptions ) if p.cfg.Indexing.ExtractGraph { + emit(opts, ProgressEvent{ + Phase: "extract_entities", + File: fileBase, + Message: "extracting entities and relationships", + }) wg2.Add(1) go func() { defer wg2.Done() graphErr = p.extractGraph(ctx, docID, texts) + if graphErr == nil { + emit(opts, ProgressEvent{ + Phase: "extract_relationships", + File: fileBase, + Message: "entities and relationships extracted", + }) + } }() } if p.cfg.Indexing.ExtractClaims { + emit(opts, ProgressEvent{ + Phase: "extract_claims", + File: fileBase, + Message: "extracting claims", + }) wg2.Add(1) go func() { defer wg2.Done() claimsErr = p.extractClaims(ctx, docID, texts) + if claimsErr == nil { + emit(opts, ProgressEvent{ + Phase: "extract_claims", + File: fileBase, + Message: "claims extracted", + }) + } }() } + emit(opts, ProgressEvent{ + Phase: "structure", + File: fileBase, + Message: "summarising document structure", + }) wg2.Add(1) go func() { defer wg2.Done() structureErr = p.structureDocument(ctx, docID, doc.Content) + if structureErr == nil { + emit(opts, ProgressEvent{ + Phase: "structure", + File: fileBase, + Message: "structure summary complete", + }) + } }() wg2.Wait() diff --git a/ui/src/hooks/api/__tests__/useUploadProgress.test.tsx b/ui/src/hooks/api/__tests__/useUploadProgress.test.tsx new file mode 100644 index 0000000..7c84c62 --- /dev/null +++ b/ui/src/hooks/api/__tests__/useUploadProgress.test.tsx @@ -0,0 +1,64 @@ +import { describe, it, expect } from "vitest"; +import { parseSseChunk } from "../useUploadProgress"; + +describe("parseSseChunk", () => { + it("parses a single complete JSON SSE frame", () => { + const input = + `data: {"job_id":"job-1","phase":"queued","done":false,"message":"queued: 1 files"}\n\n`; + const { events, rest } = parseSseChunk(input); + expect(rest).toBe(""); + expect(events).toHaveLength(1); + expect(events[0]).toMatchObject({ + job_id: "job-1", + phase: "queued", + done: false, + message: "queued: 1 files", + }); + }); + + it("parses multiple frames in one chunk and preserves order", () => { + const input = + `data: {"job_id":"j","phase":"chunk","chunks_total":5,"done":false}\n\n` + + `data: {"job_id":"j","phase":"embed","chunks_done":3,"chunks_total":5,"done":false}\n\n` + + `data: {"job_id":"j","phase":"done","done":true}\n\n`; + const { events, rest } = parseSseChunk(input); + expect(rest).toBe(""); + expect(events).toHaveLength(3); + expect(events.map((e) => e.phase)).toEqual(["chunk", "embed", "done"]); + expect(events[1]?.chunks_done).toBe(3); + expect(events[2]?.done).toBe(true); + }); + + it("returns the unfinished tail when a frame is split across chunks", () => { + const head = `data: {"job_id":"j","phase":"chunk","done":false}\n\ndata: {"job_id":"j",`; + const { events, rest } = parseSseChunk(head); + expect(events).toHaveLength(1); + expect(rest).toBe(`data: {"job_id":"j",`); + + const tail = rest + `"phase":"done","done":true}\n\n`; + const second = parseSseChunk(tail); + expect(second.events).toHaveLength(1); + expect(second.events[0]?.phase).toBe("done"); + expect(second.rest).toBe(""); + }); + + it("treats plain-text legacy frames as synthetic message events", () => { + const input = `data: queued: 1 files\n\ndata: error: boom\n\n`; + const { events } = parseSseChunk(input); + expect(events).toHaveLength(2); + expect(events[0]?.phase).toBe("message"); + expect(events[0]?.message).toBe("queued: 1 files"); + expect(events[0]?.done).toBe(false); + expect(events[1]?.phase).toBe("message"); + expect(events[1]?.done).toBe(true); + expect(events[1]?.error).toBe("boom"); + }); + + it("ignores non-data SSE lines (comments, retry, id)", () => { + const input = + `: heartbeat\nretry: 1000\nid: 42\ndata: {"job_id":"j","phase":"done","done":true}\n\n`; + const { events } = parseSseChunk(input); + expect(events).toHaveLength(1); + expect(events[0]?.phase).toBe("done"); + }); +}); diff --git a/ui/src/hooks/api/keys.ts b/ui/src/hooks/api/keys.ts index 16b27e4..d1841e4 100644 --- a/ui/src/hooks/api/keys.ts +++ b/ui/src/hooks/api/keys.ts @@ -14,4 +14,5 @@ export const qk = { entities: (project: string) => ["entities", project] as const, communities: (project: string) => ["communities", project] as const, activity: (project: string) => ["activity", project] as const, + uploadProgress: (jobId: string) => ["upload-progress", jobId] as const, }; diff --git a/ui/src/hooks/api/useUploadProgress.ts b/ui/src/hooks/api/useUploadProgress.ts new file mode 100644 index 0000000..c0f8bbb --- /dev/null +++ b/ui/src/hooks/api/useUploadProgress.ts @@ -0,0 +1,228 @@ +import { useEffect, useRef, useState, useSyncExternalStore } from "react"; + +// UploadProgressEvent mirrors api/handlers.go uploadEvent. Keep the +// field names in sync — the SSE stream is the contract. +export interface UploadProgressEvent { + job_id: string; + file?: string; + phase: string; + chunks_done?: number; + chunks_total?: number; + message?: string; + done: boolean; + error?: string; +} + +export interface UploadProgressState { + jobId: string; + file: string; + phase: string; + chunksDone: number; + chunksTotal: number; + message: string; + done: boolean; + error: string; + history: UploadProgressEvent[]; +} + +const empty: UploadProgressState = { + jobId: "", + file: "", + phase: "", + chunksDone: 0, + chunksTotal: 0, + message: "", + done: false, + error: "", + history: [], +}; + +function reduce(prev: UploadProgressState, evt: UploadProgressEvent): UploadProgressState { + return { + jobId: evt.job_id, + file: evt.file ?? prev.file, + phase: evt.phase, + chunksDone: evt.chunks_done ?? 0, + chunksTotal: evt.chunks_total ?? prev.chunksTotal, + message: evt.message ?? "", + done: evt.done, + error: evt.error ?? "", + history: [...prev.history, evt], + }; +} + +// parseSseChunk extracts complete `data: {json}\n\n` frames from a +// streaming buffer. Returns the events parsed and the leftover tail +// (an unfinished frame). Exported for unit tests. +export function parseSseChunk(buffer: string): { + events: UploadProgressEvent[]; + rest: string; +} { + const events: UploadProgressEvent[] = []; + let cursor = 0; + while (cursor < buffer.length) { + const sep = buffer.indexOf("\n\n", cursor); + if (sep === -1) break; + const frame = buffer.slice(cursor, sep); + cursor = sep + 2; + // SSE allows multi-line frames; we only care about `data: ...` lines. + const dataLines: string[] = []; + for (const line of frame.split("\n")) { + if (line.startsWith("data: ")) { + dataLines.push(line.slice(6)); + } else if (line.startsWith("data:")) { + dataLines.push(line.slice(5)); + } + } + if (dataLines.length === 0) continue; + const payload = dataLines.join("\n").trim(); + if (!payload) continue; + try { + const parsed = JSON.parse(payload) as UploadProgressEvent; + // Skip plain-text fallback frames (legacy "queued: 1 files" etc.) + if (typeof parsed === "object" && parsed && typeof parsed.phase === "string") { + events.push(parsed); + } + } catch { + // Plain-text legacy frame — surface it as a synthetic event so + // older servers still produce *something*. + events.push({ + job_id: "", + phase: "message", + message: payload, + done: payload === "done" || payload.startsWith("error:"), + error: payload.startsWith("error:") ? payload.slice(7).trim() : undefined, + }); + } + } + return { events, rest: buffer.slice(cursor) }; +} + +// Tiny per-job store that React subscribes to via useSyncExternalStore. +// We use a class-free shape to keep the bundle slim. +type Listener = () => void; +function createStore() { + const states = new Map(); + const listeners = new Set(); + return { + get(jobId: string): UploadProgressState { + return states.get(jobId) ?? { ...empty, jobId }; + }, + snapshot(): Map { + return states; + }, + apply(jobId: string, evt: UploadProgressEvent) { + const prev = states.get(jobId) ?? { ...empty, jobId }; + states.set(jobId, reduce(prev, evt)); + listeners.forEach((l) => l()); + }, + reset(jobId: string) { + states.delete(jobId); + listeners.forEach((l) => l()); + }, + subscribe(l: Listener) { + listeners.add(l); + return () => { + listeners.delete(l); + }; + }, + }; +} + +const sharedStore = createStore(); + +// useUploadProgress connects to GET /api/upload/progress for jobId. +// Returns the latest reduced state; `null` when jobId is empty/undefined. +// The connection auto-closes once the stream emits done:true. +export function useUploadProgress(jobId: string | null | undefined): UploadProgressState | null { + const activeJobRef = useRef(null); + const [, setTick] = useState(0); + + const state = useSyncExternalStore( + sharedStore.subscribe, + () => (jobId ? sharedStore.get(jobId) : null), + () => (jobId ? sharedStore.get(jobId) : null), + ); + + useEffect(() => { + if (!jobId) { + activeJobRef.current = null; + return; + } + if (activeJobRef.current === jobId) return; + activeJobRef.current = jobId; + + // EventSource for native SSE. credentials: include is implicit; + // EventSource always sends cookies on same-origin. + const url = `/api/upload/progress?job_id=${encodeURIComponent(jobId)}`; + let closed = false; + let buffer = ""; + + // Prefer fetch + ReadableStream so we can handle the structured JSON + // frame shape uniformly (EventSource auto-strips the `data: ` prefix + // but we want the exact byte stream the handler emits to keep the + // parser shared with tests). + const controller = new AbortController(); + fetch(url, { + method: "GET", + credentials: "include", + headers: { Accept: "text/event-stream" }, + signal: controller.signal, + }) + .then(async (res) => { + if (!res.ok || !res.body) { + sharedStore.apply(jobId, { + job_id: jobId, + phase: "error", + message: `progress stream failed: HTTP ${res.status}`, + done: true, + error: `HTTP ${res.status}`, + }); + return; + } + const reader = res.body.getReader(); + const decoder = new TextDecoder(); + while (!closed) { + const { value, done } = await reader.read(); + if (done) break; + buffer += decoder.decode(value, { stream: true }); + const { events, rest } = parseSseChunk(buffer); + buffer = rest; + for (const evt of events) { + sharedStore.apply(jobId, evt); + if (evt.done) { + closed = true; + controller.abort(); + break; + } + } + } + }) + .catch((err) => { + if (!closed && err?.name !== "AbortError") { + sharedStore.apply(jobId, { + job_id: jobId, + phase: "error", + message: String(err), + done: true, + error: String(err), + }); + } + }); + + setTick((t) => t + 1); + return () => { + closed = true; + controller.abort(); + }; + }, [jobId]); + + return state; +} + +// resetUploadProgress drops the stored state for a job. Useful when the +// upload modal closes and the caller wants the next upload to start +// from a clean slate. +export function resetUploadProgress(jobId: string): void { + sharedStore.reset(jobId); +} diff --git a/ui/src/routes/documents/UploadModal.tsx b/ui/src/routes/documents/UploadModal.tsx index 8c28805..be0784a 100644 --- a/ui/src/routes/documents/UploadModal.tsx +++ b/ui/src/routes/documents/UploadModal.tsx @@ -1,45 +1,183 @@ import { Dialog, DialogContent, DialogHeader, DialogTitle } from "@/components/ui/dialog"; import { useProjectStore } from "@/stores/project"; import { apiFetch } from "@/lib/api-client"; -import { useState } from "react"; +import { useEffect, useMemo, useState } from "react"; import { useQueryClient } from "@tanstack/react-query"; import { qk } from "@/hooks/api/keys"; +import { + resetUploadProgress, + useUploadProgress, + type UploadProgressEvent, +} from "@/hooks/api/useUploadProgress"; + +interface UploadResponse { + job_id: string; + status: string; +} + +interface PerFileState { + phase: string; + message: string; + chunksDone: number; + chunksTotal: number; + error: string; +} + +const PHASE_LABELS: Record = { + queued: "Queued", + load: "Loading", + chunk: "Chunking", + indexing: "Indexing", + embed: "Embedding", + extract_entities: "Extracting entities", + extract_relationships: "Linking relationships", + extract_claims: "Extracting claims", + structure: "Summarising", + finalize: "Finalising graph", + done: "Done", + error: "Failed", + cancelled: "Cancelled", +}; + +function reduceFiles(events: UploadProgressEvent[]): Map { + const byFile = new Map(); + for (const evt of events) { + const key = evt.file || "(job)"; + const prev = byFile.get(key) ?? { + phase: "", + message: "", + chunksDone: 0, + chunksTotal: 0, + error: "", + }; + byFile.set(key, { + phase: evt.phase, + message: evt.message ?? prev.message, + chunksDone: evt.chunks_done ?? prev.chunksDone, + chunksTotal: evt.chunks_total ?? prev.chunksTotal, + error: evt.error ?? prev.error, + }); + } + return byFile; +} export function UploadModal({ open, onOpenChange }: { open: boolean; onOpenChange: (v: boolean) => void }) { const project = useProjectStore((s) => s.slug); const qc = useQueryClient(); const [busy, setBusy] = useState(false); const [err, setErr] = useState(null); + const [jobId, setJobId] = useState(null); + + const progress = useUploadProgress(jobId); + + // When the SSE stream finishes (success or failure), refresh the + // documents list and stop showing "uploading…". The modal stays open + // so the user can read the final phase before dismissing. + useEffect(() => { + if (!progress?.done) return; + setBusy(false); + qc.invalidateQueries({ queryKey: qk.docs(project) }); + qc.invalidateQueries({ queryKey: qk.stats(project) }); + }, [progress?.done, qc, project]); + + // Reset state when the modal closes so a re-open starts clean. + useEffect(() => { + if (open) return; + if (jobId) resetUploadProgress(jobId); + setJobId(null); + setErr(null); + }, [open, jobId]); async function onFiles(files: FileList | null) { if (!files || files.length === 0) return; - setBusy(true); setErr(null); + setBusy(true); + setErr(null); + setJobId(null); try { const fd = new FormData(); for (const f of Array.from(files)) fd.append("files", f, f.name); - await apiFetch(`/api/upload?project=${encodeURIComponent(project)}`, { method: "POST", body: fd }); - qc.invalidateQueries({ queryKey: qk.docs(project) }); - qc.invalidateQueries({ queryKey: qk.stats(project) }); - onOpenChange(false); + const res = await apiFetch( + `/api/upload?project=${encodeURIComponent(project)}`, + { method: "POST", body: fd }, + ); + setJobId(res.job_id); } catch (e) { setErr((e as Error).message); - } finally { setBusy(false); } } + const perFile = useMemo( + () => (progress ? reduceFiles(progress.history) : new Map()), + [progress], + ); + return ( - Upload documents + + Upload documents + onFiles(e.currentTarget.files)} className="block w-full text-sm" + aria-label="Choose files to upload" /> - {busy &&

Uploading…

} - {err &&

{err}

} + {err && ( +

+ {err} +

+ )} + {jobId && ( +
    + {[...perFile.entries()].map(([file, state]) => ( +
  • +
    + + {file} + + + {PHASE_LABELS[state.phase] ?? state.phase} + +
    + {state.chunksTotal > 0 && state.phase === "embed" && ( +
    + embedding {state.chunksDone}/{state.chunksTotal} chunks +
    + )} + {state.message && state.phase !== "embed" && ( +
    + {state.message} +
    + )} + {state.error && ( +
    + {state.error} +
    + )} +
  • + ))} +
+ )} + {busy && !jobId &&

Uploading…

}
);