Skip to content

Commit 2bc647e

Browse files
authored
Merge pull request #2529 from keboola/devin/PSGO-184-1772005379-fix-file-close-lock-timeout
fix: use dedicated context for lock acquisition in closeFile (PSGO-184)
2 parents e6741c5 + 13264ac commit 2bc647e

2 files changed

Lines changed: 88 additions & 4 deletions

File tree

internal/pkg/service/stream/storage/node/coordinator/filerotation/operator.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -476,8 +476,12 @@ func (o *operator) closeFile(ctx context.Context, file *fileData) {
476476
dbCtx, dbCancel := context.WithTimeoutCause(context.WithoutCancel(ctx), dbOperationTimeout, errors.New("switch to importing timeout"))
477477
defer dbCancel()
478478

479-
// Lock all file operations
480-
lock, unlock, lockErr := clusterlock.LockFile(ctx, o.locks, o.logger, file.FileKey)
479+
// Lock all file operations.
480+
// Use a fresh context - waitForFileClosing may have consumed the FileCloseTimeout budget.
481+
lockCtx, lockCancel := context.WithTimeoutCause(context.WithoutCancel(ctx), dbOperationTimeout, errors.New("file close lock acquisition timeout"))
482+
defer lockCancel()
483+
484+
lock, unlock, lockErr := clusterlock.LockFile(lockCtx, o.locks, o.logger, file.FileKey)
481485
if lockErr != nil {
482486
o.logger.Errorf(ctx, `file close error: %s`, lockErr)
483487
return
@@ -507,8 +511,12 @@ func (o *operator) closeFile(ctx context.Context, file *fileData) {
507511

508512
// If there is an error, increment retry delay
509513
if err != nil {
510-
o.logger.Error(dbCtx, err.Error())
511-
fileEntity, rErr := o.storage.File().IncrementRetryAttempt(file.FileKey, o.clock.Now(), err.Error()).RequireLock(lock).Do(dbCtx).ResultOrErr()
514+
retryCtx, retryCancel := context.WithTimeoutCause(context.WithoutCancel(ctx), dbOperationTimeout, errors.New("retry increment timeout"))
515+
defer retryCancel()
516+
517+
o.logger.Error(ctx, err.Error())
518+
519+
fileEntity, rErr := o.storage.File().IncrementRetryAttempt(file.FileKey, o.clock.Now(), err.Error()).RequireLock(lock).Do(retryCtx).ResultOrErr()
512520
if rErr != nil {
513521
o.logger.Errorf(ctx, "cannot increment file close retry: %s", rErr)
514522
return

internal/pkg/service/stream/storage/node/coordinator/filerotation/operator_test.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ import (
3333
// fileExpirationDiff defines the time between file opening and file import trigger.
3434
const fileExpirationDiff = time.Minute
3535

36+
const shortFileCloseTimeout = 10 * time.Second
37+
3638
func TestFileRotation(t *testing.T) {
3739
t.Parallel()
3840

@@ -182,6 +184,71 @@ func TestFileRotation(t *testing.T) {
182184
ts.logger.AssertNoErrorMessage(t)
183185
}
184186

187+
// TestCloseFile_LockAcquisitionAfterWaitTimeout is a regression test for PSGO-184: when
188+
// waitForFileClosing exhausts the FileCloseTimeout, the subsequent lock acquisition must use
189+
// its own fresh context, not the expired one, so IncrementRetryAttempt can still be called.
190+
func TestCloseFile_LockAcquisitionAfterWaitTimeout(t *testing.T) {
191+
t.Parallel()
192+
193+
ctx, cancel := context.WithTimeout(t.Context(), 60*time.Second)
194+
defer cancel()
195+
196+
ts := setupWithCloseTimeout(t, ctx, shortFileCloseTimeout)
197+
defer ts.teardown(t)
198+
ts.prepareFixtures(t, ctx)
199+
200+
// Trigger file rotation to put the old file into FileClosing state.
201+
ts.clk.Advance(fileExpirationDiff)
202+
ts.triggerCheck(t, true, `
203+
{"level":"info","message":"rotating file, import conditions met: expiration threshold met, expiration: 2000-01-01T00:31:00.000Z, remains: %s, threshold: 30m0s","file.id":"%s","component":"storage.node.operator.file.rotation"}
204+
`)
205+
206+
files, err := ts.dependencies.StorageRepository().File().ListIn(ts.sink.SinkKey).Do(ctx).All()
207+
require.NoError(t, err)
208+
require.Len(t, files, 2)
209+
require.Equal(t, model.FileClosing, files[0].State)
210+
require.Equal(t, model.FileWriting, files[1].State)
211+
212+
slices, err := ts.dependencies.StorageRepository().Slice().ListIn(ts.sink.SinkKey).Do(ctx).All()
213+
require.NoError(t, err)
214+
require.Len(t, slices, 4)
215+
// ListIn returns slices in lexicographic order by etcd key (oldest file first).
216+
require.Equal(t, files[0].FileKey, slices[0].FileKey)
217+
require.Equal(t, model.SliceClosing, slices[0].State)
218+
require.Equal(t, files[0].FileKey, slices[1].FileKey)
219+
require.Equal(t, model.SliceClosing, slices[1].State)
220+
require.Equal(t, files[1].FileKey, slices[2].FileKey)
221+
require.Equal(t, model.SliceWriting, slices[2].State)
222+
require.Equal(t, files[1].FileKey, slices[3].FileKey)
223+
require.Equal(t, model.SliceWriting, slices[3].State)
224+
225+
closingFileKey := files[0].FileKey
226+
ts.logger.Truncate()
227+
228+
// Trigger a close attempt; waitForFileClosing will block until FileCloseTimeout (10s) expires.
229+
// Afterwards, lock acquisition and IncrementRetryAttempt must still succeed.
230+
// Uses real wall-clock time - the 10s timeout cannot be faked with clockwork.FakeClock.
231+
ts.clk.Advance(ts.interval)
232+
233+
assert.EventuallyWithT(t, func(c *assert.CollectT) {
234+
ts.logger.AssertJSONMessages(c, `
235+
{"level":"error","message":"error when waiting for file slices upload:\n- context deadline exceeded","component":"storage.node.operator.file.rotation"}
236+
{"level":"info","message":"file closing will be retried after %s","component":"storage.node.operator.file.rotation"}
237+
`)
238+
}, 15*time.Second, 100*time.Millisecond)
239+
240+
file, err := ts.dependencies.StorageRepository().File().Get(closingFileKey).Do(ctx).ResultOrErr()
241+
require.NoError(t, err)
242+
require.Equal(t, model.FileClosing, file.State)
243+
require.Positive(t, file.RetryAttempt, "retry attempt should be incremented, proving lock was acquired after waitForFileClosing timeout")
244+
245+
warnAndErrorLogs := ts.logger.WarnAndErrorMessages()
246+
assert.NotContains(t, warnAndErrorLogs, "cannot acquire lock", "lock acquisition should not fail when using a dedicated context")
247+
248+
ts.dependencies.Process().Shutdown(ctx, errors.New("bye bye"))
249+
ts.dependencies.Process().WaitForShutdown()
250+
}
251+
185252
type testState struct {
186253
interval time.Duration
187254
importTrigger targetConfig.ImportTrigger
@@ -196,6 +263,11 @@ type testState struct {
196263

197264
func setup(t *testing.T, ctx context.Context) *testState {
198265
t.Helper()
266+
return setupWithCloseTimeout(t, ctx, 0)
267+
}
268+
269+
func setupWithCloseTimeout(t *testing.T, ctx context.Context, closeTimeout time.Duration) *testState {
270+
t.Helper()
199271

200272
importTrigger := targetConfig.ImportTrigger{
201273
Count: 50000,
@@ -219,6 +291,9 @@ func setup(t *testing.T, ctx context.Context) *testState {
219291
Trigger: importTrigger,
220292
}
221293
cfg.Storage.Level.Target.Operator.FileRotationCheckInterval = duration.From(conditionsCheckInterval)
294+
if closeTimeout > 0 {
295+
cfg.Storage.Level.Target.Operator.FileCloseTimeout = duration.From(closeTimeout)
296+
}
222297
}, commonDeps.WithClock(clk))
223298
client := mock.TestEtcdClient()
224299

@@ -289,4 +364,5 @@ func (ts *testState) prepareFixtures(t *testing.T, ctx context.Context) {
289364
require.NoError(t, ts.dependencies.DefinitionRepository().Source().Create(&source, ts.clk.Now(), test.ByUser(), "create").Do(ctx).Err())
290365
require.NoError(t, ts.dependencies.DefinitionRepository().Sink().Create(&ts.sink, ts.clk.Now(), test.ByUser(), "create").Do(ctx).Err())
291366
ts.waitForFilesSync(t)
367+
ts.logger.Truncate()
292368
}

0 commit comments

Comments
 (0)