Skip to content

Commit 04f48b4

Browse files
vojtabiberleclaude
andcommitted
DMD-921 - Refactor FetchTableSamples goroutine into helper struct
- Extract goroutine body into sampleFetchWorker struct with fetchTable method - Add recordFailure and recordSuccess methods with defer Unlock pattern - Improves readability and testability of concurrent fetching logic Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 5e52c2b commit 04f48b4

1 file changed

Lines changed: 74 additions & 48 deletions

File tree

internal/pkg/llm/twinformat/fetcher.go

Lines changed: 74 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,64 @@ func (f *Fetcher) FetchTableSample(ctx context.Context, tableKey keboola.TableKe
227227
return sample, nil
228228
}
229229

230+
// sampleFetchWorker holds shared state for concurrent sample fetching.
231+
type sampleFetchWorker struct {
232+
fetcher *Fetcher
233+
ctx context.Context
234+
limit uint
235+
semaphore chan struct{}
236+
237+
mu sync.Mutex
238+
results []indexedSample
239+
failedCount int
240+
}
241+
242+
// indexedSample pairs a sample with its original index for ordering.
243+
type indexedSample struct {
244+
index int
245+
sample *TableSample
246+
}
247+
248+
// fetchTable fetches a sample for a single table and records the result.
249+
func (w *sampleFetchWorker) fetchTable(idx int, t *keboola.Table) {
250+
// Acquire semaphore.
251+
select {
252+
case w.semaphore <- struct{}{}:
253+
defer func() { <-w.semaphore }()
254+
case <-w.ctx.Done():
255+
w.recordFailure()
256+
return
257+
}
258+
259+
tableKey := keboola.TableKey{
260+
BranchID: t.BranchID,
261+
TableID: t.TableID,
262+
}
263+
264+
sample, fetchErr := w.fetcher.FetchTableSample(w.ctx, tableKey, w.limit)
265+
if fetchErr != nil {
266+
w.fetcher.logger.Warnf(w.ctx, "Failed to fetch sample for table %s: %v", t.TableID, fetchErr)
267+
w.recordFailure()
268+
return
269+
}
270+
271+
w.recordSuccess(idx, sample)
272+
}
273+
274+
// recordFailure increments the failure count.
275+
func (w *sampleFetchWorker) recordFailure() {
276+
w.mu.Lock()
277+
defer w.mu.Unlock()
278+
w.failedCount++
279+
}
280+
281+
// recordSuccess records a successfully fetched sample.
282+
func (w *sampleFetchWorker) recordSuccess(idx int, sample *TableSample) {
283+
w.mu.Lock()
284+
defer w.mu.Unlock()
285+
w.results = append(w.results, indexedSample{index: idx, sample: sample})
286+
}
287+
230288
// FetchTableSamples fetches samples for multiple tables concurrently.
231289
func (f *Fetcher) FetchTableSamples(ctx context.Context, tables []*keboola.Table, limit uint, maxTables int) (samples []*TableSample, err error) {
232290
ctx, span := f.telemetry.Tracer().Start(ctx, "keboola.go.twinformat.fetcher.FetchTableSamples")
@@ -248,77 +306,45 @@ func (f *Fetcher) FetchTableSamples(ctx context.Context, tables []*keboola.Table
248306
// Use bounded concurrency to respect API rate limits.
249307
const maxConcurrency = 5
250308

251-
type indexedSample struct {
252-
index int
253-
sample *TableSample
309+
worker := &sampleFetchWorker{
310+
fetcher: f,
311+
ctx: ctx,
312+
limit: limit,
313+
semaphore: make(chan struct{}, maxConcurrency),
314+
results: make([]indexedSample, 0, len(tablesToFetch)),
254315
}
255316

256-
var (
257-
mu sync.Mutex
258-
wg sync.WaitGroup
259-
semaphore = make(chan struct{}, maxConcurrency)
260-
results = make([]indexedSample, 0, len(tablesToFetch))
261-
failedCount int
262-
)
263-
317+
var wg sync.WaitGroup
264318
for i, table := range tablesToFetch {
265319
wg.Add(1)
266320
go func(idx int, t *keboola.Table) {
267321
defer wg.Done()
268-
269-
// Acquire semaphore.
270-
select {
271-
case semaphore <- struct{}{}:
272-
defer func() { <-semaphore }()
273-
case <-ctx.Done():
274-
mu.Lock()
275-
failedCount++
276-
mu.Unlock()
277-
return
278-
}
279-
280-
tableKey := keboola.TableKey{
281-
BranchID: t.BranchID,
282-
TableID: t.TableID,
283-
}
284-
285-
sample, fetchErr := f.FetchTableSample(ctx, tableKey, limit)
286-
if fetchErr != nil {
287-
f.logger.Warnf(ctx, "Failed to fetch sample for table %s: %v", t.TableID, fetchErr)
288-
mu.Lock()
289-
failedCount++
290-
mu.Unlock()
291-
return
292-
}
293-
294-
mu.Lock()
295-
results = append(results, indexedSample{index: idx, sample: sample})
296-
mu.Unlock()
322+
worker.fetchTable(idx, t)
297323
}(i, table)
298324
}
299325

300326
wg.Wait()
301327

302328
// If context was cancelled and no samples were fetched, propagate the cancellation error.
303-
if len(results) == 0 && ctx.Err() != nil {
329+
if len(worker.results) == 0 && ctx.Err() != nil {
304330
return nil, ctx.Err()
305331
}
306332

307333
// Sort by original index to preserve order.
308-
sort.Slice(results, func(i, j int) bool {
309-
return results[i].index < results[j].index
334+
sort.Slice(worker.results, func(i, j int) bool {
335+
return worker.results[i].index < worker.results[j].index
310336
})
311337

312-
samples = make([]*TableSample, 0, len(results))
313-
for _, r := range results {
338+
samples = make([]*TableSample, 0, len(worker.results))
339+
for _, r := range worker.results {
314340
samples = append(samples, r.sample)
315341
}
316342

317-
f.logger.Infof(ctx, "Fetched samples for %d tables (%d failed)", len(samples), failedCount)
343+
f.logger.Infof(ctx, "Fetched samples for %d tables (%d failed)", len(samples), worker.failedCount)
318344

319345
// Return error if any tables failed to fetch, but still return partial results.
320-
if failedCount > 0 {
321-
return samples, errors.Errorf("failed to fetch samples for %d of %d tables", failedCount, len(tablesToFetch))
346+
if worker.failedCount > 0 {
347+
return samples, errors.Errorf("failed to fetch samples for %d of %d tables", worker.failedCount, len(tablesToFetch))
322348
}
323349

324350
return samples, nil

0 commit comments

Comments
 (0)