Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 253 additions & 7 deletions internal/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,41 @@ type handlers struct {
// nil (dev/test path), upload() falls back to a detached goroutine.
workq *workq.Pool

// Upload progress tracking
// Upload progress tracking. jobProgress is the legacy plain-string
// channel — kept so existing tests / callers without a job channel
// still see the most recent state. jobEvents holds the structured
// per-phase event log used by the SSE handler.
uploadMu sync.Mutex
jobProgress map[string]string
jobEvents map[string]*jobProgressLog
jobCounter atomic.Int64
}

// jobProgressLog accumulates structured upload events for one job.
// Append is O(1), the SSE handler walks the slice from a per-stream
// cursor. done is set when a terminal event lands; the handler uses
// it to flush and close the stream.
type jobProgressLog struct {
mu sync.Mutex
events []uploadEvent
done bool
cond *sync.Cond
}

// uploadEvent is the JSON shape streamed over GET /api/upload/progress.
// Field order intentionally matches the wire format documented in
// docs/rest-api.md so the UI hook can rely on it.
type uploadEvent struct {
JobID string `json:"job_id"`
File string `json:"file,omitempty"`
Phase string `json:"phase"`
ChunksDone int `json:"chunks_done,omitempty"`
ChunksTotal int `json:"chunks_total,omitempty"`
Message string `json:"message,omitempty"`
Done bool `json:"done"`
Error string `json:"error,omitempty"`
}

