Skip to content

Commit 4de958c

Browse files
merlimatyangl
authored andcommitted
On multi-topic consumer, we shouldn't keep checking the partitioned metadata (apache#10708)
* On multi-topic consumer, we shouldn't keep checking the partitioned metadata * Added NON_PARTITIONED constant * Removed assertion that is now invalid * Fixed handling of deleted partitioned topic * Fixed re-subscribing same topic
1 parent 72e33bd commit 4de958c

6 files changed

Lines changed: 153 additions & 93 deletions

File tree

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -183,20 +183,20 @@ public void testBinaryProtoToGetTopicsOfNamespacePersistent() throws Exception {
183183

184184
// 4. verify consumer get methods, to get right number of partitions and topics.
185185
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
186-
List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics();
186+
List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions();
187187
List<ConsumerImpl<byte[]>> consumers = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers();
188188

189189
assertEquals(topics.size(), 6);
190190
assertEquals(consumers.size(), 6);
191-
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().size(), 3);
191+
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 2);
192192

193193
topics.forEach(topic -> log.debug("topic: {}", topic));
194194
consumers.forEach(c -> log.debug("consumer: {}", c.getTopic()));
195195

196196
IntStream.range(0, topics.size()).forEach(index ->
197197
assertEquals(consumers.get(index).getTopic(), topics.get(index)));
198198

199-
((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().forEach(topic -> log.debug("getTopics topic: {}", topic));
199+
((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().forEach(topic -> log.debug("getTopics topic: {}", topic));
200200

201201
// 5. produce data
202202
for (int i = 0; i < totalMessages / 3; i++) {
@@ -286,20 +286,20 @@ public void testBinaryProtoToGetTopicsOfNamespaceNonPersistent() throws Exceptio
286286

287287
// 4. verify consumer get methods, to get right number of partitions and topics.
288288
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
289-
List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics();
289+
List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions();
290290
List<ConsumerImpl<byte[]>> consumers = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers();
291291

292292
assertEquals(topics.size(), 1);
293293
assertEquals(consumers.size(), 1);
294-
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().size(), 1);
294+
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 0);
295295

296296
topics.forEach(topic -> log.debug("topic: {}", topic));
297297
consumers.forEach(c -> log.debug("consumer: {}", c.getTopic()));
298298

299299
IntStream.range(0, topics.size()).forEach(index ->
300300
assertEquals(consumers.get(index).getTopic(), topics.get(index)));
301301

302-
((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().forEach(topic -> log.debug("getTopics topic: {}", topic));
302+
((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().forEach(topic -> log.debug("getTopics topic: {}", topic));
303303

304304
// 5. produce data
305305
for (int i = 0; i < totalMessages / 4; i++) {
@@ -377,20 +377,20 @@ public void testBinaryProtoToGetTopicsOfNamespaceAll() throws Exception {
377377

378378
// 4. verify consumer get methods, to get right number of partitions and topics.
379379
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
380-
List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics();
380+
List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions();
381381
List<ConsumerImpl<byte[]>> consumers = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers();
382382

383383
assertEquals(topics.size(), 7);
384384
assertEquals(consumers.size(), 7);
385-
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().size(), 4);
385+
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 2);
386386

387387
topics.forEach(topic -> log.debug("topic: {}", topic));
388388
consumers.forEach(c -> log.debug("consumer: {}", c.getTopic()));
389389

390390
IntStream.range(0, topics.size()).forEach(index ->
391391
assertEquals(consumers.get(index).getTopic(), topics.get(index)));
392392

393-
((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().forEach(topic -> log.debug("getTopics topic: {}", topic));
393+
((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().forEach(topic -> log.debug("getTopics topic: {}", topic));
394394

395395
// 5. produce data
396396
for (int i = 0; i < totalMessages / 4; i++) {
@@ -508,9 +508,9 @@ public void testStartEmptyPatternConsumer() throws Exception {
508508

509509
// 3. verify consumer get methods, to get 5 number of partitions and topics.
510510
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
511-
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 5);
511+
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 5);
512512
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 5);
513-
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().size(), 2);
513+
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 2);
514514

515515
// 4. create producer
516516
String messagePredicate = "my-message-" + key + "-";
@@ -537,9 +537,9 @@ public void testStartEmptyPatternConsumer() throws Exception {
537537

538538
// 6. verify consumer get methods, to get number of partitions and topics, value 6=1+2+3.
539539
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
540-
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 6);
540+
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 6);
541541
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 6);
542-
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().size(), 3);
542+
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 2);
543543

544544

545545
// 7. produce data
@@ -614,9 +614,9 @@ public void testAutoSubscribePatternConsumer() throws Exception {
614614

615615
// 4. verify consumer get methods, to get 6 number of partitions and topics: 6=1+2+3
616616
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
617-
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 6);
617+
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 6);
618618
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 6);
619-
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().size(), 3);
619+
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 2);
620620

621621
// 5. produce data to topic 1,2,3; verify should receive all the message
622622
for (int i = 0; i < totalMessages / 3; i++) {
@@ -649,9 +649,9 @@ public void testAutoSubscribePatternConsumer() throws Exception {
649649
PatternMultiTopicsConsumerImpl<byte[]> consumer1 = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
650650
consumer1.run(consumer1.getRecheckPatternTimeout());
651651
Thread.sleep(100);
652-
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 10);
652+
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 10);
653653
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 10);
654-
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().size(), 4);
654+
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 3);
655655

656656
// 8. produce data to topic3 and topic4, verify should receive all the message
657657
for (int i = 0; i < totalMessages / 2; i++) {
@@ -723,9 +723,9 @@ public void testAutoUnsubscribePatternConsumer() throws Exception {
723723

724724
// 4. verify consumer get methods, to get 0 number of partitions and topics: 6=1+2+3
725725
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
726-
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 6);
726+
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 6);
727727
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 6);
728-
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().size(), 3);
728+
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 2);
729729

