Commit 47237a5
committed
[fix][client] Fix seeking might not work for multi-topics when startMessageId is earliest
### Motivation
The multi-topics consumer supports seeking all internal consumers when
accepting a timestamp as the argument. However, the multi-topics
consumer could still read from earliest after seeking to a specific
position. There are two reasons:
- `incomingMessages` is not cleared before `seek(long timestamp)`
- duplicated messages could still be received before a seek operation is
done
### Modifications
For a multi-topics consumer, before the seek operation is done, do not
call `messageReceived` method. Once the seek operation is done, collect
all messages from internal consumers and call `messageReceived` method
for each one.
Add `testSeekByTimestamp` to verify `receive`, `receiveAsync` and
message listener all work for seeking by timestamp on a multi-topics
consumer.1 parent 6347315 commit 47237a5
2 files changed
Lines changed: 101 additions & 18 deletions
File tree
- pulsar-broker/src/test/java/org/apache/pulsar/client/api
- pulsar-client/src/main/java/org/apache/pulsar/client/impl
Lines changed: 68 additions & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
29 | 29 | | |
30 | 30 | | |
31 | 31 | | |
| 32 | + | |
32 | 33 | | |
| 34 | + | |
33 | 35 | | |
34 | 36 | | |
35 | 37 | | |
| 38 | + | |
| 39 | + | |
| 40 | + | |
36 | 41 | | |
37 | 42 | | |
38 | 43 | | |
39 | 44 | | |
40 | 45 | | |
41 | 46 | | |
| 47 | + | |
42 | 48 | | |
43 | 49 | | |
44 | 50 | | |
| |||
371 | 377 | | |
372 | 378 | | |
373 | 379 | | |
| 380 | + | |
| 381 | + | |
| 382 | + | |
| 383 | + | |
| 384 | + | |
| 385 | + | |
| 386 | + | |
| 387 | + | |
| 388 | + | |
| 389 | + | |
| 390 | + | |
| 391 | + | |
| 392 | + | |
| 393 | + | |
| 394 | + | |
| 395 | + | |
| 396 | + | |
| 397 | + | |
| 398 | + | |
| 399 | + | |
| 400 | + | |
| 401 | + | |
| 402 | + | |
| 403 | + | |
| 404 | + | |
| 405 | + | |
| 406 | + | |
| 407 | + | |
| 408 | + | |
| 409 | + | |
| 410 | + | |
| 411 | + | |
| 412 | + | |
| 413 | + | |
| 414 | + | |
| 415 | + | |
| 416 | + | |
| 417 | + | |
| 418 | + | |
| 419 | + | |
| 420 | + | |
| 421 | + | |
| 422 | + | |
| 423 | + | |
| 424 | + | |
| 425 | + | |
| 426 | + | |
| 427 | + | |
| 428 | + | |
| 429 | + | |
| 430 | + | |
| 431 | + | |
| 432 | + | |
| 433 | + | |
| 434 | + | |
| 435 | + | |
| 436 | + | |
| 437 | + | |
| 438 | + | |
| 439 | + | |
| 440 | + | |
| 441 | + | |
374 | 442 | | |
Lines changed: 33 additions & 18 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
49 | 49 | | |
50 | 50 | | |
51 | 51 | | |
| 52 | + | |
52 | 53 | | |
53 | 54 | | |
54 | 55 | | |
| |||
78 | 79 | | |
79 | 80 | | |
80 | 81 | | |
| 82 | + | |
81 | 83 | | |
82 | 84 | | |
83 | 85 | | |
| |||
252 | 254 | | |
253 | 255 | | |
254 | 256 | | |
255 | | - | |
| 257 | + | |
256 | 258 | | |
257 | 259 | | |
258 | 260 | | |
| |||
748 | 750 | | |
749 | 751 | | |
750 | 752 | | |
751 | | - | |
752 | | - | |
753 | | - | |
754 | | - | |
755 | | - | |
756 | | - | |
| 753 | + | |
757 | 754 | | |
758 | 755 | | |
759 | 756 | | |
| |||
775 | 772 | | |
776 | 773 | | |
777 | 774 | | |
778 | | - | |
779 | 775 | | |
780 | | - | |
781 | | - | |
782 | | - | |
| 776 | + | |
783 | 777 | | |
784 | | - | |
| 778 | + | |
| 779 | + | |
| 780 | + | |
| 781 | + | |
785 | 782 | | |
| 783 | + | |
| 784 | + | |
| 785 | + | |
| 786 | + | |
| 787 | + | |
| 788 | + | |
786 | 789 | | |
| 790 | + | |
| 791 | + | |
| 792 | + | |
| 793 | + | |
| 794 | + | |
| 795 | + | |
| 796 | + | |
| 797 | + | |
| 798 | + | |
| 799 | + | |
787 | 800 | | |
788 | 801 | | |
789 | | - | |
790 | 802 | | |
791 | 803 | | |
792 | | - | |
793 | | - | |
794 | | - | |
795 | | - | |
796 | | - | |
| 804 | + | |
| 805 | + | |
| 806 | + | |
| 807 | + | |
| 808 | + | |
| 809 | + | |
| 810 | + | |
| 811 | + | |
797 | 812 | | |
798 | 813 | | |
799 | 814 | | |
| |||
0 commit comments