Skip to content

Commit dc1b107

Browse files
BewareMyPowerTechnoboy-
authored andcommitted
[fix][client] Fix multi-topics consumer could receive old messages after seek (#21945)
1 parent 313eae5 commit dc1b107

2 files changed

Lines changed: 125 additions & 21 deletions

File tree

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.pulsar.client.api.Message;
3535
import org.apache.pulsar.client.api.MessageId;
3636
import org.apache.pulsar.client.api.MessageIdAdv;
37+
import org.apache.pulsar.client.api.MessageListener;
3738
import org.apache.pulsar.client.api.MessageRouter;
3839
import org.apache.pulsar.client.api.MessageRoutingMode;
3940
import org.apache.pulsar.client.api.Producer;
@@ -57,22 +58,27 @@
5758
import org.testng.Assert;
5859
import org.testng.annotations.AfterMethod;
5960
import org.testng.annotations.BeforeMethod;
61+
import org.testng.annotations.DataProvider;
6062
import org.testng.annotations.Test;
61-
6263
import java.util.ArrayList;
64+
import java.util.Arrays;
65+
import java.util.Comparator;
6366
import java.util.Set;
6467
import java.util.HashMap;
6568
import java.util.HashSet;
6669
import java.util.List;
6770
import java.util.Map;
6871
import java.util.Optional;
72+
import java.util.TreeSet;
6973
import java.util.concurrent.CompletableFuture;
74+
import java.util.concurrent.CopyOnWriteArrayList;
7075
import java.util.concurrent.CountDownLatch;
7176
import java.util.concurrent.ExecutorService;
7277
import java.util.concurrent.Executors;
7378
import java.util.concurrent.Future;
7479
import java.util.concurrent.TimeUnit;
7580
import java.util.concurrent.atomic.AtomicInteger;
81+
import java.util.function.Function;
7682
import java.util.stream.Collectors;
7783
import java.util.stream.IntStream;
7884

@@ -1394,4 +1400,76 @@ public Map<String, String> getActiveConsumers() {
13941400
}
13951401
}
13961402

