Skip to content

Commit 5c10e23

Browse files
Matovidloclaude
andcommitted
fix(stream): prevent reader leak and ensure timestamp consistency in error handling
- Move defer reader.Close() immediately after OpenReader to prevent resource leaks on early returns - Capture clock.Now() once in terminal retry handling to ensure timestamp consistency between etcd and logs Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent 8cf7b35 commit 5c10e23

2 files changed

Lines changed: 14 additions & 11 deletions

File tree

  • internal/pkg/service/stream

internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/slice.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,16 @@ func (b *Bridge) uploadSlice(ctx context.Context, volume *diskreader.Volume, sli
3535
return err
3636
}
3737

38+
// Error when closing the reader is not a fatal error
39+
// Register this defer immediately after opening to prevent resource leaks on early returns
40+
defer func() {
41+
err := reader.Close(ctx)
42+
if err != nil {
43+
b.logger.Warnf(ctx, "unable to close reader: %v", err)
44+
return
45+
}
46+
}()
47+
3848
// Get authorization token
3949
existingToken, err := b.schema.Token().ForSink(slice.SinkKey).GetOrErr(b.client).Do(ctx).ResultOrErr()
4050
if err != nil {
@@ -83,15 +93,6 @@ func (b *Bridge) uploadSlice(ctx context.Context, volume *diskreader.Volume, sli
8393
}
8494
}()
8595

86-
// Error when closing the reader is not a fatal error
87-
defer func() {
88-
err := reader.Close(ctx)
89-
if err != nil {
90-
b.logger.Warnf(ctx, "unable to close reader: %v", err)
91-
return
92-
}
93-
}()
94-
9596
// Decrypt file upload credentials
9697
var credentials keboola.FileUploadCredentials
9798
if keboolaFile.EncryptedCredentials != "" {

internal/pkg/service/stream/storage/node/readernode/sliceupload/operator.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -337,14 +337,16 @@ func (o *operator) handleUploadError(ctx context.Context, slice *sliceData, err
337337
dbCtx, dbCancel := context.WithTimeoutCause(context.WithoutCancel(ctx), dbOperationTimeout, errors.New("terminal retry update timeout"))
338338
defer dbCancel()
339339

340-
_, rErr := o.storage.Slice().IncrementNonRetryableAttempt(slice.SliceKey, o.clock.Now(), err.Error(), terminalRetryInterval).Do(dbCtx).ResultOrErr()
340+
// Capture the timestamp once to ensure consistency between etcd update and log message
341+
now := o.clock.Now()
342+
_, rErr := o.storage.Slice().IncrementNonRetryableAttempt(slice.SliceKey, now, err.Error(), terminalRetryInterval).Do(dbCtx).ResultOrErr()
341343
if rErr != nil {
342344
o.logger.Errorf(ctx, "cannot update slice with terminal retry interval: %s", rErr)
343345
return
344346
}
345347

346348
o.logger.Infof(ctx, "slice retry disabled until %s, requires manual intervention",
347-
o.clock.Now().Add(terminalRetryInterval).Format(time.RFC3339))
349+
now.Add(terminalRetryInterval).Format(time.RFC3339))
348350
return
349351
}
350352

0 commit comments

Comments
 (0)