730730
// 5. produce data to topic 1,2,3; verify should receive all the message
731731
for (int i = 0; i < totalMessages / 3; i++) {
@@ -757,9 +757,9 @@ public void testAutoUnsubscribePatternConsumer() throws Exception {
757757
PatternMultiTopicsConsumerImpl<byte[]> consumer1 = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
758758
consumer1.run(consumer1.getRecheckPatternTimeout());
759759
Thread.sleep(100);
760-
assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getPartitionedTopics().size(), 2);
760+
assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getPartitions().size(), 2);
761761
assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers().size(), 2);
762-
assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getTopics().size(), 1);
762+
assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getPartitionedTopics().size(), 1);
763763

764764
// 8. produce data to topic2, verify should receive all the message
765765
for (int i = 0; i < totalMessages; i++) {
@@ -808,7 +808,7 @@ public void testTopicDeletion() throws Exception {
808808

809809
// 4. verify consumer get methods
810810
assertSame(consumerImpl.getPattern(), pattern);
811-
assertEquals(consumerImpl.getTopics().size(), 2);
811+
assertEquals(consumerImpl.getPartitionedTopics().size(), 0);
812812

813813
producer1.send("msg-1");
814814

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ public void testGetConsumersAndGetTopics() throws Exception {
148148
assertTrue(consumer instanceof MultiTopicsConsumerImpl);
149149
assertTrue(consumer.getTopic().startsWith(MultiTopicsConsumerImpl.DUMMY_TOPIC_NAME_PREFIX));
150150

151-
List<String> topics = ((MultiTopicsConsumerImpl<byte[]>) consumer).getPartitionedTopics();
151+
List<String> topics = ((MultiTopicsConsumerImpl<byte[]>) consumer).getPartitions();
152152
List<ConsumerImpl<byte[]>> consumers = ((MultiTopicsConsumerImpl) consumer).getConsumers();
153153

154154
topics.forEach(topic -> log.info("topic: {}", topic));
@@ -157,7 +157,7 @@ public void testGetConsumersAndGetTopics() throws Exception {
157157
IntStream.range(0, 6).forEach(index ->
158158
assertEquals(consumers.get(index).getTopic(), topics.get(index)));
159159

160-
assertEquals(((MultiTopicsConsumerImpl<byte[]>) consumer).getTopics().size(), 3);
160+
assertEquals(((MultiTopicsConsumerImpl<byte[]>) consumer).getPartitionedTopics().size(), 2);
161161

162162
consumer.unsubscribe();
163163
consumer.close();
@@ -563,12 +563,12 @@ public void testSubscribeUnsubscribeSingleTopic() throws Exception {
563563
assertEquals(messageSet, totalMessages * 2 / 3);
564564

565565
// 7. use getter to verify internal topics number after un-subscribe topic3
566-
List<String> topics = ((MultiTopicsConsumerImpl<byte[]>) consumer).getPartitionedTopics();
566+
List<String> topics = ((MultiTopicsConsumerImpl<byte[]>) consumer).getPartitions();
567567
List<ConsumerImpl<byte[]>> consumers = ((MultiTopicsConsumerImpl) consumer).getConsumers();
568568

569569
assertEquals(topics.size(), 3);
570570
assertEquals(consumers.size(), 3);
571-
assertEquals(((MultiTopicsConsumerImpl<byte[]>) consumer).getTopics().size(), 2);
571+
assertEquals(((MultiTopicsConsumerImpl<byte[]>) consumer).getPartitionedTopics().size(), 1);
572572

573573
// 8. re-subscribe topic3
574574
CompletableFuture<Void> subFuture = ((MultiTopicsConsumerImpl<byte[]>)consumer).subscribeAsync(topicName3, true);
@@ -594,12 +594,12 @@ public void testSubscribeUnsubscribeSingleTopic() throws Exception {
594594
assertEquals(messageSet, totalMessages);
595595

596596
// 11. use getter to verify internal topics number after subscribe topic3
597-
topics = ((MultiTopicsConsumerImpl<byte[]>) consumer).getPartitionedTopics();
597+
topics = ((MultiTopicsConsumerImpl<byte[]>) consumer).getPartitions();
598598
consumers = ((MultiTopicsConsumerImpl) consumer).getConsumers();
599599

600600
assertEquals(topics.size(), 6);
601601
assertEquals(consumers.size(), 6);
602-
assertEquals(((MultiTopicsConsumerImpl<byte[]>) consumer).getTopics().size(), 3);
602+
assertEquals(((MultiTopicsConsumerImpl<byte[]>) consumer).getPartitionedTopics().size(), 2);
603603

604604
consumer.unsubscribe();
605605
consumer.close();
@@ -1181,20 +1181,20 @@ public void testAutoDiscoverMultiTopicsPartitions() throws Exception {
11811181
.subscribe();
11821182

11831183
Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 3);
1184-
Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 3);
1184+
Assert.assertEquals(consumer.getConsumers().size(), 3);
11851185

11861186
admin.topics().deletePartitionedTopic(topicName, true);
11871187
consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());
11881188
Awaitility.await().untilAsserted(() -> {
11891189
Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 0);
1190-
Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 0);
1190+
Assert.assertEquals(consumer.getConsumers().size(), 0);
11911191
});
11921192

11931193
admin.topics().createPartitionedTopic(topicName, 7);
11941194
consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());
11951195
Awaitility.await().untilAsserted(() -> {
11961196
Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 7);
1197-
Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 7);
1197+
Assert.assertEquals(consumer.getConsumers().size(), 7);
11981198
});
11991199
}
12001200

0 commit comments

Comments
 (0)