Skip to content

Commit f5cbc70

Browse files
vojtabiberleclaude
andcommitted
DMD-921 - Refactor FetchTableSamples to use errgroup with semaphore
- Use errgroup.WithContext + semaphore.Weighted instead of spawning all goroutines upfront with a channel-based semaphore - Remove context from struct - pass to methods as first parameter - Increase maxConcurrency from 5 to 10 - Extract fetchTableSample method on Fetcher (not worker struct) - Rename worker to collector (only holds results, not logic) This follows the pattern from internal/pkg/state/local/workers.go and addresses the anti-pattern of creating N goroutines that block on semaphore acquisition. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 49f929a commit f5cbc70

1 file changed

Lines changed: 60 additions & 67 deletions

File tree

internal/pkg/llm/twinformat/fetcher.go

Lines changed: 60 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"time"
99

1010
"github.com/keboola/keboola-sdk-go/v2/pkg/keboola"
11+
"golang.org/x/sync/errgroup"
12+
"golang.org/x/sync/semaphore"
1113

1214
"github.com/keboola/keboola-as-code/internal/pkg/llm/twinformat/configparser"
1315
"github.com/keboola/keboola-as-code/internal/pkg/log"
@@ -227,62 +229,31 @@ func (f *Fetcher) FetchTableSample(ctx context.Context, tableKey keboola.TableKe
227229
return sample, nil
228230
}
229231

230-
// sampleFetchWorker holds shared state for concurrent sample fetching.
231-
type sampleFetchWorker struct {
232-
fetcher *Fetcher
233-
ctx context.Context
234-
limit uint
235-
semaphore chan struct{}
236-
237-
mu sync.Mutex
238-
results []indexedSample
239-
failedCount int
240-
}
241-
242232
// indexedSample pairs a sample with its original index for ordering.
243233
type indexedSample struct {
244234
index int
245235
sample *TableSample
246236
}
247237

