Skip to content

Commit 2b2a6a8

Browse files
committed
[fix][client] Fix multi-topics consumer could receive old messages after seek
1 parent 6347315 commit 2b2a6a8

2 files changed

Lines changed: 107 additions & 19 deletions

File tree

pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,22 @@
2929

3030
import com.google.common.collect.Lists;
3131
import java.util.ArrayList;
32+
import java.util.Arrays;
3233
import java.util.Collections;
34+
import java.util.Comparator;
3335
import java.util.HashMap;
3436
import java.util.List;
3537
import java.util.Map;
38+
import java.util.TreeSet;
39+
import java.util.concurrent.CompletableFuture;
40+
import java.util.concurrent.CopyOnWriteArrayList;
3641
import java.util.concurrent.ExecutionException;
3742
import java.util.concurrent.ExecutorService;
3843
import java.util.concurrent.ScheduledExecutorService;
3944
import java.util.concurrent.TimeUnit;
4045
import java.util.concurrent.TimeoutException;
4146
import java.util.concurrent.atomic.AtomicLong;
47+
import java.util.function.Function;
4248
import java.util.stream.Collectors;
4349
import java.util.stream.IntStream;
4450
import lombok.Cleanup;
@@ -371,4 +377,66 @@ public void testMultipleIOThreads() throws PulsarAdminException, PulsarClientExc
371377
assertTrue(consumer instanceof MultiTopicsConsumerImpl);
372378
assertTrue(consumer.isConnected());
373379
}
380+
381+
@DataProvider
382+
public static Object[][] seekByFunction() {
383+
return new Object[][] {
384+
{ true }, { false }
385+
};
386+
}
387+
388+
@Test(timeOut = 30000, dataProvider = "seekByFunction")
389+
public void testSeekByTimestamp(boolean seekByFunction) throws Exception {
390+
final var topic1 = TopicName.get(newTopicName()).toString();
391+
final var topic2 = TopicName.get(newTopicName()).toString();
392+
@Cleanup final var producer1 = pulsarClient.newProducer(Schema.STRING).topic(topic1).create();
393+
@Cleanup final var producer2 = pulsarClient.newProducer(Schema.STRING).topic(topic2).create();
394+
producer1.send("1-0");
395+
producer2.send("2-0");
396+
producer1.send("1-1");
397+
producer2.send("2-1");
398+
final var consumer1 = pulsarClient.newConsumer(Schema.STRING)
399+
.topics(Arrays.asList(topic1, topic2)).subscriptionName("sub")
400+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
401+
final var timestamps = new ArrayList<Long>();
402+
for (int i = 0; i < 4; i++) {
403+
timestamps.add(consumer1.receive().getPublishTime());
404+
}
405+
timestamps.sort(Comparator.naturalOrder());
406+
final var timestamp = timestamps.get(2);
407+
consumer1.close();
408+
409+
final Function<Consumer<String>, CompletableFuture<Void>> seekAsync = consumer ->
410+
seekByFunction ? consumer.seekAsync(__ -> timestamp) : consumer.seekAsync(timestamp);
411+
412+
@Cleanup final var consumer2 = pulsarClient.newConsumer(Schema.STRING)
413+
.topics(Arrays.asList(topic1, topic2)).subscriptionName("sub-2")
414+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
415+
seekAsync.apply(consumer2).get();
416+
final var values = new TreeSet<String>();
417+
for (int i = 0; i < 2; i++) {
418+
values.add(consumer2.receive().getValue());
419+
}
420+
assertEquals(values, new TreeSet<>(Arrays.asList("1-1", "2-1")));
421+
422+
final var valuesInListener = new CopyOnWriteArrayList<String>();
423+
@Cleanup final var consumer3 = pulsarClient.newConsumer(Schema.STRING)
424+
.topics(Arrays.asList(topic1, topic2)).subscriptionName("sub-3")
425+
.messageListener((MessageListener<String>) (__, msg) -> valuesInListener.add(msg.getValue()))
426+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
427+
seekAsync.apply(consumer3).get();
428+
if (valuesInListener.isEmpty()) {
429+
Awaitility.await().untilAsserted(() -> assertEquals(valuesInListener.size(), 2));
430+
assertEquals(valuesInListener.stream().sorted().toList(), Arrays.asList("1-1", "2-1"));
431+
} // else: consumer3 has passed messages to the listener before seek, in this case we cannot assume anything
432+
433+
@Cleanup final var consumer4 = pulsarClient.newConsumer(Schema.STRING)
434+
.topics(Arrays.asList(topic1, topic2)).subscriptionName("sub-4")
435+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
436+
seekAsync.apply(consumer4).get();
437+
final var valuesInReceiveAsync = new ArrayList<String>();
438+
valuesInReceiveAsync.add(consumer4.receiveAsync().get().getValue());
439+
valuesInReceiveAsync.add(consumer4.receiveAsync().get().getValue());
440+
assertEquals(valuesInReceiveAsync.stream().sorted().toList(), Arrays.asList("1-1", "2-1"));
441+
}
374442
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java