// resolveStore is the single entry point for every doc handler. It
// pulls the slug off ctx and returns the matching per-project store.
// A missing/empty slug or open failure becomes a 500 — the project
Expand Down Expand Up @@ -575,27 +604,103 @@ func (h *handlers) upload(w http.ResponseWriter, r *http.Request) {
slog.Info("📦 upload job queued", "job_id", jobID, "files", len(paths), "project", slug)

h.setProgress(jobID, fmt.Sprintf("queued: %d files", len(paths)))
h.appendEvent(jobID, uploadEvent{
JobID: jobID,
Phase: "queued",
Message: fmt.Sprintf("queued: %d files", len(paths)),
})

job := func(ctx context.Context) {
defer os.RemoveAll(tmpDir)
pl := pipeline.New(st, h.provider, h.cfg)

// Per-job pipeline progress channel. Buffered so a slow SSE
// consumer never stalls indexing; the pipeline emit() helper is
// also non-blocking so we are belt-and-braces here.
progressCh := make(chan pipeline.ProgressEvent, 32)
stopRelay := make(chan struct{})
var relayDone sync.WaitGroup
relayDone.Add(1)
go func() {
defer relayDone.Done()
for {
select {
case evt, ok := <-progressCh:
if !ok {
return
}
errStr := ""
if evt.Error != nil {
errStr = evt.Error.Error()
}
h.appendEvent(jobID, uploadEvent{
JobID: jobID,
File: evt.File,
Phase: evt.Phase,
ChunksDone: evt.ChunksDone,
ChunksTotal: evt.ChunksTotal,
Message: evt.Message,
Done: evt.Done,
Error: errStr,
})
case <-stopRelay:
return
}
}
}()

for _, p := range paths {
if ctx.Err() != nil {
slog.Warn("🛑 upload indexing cancelled on shutdown", "job_id", jobID, "file", filepath.Base(p))
h.setProgress(jobID, "cancelled")
h.finishEvent(jobID, uploadEvent{
JobID: jobID,
File: filepath.Base(p),
Phase: "cancelled",
Message: "cancelled on shutdown",
Done: true,
})
close(stopRelay)
relayDone.Wait()
return
}
slog.Info("📦 upload indexing file", "job_id", jobID, "file", filepath.Base(p))
h.setProgress(jobID, fmt.Sprintf("indexing: %s", filepath.Base(p)))
if err := pl.IndexPath(ctx, p, pipeline.IndexOptions{}); err != nil {
h.appendEvent(jobID, uploadEvent{
JobID: jobID,
File: filepath.Base(p),
Phase: "indexing",
Message: "indexing " + filepath.Base(p),
})
if err := pl.IndexPath(ctx, p, pipeline.IndexOptions{Progress: progressCh}); err != nil {
slog.Error("❌ upload indexing failed", "job_id", jobID, "file", filepath.Base(p), "err", err)
h.setProgress(jobID, fmt.Sprintf("error: %v", err))
h.finishEvent(jobID, uploadEvent{
JobID: jobID,
File: filepath.Base(p),
Phase: "error",
Message: err.Error(),
Done: true,
Error: err.Error(),
})
close(stopRelay)
relayDone.Wait()
return
}
}
h.setProgress(jobID, "finalizing")
h.appendEvent(jobID, uploadEvent{
JobID: jobID,
Phase: "finalize",
Message: "running community detection and summaries",
})
if err := pl.Finalize(ctx, false, true); err != nil {
slog.Warn("⚠️ upload finalization failed", "job_id", jobID, "err", err)
h.appendEvent(jobID, uploadEvent{
JobID: jobID,
Phase: "finalize",
Message: "finalize warning: " + err.Error(),
})
}
// Invalidate the vector index for this project so the next
// search rebuild picks up the newly-indexed chunks.
Expand All @@ -604,6 +709,14 @@ func (h *handlers) upload(w http.ResponseWriter, r *http.Request) {
}
slog.Info("✅ upload job complete", "job_id", jobID, "files", len(paths), "project", slug)
h.setProgress(jobID, "done")
h.finishEvent(jobID, uploadEvent{
JobID: jobID,
Phase: "done",
Message: fmt.Sprintf("indexed %d files", len(paths)),
Done: true,
})
close(stopRelay)
relayDone.Wait()
}

if h.workq == nil {
Expand All @@ -613,11 +726,25 @@ func (h *handlers) upload(w http.ResponseWriter, r *http.Request) {
if err := h.workq.Submit(job); err != nil {
if errors.Is(err, workq.ErrQueueFull) {
h.setProgress(jobID, "rejected: indexing queue full")
h.finishEvent(jobID, uploadEvent{
JobID: jobID,
Phase: "error",
Message: "indexing queue full; retry later",
Done: true,
Error: "indexing queue full",
})
w.Header().Set("Retry-After", "30")
writeError(w, r, http.StatusServiceUnavailable, "indexing queue full; retry later", nil)
return
}
h.setProgress(jobID, "rejected: server unavailable")
h.finishEvent(jobID, uploadEvent{
JobID: jobID,
Phase: "error",
Message: "server shutting down",
Done: true,
Error: "server shutting down",
})
writeError(w, r, http.StatusServiceUnavailable, "server shutting down", err)
return
}
Expand Down Expand Up @@ -666,21 +793,142 @@ func (h *handlers) clearProgress(jobID string) {
delete(h.jobProgress, jobID)
}

// appendEvent records a structured upload event for jobID. Wakes any
// SSE streamers blocked on the cond. Done flag must NOT be set here —
// terminal events go through finishEvent so the cleanup path stays
// in one place.
func (h *handlers) appendEvent(jobID string, evt uploadEvent) {
if jobID == "" {
return
}
log := h.getOrCreateEventLog(jobID)
log.mu.Lock()
log.events = append(log.events, evt)
log.cond.Broadcast()
log.mu.Unlock()
}

// finishEvent appends a terminal event and marks the log done.
// SSE streamers wake, flush the final event, and disconnect.
func (h *handlers) finishEvent(jobID string, evt uploadEvent) {
if jobID == "" {
return
}
log := h.getOrCreateEventLog(jobID)
log.mu.Lock()
evt.Done = true
log.events = append(log.events, evt)
log.done = true
log.cond.Broadcast()
log.mu.Unlock()
}

// getOrCreateEventLog returns the per-job structured log, allocating
// on first access. Holds uploadMu only briefly.
func (h *handlers) getOrCreateEventLog(jobID string) *jobProgressLog {
h.uploadMu.Lock()
defer h.uploadMu.Unlock()
if h.jobEvents == nil {
h.jobEvents = make(map[string]*jobProgressLog)
}
log, ok := h.jobEvents[jobID]
if !ok {
log = &jobProgressLog{}
log.cond = sync.NewCond(&log.mu)
h.jobEvents[jobID] = log
}
return log
}

// clearEventLog drops the per-job structured log. Called after the SSE
// stream observes a terminal state so the map does not grow unbounded.
func (h *handlers) clearEventLog(jobID string) {
if jobID == "" {
return
}
h.uploadMu.Lock()
defer h.uploadMu.Unlock()
delete(h.jobEvents, jobID)
}

func (h *handlers) uploadProgress(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")

flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming not supported", http.StatusInternalServerError)
return
}

// P1-1: filter by job_id. When present, only emit events for the
// specific job and terminate when THAT job reaches done/error.
// Filter by job_id when present. Without one, fall back to the
// legacy plain-text relay so older clients still see something.
jobID := r.URL.Query().Get("job_id")
if jobID == "" {
h.uploadProgressLegacy(w, r, flusher)
return
}

log := h.getOrCreateEventLog(jobID)
ctx := r.Context()
cursor := 0

// Wake the cond from a watcher goroutine when the client cancels;
// otherwise sync.Cond.Wait would block forever.
doneCh := make(chan struct{})
defer close(doneCh)
go func() {
select {
case <-ctx.Done():
log.mu.Lock()
log.cond.Broadcast()
log.mu.Unlock()
case <-doneCh:
}
}()

enc := json.NewEncoder(w)
for {
log.mu.Lock()
for cursor >= len(log.events) && !log.done && ctx.Err() == nil {
log.cond.Wait()
}
// Snapshot the events we have not yet flushed.
batch := append([]uploadEvent(nil), log.events[cursor:]...)
cursor = len(log.events)
terminal := log.done
log.mu.Unlock()

if ctx.Err() != nil {
return
}

for _, evt := range batch {
if _, err := w.Write([]byte("data: ")); err != nil {
return
}
if err := enc.Encode(evt); err != nil {
return
}
// json.Encoder appends a newline; SSE needs a blank line.
if _, err := w.Write([]byte("\n")); err != nil {
return
}
flusher.Flush()
}

if terminal {
h.clearEventLog(jobID)
return
}
}
}

// uploadProgressLegacy preserves the Wave-A plain-text behaviour for
// callers that connect without a job_id query parameter.
func (h *handlers) uploadProgressLegacy(w http.ResponseWriter, r *http.Request, flusher http.Flusher) {
ctx := r.Context()
lastMsg := ""
ticker := time.NewTicker(500 * time.Millisecond)
Expand All @@ -691,14 +939,12 @@ func (h *handlers) uploadProgress(w http.ResponseWriter, r *http.Request) {
case <-ctx.Done():
return
case <-ticker.C:
msg, _ := h.progressForJob(jobID)

msg, _ := h.progressForJob("")
if msg != "" && msg != lastMsg {
fmt.Fprintf(w, "data: %s\n\n", msg)
flusher.Flush()
lastMsg = msg
if msg == "done" || strings.HasPrefix(msg, "error:") {
h.clearProgress(jobID)
return
}
}
Expand Down
Loading
Loading