downstreamadapter: preserve remove upgrade during close#4815
downstreamadapter: preserve remove upgrade during close#4815hongyunyan wants to merge 16 commits intopingcap:masterfrom
Conversation
|
Adding the "do-not-merge/release-note-label-needed" label because no release-note block was detected, please follow our release note process to remove it. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 32 minutes and 37 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (20)
📝 WalkthroughWalkthroughThis PR refactors changefeed removal cleanup to execute asynchronously using atomic flags and scheduled background tasks. The pending message queue semantics are updated to return messages directly from Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant DispatcherManager
participant AtomicFlags
participant Scheduler
participant RedoMeta
participant MySQLSink
User->>DispatcherManager: TryClose(removeChangefeed=true)
DispatcherManager->>AtomicFlags: Set removeChangefeedRequested=true
DispatcherManager->>Scheduler: Schedule cleanup task
DispatcherManager->>User: Return success
Scheduler->>Scheduler: Wait for removal condition
Scheduler->>AtomicFlags: Check removeChangefeedRequested & !removeChangefeedCleaned
alt Redo Mode
Scheduler->>RedoMeta: closeRedoMeta(removeChangefeed=true)
RedoMeta->>RedoMeta: Cleanup(context.Background())
RedoMeta-->>Scheduler: Return error or nil
else MySQL Sink
Scheduler->>MySQLSink: CleanupRemovedChangefeed()
MySQLSink->>MySQLSink: Create temp DB connection
MySQLSink->>MySQLSink: RemoveDDLTsItem()
MySQLSink-->>Scheduler: Return result
end
Scheduler->>AtomicFlags: Set removeChangefeedCleaned=true
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request refactors the changefeed closure and removal logic to ensure that cleanup tasks, such as redo metadata and sink-specific state, are handled reliably even if requested after the initial close. It introduces a state-tracking mechanism in the pendingMessageQueue to manage in-flight and queued requests separately. Feedback identifies a potential premature exit in the Pop method of the message queue when encountering stale entries and a possible resource leak because closeRedoMeta is no longer called during a normal close path.
| func (q *pendingMessageQueue) Pop() (pendingMessageKey, bool) { | ||
| return q.queue.Get() | ||
| key, ok := q.queue.Get() | ||
| if !ok { | ||
| return pendingMessageKey{}, false | ||
| } | ||
|
|
||
| q.mu.Lock() | ||
| defer q.mu.Unlock() | ||
|
|
||
| state := q.pending[key] | ||
| if state == nil || state.queued == nil { | ||
| return pendingMessageKey{}, false | ||
| } | ||
| state.inFlight = state.queued | ||
| state.queued = nil | ||
| return key, true | ||
| } |
There was a problem hiding this comment.
Returning false from Pop when state.queued is nil (but the channel is not closed) can cause the orchestrator loop to exit prematurely, as it typically interprets ok=false as a signal that the queue is closed. While the current TryEnqueue logic is designed to prevent this state, it is safer to handle stale entries by continuing to the next key in the channel rather than returning false.
| func (q *pendingMessageQueue) Pop() (pendingMessageKey, bool) { | |
| return q.queue.Get() | |
| key, ok := q.queue.Get() | |
| if !ok { | |
| return pendingMessageKey{}, false | |
| } | |
| q.mu.Lock() | |
| defer q.mu.Unlock() | |
| state := q.pending[key] | |
| if state == nil || state.queued == nil { | |
| return pendingMessageKey{}, false | |
| } | |
| state.inFlight = state.queued | |
| state.queued = nil | |
| return key, true | |
| } | |
| func (q *pendingMessageQueue) Pop() (pendingMessageKey, bool) { | |
| for { | |
| key, ok := q.queue.Get() | |
| if !ok { | |
| return pendingMessageKey{}, false | |
| } | |
| q.mu.Lock() | |
| state := q.pending[key] | |
| if state != nil && state.queued != nil { | |
| state.inFlight = state.queued | |
| state.queued = nil | |
| q.mu.Unlock() | |
| return key, true | |
| } | |
| q.mu.Unlock() | |
| } | |
| } |
| if e.IsRedoEnabled() { | ||
| e.redoSink.Close(removeChangefeed) | ||
| // FIXME: cleanup redo log when remove the changefeed | ||
| e.closeRedoMeta(removeChangefeed) | ||
| e.redoSink.Close(false) | ||
| } |
There was a problem hiding this comment.
The call to e.closeRedoMeta(removeChangefeed) was previously unconditional when redo was enabled. In the new implementation, closeRedoMeta(true) is only called inside cleanupRemovedChangefeed, which is skipped if removeChangefeedRequested is false. This means for a normal close, closeRedoMeta is never called. If closeRedoMeta(false) is required to release resources or stop background tasks associated with redo metadata, this change could lead to resource leaks.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
downstreamadapter/sink/mysql/sink.go (2)
446-446: Wrap the return error witherrors.Trace().♻️ Proposed fix
- return cleanupWriter.RemoveDDLTsItem() + return errors.Trace(cleanupWriter.RemoveDDLTsItem())🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/sink/mysql/sink.go` at line 446, The return value from cleanupWriter.RemoveDDLTsItem() should be wrapped with errors.Trace() before returning to preserve stack context; update the return in the function that currently does "return cleanupWriter.RemoveDDLTsItem()" to instead return errors.Trace(cleanupWriter.RemoveDDLTsItem()) so the error is traced (ensure errors is the package providing Trace is imported and used consistently in this file).
430-437: Wrap errors from library calls witherrors.Trace().Per coding guidelines, errors from third-party or library calls should be wrapped immediately to attach a stack trace for debugging.
♻️ Proposed fix
dsnStr, err := mysql.GenerateDSN(context.Background(), s.cfg) if err != nil { - return err + return errors.Trace(err) } db, err := mysql.CreateMysqlDBConn(dsnStr) if err != nil { - return err + return errors.Trace(err) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/sink/mysql/sink.go` around lines 430 - 437, Wrap returned errors from the library calls in errors.Trace before returning: when calling mysql.GenerateDSN and mysql.CreateMysqlDBConn in the current function, replace direct returns of err with returning errors.Trace(err) so both GenerateDSN and CreateMysqlDBConn failures are wrapped with a stack trace (refer to the mysql.GenerateDSN and mysql.CreateMysqlDBConn call sites).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@downstreamadapter/sink/mysql/sink.go`:
- Around line 445-446: The cleanupWriter created via mysql.NewWriter currently
isn't closed, leaking its internal resources; ensure you call
cleanupWriter.Close() (e.g., defer cleanupWriter.Close() immediately after
creating cleanupWriter or immediately after calling RemoveDDLTsItem()) so its
statement cache, ticker, context cancellation and DML session are
released—mirror how dmlWriter and ddlWriter are handled in the Close() method
and keep the call adjacent to the NewWriter/RemoveDDLTsItem usage.
---
Nitpick comments:
In `@downstreamadapter/sink/mysql/sink.go`:
- Line 446: The return value from cleanupWriter.RemoveDDLTsItem() should be
wrapped with errors.Trace() before returning to preserve stack context; update
the return in the function that currently does "return
cleanupWriter.RemoveDDLTsItem()" to instead return
errors.Trace(cleanupWriter.RemoveDDLTsItem()) so the error is traced (ensure
errors is the package providing Trace is imported and used consistently in this
file).
- Around line 430-437: Wrap returned errors from the library calls in
errors.Trace before returning: when calling mysql.GenerateDSN and
mysql.CreateMysqlDBConn in the current function, replace direct returns of err
with returning errors.Trace(err) so both GenerateDSN and CreateMysqlDBConn
failures are wrapped with a stack trace (refer to the mysql.GenerateDSN and
mysql.CreateMysqlDBConn call sites).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 385a1c47-a65f-44c0-8db4-781b7b87bb72
📒 Files selected for processing (5)
downstreamadapter/dispatchermanager/dispatcher_manager.godownstreamadapter/dispatchermanager/dispatcher_manager_test.godownstreamadapter/dispatcherorchestrator/dispatcher_orchestrator_test.godownstreamadapter/dispatcherorchestrator/helper.godownstreamadapter/sink/mysql/sink.go
There was a problem hiding this comment.
🧹 Nitpick comments (1)
downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator_test.go (1)
146-159: Consider: Fragile reflection-based assertion.The
reflect.TypeOf(*state).NumField() == 1assertion will break if thependingQueueStatestruct gains additional fields. While this documents the current contract, consider adding a comment explaining the intent, or replace with a direct field check if the goal is just to verifyqueuedis set correctly.💡 Alternative approach
q.mu.Lock() state := q.pending[key] q.mu.Unlock() require.NotNil(t, state) - require.Equal(t, 1, reflect.TypeOf(*state).NumField()) + // Verify only the queued field is populated with the new message. + // This documents that Pop removes the entry and TryEnqueue creates fresh state. require.Same(t, msg2, state.queued)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator_test.go` around lines 146 - 159, The test uses a fragile reflection assertion reflect.TypeOf(*state).NumField() == 1 on the pendingQueueState struct; replace this with a direct check of the field(s) you care about (e.g., verify state.queued is set via require.NotNil/require.Same on the queued field) or, if you must keep the intent as a contract, add a clear comment on pendingQueueState and why the field-count matters; update the test around TryEnqueue, pending[key], pendingQueueState and queued to remove the reflection-based NumField check and assert the queued field directly (or document the contract) instead.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator_test.go`:
- Around line 146-159: The test uses a fragile reflection assertion
reflect.TypeOf(*state).NumField() == 1 on the pendingQueueState struct; replace
this with a direct check of the field(s) you care about (e.g., verify
state.queued is set via require.NotNil/require.Same on the queued field) or, if
you must keep the intent as a contract, add a clear comment on pendingQueueState
and why the field-count matters; update the test around TryEnqueue,
pending[key], pendingQueueState and queued to remove the reflection-based
NumField check and assert the queued field directly (or document the contract)
instead.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 5bf6bff2-8b08-4b0b-83ef-8122edd07d9a
📒 Files selected for processing (19)
cmd/storage-consumer/main.godownstreamadapter/dispatcher/mock_sink_helper_test.godownstreamadapter/dispatchermanager/dispatcher_manager.godownstreamadapter/dispatchermanager/dispatcher_manager_test.godownstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.godownstreamadapter/dispatcherorchestrator/dispatcher_orchestrator_test.godownstreamadapter/dispatcherorchestrator/helper.godownstreamadapter/sink/blackhole/sink.godownstreamadapter/sink/cloudstorage/dml_writers_test.godownstreamadapter/sink/cloudstorage/sink.godownstreamadapter/sink/cloudstorage/sink_test.godownstreamadapter/sink/kafka/sink.godownstreamadapter/sink/kafka/sink_test.godownstreamadapter/sink/mock/sink_mock.godownstreamadapter/sink/mysql/sink.godownstreamadapter/sink/mysql/sink_test.godownstreamadapter/sink/pulsar/sink.godownstreamadapter/sink/redo/sink.godownstreamadapter/sink/sink.go
✅ Files skipped from review due to trivial changes (2)
- downstreamadapter/sink/cloudstorage/dml_writers_test.go
- downstreamadapter/sink/cloudstorage/sink_test.go
🚧 Files skipped from review as they are similar to previous changes (3)
- downstreamadapter/dispatchermanager/dispatcher_manager.go
- downstreamadapter/dispatcherorchestrator/helper.go
- downstreamadapter/dispatchermanager/dispatcher_manager_test.go
There was a problem hiding this comment.
🧹 Nitpick comments (2)
downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator_test.go (1)
82-100: Prefer deterministic synchronization overtime.Sleepin this test.Using
time.Sleepat Line 92 can introduce flakiness under slow CI scheduling; use a start signal from the goroutine instead.Suggested deterministic refactor
func TestPendingMessageQueue_PopReturnsAfterClose(t *testing.T) { t.Parallel() q := newPendingMessageQueue() doneCh := make(chan bool, 1) + startedCh := make(chan struct{}) go func() { + close(startedCh) _, _, ok := q.Pop() doneCh <- ok }() - time.Sleep(10 * time.Millisecond) + <-startedCh q.Close() select { case ok := <-doneCh: require.False(t, ok) case <-time.After(time.Second): require.FailNow(t, "Pop did not return after context cancel") } }As per coding guidelines,
**/*_test.go: “favor deterministic tests and use testify/require”.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator_test.go` around lines 82 - 100, Replace the non-deterministic time.Sleep in TestPendingMessageQueue_PopReturnsAfterClose with an explicit start signal from the goroutine: create a startedCh (chan struct{}) that the goroutine closes or sends to immediately after it begins and before calling q.Pop(), then have the test wait on startedCh (with require.Eventually or a simple receive) prior to calling q.Close(); keep assertions on the returned ok from q.Pop() the same and continue using require for checks, referencing TestPendingMessageQueue_PopReturnsAfterClose, newPendingMessageQueue, Pop, and Close to locate where to add startedCh and the synchronization receive/send.downstreamadapter/dispatchermanager/dispatcher_manager.go (1)
941-963: Consider adding a retry mechanism for failed remove cleanup.When
runRemoveChangefeedCleanup()fails, the error is logged butremoveChangefeedCleanedstaysfalse. Since the orchestrator (per context snippet 3) deletes the manager reference afterTryClosereturnstrue, there's no path to retry the cleanup unlessTryClose(true)is called externally again before deletion.This may be acceptable for "best-effort" semantics, but if
ddl_tscleanup is critical for correctness, consider:
- Adding a bounded retry loop within the goroutine, or
- Returning a cleanup error channel that callers can monitor
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/dispatchermanager/dispatcher_manager.go` around lines 941 - 963, The current tryScheduleRemoveChangefeedCleanup goroutine logs failures from runRemoveChangefeedCleanup but never retries, leaving removeChangefeedCleaned false and preventing future attempts; change tryScheduleRemoveChangefeedCleanup to perform a bounded retry loop (e.g., maxAttempts with exponential/backoff sleep) inside the goroutine while respecting removeChangefeedRequested and using removeChangefeedCleanupRunning to guard concurrent runs, and set removeChangefeedCleaned only on successful completion; alternatively, add an exported cleanup error channel or callback that tryScheduleRemoveChangefeedCleanup sends failures to (so callers of TryClose/TryClose(true) can observe and re-trigger) — update references to runRemoveChangefeedCleanup, removeChangefeedCleanupRunning, removeChangefeedCleaned, and removeChangefeedRequested accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@downstreamadapter/dispatchermanager/dispatcher_manager.go`:
- Around line 941-963: The current tryScheduleRemoveChangefeedCleanup goroutine
logs failures from runRemoveChangefeedCleanup but never retries, leaving
removeChangefeedCleaned false and preventing future attempts; change
tryScheduleRemoveChangefeedCleanup to perform a bounded retry loop (e.g.,
maxAttempts with exponential/backoff sleep) inside the goroutine while
respecting removeChangefeedRequested and using removeChangefeedCleanupRunning to
guard concurrent runs, and set removeChangefeedCleaned only on successful
completion; alternatively, add an exported cleanup error channel or callback
that tryScheduleRemoveChangefeedCleanup sends failures to (so callers of
TryClose/TryClose(true) can observe and re-trigger) — update references to
runRemoveChangefeedCleanup, removeChangefeedCleanupRunning,
removeChangefeedCleaned, and removeChangefeedRequested accordingly.
In `@downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator_test.go`:
- Around line 82-100: Replace the non-deterministic time.Sleep in
TestPendingMessageQueue_PopReturnsAfterClose with an explicit start signal from
the goroutine: create a startedCh (chan struct{}) that the goroutine closes or
sends to immediately after it begins and before calling q.Pop(), then have the
test wait on startedCh (with require.Eventually or a simple receive) prior to
calling q.Close(); keep assertions on the returned ok from q.Pop() the same and
continue using require for checks, referencing
TestPendingMessageQueue_PopReturnsAfterClose, newPendingMessageQueue, Pop, and
Close to locate where to add startedCh and the synchronization receive/send.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 85b8ed1d-eee7-4997-9a16-dba4441cddce
📒 Files selected for processing (6)
downstreamadapter/dispatchermanager/dispatcher_manager.godownstreamadapter/dispatchermanager/dispatcher_manager_redo.godownstreamadapter/dispatchermanager/dispatcher_manager_test.godownstreamadapter/dispatcherorchestrator/dispatcher_orchestrator_test.godownstreamadapter/dispatcherorchestrator/helper.godownstreamadapter/sink/mysql/sink.go
✅ Files skipped from review due to trivial changes (1)
- downstreamadapter/dispatcherorchestrator/helper.go
🚧 Files skipped from review as they are similar to previous changes (1)
- downstreamadapter/sink/mysql/sink.go
| if e.IsRedoEnabled() { | ||
| e.redoSink.Close(removeChangefeed) | ||
| // FIXME: cleanup redo log when remove the changefeed | ||
| e.closeRedoMeta(removeChangefeed) |
There was a problem hiding this comment.
It moves into runRemoveChangefeedCleanup, just like remove ddl_ts record.
| func (m *DispatcherOrchestrator) handleMessages() { | ||
| for { | ||
| key, ok := m.msgQueue.Pop() | ||
| _, msg, ok := m.msgQueue.Pop() |
There was a problem hiding this comment.
| _, msg, ok := m.msgQueue.Pop() | |
| msg, ok := m.msgQueue.Pop() |
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: lidezhu, wk989898 The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
What problem does this PR solve?
Issue Number: close #4825
What is changed and how it works?
Background
The dispatcher orchestrator de-duplicates close requests by changefeed and message type. A later
removed=trueclose request could overwrite the pending entry for the same key while the earlierremoved=falserequest was already in flight.Motivation
That overwrite was not re-queued, and
Done(key)later deleted the upgraded request together with the in-flight one. As a result, the stronger remove semantics could be dropped silently and the downstream cleanup path would only execute the normal close flow.Summary of changes
queuedandinFlightslots so an upgrade never overwrites the request a worker is already processingremoved=trueclose requests for the next round and keep the in-flight request stable untilDoneDispatcherManagerkeepremoved=trueas a sticky close requirement and finish remove-only cleanup even after the base close path has completedValidation
make fmtgo test ./downstreamadapter/dispatcherorchestratorgo test ./downstreamadapter/dispatchermanagergo test --tags=intest ./downstreamadapter/sink/mysqlSummary by CodeRabbit
Release Notes
Bug Fixes