Skip to content

Commit de562df

Browse files
authored
Merge pull request #2502 from keboola/vb/DMD-922/llm-export-samples-security
DMD-922 - Add samples for llm export
2 parents 63bfe1b + f5f7c52 commit de562df

9 files changed

Lines changed: 898 additions & 8 deletions

File tree

internal/pkg/llm/twinformat/fetcher.go

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,17 @@ import (
44
"context"
55
"sort"
66
"strings"
7+
"sync"
78
"time"
89

910
"github.com/keboola/keboola-sdk-go/v2/pkg/keboola"
11+
"golang.org/x/sync/errgroup"
12+
"golang.org/x/sync/semaphore"
1013

1114
"github.com/keboola/keboola-as-code/internal/pkg/llm/twinformat/configparser"
1215
"github.com/keboola/keboola-as-code/internal/pkg/log"
1316
"github.com/keboola/keboola-as-code/internal/pkg/telemetry"
17+
"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
1418
)
1519

1620
// FetcherDependencies defines the dependencies required by the Fetcher.
@@ -183,6 +187,162 @@ type componentsResult struct {
183187
ComponentConfigs []*configparser.ComponentConfig
184188
}
185189

190+
// TableSample represents a sample of table data.
191+
type TableSample struct {
192+
TableID keboola.TableID
193+
Columns []string
194+
Rows [][]string
195+
}
196+
197+
// RowCount returns the number of rows in the sample.
198+
func (s *TableSample) RowCount() int {
199+
return len(s.Rows)
200+
}
201+
202+
// FetchTableSample fetches a sample of data from a table.
203+
func (f *Fetcher) FetchTableSample(ctx context.Context, tableKey keboola.TableKey, limit uint) (sample *TableSample, err error) {
204+
ctx, span := f.telemetry.Tracer().Start(ctx, "keboola.go.twinformat.fetcher.FetchTableSample")
205+
defer span.End(&err)
206+
207+
f.logger.Debugf(ctx, "Fetching sample for table %s (limit: %d)", tableKey.TableID, limit)
208+
209+
// Fetch table preview using the SDK.
210+
// Don't pass WithLimitRows when limit is 0, as it may have different semantics than omitting.
211+
var preview *keboola.TablePreview
212+
if limit == 0 {
213+
preview, err = f.api.PreviewTableRequest(tableKey).Send(ctx)
214+
} else {
215+
preview, err = f.api.PreviewTableRequest(tableKey, keboola.WithLimitRows(limit)).Send(ctx)
216+
}
217+
if err != nil {
218+
return nil, err
219+
}
220+
221+
sample = &TableSample{
222+
TableID: tableKey.TableID,
223+
Columns: preview.Columns,
224+
Rows: preview.Rows,
225+
}
226+
227+
f.logger.Debugf(ctx, "Fetched %d rows for table %s", sample.RowCount(), tableKey.TableID)
228+
229+
return sample, nil
230+
}
231+
232+
// indexedSample pairs a sample with its original index for ordering.
233+
type indexedSample struct {
234+
index int
235+
sample *TableSample
236+
}
237+
238+
// sampleFetchCollector collects results from concurrent sample fetching.
239+
type sampleFetchCollector struct {
240+
mu sync.Mutex
241+
results []indexedSample
242+
failedCount int
243+
}
244+
245+
// recordFailure increments the failure count.
246+
func (c *sampleFetchCollector) recordFailure() {
247+
c.mu.Lock()
248+
defer c.mu.Unlock()
249+
c.failedCount++
250+
}
251+
252+
// recordSuccess records a successfully fetched sample.
253+
func (c *sampleFetchCollector) recordSuccess(idx int, sample *TableSample) {
254+
c.mu.Lock()
255+
defer c.mu.Unlock()
256+
c.results = append(c.results, indexedSample{index: idx, sample: sample})
257+
}
258+
259+
// FetchTableSamples fetches samples for multiple tables concurrently.
260+
func (f *Fetcher) FetchTableSamples(ctx context.Context, tables []*keboola.Table, limit uint, maxTables int) (samples []*TableSample, err error) {
261+
ctx, span := f.telemetry.Tracer().Start(ctx, "keboola.go.twinformat.fetcher.FetchTableSamples")
262+
defer span.End(&err)
263+
264+
// Guard against non-positive maxTables to avoid panics from negative slice capacities.
265+
if maxTables <= 0 {
266+
return []*TableSample{}, nil
267+
}
268+
269+
// Limit tables to fetch.
270+
tablesToFetch := tables
271+
if len(tablesToFetch) > maxTables {
272+
tablesToFetch = tablesToFetch[:maxTables]
273+
}
274+
275+
f.logger.Infof(ctx, "Fetching samples for %d tables concurrently (limit: %d rows each)", len(tablesToFetch), limit)
276+
277+
// Use bounded concurrency to respect API rate limits.
278+
const maxConcurrency = 10
279+
280+
sem := semaphore.NewWeighted(maxConcurrency)
281+
group, groupCtx := errgroup.WithContext(ctx)
282+
collector := &sampleFetchCollector{
283+
results: make([]indexedSample, 0, len(tablesToFetch)),
284+
}
285+
286+
for i, table := range tablesToFetch {
287+
idx, t := i, table
288+
group.Go(func() error {
289+
// Acquire semaphore - blocks until slot available or context cancelled.
290+
if err := sem.Acquire(groupCtx, 1); err != nil {
291+
collector.recordFailure()
292+
return nil // Don't propagate - we want partial results
293+
}
294+
defer sem.Release(1)
295+
296+
f.fetchTableSample(groupCtx, collector, idx, t, limit)
297+
return nil
298+
})
299+
}
300+
301+
// Wait for all goroutines to complete.
302+
_ = group.Wait()
303+
304+
// If context was cancelled and no samples were fetched, propagate the cancellation error.
305+
if len(collector.results) == 0 && ctx.Err() != nil {
306+
return nil, ctx.Err()
307+
}
308+
309+
// Sort by original index to preserve order.
310+
sort.Slice(collector.results, func(i, j int) bool {
311+
return collector.results[i].index < collector.results[j].index
312+
})
313+
314+
samples = make([]*TableSample, 0, len(collector.results))
315+
for _, r := range collector.results {
316+
samples = append(samples, r.sample)
317+
}
318+
319+
f.logger.Infof(ctx, "Fetched samples for %d tables (%d failed)", len(samples), collector.failedCount)
320+
321+
// Return error if any tables failed to fetch, but still return partial results.
322+
if collector.failedCount > 0 {
323+
return samples, errors.Errorf("failed to fetch samples for %d of %d tables", collector.failedCount, len(tablesToFetch))
324+
}
325+
326+
return samples, nil
327+
}
328+
329+
// fetchTableSample fetches a sample for a single table and records the result.
330+
func (f *Fetcher) fetchTableSample(ctx context.Context, collector *sampleFetchCollector, idx int, t *keboola.Table, limit uint) {
331+
tableKey := keboola.TableKey{
332+
BranchID: t.BranchID,
333+
TableID: t.TableID,
334+
}
335+
336+
sample, err := f.FetchTableSample(ctx, tableKey, limit)
337+
if err != nil {
338+
f.logger.Warnf(ctx, "Failed to fetch sample for table %s: %v", t.TableID, err)
339+
collector.recordFailure()
340+
return
341+
}
342+
343+
collector.recordSuccess(idx, sample)
344+
}
345+
186346
// fetchAllComponents fetches all components and extracts transformation and component configs.
187347
// This makes a single API call and returns all data needed for processing.
188348
func (f *Fetcher) fetchAllComponents(ctx context.Context, branchID keboola.BranchID) (result *componentsResult, err error) {

internal/pkg/llm/twinformat/fetcher_test.go

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,3 +475,149 @@ func TestFetchTableLastImporter_NoImportEvents(t *testing.T) {
475475
// No import events, should return empty
476476
assert.Empty(t, componentID)
477477
}
478+
479+
func TestFetchTableSample(t *testing.T) {
480+
t.Parallel()
481+
482+
fetcher, transport := newTestFetcher(t)
483+
branchID := keboola.BranchID(123)
484+
485+
tableKey := keboola.TableKey{
486+
BranchID: branchID,
487+
TableID: keboola.TableID{
488+
BucketID: keboola.BucketID{Stage: keboola.BucketStageIn, BucketName: "test"},
489+
TableName: "sample_table",
490+
},
491+
}
492+
493+
// Mock table preview response - Storage API returns CSV format
494+
csvData := "\"id\",\"name\",\"value\"\n\"1\",\"Alice\",\"100\"\n\"2\",\"Bob\",\"200\"\n\"3\",\"Charlie\",\"300\"\n"
495+
transport.RegisterResponder(
496+
http.MethodGet,
497+
`=~/v2/storage/branch/123/tables/in.test.sample_table/data-preview`,
498+
httpmock.NewStringResponder(200, csvData),
499+
)
500+
501+
sample, err := fetcher.FetchTableSample(t.Context(), tableKey, 100)
502+
require.NoError(t, err)
503+
504+
assert.Equal(t, tableKey.TableID, sample.TableID)
505+
assert.Equal(t, []string{"id", "name", "value"}, sample.Columns)
506+
assert.Len(t, sample.Rows, 3)
507+
assert.Equal(t, 3, sample.RowCount())
508+
assert.Equal(t, []string{"1", "Alice", "100"}, sample.Rows[0])
509+
}
510+
511+
func TestFetchTableSamples(t *testing.T) {
512+
t.Parallel()
513+
514+
fetcher, transport := newTestFetcher(t)
515+
branchID := keboola.BranchID(123)
516+
517+
tables := []*keboola.Table{
518+
{
519+
TableKey: keboola.TableKey{
520+
BranchID: branchID,
521+
TableID: keboola.TableID{
522+
BucketID: keboola.BucketID{Stage: keboola.BucketStageIn, BucketName: "bucket"},
523+
TableName: "table1",
524+
},
525+
},
526+
},
527+
{
528+
TableKey: keboola.TableKey{
529+
BranchID: branchID,
530+
TableID: keboola.TableID{
531+
BucketID: keboola.BucketID{Stage: keboola.BucketStageIn, BucketName: "bucket"},
532+
TableName: "table2",
533+
},
534+
},
535+
},
536+
{
537+
TableKey: keboola.TableKey{
538+
BranchID: branchID,
539+
TableID: keboola.TableID{
540+
BucketID: keboola.BucketID{Stage: keboola.BucketStageIn, BucketName: "bucket"},
541+
TableName: "table3",
542+
},
543+
},
544+
},
545+
}
546+
547+
// Mock preview responses for each table - Storage API returns CSV format
548+
transport.RegisterResponder(
549+
http.MethodGet,
550+
`=~/v2/storage/branch/123/tables/in.bucket.table1/data-preview`,
551+
httpmock.NewStringResponder(200, "\"col1\"\n\"val1\"\n"),
552+
)
553+
transport.RegisterResponder(
554+
http.MethodGet,
555+
`=~/v2/storage/branch/123/tables/in.bucket.table2/data-preview`,
556+
httpmock.NewStringResponder(200, "\"col2\"\n\"val2\"\n"),
557+
)
558+
transport.RegisterResponder(
559+
http.MethodGet,
560+
`=~/v2/storage/branch/123/tables/in.bucket.table3/data-preview`,
561+
httpmock.NewStringResponder(200, "\"col3\"\n\"val3\"\n"),
562+
)
563+
564+
// Test with maxTables=2 (should only fetch first 2 tables)
565+
samples, err := fetcher.FetchTableSamples(t.Context(), tables, 100, 2)
566+
require.NoError(t, err)
567+
568+
assert.Len(t, samples, 2)
569+
assert.Equal(t, "in.bucket.table1", samples[0].TableID.String())
570+
assert.Equal(t, "in.bucket.table2", samples[1].TableID.String())
571+
}
572+
573+
func TestFetchTableSamples_SkipsFailedTables(t *testing.T) {
574+
t.Parallel()
575+
576+
fetcher, transport := newTestFetcher(t)
577+
branchID := keboola.BranchID(123)
578+
579+
tables := []*keboola.Table{
580+
{
581+
TableKey: keboola.TableKey{
582+
BranchID: branchID,
583+
TableID: keboola.TableID{
584+
BucketID: keboola.BucketID{Stage: keboola.BucketStageIn, BucketName: "bucket"},
585+
TableName: "good_table",
586+
},
587+
},
588+
},
589+
{
590+
TableKey: keboola.TableKey{
591+
BranchID: branchID,
592+
TableID: keboola.TableID{
593+
BucketID: keboola.BucketID{Stage: keboola.BucketStageIn, BucketName: "bucket"},
594+
TableName: "bad_table",
595+
},
596+
},
597+
},
598+
}
599+
600+
// First table succeeds - Storage API returns CSV format
601+
transport.RegisterResponder(
602+
http.MethodGet,
603+
`=~/v2/storage/branch/123/tables/in.bucket.good_table/data-preview`,
604+
httpmock.NewStringResponder(200, "\"col1\"\n\"val1\"\n"),
605+
)
606+
607+
// Second table fails
608+
transport.RegisterResponder(
609+
http.MethodGet,
610+
`=~/v2/storage/branch/123/tables/in.bucket.bad_table/data-preview`,
611+
httpmock.NewStringResponder(500, "Internal Server Error"),
612+
)
613+
614+
samples, err := fetcher.FetchTableSamples(t.Context(), tables, 100, 10)
615+
616+
// Should return error indicating partial failure, but still return partial results.
617+
require.Error(t, err)
618+
assert.Contains(t, err.Error(), "failed to fetch samples for 1 of 2 tables")
619+
620+
// Should have 1 sample (failed table is skipped)
621+
assert.Len(t, samples, 1)
622+
assert.Equal(t, "in.bucket.good_table", samples[0].TableID.String())
623+
}

0 commit comments

Comments
 (0)