248-
// fetchTable fetches a sample for a single table and records the result.
249-
func (w *sampleFetchWorker) fetchTable(idx int, t *keboola.Table) {
250-
// Acquire semaphore.
251-
select {
252-
case w.semaphore <- struct{}{}:
253-
defer func() { <-w.semaphore }()
254-
case <-w.ctx.Done():
255-
w.recordFailure()
256-
return
257-
}
258-
259-
tableKey := keboola.TableKey{
260-
BranchID: t.BranchID,
261-
TableID: t.TableID,
262-
}
263-
264-
sample, fetchErr := w.fetcher.FetchTableSample(w.ctx, tableKey, w.limit)
265-
if fetchErr != nil {
266-
w.fetcher.logger.Warnf(w.ctx, "Failed to fetch sample for table %s: %v", t.TableID, fetchErr)
267-
w.recordFailure()
268-
return
269-
}
270-
271-
w.recordSuccess(idx, sample)
238+
// sampleFetchCollector collects results from concurrent sample fetching.
239+
type sampleFetchCollector struct {
240+
mu sync.Mutex
241+
results []indexedSample
242+
failedCount int
272243
}
273244

274245
// recordFailure increments the failure count.
275-
func (w *sampleFetchWorker) recordFailure() {
276-
w.mu.Lock()
277-
defer w.mu.Unlock()
278-
w.failedCount++
246+
func (c *sampleFetchCollector) recordFailure() {
247+
c.mu.Lock()
248+
defer c.mu.Unlock()
249+
c.failedCount++
279250
}
280251

281252
// recordSuccess records a successfully fetched sample.
282-
func (w *sampleFetchWorker) recordSuccess(idx int, sample *TableSample) {
283-
w.mu.Lock()
284-
defer w.mu.Unlock()
285-
w.results = append(w.results, indexedSample{index: idx, sample: sample})
253+
func (c *sampleFetchCollector) recordSuccess(idx int, sample *TableSample) {
254+
c.mu.Lock()
255+
defer c.mu.Unlock()
256+
c.results = append(c.results, indexedSample{index: idx, sample: sample})
286257
}
287258

288259
// FetchTableSamples fetches samples for multiple tables concurrently.
@@ -304,52 +275,74 @@ func (f *Fetcher) FetchTableSamples(ctx context.Context, tables []*keboola.Table
304275
f.logger.Infof(ctx, "Fetching samples for %d tables concurrently (limit: %d rows each)", len(tablesToFetch), limit)
305276

306277
// Use bounded concurrency to respect API rate limits.
307-
const maxConcurrency = 5
308-
309-
worker := &sampleFetchWorker{
310-
fetcher: f,
311-
ctx: ctx,
312-
limit: limit,
313-
semaphore: make(chan struct{}, maxConcurrency),
314-
results: make([]indexedSample, 0, len(tablesToFetch)),
278+
const maxConcurrency = 10
279+
280+
sem := semaphore.NewWeighted(maxConcurrency)
281+
group, groupCtx := errgroup.WithContext(ctx)
282+
collector := &sampleFetchCollector{
283+
results: make([]indexedSample, 0, len(tablesToFetch)),
315284
}
316285

317-
var wg sync.WaitGroup
318286
for i, table := range tablesToFetch {
319-
wg.Add(1)
320-
go func(idx int, t *keboola.Table) {
321-
defer wg.Done()
322-
worker.fetchTable(idx, t)
323-
}(i, table)
287+
idx, t := i, table
288+
group.Go(func() error {
289+
// Acquire semaphore - blocks until slot available or context cancelled.
290+
if err := sem.Acquire(groupCtx, 1); err != nil {
291+
collector.recordFailure()
292+
return nil // Don't propagate - we want partial results
293+
}
294+
defer sem.Release(1)
295+
296+
f.fetchTableSample(groupCtx, collector, idx, t, limit)
297+
return nil
298+
})
324299
}
325300

326-
wg.Wait()
301+
// Wait for all goroutines to complete.
302+
_ = group.Wait()
327303

328304
// If context was cancelled and no samples were fetched, propagate the cancellation error.
329-
if len(worker.results) == 0 && ctx.Err() != nil {
305+
if len(collector.results) == 0 && ctx.Err() != nil {
330306
return nil, ctx.Err()
331307
}
332308

333309
// Sort by original index to preserve order.
334-
sort.Slice(worker.results, func(i, j int) bool {
335-
return worker.results[i].index < worker.results[j].index
310+
sort.Slice(collector.results, func(i, j int) bool {
311+
return collector.results[i].index < collector.results[j].index
336312
})
337313

338-
samples = make([]*TableSample, 0, len(worker.results))
339-
for _, r := range worker.results {
314+
samples = make([]*TableSample, 0, len(collector.results))
315+
for _, r := range collector.results {
340316
samples = append(samples, r.sample)
341317
}
342318

343-
f.logger.Infof(ctx, "Fetched samples for %d tables (%d failed)", len(samples), worker.failedCount)
319+
f.logger.Infof(ctx, "Fetched samples for %d tables (%d failed)", len(samples), collector.failedCount)
344320

345321
// Return error if any tables failed to fetch, but still return partial results.
346-
if worker.failedCount > 0 {
347-
return samples, errors.Errorf("failed to fetch samples for %d of %d tables", worker.failedCount, len(tablesToFetch))
322+
if collector.failedCount > 0 {
323+
return samples, errors.Errorf("failed to fetch samples for %d of %d tables", collector.failedCount, len(tablesToFetch))
348324
}
349325

350326
return samples, nil
351327
}
352328

329+
// fetchTableSample fetches a sample for a single table and records the result.
330+
func (f *Fetcher) fetchTableSample(ctx context.Context, collector *sampleFetchCollector, idx int, t *keboola.Table, limit uint) {
331+
tableKey := keboola.TableKey{
332+
BranchID: t.BranchID,
333+
TableID: t.TableID,
334+
}
335+
336+
sample, err := f.FetchTableSample(ctx, tableKey, limit)
337+
if err != nil {
338+
f.logger.Warnf(ctx, "Failed to fetch sample for table %s: %v", t.TableID, err)
339+
collector.recordFailure()
340+
return
341+
}
342+
343+
collector.recordSuccess(idx, sample)
344+
}
345+
353346
// fetchAllComponents fetches all components and extracts transformation and component configs.
354347
// This makes a single API call and returns all data needed for processing.
355348
func (f *Fetcher) fetchAllComponents(ctx context.Context, branchID keboola.BranchID) (result *componentsResult, err error) {

0 commit comments

Comments
 (0)