Skip to content

Commit 1030f7b

Browse files
aksOpsclaude
andcommitted
Implement index/enrich/serve three-command architecture for memory optimization
Phase B memory optimization: split the monolithic `analyze` pipeline into three discrete commands. `index` writes to H2 only using batched streaming (default 500 files per batch), keeping peak memory bounded. `enrich` loads H2 data into Neo4j, runs linkers and layer classifier. `serve` reads pre-enriched Neo4j graph. - Add IndexCommand with --batch-size flag and batched H2 writes - Add EnrichCommand that bulk-loads H2 -> Neo4j with linkers + classifier - Add Analyzer.runBatchedIndex() for memory-efficient batched processing - Enhance AnalysisCache with getNodeCount/getEdgeCount/storeBatchResults - Add batchSize config property to CodeIqConfig - Update CodeIqCli to register index and enrich subcommands - Update CodeIqApplication to handle index/enrich command profiles - Keep analyze as backward-compatible legacy command - All 1,191 tests pass (including 12 new tests) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent a40098e commit 1030f7b

11 files changed

Lines changed: 1184 additions & 2 deletions

File tree

src/main/java/io/github/randomcodespace/iq/CodeIqApplication.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,24 @@ public static void main(String[] args) {
4848
var app = new SpringApplication(CodeIqApplication.class);
4949
app.setBannerMode(org.springframework.boot.Banner.Mode.OFF);
5050

51-
// Detect if "serve" is among the arguments
51+
// Detect command from arguments
5252
boolean isServe = Arrays.stream(args)
5353
.anyMatch(arg -> "serve".equalsIgnoreCase(arg));
54+
boolean isIndex = Arrays.stream(args)
55+
.anyMatch(arg -> "index".equalsIgnoreCase(arg));
56+
boolean isEnrich = Arrays.stream(args)
57+
.anyMatch(arg -> "enrich".equalsIgnoreCase(arg));
5458

5559
if (isServe) {
5660
app.setAdditionalProfiles("serving");
61+
} else if (isIndex) {
62+
app.setAdditionalProfiles("indexing");
63+
// Index command: no web server, no Neo4j
64+
app.setWebApplicationType(org.springframework.boot.WebApplicationType.NONE);
65+
} else if (isEnrich) {
66+
// Enrich command: no web server, Neo4j started programmatically
67+
app.setAdditionalProfiles("indexing");
68+
app.setWebApplicationType(org.springframework.boot.WebApplicationType.NONE);
5769
} else {
5870
app.setAdditionalProfiles("indexing");
5971
// Disable web server for non-serve commands

src/main/java/io/github/randomcodespace/iq/analyzer/Analyzer.java

Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import io.github.randomcodespace.iq.detector.DetectorResult;
1313
import io.github.randomcodespace.iq.detector.DetectorUtils;
1414
import io.github.randomcodespace.iq.grammar.AntlrParserFactory;
15+
import io.github.randomcodespace.iq.model.CodeEdge;
1516
import io.github.randomcodespace.iq.model.CodeNode;
1617
import io.github.randomcodespace.iq.model.NodeKind;
1718
import org.slf4j.Logger;
@@ -347,6 +348,249 @@ private AnalysisResult runWithCache(Path root, Integer parallelism, AnalysisCach
347348
);
348349
}
349350

351+
/**
352+
* Execute the indexing pipeline with batched streaming to H2.
353+
* <p>
354+
* Unlike {@link #run}, this method does NOT hold all nodes/edges in memory.
355+
* It processes files in batches and flushes each batch to H2, then releases
356+
* the batch memory. No linkers, layer classification, or Neo4j are used.
357+
*
358+
* @param repoPath root of the repository to analyze
359+
* @param parallelism max parallel threads, or null for adaptive (virtual threads)
360+
* @param batchSize number of files per H2 flush batch
361+
* @param incremental if true, use file content hashing to skip unchanged files
362+
* @param onProgress optional callback for progress reporting (may be null)
363+
* @return the analysis result containing graph data and statistics
364+
*/
365+
public AnalysisResult runBatchedIndex(Path repoPath, Integer parallelism, int batchSize,
366+
boolean incremental, Consumer<String> onProgress) {
367+
Instant start = Instant.now();
368+
Consumer<String> report = onProgress != null ? onProgress : msg -> {};
369+
370+
final Path root = repoPath.toAbsolutePath().normalize();
371+
372+
// Always use H2 cache as the primary store during indexing
373+
Path cachePath = root.resolve(config.getCacheDir()).resolve("analysis-cache.db");
374+
AnalysisCache cache;
375+
try {
376+
cache = new AnalysisCache(cachePath);
377+
} catch (Exception e) {
378+
log.error("Failed to open H2 store at {}", cachePath, e);
379+
return new AnalysisResult(0, 0, 0, 0,
380+
Map.of(), Map.of(), Map.of(), Map.of(), Duration.ZERO);
381+
}
382+
383+
try {
384+
return runBatchedWithCache(root, parallelism, batchSize, incremental, cache, report, start);
385+
} finally {
386+
cache.close();
387+
}
388+
}
389+
390+
private AnalysisResult runBatchedWithCache(Path root, Integer parallelism, int batchSize,
391+
boolean incremental, AnalysisCache cache,
392+
Consumer<String> report, Instant start) {
393+
// 0. Load project config for pipeline filtering
394+
ProjectConfig projectConfig = ProjectConfigLoader.loadProjectConfig(root);
395+
DetectorRegistry effectiveRegistry = registry;
396+
397+
if (projectConfig.hasDetectorCategoryFilter()) {
398+
effectiveRegistry = effectiveRegistry.filterByCategories(
399+
projectConfig.getDetectorCategories());
400+
report.accept("Detector categories: " + projectConfig.getDetectorCategories());
401+
}
402+
if (projectConfig.hasDetectorIncludeFilter()) {
403+
effectiveRegistry = effectiveRegistry.filterByNames(
404+
projectConfig.getDetectorInclude());
405+
report.accept("Detector include: " + projectConfig.getDetectorInclude());
406+
}
407+
if (parallelism == null && projectConfig.getPipelineParallelism() != null) {
408+
parallelism = projectConfig.getPipelineParallelism();
409+
report.accept("Pipeline parallelism: " + parallelism + " (from config)");
410+
}
411+
412+
// 1. Discover files
413+
report.accept("Discovering files...");
414+
List<DiscoveredFile> files = fileDiscovery.discover(root);
415+
416+
if (projectConfig.hasLanguageFilter()) {
417+
Set<String> allowedLanguages = new HashSet<>(projectConfig.getLanguages());
418+
files = files.stream()
419+
.filter(f -> allowedLanguages.contains(f.language()))
420+
.toList();
421+
report.accept("Language filter active: " + projectConfig.getLanguages());
422+
}
423+
if (projectConfig.hasExcludePatterns()) {
424+
List<String> excludes = projectConfig.getExclude();
425+
files = files.stream()
426+
.filter(f -> !matchesAnyExclude(f.path().toString(), excludes))
427+
.toList();
428+
report.accept("Exclude patterns: " + excludes);
429+
}
430+
431+
int totalFiles = files.size();
432+
report.accept("Found " + totalFiles + " files");
433+
434+
// Compute language breakdown
435+
Map<String, Integer> languageBreakdown = new HashMap<>();
436+
for (DiscoveredFile f : files) {
437+
languageBreakdown.merge(f.language(), 1, Integer::sum);
438+
}
439+
440+
// 2. Process files in batches
441+
report.accept("Indexing " + totalFiles + " files in batches of " + batchSize + "...");
442+
443+
final DetectorRegistry detectorRegistry = effectiveRegistry;
444+
int totalNodesWritten = 0;
445+
int totalEdgesWritten = 0;
446+
int filesAnalyzed = 0;
447+
int cacheHits = 0;
448+
int batchNumber = 0;
449+
Map<String, Integer> nodeBreakdown = new HashMap<>();
450+
Map<String, Integer> edgeBreakdown = new HashMap<>();
451+
Map<String, Integer> frameworkBreakdown = new HashMap<>();
452+
453+
// Clear previous index data if not incremental
454+
if (!incremental) {
455+
cache.clear();
456+
}
457+
458+
List<DiscoveredFile> batch = new ArrayList<>(batchSize);
459+
for (int fileIdx = 0; fileIdx < files.size(); fileIdx++) {
460+
batch.add(files.get(fileIdx));
461+
462+
if (batch.size() >= batchSize || fileIdx == files.size() - 1) {
463+
batchNumber++;
464+
report.accept("Processing batch " + batchNumber + " (" + batch.size() + " files)...");
465+
466+
// Analyze batch in parallel
467+
DetectorResult[] resultSlots = new DetectorResult[batch.size()];
468+
int[] batchCacheHits = {0};
469+
470+
var executorService = parallelism != null && parallelism > 0
471+
? Executors.newFixedThreadPool(parallelism)
472+
: Executors.newVirtualThreadPerTaskExecutor();
473+
try (var executor = executorService) {
474+
List<Future<?>> futures = new ArrayList<>(batch.size());
475+
for (int i = 0; i < batch.size(); i++) {
476+
final int idx = i;
477+
final DiscoveredFile file = batch.get(idx);
478+
futures.add(executor.submit(() -> {
479+
if (incremental) {
480+
try {
481+
Path absPath = root.resolve(file.path());
482+
String hash = FileHasher.hash(absPath);
483+
if (cache.isCached(hash)) {
484+
var cached = cache.loadCachedResults(hash);
485+
if (cached != null) {
486+
resultSlots[idx] = DetectorResult.of(cached.nodes(), cached.edges());
487+
synchronized (batchCacheHits) {
488+
batchCacheHits[0]++;
489+
}
490+
return null;
491+
}
492+
}
493+
DetectorResult result = analyzeFile(file, root, detectorRegistry);
494+
resultSlots[idx] = result;
495+
if (result != null && (!result.nodes().isEmpty() || !result.edges().isEmpty())) {
496+
cache.storeResults(hash, file.path().toString(), file.language(),
497+
result.nodes(), result.edges());
498+
}
499+
} catch (IOException e) {
500+
log.debug("Could not hash file {}", file.path(), e);
501+
resultSlots[idx] = analyzeFile(file, root, detectorRegistry);
502+
}
503+
} else {
504+
resultSlots[idx] = analyzeFile(file, root, detectorRegistry);
505+
}
506+
return null;
507+
}));
508+
}
509+
510+
// Collect in order
511+
for (int i = 0; i < futures.size(); i++) {
512+
try {
513+
futures.get(i).get();
514+
} catch (ExecutionException e) {
515+
log.warn("Analysis failed for {}", batch.get(i).path(), e.getCause());
516+
} catch (InterruptedException e) {
517+
Thread.currentThread().interrupt();
518+
log.warn("Analysis interrupted for {}", batch.get(i).path());
519+
}
520+
}
521+
}
522+
523+
cacheHits += batchCacheHits[0];
524+
525+
// Collect batch results and flush non-cached to H2
526+
List<CodeNode> batchNodes = new ArrayList<>();
527+
List<CodeEdge> batchEdges = new ArrayList<>();
528+
int batchFilesAnalyzed = 0;
529+
530+
for (int i = 0; i < resultSlots.length; i++) {
531+
DetectorResult result = resultSlots[i];
532+
if (result != null && (!result.nodes().isEmpty() || !result.edges().isEmpty())) {
533+
batchFilesAnalyzed++;
534+
// Only store non-incremental results (incremental already stored above)
535+
if (!incremental) {
536+
batchNodes.addAll(result.nodes());
537+
batchEdges.addAll(result.edges());
538+
}
539+
// Track breakdowns
540+
for (CodeNode node : result.nodes()) {
541+
nodeBreakdown.merge(node.getKind().getValue(), 1, Integer::sum);
542+
Object fw = node.getProperties().get("framework");
543+
if (fw != null && !fw.toString().isEmpty()) {
544+
frameworkBreakdown.merge(fw.toString(), 1, Integer::sum);
545+
}
546+
}
547+
for (var edge : result.edges()) {
548+
edgeBreakdown.merge(edge.getKind().getValue(), 1, Integer::sum);
549+
}
550+
totalNodesWritten += result.nodes().size();
551+
totalEdgesWritten += result.edges().size();
552+
}
553+
}
554+
555+
filesAnalyzed += batchFilesAnalyzed;
556+
557+
// For non-incremental mode, batch-flush to H2
558+
if (!incremental && (!batchNodes.isEmpty() || !batchEdges.isEmpty())) {
559+
String batchId = "batch:" + batchNumber + ":" + System.nanoTime();
560+
cache.storeBatchResults(batchId, "batch-" + batchNumber,
561+
"mixed", batchNodes, batchEdges);
562+
}
563+
564+
// Release batch memory
565+
batch.clear();
566+
}
567+
}
568+
569+
if (cacheHits > 0) {
570+
report.accept("Cache hits: " + cacheHits + " / " + totalFiles + " files");
571+
}
572+
573+
// Record run
574+
String commitSha = getGitHead(root);
575+
cache.recordRun(commitSha, filesAnalyzed);
576+
577+
Duration elapsed = Duration.between(start, Instant.now());
578+
report.accept("Index complete - " + totalNodesWritten + " nodes, "
579+
+ totalEdgesWritten + " edges written to H2");
580+
581+
return new AnalysisResult(
582+
totalFiles,
583+
filesAnalyzed,
584+
totalNodesWritten,
585+
totalEdgesWritten,
586+
languageBreakdown,
587+
nodeBreakdown,
588+
edgeBreakdown,
589+
frameworkBreakdown,
590+
elapsed
591+
);
592+
}
593+
350594
/**
351595
* Check whether a file is minified (e.g. *.min.js, *.bundle.js) and large
352596
* enough that running detectors would be wasteful.

src/main/java/io/github/randomcodespace/iq/cache/AnalysisCache.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,46 @@ private long countTable(String table) throws SQLException {
463463
}
464464
}
465465

466+
/**
467+
* Return the total number of cached nodes.
468+
*/
469+
public long getNodeCount() {
470+
try {
471+
return countTable("nodes");
472+
} catch (SQLException e) {
473+
log.debug("Failed to count nodes", e);
474+
return 0;
475+
}
476+
}
477+
478+
/**
479+
* Return the total number of cached edges.
480+
*/
481+
public long getEdgeCount() {
482+
try {
483+
return countTable("edges");
484+
} catch (SQLException e) {
485+
log.debug("Failed to count edges", e);
486+
return 0;
487+
}
488+
}
489+
490+
/**
491+
* Store a batch of nodes and edges from the analyzer, keyed by a synthetic batch hash.
492+
* Used during batched indexing where the Analyzer flushes results per batch
493+
* rather than per file content hash.
494+
*
495+
* @param batchId unique batch identifier
496+
* @param filePath representative file path for the batch
497+
* @param language representative language
498+
* @param nodes nodes to store
499+
* @param edges edges to store
500+
*/
501+
public void storeBatchResults(String batchId, String filePath, String language,
502+
List<CodeNode> nodes, List<CodeEdge> edges) {
503+
storeResults(batchId, filePath, language, nodes, edges);
504+
}
505+
466506
/**
467507
* Load all cached nodes across all files.
468508
*

src/main/java/io/github/randomcodespace/iq/cli/AnalyzeCommand.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,14 @@
1717

1818
/**
1919
* Scan a codebase and build a knowledge graph.
20+
* <p>
21+
* This is the legacy command that uses in-memory graph building with Neo4j.
22+
* For memory-efficient indexing, use {@code index} instead.
23+
* Kept as backward-compatible alias.
2024
*/
2125
@Component
2226
@Command(name = "analyze", mixinStandardHelpOptions = true,
23-
description = "Scan codebase and build knowledge graph")
27+
description = "Scan codebase and build knowledge graph (legacy; prefer 'index' for large codebases)")
2428
public class AnalyzeCommand implements Callable<Integer> {
2529

2630
@Parameters(index = "0", defaultValue = ".", description = "Path to codebase root")

src/main/java/io/github/randomcodespace/iq/cli/CodeIqCli.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
description = "Intelligent code graph discovery and analysis CLI",
1717
subcommands = {
1818
AnalyzeCommand.class,
19+
IndexCommand.class,
20+
EnrichCommand.class,
1921
ServeCommand.class,
2022
GraphCommand.class,
2123
QueryCommand.class,

0 commit comments

Comments
 (0)