Lines changed: 39 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.function.Function;
5050
import java.util.stream.Collectors;
5151
import java.util.stream.IntStream;
52+
import javax.annotation.Nullable;
5253
import org.apache.commons.lang3.tuple.Pair;
5354
import org.apache.pulsar.client.api.BatchReceivePolicy;
5455
import org.apache.pulsar.client.api.Consumer;
@@ -101,7 +102,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
101102
private final MultiTopicConsumerStatsRecorderImpl stats;
102103
private final ConsumerConfigurationData<T> internalConfig;
103104

104-
private volatile MessageIdAdv startMessageId;
105+
private final MessageIdAdv startMessageId;
106+
private volatile boolean duringSeek = false;
105107
private final long startMessageRollbackDurationInSec;
106108
MultiTopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> conf,
107109
ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema,
@@ -235,6 +237,10 @@ private void startReceivingMessages(List<ConsumerImpl<T>> newConsumers) {
235237
}
236238

237239
private void receiveMessageFromConsumer(ConsumerImpl<T> consumer, boolean batchReceive) {
240+
if (duringSeek) {
241+
log.info("[{}] Pause receiving messages for topic {} due to seek", subscription, consumer.getTopic());
242+
return;
243+
}
238244
CompletableFuture<List<Message<T>>> messagesFuture;
239245
if (batchReceive) {
240246
messagesFuture = consumer.batchReceiveAsync().thenApply(msgs -> ((MessagesImpl<T>) msgs).getMessageList());
@@ -252,7 +258,7 @@ private void receiveMessageFromConsumer(ConsumerImpl<T> consumer, boolean batchR
252258
}
253259
// Process the message, add to the queue and trigger listener or async callback
254260
messages.forEach(msg -> {
255-
if (isValidConsumerEpoch((MessageImpl<T>) msg)) {
261+
if (isValidConsumerEpoch((MessageImpl<T>) msg) && !duringSeek) {
256262
messageReceived(consumer, msg);
257263
}
258264
});
@@ -748,12 +754,7 @@ public void seek(Function<String, Object> function) throws PulsarClientException
748754

749755
@Override
750756
public CompletableFuture<Void> seekAsync(Function<String, Object> function) {
751-
List<CompletableFuture<Void>> futures = new ArrayList<>(consumers.size());
752-
consumers.values().forEach(consumer -> futures.add(consumer.seekAsync(function)));
753-
unAckedMessageTracker.clear();
754-
incomingMessages.clear();
755-
resetIncomingMessageSize();
756-
return FutureUtil.waitForAll(futures);
757+
return seekAllAsync(consumer -> consumer.seekAsync(function));
757758
}
758759

759760
@Override
@@ -775,25 +776,44 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) {
775776
);
776777
}
777778

778-
final CompletableFuture<Void> seekFuture;
779779
if (internalConsumer == null) {
780-
List<CompletableFuture<Void>> futures = new ArrayList<>(consumers.size());
781-
consumers.values().forEach(consumerImpl -> futures.add(consumerImpl.seekAsync(messageId)));
782-
seekFuture = FutureUtil.waitForAll(futures);
780+
return seekAllAsync(consumer -> consumer.seekAsync(messageId));
783781
} else {
784-
seekFuture = internalConsumer.seekAsync(messageId);
782+
beforeSeek();
783+
final CompletableFuture<Void> future = new CompletableFuture<>();
784+
internalConsumer.seekAsync(messageId).whenComplete((__, e) -> afterSeek(future, e));
785+
return future;
785786
}
787+
}
786788

789+
@Override
790+
public CompletableFuture<Void> seekAsync(long timestamp) {
791+
return seekAllAsync(consumer -> consumer.seekAsync(timestamp));
792+
}
793+
794+
private CompletableFuture<Void> seekAllAsync(Function<ConsumerImpl<T>, CompletableFuture<Void>> seekFunc) {
795+
beforeSeek();
796+
final CompletableFuture<Void> future = new CompletableFuture<>();
797+
FutureUtil.waitForAll(consumers.values().stream().map(seekFunc).collect(Collectors.toList()))
798+
.whenComplete((__, e) -> afterSeek(future, e));
799+
return future;
800+
}
801+
802+
private void beforeSeek() {
803+
duringSeek = true;
787804
unAckedMessageTracker.clear();
788805
clearIncomingMessages();
789-
return seekFuture;
790806
}
791807

792-
@Override
793-
public CompletableFuture<Void> seekAsync(long timestamp) {
794-
List<CompletableFuture<Void>> futures = new ArrayList<>(consumers.size());
795-
consumers.values().forEach(consumer -> futures.add(consumer.seekAsync(timestamp)));
796-
return FutureUtil.waitForAll(futures);
808+
private void afterSeek(CompletableFuture<Void> seekFuture, @Nullable Throwable throwable) {
809+
duringSeek = false;
810+
log.info("[{}] Resume receiving messages for {} since seek is done", subscription, consumers.keySet());
811+
startReceivingMessages(new ArrayList<>(consumers.values()));
812+
if (throwable == null) {
813+
seekFuture.complete(null);
814+
} else {
815+
seekFuture.completeExceptionally(throwable);
816+
}
797817
}
798818

799819
@Override

0 commit comments

Comments
 (0)