Skip to content

Commit 7a0471e

Browse files
authored
Merge pull request #2530 from keboola/devin/1772005948-psgo-181-gcs-upload-expired-credentials
fix(stream): check credential expiration before slice upload, handle non-retryable errors [PSGO-181]
2 parents fef67d4 + 5215e2f commit 7a0471e

13 files changed

Lines changed: 320 additions & 37 deletions

File tree

docs/e2e_tests.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ TEST_KBC_PROJECTS_LOCK_HOST=redis+tls://redis:6380
290290

291291
## Generate new unique ID
292292

293-
If a ENV placeholder in the form `^TEST_NEW_TICKET_\d+$` is found, it is replaced with new ID/ticket [generated by API](https://keboola.docs.apiary.io/#reference/tickets/generate-unique-id/generate-new-id).
293+
If a ENV placeholder in the form `^TEST_NEW_TICKET_\d+$` is found, it is replaced with new ID/ticket generated by API.
294294
- E.g. `%%TEST_NEW_TICKET_1%%`
295295
- The value is generated when the first occurrence is found.
296296
- All occurrences are replaced with the same value.

internal/pkg/model/object.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ func (m BranchMetadata) ToOrderedMap() *orderedmap.OrderedMap {
362362
return res
363363
}
364364

365-
// Branch https://keboola.docs.apiary.io/#reference/development-branches/branches/list-branches
365+
// Branch represents a development branch.
366366
type Branch struct {
367367
BranchKey
368368
Name string `json:"name" validate:"required" diff:"true" metaFile:"true"`
@@ -473,7 +473,7 @@ func (m ConfigMetadata) InstanceID() string {
473473
return m[instanceIDMetadataKey]
474474
}
475475

476-
// Config https://keboola.docs.apiary.io/#reference/components-and-configurations/component-configurations/list-configurations
476+
// Config represents a component configuration.
477477
type Config struct {
478478
ConfigKey
479479
Name string `json:"name" validate:"required" diff:"true" metaFile:"true"`
@@ -528,7 +528,7 @@ func (c *ConfigWithRows) ToAPIObject(changeDescription string, changedFields Cha
528528
return out, append(changedFields.Slice(), "changeDescription")
529529
}
530530

531-
// ConfigRow https://keboola.docs.apiary.io/#reference/components-and-configurations/component-configurations/list-configurations
531+
// ConfigRow represents a configuration row.
532532
type ConfigRow struct {
533533
ConfigRowKey
534534
Name string `json:"name" diff:"true" metaFile:"true"`

internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/bridge.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"time"
77

88
"github.com/dgraph-io/ristretto/v2"
9+
"github.com/jonboulle/clockwork"
910
"github.com/keboola/go-cloud-encrypt/pkg/cloudencrypt"
1011
"github.com/keboola/keboola-sdk-go/v2/pkg/keboola"
1112
etcd "go.etcd.io/etcd/client/v3"
@@ -45,6 +46,7 @@ const (
4546

4647
type Bridge struct {
4748
logger log.Logger
49+
clock clockwork.Clock
4850
config keboolasink.Config
4951
client etcd.KV
5052
schema schema.Schema
@@ -68,6 +70,7 @@ type jobData struct {
6870

6971
type dependencies interface {
7072
Logger() log.Logger
73+
Clock() clockwork.Clock
7174
EtcdClient() *etcd.Client
7275
EtcdSerde() *serde.Serde
7376
Process() *servicectx.Process
@@ -105,6 +108,7 @@ func New(d dependencies, apiProvider apiProvider, config keboolasink.Config) (*B
105108

106109
b := &Bridge{
107110
logger: d.Logger().WithComponent("keboola.bridge"),
111+
clock: d.Clock(),
108112
config: config,
109113
client: d.EtcdClient(),
110114
schema: schema.New(d.EtcdSerde()),

internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/file.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ func (b *Bridge) deleteCredentialsOnFileDelete() {
144144
}
145145

146146
func (b *Bridge) importFile(ctx context.Context, file plugin.File, stats statistics.Value) error {
147-
start := time.Now()
147+
start := b.clock.Now()
148148

149149
// Get authorization token
150150
existingToken, err := b.schema.Token().ForSink(file.SinkKey).GetOrErr(b.client).Do(ctx).ResultOrErr()

internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/slice.go

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ import (
1515
"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
1616
)
1717

18+
// ErrCredentialsExpired is returned when the upload credentials for a file have expired.
19+
// This is a non-retryable error because the credentials will never become valid again without external intervention.
20+
var ErrCredentialsExpired = errors.New("upload credentials have expired")
21+
1822
func (b *Bridge) uploadSlice(ctx context.Context, volume *diskreader.Volume, slice plugin.Slice, stats statistics.Value) error {
1923
// Skip upload if the slice is empty.
2024
// The state is anyway switched to the SliceUploaded by the operator.
@@ -23,20 +27,47 @@ func (b *Bridge) uploadSlice(ctx context.Context, volume *diskreader.Volume, sli
2327
return nil
2428
}
2529

26-
start := time.Now()
30+
start := b.clock.Now()
2731

2832
reader, err := volume.OpenReader(slice.SliceKey, slice.LocalStorage, slice.EncodingCompression, slice.StagingStorage.Compression)
2933
if err != nil {
3034
b.logger.Warnf(ctx, "unable to open reader: %v", err)
3135
return err
3236
}
3337

38+
// Error when closing the reader is not a fatal error
39+
// Register this defer immediately after opening to prevent resource leaks on early returns
40+
defer func() {
41+
err := reader.Close(ctx)
42+
if err != nil {
43+
b.logger.Warnf(ctx, "unable to close reader: %v", err)
44+
return
45+
}
46+
}()
47+
3448
// Get authorization token
3549
existingToken, err := b.schema.Token().ForSink(slice.SinkKey).GetOrErr(b.client).Do(ctx).ResultOrErr()
3650
if err != nil {
3751
return err
3852
}
3953

54+
// Get file details to check expiration before decryption
55+
keboolaFile, err := b.schema.File().ForFile(slice.FileKey).GetOrErr(b.client).Do(ctx).ResultOrErr()
56+
if err != nil {
57+
return err
58+
}
59+
60+
// Check credential expiration before decryption to avoid unnecessary crypto/KMS operations.
61+
// Expired credentials will never succeed, so we fail fast with a non-retryable error
62+
// to avoid wasting API calls and generating excessive error logs.
63+
expiration := keboolaFile.Expiration()
64+
if !expiration.IsZero() && b.clock.Now().After(expiration.Time()) {
65+
return model.NewNonRetryableError(errors.Errorf(
66+
"%w: credentials for file %s expired at %s",
67+
ErrCredentialsExpired, slice.FileKey.String(), expiration.String(),
68+
))
69+
}
70+
4071
// Prepare encryption metadata
4172
metadata := cloudencrypt.Metadata{"sink": slice.SinkKey.String()}
4273

@@ -62,21 +93,6 @@ func (b *Bridge) uploadSlice(ctx context.Context, volume *diskreader.Volume, sli
6293
}
6394
}()
6495

65-
// Error when closing the reader is not a fatal error
66-
defer func() {
67-
err := reader.Close(ctx)
68-
if err != nil {
69-
b.logger.Warnf(ctx, "unable to close reader: %v", err)
70-
return
71-
}
72-
}()
73-
74-
// Get file details
75-
keboolaFile, err := b.schema.File().ForFile(slice.FileKey).GetOrErr(b.client).Do(ctx).ResultOrErr()
76-
if err != nil {
77-
return err
78-
}
79-
8096
// Decrypt file upload credentials
8197
var credentials keboola.FileUploadCredentials
8298
if keboolaFile.EncryptedCredentials != "" {
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package model
2+
3+
// NonRetryableError wraps an error to indicate that the operation should not be retried.
4+
// For example, expired credentials will never succeed on retry without external intervention.
5+
type NonRetryableError struct {
6+
Err error
7+
}
8+
9+
func NewNonRetryableError(err error) *NonRetryableError {
10+
return &NonRetryableError{Err: err}
11+
}
12+
13+
func (e *NonRetryableError) Error() string {
14+
return e.Err.Error()
15+
}
16+
17+
func (e *NonRetryableError) Unwrap() error {
18+
return e.Err
19+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package model_test
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
8+
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model"
9+
"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
10+
)
11+
12+
func TestNonRetryableError_Error(t *testing.T) {
13+
t.Parallel()
14+
15+
baseErr := errors.New("test error message")
16+
nonRetryableErr := model.NewNonRetryableError(baseErr)
17+
18+
assert.Equal(t, "test error message", nonRetryableErr.Error())
19+
}
20+
21+
func TestNonRetryableError_Unwrap(t *testing.T) {
22+
t.Parallel()
23+
24+
baseErr := errors.New("wrapped error")
25+
nonRetryableErr := model.NewNonRetryableError(baseErr)
26+
27+
assert.Equal(t, baseErr, nonRetryableErr.Unwrap())
28+
}
29+
30+
func TestNonRetryableError_ErrorsAs(t *testing.T) {
31+
t.Parallel()
32+
33+
// Test that errors.As correctly identifies NonRetryableError through wrapping
34+
baseErr := errors.New("base error")
35+
nonRetryableErr := model.NewNonRetryableError(baseErr)
36+
wrappedErr := errors.Errorf("wrapped: %w", nonRetryableErr)
37+
38+
var target *model.NonRetryableError
39+
assert.True(t, errors.As(wrappedErr, &target))
40+
assert.Equal(t, nonRetryableErr, target)
41+
assert.Equal(t, "base error", target.Err.Error())
42+
}
43+
44+
func TestNonRetryableError_ErrorsAs_Negative(t *testing.T) {
45+
t.Parallel()
46+
47+
// Test that a regular error is not identified as NonRetryableError
48+
regularErr := errors.New("regular error")
49+
50+
var target *model.NonRetryableError
51+
assert.False(t, errors.As(regularErr, &target))
52+
assert.Nil(t, target)
53+
}

internal/pkg/service/stream/storage/model/repository/slice/slice_retry.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,13 @@ func (r *Repository) IncrementRetryAttempt(sliceKey model.SliceKey, now time.Tim
1515
return slice, nil
1616
})
1717
}
18+
19+
// IncrementNonRetryableAttempt increments retry attempt with a fixed interval instead of exponential backoff.
20+
// This is used for errors that will never succeed (e.g., expired credentials) but should still be
21+
// periodically reported in logs to keep the issue visible.
22+
func (r *Repository) IncrementNonRetryableAttempt(sliceKey model.SliceKey, now time.Time, reason string, fixedInterval time.Duration) *op.AtomicOp[model.Slice] {
23+
return r.update(sliceKey, now, func(slice model.Slice) (model.Slice, error) {
24+
slice.IncrementNonRetryableAttempt(now, reason, fixedInterval)
25+
return slice, nil
26+
})
27+
}

internal/pkg/service/stream/storage/model/retry.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,23 @@ func (v *Retryable) IncrementRetryAttempt(backoff RetryBackoff, failedAt time.Ti
9393
v.RetryAfter = &retryAfterUTC
9494
}
9595

96+
// IncrementNonRetryableAttempt increments the retry attempt counter using a fixed interval
97+
// instead of exponential backoff. This is used for errors that will never succeed (e.g., expired credentials)
98+
// but should still be periodically reported in logs.
99+
func (v *Retryable) IncrementNonRetryableAttempt(failedAt time.Time, reason string, fixedInterval time.Duration) {
100+
v.RetryAttempt += 1
101+
v.RetryReason = reason
102+
103+
failedAtUTC := utctime.From(failedAt)
104+
if v.FirstFailedAt == nil {
105+
v.FirstFailedAt = &failedAtUTC
106+
}
107+
v.LastFailedAt = &failedAtUTC
108+
109+
retryAfterUTC := utctime.From(failedAt.Add(fixedInterval))
110+
v.RetryAfter = &retryAfterUTC
111+
}
112+
96113
func (v *Retryable) ResetRetry() {
97114
v.RetryAttempt = 0
98115
v.RetryReason = ""

internal/pkg/service/stream/storage/model/retry_test.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,3 +238,99 @@ func TestRetryBackoff_RetryAt_Random(t *testing.T) {
238238
now = retryAt
239239
}
240240
}
241+
242+
func TestRetryable_IncrementNonRetryableAttempt(t *testing.T) {
243+
t.Parallel()
244+
245+
fixedInterval := 2 * time.Hour
246+
v := Retryable{}
247+
248+
// First attempt
249+
v.IncrementNonRetryableAttempt(utctime.MustParse("2000-01-01T00:00:00.000Z").Time(), "credential expired", fixedInterval)
250+
assert.Equal(t, Retryable{
251+
RetryAttempt: 1,
252+
RetryReason: "credential expired",
253+
FirstFailedAt: ptr.Ptr(utctime.MustParse("2000-01-01T00:00:00.000Z")),
254+
LastFailedAt: ptr.Ptr(utctime.MustParse("2000-01-01T00:00:00.000Z")),
255+
RetryAfter: ptr.Ptr(utctime.MustParse("2000-01-01T02:00:00.000Z")), // +2 hours
256+
}, v)
257+
258+
// Second attempt - FirstFailedAt should remain unchanged
259+
v.IncrementNonRetryableAttempt(utctime.MustParse("2000-01-01T02:00:00.000Z").Time(), "credential expired", fixedInterval)
260+
assert.Equal(t, Retryable{
261+
RetryAttempt: 2,
262+
RetryReason: "credential expired",
263+
FirstFailedAt: ptr.Ptr(utctime.MustParse("2000-01-01T00:00:00.000Z")), // unchanged
264+
LastFailedAt: ptr.Ptr(utctime.MustParse("2000-01-01T02:00:00.000Z")),
265+
RetryAfter: ptr.Ptr(utctime.MustParse("2000-01-01T04:00:00.000Z")), // +2 hours (fixed)
266+
}, v)
267+
268+
// Third attempt - still fixed interval
269+
v.IncrementNonRetryableAttempt(utctime.MustParse("2000-01-01T04:00:00.000Z").Time(), "credential expired", fixedInterval)
270+
assert.Equal(t, Retryable{
271+
RetryAttempt: 3,
272+
RetryReason: "credential expired",
273+
FirstFailedAt: ptr.Ptr(utctime.MustParse("2000-01-01T00:00:00.000Z")), // unchanged
274+
LastFailedAt: ptr.Ptr(utctime.MustParse("2000-01-01T04:00:00.000Z")),
275+
RetryAfter: ptr.Ptr(utctime.MustParse("2000-01-01T06:00:00.000Z")), // +2 hours (fixed)
276+
}, v)
277+
}
278+
279+
func TestRetryable_IncrementNonRetryableAttempt_VsExponential(t *testing.T) {
280+
t.Parallel()
281+
282+
// This test demonstrates the difference between exponential backoff (IncrementRetryAttempt)
283+
// and fixed interval (IncrementNonRetryableAttempt)
284+
285+
backoff := NoRandomizationBackoff()
286+
fixedInterval := 2 * time.Hour
287+
288+
exponential := Retryable{}
289+
fixed := Retryable{}
290+
291+
baseTime := utctime.MustParse("2000-01-01T00:00:00.000Z").Time()
292+
293+
// First attempt - both use their base interval
294+
exponential.IncrementRetryAttempt(backoff, baseTime, "retryable error")
295+
fixed.IncrementNonRetryableAttempt(baseTime, "non-retryable error", fixedInterval)
296+
297+
assert.Equal(t, "2000-01-01T00:02:00.000Z", exponential.RetryAfter.String()) // +2 min (exponential base)
298+
assert.Equal(t, "2000-01-01T02:00:00.000Z", fixed.RetryAfter.String()) // +2 hours (fixed)
299+
300+
// Second attempt - exponential increases, fixed stays same
301+
exponential.IncrementRetryAttempt(backoff, baseTime.Add(2*time.Minute), "retryable error")
302+
fixed.IncrementNonRetryableAttempt(baseTime.Add(2*time.Hour), "non-retryable error", fixedInterval)
303+
304+
assert.Equal(t, "2000-01-01T00:10:00.000Z", exponential.RetryAfter.String()) // +8 min (exponential x4)
305+
assert.Equal(t, "2000-01-01T04:00:00.000Z", fixed.RetryAfter.String()) // +2 hours (fixed)
306+
307+
// Third attempt - exponential continues to grow, fixed stays same
308+
exponential.IncrementRetryAttempt(backoff, baseTime.Add(10*time.Minute), "retryable error")
309+
fixed.IncrementNonRetryableAttempt(baseTime.Add(4*time.Hour), "non-retryable error", fixedInterval)
310+
311+
assert.Equal(t, "2000-01-01T00:42:00.000Z", exponential.RetryAfter.String()) // +32 min (exponential x4)
312+
assert.Equal(t, "2000-01-01T06:00:00.000Z", fixed.RetryAfter.String()) // +2 hours (fixed)
313+
314+
// Verify FirstFailedAt remains unchanged for both
315+
assert.Equal(t, baseTime, exponential.FirstFailedAt.Time())
316+
assert.Equal(t, baseTime, fixed.FirstFailedAt.Time())
317+
}
318+
319+
func TestRetryable_IncrementNonRetryableAttempt_TerminalInterval(t *testing.T) {
320+
t.Parallel()
321+
322+
// Test with a very long "terminal" interval (simulating max attempts reached)
323+
terminalInterval := 10 * 365 * 24 * time.Hour // ~10 years
324+
v := Retryable{}
325+
326+
baseTime := utctime.MustParse("2000-01-01T00:00:00.000Z").Time()
327+
v.IncrementNonRetryableAttempt(baseTime, "max attempts reached", terminalInterval)
328+
329+
// Verify far-future retry time (should be at least 9 years in the future)
330+
expectedRetryAfter := utctime.From(baseTime.Add(terminalInterval))
331+
assert.Equal(t, expectedRetryAfter.String(), v.RetryAfter.String())
332+
333+
// Verify it's far in the future (more than 9 years)
334+
nineYearsLater := baseTime.Add(9 * 365 * 24 * time.Hour)
335+
assert.True(t, v.RetryAfter.Time().After(nineYearsLater), "RetryAfter should be more than 9 years in the future")
336+
}

0 commit comments

Comments
 (0)