1403+
@DataProvider
1404+
public static Object[][] seekByFunction() {
1405+
return new Object[][] {
1406+
{ true }, { false }
1407+
};
1408+
}
1409+
1410+
@Test(timeOut = 30000, dataProvider = "seekByFunction")
1411+
public void testSeekToNewerPosition(boolean seekByFunction) throws Exception {
1412+
final var topic1 = TopicName.get(newTopicName()).toString()
1413+
.replace("my-property", "public").replace("my-ns", "default");
1414+
final var topic2 = TopicName.get(newTopicName()).toString()
1415+
.replace("my-property", "public").replace("my-ns", "default");
1416+
@Cleanup final var producer1 = pulsarClient.newProducer(Schema.STRING).topic(topic1).create();
1417+
@Cleanup final var producer2 = pulsarClient.newProducer(Schema.STRING).topic(topic2).create();
1418+
producer1.send("1-0");
1419+
producer2.send("2-0");
1420+
producer1.send("1-1");
1421+
producer2.send("2-1");
1422+
final var consumer1 = pulsarClient.newConsumer(Schema.STRING)
1423+
.topics(Arrays.asList(topic1, topic2)).subscriptionName("sub")
1424+
.ackTimeout(1, TimeUnit.SECONDS)
1425+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
1426+
final var timestamps = new ArrayList<Long>();
1427+
for (int i = 0; i < 4; i++) {
1428+
timestamps.add(consumer1.receive().getPublishTime());
1429+
}
1430+
timestamps.sort(Comparator.naturalOrder());
1431+
final var timestamp = timestamps.get(2);
1432+
consumer1.close();
1433+
1434+
final Function<Consumer<String>, CompletableFuture<Void>> seekAsync = consumer -> {
1435+
final var future = seekByFunction ? consumer.seekAsync(__ -> timestamp) : consumer.seekAsync(timestamp);
1436+
assertEquals(((ConsumerBase<String>) consumer).getIncomingMessageSize(), 0L);
1437+
assertEquals(((ConsumerBase<String>) consumer).getTotalIncomingMessages(), 0);
1438+
assertTrue(((ConsumerBase<String>) consumer).getUnAckedMessageTracker().isEmpty());
1439+
return future;
1440+
};
1441+
1442+
@Cleanup final var consumer2 = pulsarClient.newConsumer(Schema.STRING)
1443+
.topics(Arrays.asList(topic1, topic2)).subscriptionName("sub-2")
1444+
.ackTimeout(1, TimeUnit.SECONDS)
1445+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
1446+
seekAsync.apply(consumer2).get();
1447+
final var values = new TreeSet<String>();
1448+
for (int i = 0; i < 2; i++) {
1449+
values.add(consumer2.receive().getValue());
1450+
}
1451+
assertEquals(values, new TreeSet<>(Arrays.asList("1-1", "2-1")));
1452+
1453+
final var valuesInListener = new CopyOnWriteArrayList<String>();
1454+
@Cleanup final var consumer3 = pulsarClient.newConsumer(Schema.STRING)
1455+
.topics(Arrays.asList(topic1, topic2)).subscriptionName("sub-3")
1456+
.messageListener((MessageListener<String>) (__, msg) -> valuesInListener.add(msg.getValue()))
1457+
.ackTimeout(1, TimeUnit.SECONDS)
1458+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
1459+
seekAsync.apply(consumer3).get();
1460+
if (valuesInListener.isEmpty()) {
1461+
Awaitility.await().untilAsserted(() -> assertEquals(valuesInListener.size(), 2));
1462+
assertEquals(valuesInListener.stream().sorted().toList(), Arrays.asList("1-1", "2-1"));
1463+
} // else: consumer3 has passed messages to the listener before seek, in this case we cannot assume anything
1464+
1465+
@Cleanup final var consumer4 = pulsarClient.newConsumer(Schema.STRING)
1466+
.topics(Arrays.asList(topic1, topic2)).subscriptionName("sub-4")
1467+
.ackTimeout(1, TimeUnit.SECONDS)
1468+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
1469+
seekAsync.apply(consumer4).get();
1470+
final var valuesInReceiveAsync = new ArrayList<String>();
1471+
valuesInReceiveAsync.add(consumer4.receiveAsync().get().getValue());
1472+
valuesInReceiveAsync.add(consumer4.receiveAsync().get().getValue());
1473+
assertEquals(valuesInReceiveAsync.stream().sorted().toList(), Arrays.asList("1-1", "2-1"));
1474+
}
13971475
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java

Lines changed: 46 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.function.Function;
5050
import java.util.stream.Collectors;
5151
import java.util.stream.IntStream;
52+
import javax.annotation.Nullable;
5253
import org.apache.commons.lang3.tuple.Pair;
5354
import org.apache.pulsar.client.api.BatchReceivePolicy;
5455
import org.apache.pulsar.client.api.Consumer;
@@ -101,7 +102,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
101102
private final MultiTopicConsumerStatsRecorderImpl stats;
102103
private final ConsumerConfigurationData<T> internalConfig;
103104

104-
private volatile MessageIdAdv startMessageId;
105+
private final MessageIdAdv startMessageId;
106+
private volatile boolean duringSeek = false;
105107
private final long startMessageRollbackDurationInSec;
106108
MultiTopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> conf,
107109
ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema,
@@ -235,6 +237,10 @@ private void startReceivingMessages(List<ConsumerImpl<T>> newConsumers) {
235237
}
236238

