Skip to content

Commit 8cf7b35

Browse files
Matovidloclaude
andcommitted
fix(stream): address PR #2530 review comments - terminal state, clock injection, error messages, tests
This commit addresses all review feedback from PR #2530: 1. Fix terminal state loop by updating etcd with far-future RetryAfter when max attempts reached 2. Inject clock into Bridge to make expiration check testable 3. Move expiration check before decryption to avoid unnecessary crypto ops 4. Fix error messages (file import -> slice upload) 5. Add comprehensive test coverage for NonRetryableError and IncrementNonRetryableAttempt Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent 055a5a4 commit 8cf7b35

6 files changed

Lines changed: 195 additions & 23 deletions

File tree

internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/bridge.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"time"
77

88
"github.com/dgraph-io/ristretto/v2"
9+
"github.com/jonboulle/clockwork"
910
"github.com/keboola/go-cloud-encrypt/pkg/cloudencrypt"
1011
"github.com/keboola/keboola-sdk-go/v2/pkg/keboola"
1112
etcd "go.etcd.io/etcd/client/v3"
@@ -45,6 +46,7 @@ const (
4546

4647
type Bridge struct {
4748
logger log.Logger
49+
clock clockwork.Clock
4850
config keboolasink.Config
4951
client etcd.KV
5052
schema schema.Schema
@@ -68,6 +70,7 @@ type jobData struct {
6870

6971
type dependencies interface {
7072
Logger() log.Logger
73+
Clock() clockwork.Clock
7174
EtcdClient() *etcd.Client
7275
EtcdSerde() *serde.Serde
7376
Process() *servicectx.Process
@@ -105,6 +108,7 @@ func New(d dependencies, apiProvider apiProvider, config keboolasink.Config) (*B
105108

106109
b := &Bridge{
107110
logger: d.Logger().WithComponent("keboola.bridge"),
111+
clock: d.Clock(),
108112
config: config,
109113
client: d.EtcdClient(),
110114
schema: schema.New(d.EtcdSerde()),

internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/file.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ func (b *Bridge) deleteCredentialsOnFileDelete() {
145145
}
146146

147147
func (b *Bridge) importFile(ctx context.Context, file plugin.File, stats statistics.Value) error {
148-
start := time.Now()
148+
start := b.clock.Now()
149149

150150
// Get authorization token
151151
existingToken, err := b.schema.Token().ForSink(file.SinkKey).GetOrErr(b.client).Do(ctx).ResultOrErr()

internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/slice.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func (b *Bridge) uploadSlice(ctx context.Context, volume *diskreader.Volume, sli
2727
return nil
2828
}
2929

30-
start := time.Now()
30+
start := b.clock.Now()
3131

3232
reader, err := volume.OpenReader(slice.SliceKey, slice.LocalStorage, slice.EncodingCompression, slice.StagingStorage.Compression)
3333
if err != nil {
@@ -41,6 +41,23 @@ func (b *Bridge) uploadSlice(ctx context.Context, volume *diskreader.Volume, sli
4141
return err
4242
}
4343

44+
// Get file details to check expiration before decryption
45+
keboolaFile, err := b.schema.File().ForFile(slice.FileKey).GetOrErr(b.client).Do(ctx).ResultOrErr()
46+
if err != nil {
47+
return err
48+
}
49+
50+
// Check credential expiration before decryption to avoid unnecessary crypto/KMS operations.
51+
// Expired credentials will never succeed, so we fail fast with a non-retryable error
52+
// to avoid wasting API calls and generating excessive error logs.
53+
expiration := keboolaFile.Expiration()
54+
if !expiration.IsZero() && b.clock.Now().After(expiration.Time()) {
55+
return model.NewNonRetryableError(errors.Errorf(
56+
"%w: credentials for file %s expired at %s",
57+
ErrCredentialsExpired, slice.FileKey.String(), expiration.String(),
58+
))
59+
}
60+
4461
// Prepare encryption metadata
4562
metadata := cloudencrypt.Metadata{"sink": slice.SinkKey.String()}
4663

@@ -75,12 +92,6 @@ func (b *Bridge) uploadSlice(ctx context.Context, volume *diskreader.Volume, sli
7592
}
7693
}()
7794

78-
// Get file details
79-
keboolaFile, err := b.schema.File().ForFile(slice.FileKey).GetOrErr(b.client).Do(ctx).ResultOrErr()
80-
if err != nil {
81-
return err
82-
}
83-
8495
// Decrypt file upload credentials
8596
var credentials keboola.FileUploadCredentials
8697
if keboolaFile.EncryptedCredentials != "" {
@@ -97,17 +108,6 @@ func (b *Bridge) uploadSlice(ctx context.Context, volume *diskreader.Volume, sli
97108
credentials = *keboolaFile.UploadCredentials
98109
}
99110

100-
// Check credential expiration before attempting upload.
101-
// Expired credentials will never succeed, so we fail fast with a non-retryable error
102-
// to avoid wasting API calls and generating excessive error logs.
103-
expiration := keboolaFile.Expiration()
104-
if !expiration.IsZero() && time.Now().After(expiration.Time()) {
105-
return model.NewNonRetryableError(errors.Errorf(
106-
"%w: credentials for file %s expired at %s",
107-
ErrCredentialsExpired, slice.FileKey.String(), expiration.String(),
108-
))
109-
}
110-
111111
// Upload slice
112112
uploader, err := keboola.NewUploadSliceWriter(ctx, &credentials, slice.StagingStorage.Path)
113113
if err != nil {
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package model_test
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
8+
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model"
9+
"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
10+
)
11+
12+
func TestNonRetryableError_Error(t *testing.T) {
13+
t.Parallel()
14+
15+
baseErr := errors.New("test error message")
16+
nonRetryableErr := model.NewNonRetryableError(baseErr)
17+
18+
assert.Equal(t, "test error message", nonRetryableErr.Error())
19+
}
20+
21+
func TestNonRetryableError_Unwrap(t *testing.T) {
22+
t.Parallel()
23+
24+
baseErr := errors.New("wrapped error")
25+
nonRetryableErr := model.NewNonRetryableError(baseErr)
26+
27+
assert.Equal(t, baseErr, nonRetryableErr.Unwrap())
28+
}
29+
30+
func TestNonRetryableError_ErrorsAs(t *testing.T) {
31+
t.Parallel()
32+
33+
// Test that errors.As correctly identifies NonRetryableError through wrapping
34+
baseErr := errors.New("base error")
35+
nonRetryableErr := model.NewNonRetryableError(baseErr)
36+
wrappedErr := errors.Errorf("wrapped: %w", nonRetryableErr)
37+
38+
var target *model.NonRetryableError
39+
assert.True(t, errors.As(wrappedErr, &target))
40+
assert.Equal(t, nonRetryableErr, target)
41+
assert.Equal(t, "base error", target.Err.Error())
42+
}
43+
44+
func TestNonRetryableError_ErrorsAs_Negative(t *testing.T) {
45+
t.Parallel()
46+
47+
// Test that a regular error is not identified as NonRetryableError
48+
regularErr := errors.New("regular error")
49+
50+
var target *model.NonRetryableError
51+
assert.False(t, errors.As(regularErr, &target))
52+
assert.Nil(t, target)
53+
}

internal/pkg/service/stream/storage/model/retry_test.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,3 +239,99 @@ func TestRetryBackoff_RetryAt_Random(t *testing.T) {
239239
now = retryAt
240240
}
241241
}
242+
243+
func TestRetryable_IncrementNonRetryableAttempt(t *testing.T) {
244+
t.Parallel()
245+
246+
fixedInterval := 2 * time.Hour
247+
v := Retryable{}
248+
249+
// First attempt
250+
v.IncrementNonRetryableAttempt(utctime.MustParse("2000-01-01T00:00:00.000Z").Time(), "credential expired", fixedInterval)
251+
assert.Equal(t, Retryable{
252+
RetryAttempt: 1,
253+
RetryReason: "credential expired",
254+
FirstFailedAt: ptr.Ptr(utctime.MustParse("2000-01-01T00:00:00.000Z")),
255+
LastFailedAt: ptr.Ptr(utctime.MustParse("2000-01-01T00:00:00.000Z")),
256+
RetryAfter: ptr.Ptr(utctime.MustParse("2000-01-01T02:00:00.000Z")), // +2 hours
257+
}, v)
258+
259+
// Second attempt - FirstFailedAt should remain unchanged
260+
v.IncrementNonRetryableAttempt(utctime.MustParse("2000-01-01T02:00:00.000Z").Time(), "credential expired", fixedInterval)
261+
assert.Equal(t, Retryable{
262+
RetryAttempt: 2,
263+
RetryReason: "credential expired",
264+
FirstFailedAt: ptr.Ptr(utctime.MustParse("2000-01-01T00:00:00.000Z")), // unchanged
265+
LastFailedAt: ptr.Ptr(utctime.MustParse("2000-01-01T02:00:00.000Z")),
266+
RetryAfter: ptr.Ptr(utctime.MustParse("2000-01-01T04:00:00.000Z")), // +2 hours (fixed)
267+
}, v)
268+
269+
// Third attempt - still fixed interval
270+
v.IncrementNonRetryableAttempt(utctime.MustParse("2000-01-01T04:00:00.000Z").Time(), "credential expired", fixedInterval)
271+
assert.Equal(t, Retryable{
272+
RetryAttempt: 3,
273+
RetryReason: "credential expired",
274+
FirstFailedAt: ptr.Ptr(utctime.MustParse("2000-01-01T00:00:00.000Z")), // unchanged
275+
LastFailedAt: ptr.Ptr(utctime.MustParse("2000-01-01T04:00:00.000Z")),
276+
RetryAfter: ptr.Ptr(utctime.MustParse("2000-01-01T06:00:00.000Z")), // +2 hours (fixed)
277+
}, v)
278+
}
279+
280+
func TestRetryable_IncrementNonRetryableAttempt_VsExponential(t *testing.T) {
281+
t.Parallel()
282+
283+
// This test demonstrates the difference between exponential backoff (IncrementRetryAttempt)
284+
// and fixed interval (IncrementNonRetryableAttempt)
285+
286+
backoff := NoRandomizationBackoff()
287+
fixedInterval := 2 * time.Hour
288+
289+
exponential := Retryable{}
290+
fixed := Retryable{}
291+
292+
baseTime := utctime.MustParse("2000-01-01T00:00:00.000Z").Time()
293+
294+
// First attempt - both use their base interval
295+
exponential.IncrementRetryAttempt(backoff, baseTime, "retryable error")
296+
fixed.IncrementNonRetryableAttempt(baseTime, "non-retryable error", fixedInterval)
297+
298+
assert.Equal(t, "2000-01-01T00:02:00.000Z", exponential.RetryAfter.String()) // +2 min (exponential base)
299+
assert.Equal(t, "2000-01-01T02:00:00.000Z", fixed.RetryAfter.String()) // +2 hours (fixed)
300+
301+
// Second attempt - exponential increases, fixed stays same
302+
exponential.IncrementRetryAttempt(backoff, baseTime.Add(2*time.Minute), "retryable error")
303+
fixed.IncrementNonRetryableAttempt(baseTime.Add(2*time.Hour), "non-retryable error", fixedInterval)
304+
305+
assert.Equal(t, "2000-01-01T00:10:00.000Z", exponential.RetryAfter.String()) // +8 min (exponential x4)
306+
assert.Equal(t, "2000-01-01T04:00:00.000Z", fixed.RetryAfter.String()) // +2 hours (fixed)
307+
308+
// Third attempt - exponential continues to grow, fixed stays same
309+
exponential.IncrementRetryAttempt(backoff, baseTime.Add(10*time.Minute), "retryable error")
310+
fixed.IncrementNonRetryableAttempt(baseTime.Add(4*time.Hour), "non-retryable error", fixedInterval)
311+
312+
assert.Equal(t, "2000-01-01T00:42:00.000Z", exponential.RetryAfter.String()) // +32 min (exponential x4)
313+
assert.Equal(t, "2000-01-01T06:00:00.000Z", fixed.RetryAfter.String()) // +2 hours (fixed)
314+
315+
// Verify FirstFailedAt remains unchanged for both
316+
assert.Equal(t, baseTime, exponential.FirstFailedAt.Time())
317+
assert.Equal(t, baseTime, fixed.FirstFailedAt.Time())
318+
}
319+
320+
func TestRetryable_IncrementNonRetryableAttempt_TerminalInterval(t *testing.T) {
321+
t.Parallel()
322+
323+
// Test with a very long "terminal" interval (simulating max attempts reached)
324+
terminalInterval := 10 * 365 * 24 * time.Hour // ~10 years
325+
v := Retryable{}
326+
327+
baseTime := utctime.MustParse("2000-01-01T00:00:00.000Z").Time()
328+
v.IncrementNonRetryableAttempt(baseTime, "max attempts reached", terminalInterval)
329+
330+
// Verify far-future retry time (should be at least 9 years in the future)
331+
expectedRetryAfter := utctime.From(baseTime.Add(terminalInterval))
332+
assert.Equal(t, expectedRetryAfter.String(), v.RetryAfter.String())
333+
334+
// Verify it's far in the future (more than 9 years)
335+
nineYearsLater := baseTime.Add(9 * 365 * 24 * time.Hour)
336+
assert.True(t, v.RetryAfter.Time().After(nineYearsLater), "RetryAfter should be more than 9 years in the future")
337+
}

internal/pkg/service/stream/storage/node/readernode/sliceupload/operator.go

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,16 @@ const dbOperationTimeout = 30 * time.Second
4242
const nonRetryableRetryInterval = 2 * time.Hour
4343

4444
// maxNonRetryableAttempts limits the number of retry reports for non-retryable errors.
45-
// After this many attempts, the slice stops being retried. With a 2-hour interval,
46-
// 50 attempts span ~4 days, giving operators enough time to intervene.
45+
// After this many attempts, the slice stops being retried by setting RetryAfter to a
46+
// far-future value (~10 years). With a 2-hour interval, 50 attempts span ~4 days,
47+
// giving operators enough time to intervene before retries are disabled.
4748
const maxNonRetryableAttempts = 50
4849

50+
// terminalRetryInterval is used when maxNonRetryableAttempts is reached.
51+
// This far-future interval (10 years) effectively stops retry attempts while
52+
// keeping the slice in a queryable state in etcd for operators to investigate.
53+
const terminalRetryInterval = 10 * 365 * 24 * time.Hour
54+
4955
type operator struct {
5056
config stagingConfig.OperatorConfig
5157
clock clockwork.Clock
@@ -326,6 +332,19 @@ func (o *operator) handleUploadError(ctx context.Context, slice *sliceData, err
326332
// If non-retryable and max attempts reached, log and stop retrying.
327333
if isNonRetryable && slice.Retry.RetryAttempt >= maxNonRetryableAttempts {
328334
o.logger.Errorf(ctx, "slice upload permanently failed after %d attempts: %s", slice.Retry.RetryAttempt, err.Error())
335+
336+
// Update etcd with a far-future RetryAfter to prevent retry after pod restarts
337+
dbCtx, dbCancel := context.WithTimeoutCause(context.WithoutCancel(ctx), dbOperationTimeout, errors.New("terminal retry update timeout"))
338+
defer dbCancel()
339+
340+
_, rErr := o.storage.Slice().IncrementNonRetryableAttempt(slice.SliceKey, o.clock.Now(), err.Error(), terminalRetryInterval).Do(dbCtx).ResultOrErr()
341+
if rErr != nil {
342+
o.logger.Errorf(ctx, "cannot update slice with terminal retry interval: %s", rErr)
343+
return
344+
}
345+
346+
o.logger.Infof(ctx, "slice retry disabled until %s, requires manual intervention",
347+
o.clock.Now().Add(terminalRetryInterval).Format(time.RFC3339))
329348
return
330349
}
331350

@@ -348,7 +367,7 @@ func (o *operator) handleUploadError(ctx context.Context, slice *sliceData, err
348367
func (o *operator) handleNonRetryableError(ctx context.Context, dbCtx context.Context, slice *sliceData, err error) {
349368
sliceEntity, rErr := o.storage.Slice().IncrementNonRetryableAttempt(slice.SliceKey, o.clock.Now(), err.Error(), nonRetryableRetryInterval).Do(dbCtx).ResultOrErr()
350369
if rErr != nil {
351-
o.logger.Errorf(ctx, "cannot increment file import retry: %s", rErr)
370+
o.logger.Errorf(ctx, "cannot increment slice upload retry: %s", rErr)
352371
return
353372
}
354373
o.logger.Warnf(ctx, "slice upload failed with non-retryable error (attempt %d/%d), will report again after %s",
@@ -359,7 +378,7 @@ func (o *operator) handleNonRetryableError(ctx context.Context, dbCtx context.Co
359378
func (o *operator) handleRetryableError(ctx context.Context, dbCtx context.Context, slice *sliceData, err error) {
360379
sliceEntity, rErr := o.storage.Slice().IncrementRetryAttempt(slice.SliceKey, o.clock.Now(), err.Error()).Do(dbCtx).ResultOrErr()
361380
if rErr != nil {
362-
o.logger.Errorf(ctx, "cannot increment file import retry: %s", rErr)
381+
o.logger.Errorf(ctx, "cannot increment slice upload retry: %s", rErr)
363382
return
364383
}
365384
o.logger.Infof(ctx, "slice upload will be retried after %q", sliceEntity.RetryAfter.String())

0 commit comments

Comments
 (0)