Subscription: support consumer timeout and idle disconnect#17293
Subscription: support consumer timeout and idle disconnect#17293
Conversation
Expose connectionTimeoutInMs across subscription consumer builders and handshake/session setup. Close idle server-side consumers based on heartbeat inactivity and reuse closeConsumer to unsubscribe topics before dropping the consumer.
There was a problem hiding this comment.
Pull request overview
This PR extends IoTDB’s subscription system to (1) propagate a consumer-side connectionTimeoutInMs through the subscription consumer/provider/session handshake flow, and (2) add a server-side idle-disconnect mechanism driven by subscription inactivity, reusing the existing closeConsumer(...) unsubscribe+drop flow.
Changes:
- Add server-side timeout checking for idle subscription consumers and trigger server-side close when inactive.
- Propagate
connectionTimeoutInMsfrom subscription consumer builders into subscription sessions and handshake attributes. - Introduce new consumer config keys/getters for connection timeout and heartbeat interval consumption.
Reviewed changes
Copilot reviewed 25 out of 25 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java | Tracks last activity/in-flight requests and implements server-side timeout close for idle consumers. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiver.java | Adds handleTimeout() to the receiver interface. |
| iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java | Tracks active receivers and periodically invokes timeout checks. |
| iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java | Exposes fluent connectionTimeoutInMs(...) on tree push consumer builder. |
| iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java | Passes connectionTimeoutInMs (and heartbeat interval) into provider construction. |
| iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java | Exposes fluent connectionTimeoutInMs(...) on tree pull consumer builder. |
| iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java | Passes connectionTimeoutInMs (and heartbeat interval) into provider construction. |
| iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java | Propagates connection timeout into the subscription session builder. |
| iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java | Exposes fluent connectionTimeoutInMs(...) on table push consumer builder. |
| iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java | Passes connectionTimeoutInMs (and heartbeat interval) into provider construction. |
| iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java | Exposes fluent connectionTimeoutInMs(...) on table pull consumer builder. |
| iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java | Passes connectionTimeoutInMs (and heartbeat interval) into provider construction. |
| iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java | Propagates connection timeout into the subscription session builder. |
| iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPushConsumerBuilder.java | Adds covariant fluent connectionTimeoutInMs(...) override for push builders. |
| iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumerBuilder.java | Adds covariant fluent connectionTimeoutInMs(...) override for pull builders. |
| iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java | Stores heartbeat/connection timeout and includes them in handshake attributes; passes timeout into session builder. |
| iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java | Adds builder field + setter for connectionTimeoutInMs. |
| iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java | Stores connectionTimeoutInMs, loads it from properties, passes it into provider, and reports it. |
| iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTreeSessionBuilder.java | Adds session builder setter for connectionTimeoutInMs. |
| iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTableSessionBuilder.java | Adds session builder setter for connectionTimeoutInMs. |
| iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionWrapper.java | Opens subscription session using connection timeout. |
| iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java | Initializes connectionTimeoutInMs from builder. |
| iotdb-client/session/src/main/java/org/apache/iotdb/session/AbstractSessionBuilder.java | Adds connectionTimeoutInMs field to the base session builder. |
| iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java | Defines new consumer attribute key/default for connection timeout. |
| iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java | Adds getters for heartbeat interval and connection timeout. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| return getReceiver(reqVersion).handle(req); | ||
| final SubscriptionReceiver receiver = getReceiver(reqVersion); | ||
| activeReceivers.add(receiver); | ||
| receiver.handleTimeout(); |
There was a problem hiding this comment.
receiver.handleTimeout() is invoked before receiver.handle(req). Because SubscriptionReceiverV1 only updates lastActivityTimeMs inside handle() (via beforeHandle), a request that arrives right after a long idle period (including a HEARTBEAT) can be treated as timed-out and closed before the receiver records that new activity. This can cause spurious server-side closes and subsequent requests failing with missing-consumer state.
Consider removing the per-request handleTimeout() call (rely on the scheduled checker), or move timeout evaluation to after the request has updated activity (e.g., inside SubscriptionReceiverV1.handle() after beforeHandle).
| receiver.handleTimeout(); |
| if (inactiveMs <= timeoutMs) { | ||
| return; | ||
| } | ||
| clearSharedConsumerState(); | ||
| } |
There was a problem hiding this comment.
handleTimeout() clears the shared consumer state (clearSharedConsumerState()) before attempting closeConsumer(...). If closeConsumer fails (e.g., ConfigNode RPC issue or unsubscribe error), the receiver has already discarded the config, so subsequent timeout checks won’t retry and the consumer may remain leaked/subscribed.
Consider only clearing shared state after a successful close, or retaining a “pending close” state to retry with backoff.
| LOGGER.info( | ||
| "Subscription: consumer {} is inactive for {} ms, exceeding timeout {} ms, close it on server side.", | ||
| consumerConfig, | ||
| inactiveMs, | ||
| timeoutMs); |
There was a problem hiding this comment.
Timeout-triggered closeConsumer(consumerConfig) can race with a concurrent HANDSHAKE that re-creates/activates a consumer with the same consumerGroupId/consumerId. Since closeConsumer drops the consumer by IDs via ConfigNode, it can end up closing the newly created consumer if the handshake happens while the timeout close is in progress.
Consider guarding handleTimeout with a per-consumer/receiver “epoch” (incremented on activateConsumer) and only closing if the epoch is unchanged, or serialize timeout closes vs. handshakes with a dedicated close-in-progress flag/lock.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #17293 +/- ##
============================================
- Coverage 39.76% 39.75% -0.01%
Complexity 282 282
============================================
Files 5100 5100
Lines 342150 342233 +83
Branches 43596 43597 +1
============================================
+ Hits 136044 136065 +21
- Misses 206106 206168 +62 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
May add a unit test for this.
There was a problem hiding this comment.
Added a lightweight UT in SubscriptionReceiverV1Test under datanode. It covers the timeout threshold calculation and the non-timeout branches for recently active consumers / in-flight requests. Verified with mvn -pl iotdb-core/datanode -am -DskipITs -DskipIntegrationTests -DfailIfNoTests=false -Dsurefire.failIfNoSpecifiedTests=false -Dtest=SubscriptionReceiverV1Test test.
|




Summary
connectionTimeoutInMsfor all subscription consumers and pass it through builder, session open, and handshake.closeConsumer(...)flow so timeout-triggered disconnect also unsubscribes subscribed topics before dropping the consumer.Verification
mvn -T 8 spotless:apply -P with-integration-testsMAVEN_OPTS='-XX:ReservedCodeCacheSize=512m' mvn -T 8 clean package -P with-integration-tests -DskipUTs -pl integration-test,distribution -DfailIfNoTests=false -am -UThis PR was primarily authored with Codex using gpt-5.4 xhigh and then hand-reviewed by me. I AM responsible for every change made in this PR. I aimed to keep it aligned with our goals, though I may have missed minor issues. Please flag anything that feels off, I'll fix it quickly.