237239
private void receiveMessageFromConsumer(ConsumerImpl<T> consumer, boolean batchReceive) {
240+
if (duringSeek) {
241+
log.info("[{}] Pause receiving messages for topic {} due to seek", subscription, consumer.getTopic());
242+
return;
243+
}
238244
CompletableFuture<List<Message<T>>> messagesFuture;
239245
if (batchReceive) {
240246
messagesFuture = consumer.batchReceiveAsync().thenApply(msgs -> ((MessagesImpl<T>) msgs).getMessageList());
@@ -252,8 +258,12 @@ private void receiveMessageFromConsumer(ConsumerImpl<T> consumer, boolean batchR
252258
}
253259
// Process the message, add to the queue and trigger listener or async callback
254260
messages.forEach(msg -> {
255-
if (isValidConsumerEpoch((MessageImpl<T>) msg)) {
261+
final boolean skipDueToSeek = duringSeek;
262+
if (isValidConsumerEpoch((MessageImpl<T>) msg) && !skipDueToSeek) {
256263
messageReceived(consumer, msg);
264+
} else if (skipDueToSeek) {
265+
log.info("[{}] [{}] Skip processing message {} received during seek", topic, subscription,
266+
msg.getMessageId());
257267
}
258268
});
259269

@@ -748,17 +758,12 @@ public void seek(Function<String, Object> function) throws PulsarClientException
748758

749759
@Override
750760
public CompletableFuture<Void> seekAsync(Function<String, Object> function) {
751-
List<CompletableFuture<Void>> futures = new ArrayList<>(consumers.size());
752-
consumers.values().forEach(consumer -> futures.add(consumer.seekAsync(function)));
753-
unAckedMessageTracker.clear();
754-
incomingMessages.clear();
755-
resetIncomingMessageSize();
756-
return FutureUtil.waitForAll(futures);
761+
return seekAllAsync(consumer -> consumer.seekAsync(function));
757762
}
758763

759764
@Override
760765
public CompletableFuture<Void> seekAsync(MessageId messageId) {
761-
final Consumer<T> internalConsumer;
766+
final ConsumerImpl<T> internalConsumer;
762767
if (messageId instanceof TopicMessageId) {
763768
TopicMessageId topicMessageId = (TopicMessageId) messageId;
764769
internalConsumer = consumers.get(topicMessageId.getOwnerTopic());
@@ -775,25 +780,46 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) {
775780
);
776781
}
777782

778-
final CompletableFuture<Void> seekFuture;
779783
if (internalConsumer == null) {
780-
List<CompletableFuture<Void>> futures = new ArrayList<>(consumers.size());
781-
consumers.values().forEach(consumerImpl -> futures.add(consumerImpl.seekAsync(messageId)));
782-
seekFuture = FutureUtil.waitForAll(futures);
784+
return seekAllAsync(consumer -> consumer.seekAsync(messageId));
783785
} else {
784-
seekFuture = internalConsumer.seekAsync(messageId);
786+
return seekAsyncInternal(Collections.singleton(internalConsumer), __ -> __.seekAsync(messageId));
785787
}
788+
}
789+
790+
@Override
791+
public CompletableFuture<Void> seekAsync(long timestamp) {
792+
return seekAllAsync(consumer -> consumer.seekAsync(timestamp));
793+
}
786794

795+
private CompletableFuture<Void> seekAsyncInternal(Collection<ConsumerImpl<T>> consumers,
796+
Function<ConsumerImpl<T>, CompletableFuture<Void>> seekFunc) {
797+
beforeSeek();
798+
final CompletableFuture<Void> future = new CompletableFuture<>();
799+
FutureUtil.waitForAll(consumers.stream().map(seekFunc).collect(Collectors.toList()))
800+
.whenComplete((__, e) -> afterSeek(future, e));
801+
return future;
802+
}
803+
804+
private CompletableFuture<Void> seekAllAsync(Function<ConsumerImpl<T>, CompletableFuture<Void>> seekFunc) {
805+
return seekAsyncInternal(consumers.values(), seekFunc);
806+
}
807+
808+
private void beforeSeek() {
809+
duringSeek = true;
787810
unAckedMessageTracker.clear();
788811
clearIncomingMessages();
789-
return seekFuture;
790812
}
791813

792-
@Override
793-
public CompletableFuture<Void> seekAsync(long timestamp) {
794-
List<CompletableFuture<Void>> futures = new ArrayList<>(consumers.size());
795-
consumers.values().forEach(consumer -> futures.add(consumer.seekAsync(timestamp)));
796-
return FutureUtil.waitForAll(futures);
814+
private void afterSeek(CompletableFuture<Void> seekFuture, @Nullable Throwable throwable) {
815+
duringSeek = false;
816+
log.info("[{}] Resume receiving messages for {} since seek is done", subscription, consumers.keySet());
817+
startReceivingMessages(new ArrayList<>(consumers.values()));
818+
if (throwable == null) {
819+
seekFuture.complete(null);
820+
} else {
821+
seekFuture.completeExceptionally(throwable);
822+
}
797823
}
798824

799825
@Override

0 commit comments

Comments
 (0)