Skip to content

Commit 35ad63a

Browse files
committed
Support waiting for ACK response
TODO: - Reuse the logic to convert vector<pair<MessageId, ResultCallback> to a map. - Fix the callback in AckGroupingTrackerEnabled not triggered in time
1 parent 14b0451 commit 35ad63a

13 files changed

Lines changed: 301 additions & 213 deletions

include/pulsar/ConsumerConfiguration.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -618,6 +618,22 @@ class PULSAR_PUBLIC ConsumerConfiguration {
618618
*/
619619
bool isBatchIndexAckEnabled() const;
620620

621+
/**
622+
* Whether to receive the ACK receipt from broker.
623+
*
624+
* By default, when Consumer::acknowledge is called, it won't wait until the corresponding response from
625+
* broker. After it's enabled, the `acknowledge` method will return a Result that indicates if the
626+
* acknowledgment succeeded.
627+
*
628+
* Default: false
629+
*/
630+
ConsumerConfiguration& setAckReceiptEnabled(bool ackReceiptEnabled);
631+
632+
/**
633+
* The associated getter of setAckReceiptEnabled.
634+
*/
635+
bool isAckReceiptEnabled() const;
636+
621637
friend class PulsarWrapper;
622638
friend class PulsarFriend;
623639

lib/AckGroupingTracker.cc

Lines changed: 65 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -21,66 +21,89 @@
2121

2222
#include "BitSet.h"
2323
#include "ClientConnection.h"
24+
#include "ClientImpl.h"
2425
#include "Commands.h"
26+
#include "HandlerBase.h"
2527
#include "LogUtils.h"
2628
#include "MessageIdImpl.h"
2729

2830
namespace pulsar {
2931

3032
DECLARE_LOG_OBJECT();
3133

32-
inline void sendAck(ClientConnectionPtr cnx, uint64_t consumerId, const MessageId& msgId,
33-
CommandAck_AckType ackType) {
34-
const auto& bitSet = Commands::getMessageIdImpl(msgId)->getBitSet();
35-
auto cmd = Commands::newAck(consumerId, msgId.ledgerId(), msgId.entryId(), bitSet, ackType, -1);
36-
cnx->sendCommand(cmd);
37-
LOG_DEBUG("ACK request is sent for message - [" << msgId.ledgerId() << ", " << msgId.entryId() << "]");
38-
}
39-
40-
bool AckGroupingTracker::doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uint64_t consumerId,
41-
const MessageId& msgId, CommandAck_AckType ackType) {
42-
auto cnx = connWeakPtr.lock();
43-
if (cnx == nullptr) {
44-
LOG_DEBUG("Connection is not ready, ACK failed for message - [" << msgId.ledgerId() << ", "
45-
<< msgId.entryId() << "]");
46-
return false;
34+
std::pair<std::shared_ptr<ClientConnection>, uint64_t> AckGroupingTracker::generateAckInfo() const {
35+
auto handler = handler_.lock();
36+
if (!handler) {
37+
LOG_DEBUG("Reference to the HandlerBase is not valid in generateAckInfo");
38+
return std::make_pair(nullptr, 0);
4739
}
48-
sendAck(cnx, consumerId, msgId, ackType);
49-
return true;
50-
}
51-
52-
static std::ostream& operator<<(std::ostream& os, const std::set<MessageId>& msgIds) {
53-
bool first = true;
54-
for (auto&& msgId : msgIds) {
55-
if (first) {
56-
first = false;
57-
} else {
58-
os << ", ";
40+
auto cnx = handler->getCnx().lock();
41+
if (!cnx) {
42+
LOG_DEBUG("Connection is not ready in generateAckInfo");
43+
return std::make_pair(nullptr, 0);
44+
}
45+
if (waitResponse_) {
46+
auto client = client_.lock();
47+
if (!client) {
48+
LOG_DEBUG("Reference to the ClientImpl is not valid in generateAckInfo");
49+
return std::make_pair(nullptr, 0);
5950
}
60-
os << "[" << msgId << "]";
51+
return std::make_pair(cnx, client->newRequestId());
52+
} else {
53+
return std::make_pair(cnx, 0);
6154
}
62-
return os;
6355
}
6456

65-
bool AckGroupingTracker::doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uint64_t consumerId,
66-
const std::set<MessageId>& msgIds) {
67-
auto cnx = connWeakPtr.lock();
68-
if (cnx == nullptr) {
69-
LOG_DEBUG("Connection is not ready, ACK failed.");
70-
return false;
57+
void AckGroupingTracker::doImmediateAck(ClientConnection& cnx, uint64_t requestId, const MessageId& msgId,
58+
ResultCallback callback, CommandAck_AckType ackType) {
59+
const auto& bitSet = Commands::getMessageIdImpl(msgId)->getBitSet();
60+
auto cmd = Commands::newAck(consumerId_, msgId.ledgerId(), msgId.entryId(), bitSet, ackType, -1);
61+
if (waitResponse_) {
62+
cnx.sendRequestWithId(cmd, requestId).addListener([callback](Result result, const ResponseData&) {
63+
if (callback) {
64+
callback(result);
65+
}
66+
});
67+
} else {
68+
cnx.sendCommand(cmd);
69+
callback(ResultOk);
7170
}
71+
}
72+
73+
void AckGroupingTracker::doImmediateAck(ClientConnection& cnx, uint64_t requestId,
74+
const std::map<MessageId, ResultCallback>& msgIdToCallback) {
75+
using Callbacks = std::vector<ResultCallback>;
76+
if (Commands::peerSupportsActiveConsumerListener(cnx.getServerProtocolVersion())) {
77+
std::set<MessageId> msgIds;
78+
std::unique_ptr<Callbacks> callbacks{new Callbacks};
7279

73-
if (Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion())) {
74-
auto cmd = Commands::newMultiMessageAck(consumerId, msgIds);
75-
cnx->sendCommand(cmd);
76-
LOG_DEBUG("ACK request is sent for " << msgIds.size() << " messages: " << msgIds);
80+
for (auto&& kv : msgIdToCallback) {
81+
msgIds.emplace(kv.first);
82+
if (kv.second) {
83+
callbacks->emplace_back(kv.second);
84+
}
85+
}
86+
87+
auto cmd = Commands::newMultiMessageAck(consumerId_, msgIds);
88+
if (waitResponse_) {
89+
auto rawPtr = callbacks.release();
90+
cnx.sendRequestWithId(cmd, requestId).addListener([rawPtr](Result result, const ResponseData&) {
91+
std::unique_ptr<Callbacks> callbacks{rawPtr};
92+
for (auto&& callback : *callbacks) {
93+
callback(result);
94+
}
95+
});
96+
} else {
97+
cnx.sendCommand(cmd);
98+
for (auto&& callback : *callbacks) {
99+
callback(ResultOk);
100+
}
101+
}
77102
} else {
78-
// Broker does not support multi-message ACK, use multiple individual ACKs instead.
79-
for (const auto& msgId : msgIds) {
80-
sendAck(cnx, consumerId, msgId, CommandAck_AckType_Individual);
103+
for (auto&& kv : msgIdToCallback) {
104+
doImmediateAck(cnx, requestId, kv.first, kv.second, CommandAck_AckType_Individual);
81105
}
82106
}
83-
return true;
84107
}
85108

86109
} // namespace pulsar

lib/AckGroupingTracker.h

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,21 @@
1919
#ifndef LIB_ACKGROUPINGTRACKER_H_
2020
#define LIB_ACKGROUPINGTRACKER_H_
2121

22+
#include <pulsar/Consumer.h>
2223
#include <pulsar/MessageId.h>
2324

2425
#include <cstdint>
25-
#include <set>
26+
#include <map>
27+
#include <utility> // std::pair
2628

2729
#include "ProtoApiEnums.h"
2830

2931
namespace pulsar {
3032

33+
class ClientImpl;
3134
class ClientConnection;
3235
using ClientConnectionWeakPtr = std::weak_ptr<ClientConnection>;
36+
class HandlerBase;
3337

3438
/**
3539
* @class AckGroupingTracker
@@ -38,7 +42,10 @@ using ClientConnectionWeakPtr = std::weak_ptr<ClientConnection>;
3842
*/
3943
class AckGroupingTracker : public std::enable_shared_from_this<AckGroupingTracker> {
4044
public:
41-
AckGroupingTracker() = default;
45+
AckGroupingTracker(std::shared_ptr<ClientImpl> client, std::shared_ptr<HandlerBase> handler,
46+
uint64_t consumerId, bool waitResponse)
47+
: client_(client), handler_(handler), consumerId_(consumerId), waitResponse_(waitResponse) {}
48+
4249
virtual ~AckGroupingTracker() = default;
4350

4451
/**
@@ -59,20 +66,23 @@ class AckGroupingTracker : public std::enable_shared_from_this<AckGroupingTracke
5966
/**
6067
* Adding message ID into ACK group for individual ACK.
6168
* @param[in] msgId ID of the message to be ACKed.
69+
* @param[in] callback the callback that is triggered when the message is acknowledged
6270
*/
63-
virtual void addAcknowledge(const MessageId& msgId) {}
71+
virtual void addAcknowledge(const MessageId& msgId, ResultCallback callback) {}
6472

6573
/**
6674
* Adding message ID list into ACK group for individual ACK.
67-
* @param[in] msgIds of the message to be ACKed.
75+
* @param[in] msgIdAndCallbacks the list of the pair of the message to be ACKed and its callback
6876
*/
69-
virtual void addAcknowledgeList(const MessageIdList& msgIds) {}
77+
virtual void addAcknowledgeList(
78+
const std::vector<std::pair<MessageId, ResultCallback>>& msgIdAndCallbacks) {}
7079

7180
/**
7281
* Adding message ID into ACK group for cumulative ACK.
7382
* @param[in] msgId ID of the message to be ACKed.
83+
* @param[in] callback the callback that is triggered when the message is acknowledged
7484
*/
75-
virtual void addAcknowledgeCumulative(const MessageId& msgId) {}
85+
virtual void addAcknowledgeCumulative(const MessageId& msgId, ResultCallback callback) {}
7686

7787
/**
7888
* Flush all the pending grouped ACKs (as flush() does), and stop period ACKs sending.
@@ -91,27 +101,18 @@ class AckGroupingTracker : public std::enable_shared_from_this<AckGroupingTracke
91101
virtual void flushAndClean() {}
92102

93103
protected:
94-
/**
95-
* Immediately send ACK request to broker.
96-
* @param[in] connWeakPtr weak pointer of the client connection.
97-
* @param[in] consumerId ID of the consumer that performs this ACK.
98-
* @param[in] msgId message ID to be ACKed.
99-
* @param[in] ackType ACK type, e.g. cumulative.
100-
* @return true if the ACK is sent successfully, otherwise false.
101-
*/
102-
bool doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uint64_t consumerId, const MessageId& msgId,
103-
CommandAck_AckType ackType);
104+
void doImmediateAck(ClientConnection& cnx, uint64_t requestId, const MessageId& msgId,
105+
ResultCallback callback, CommandAck_AckType ackType);
106+
void doImmediateAck(ClientConnection& cnx, uint64_t requestId,
107+
const std::map<MessageId, ResultCallback>& msgIdToCallback);
108+
std::pair<std::shared_ptr<ClientConnection>, uint64_t /* request id */> generateAckInfo() const;
109+
110+
private:
111+
const std::weak_ptr<ClientImpl> client_;
112+
const std::weak_ptr<HandlerBase> handler_;
113+
const uint64_t consumerId_;
114+
const bool waitResponse_;
104115

105-
/**
106-
* Immediately send a set of ACK requests one by one to the broker, it only supports individual
107-
* ACK.
108-
* @param[in] connWeakPtr weak pointer of the client connection.
109-
* @param[in] consumerId ID of the consumer that performs this ACK.
110-
* @param[in] msgIds message IDs to be ACKed.
111-
* @return true if the ACK is sent successfully, otherwise false.
112-
*/
113-
bool doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uint64_t consumerId,
114-
const std::set<MessageId>& msgIds);
115116
}; // class AckGroupingTracker
116117

117118
using AckGroupingTrackerPtr = std::shared_ptr<AckGroupingTracker>;

lib/AckGroupingTrackerDisabled.cc

Lines changed: 56 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,32 +20,72 @@
2020
#include "AckGroupingTrackerDisabled.h"
2121

2222
#include "HandlerBase.h"
23-
#include "LogUtils.h"
2423
#include "ProtoApiEnums.h"
2524

2625
namespace pulsar {
2726

28-
DECLARE_LOG_OBJECT();
29-
30-
AckGroupingTrackerDisabled::AckGroupingTrackerDisabled(HandlerBase& handler, uint64_t consumerId)
31-
: AckGroupingTracker(), handler_(handler), consumerId_(consumerId) {
32-
LOG_INFO("ACK grouping is disabled.");
27+
void AckGroupingTrackerDisabled::addAcknowledge(const MessageId& msgId, ResultCallback callback) {
28+
auto pair = generateAckInfo();
29+
auto cnx = pair.first;
30+
if (!cnx) {
31+
if (callback) {
32+
callback(ResultNotConnected);
33+
}
34+
return;
35+
}
36+
auto requestId = pair.second;
37+
doImmediateAck(*cnx, requestId, msgId, callback, CommandAck_AckType_Individual);
3338
}
3439

35-
void AckGroupingTrackerDisabled::addAcknowledge(const MessageId& msgId) {
36-
this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgId, CommandAck_AckType_Individual);
37-
}
40+
void AckGroupingTrackerDisabled::addAcknowledgeList(
41+
const std::vector<std::pair<MessageId, ResultCallback>>& msgIdAndCallbacks) {
42+
auto pair = generateAckInfo();
43+
auto cnx = pair.first;
44+
if (!cnx) {
45+
for (auto&& kv : msgIdAndCallbacks) {
46+
auto callback = kv.second;
47+
if (callback) {
48+
callback(ResultNotConnected);
49+
}
50+
}
51+
return;
52+
}
3853

39-
void AckGroupingTrackerDisabled::addAcknowledgeList(const MessageIdList& msgIds) {
40-
std::set<MessageId> msgIdSet;
41-
for (auto&& msgId : msgIds) {
42-
msgIdSet.emplace(msgId);
54+
// TODO: reuse the same logic in AckGroupingTrackerEnabled
55+
auto requestId = pair.second;
56+
std::map<MessageId, ResultCallback> msgIdToCallback;
57+
for (auto&& kv : msgIdAndCallbacks) {
58+
auto&& msgId = kv.first;
59+
auto callback = kv.second;
60+
auto pair = msgIdToCallback.emplace(msgId, callback);
61+
62+
// `msgId` is already cached, combine `callback` with the existing callback
63+
auto previousCallback = pair.first->second;
64+
if (callback) {
65+
auto previousCallback = pair.first->second;
66+
pair.first->second = [previousCallback, callback](Result result) {
67+
if (previousCallback) {
68+
previousCallback(result);
69+
}
70+
callback(result);
71+
};
72+
}
4373
}
44-
this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgIdSet);
74+
75+
doImmediateAck(*cnx, requestId, msgIdToCallback);
4576
}
4677

47-
void AckGroupingTrackerDisabled::addAcknowledgeCumulative(const MessageId& msgId) {
48-
this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgId, CommandAck_AckType_Cumulative);
78+
void AckGroupingTrackerDisabled::addAcknowledgeCumulative(const MessageId& msgId, ResultCallback callback) {
79+
auto pair = generateAckInfo();
80+
auto cnx = pair.first;
81+
if (!cnx) {
82+
if (callback) {
83+
callback(ResultNotConnected);
84+
}
85+
return;
86+
}
87+
auto requestId = pair.second;
88+
doImmediateAck(*cnx, requestId, msgId, callback, CommandAck_AckType_Cumulative);
4989
}
5090

5191
} // namespace pulsar

lib/AckGroupingTrackerDisabled.h

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,10 @@
1919
#ifndef LIB_ACKGROUPINGTRACKERDISABLED_H_
2020
#define LIB_ACKGROUPINGTRACKERDISABLED_H_
2121

22-
#include <cstdint>
23-
2422
#include "AckGroupingTracker.h"
2523

2624
namespace pulsar {
2725

28-
class HandlerBase;
29-
3026
/**
3127
* @class AckGroupingTrackerDisabled
3228
* ACK grouping tracker that does not tracker or group ACK requests. The ACK requests are diretly
@@ -36,23 +32,11 @@ class AckGroupingTrackerDisabled : public AckGroupingTracker {
3632
public:
3733
virtual ~AckGroupingTrackerDisabled() = default;
3834

39-
/**
40-
* Constructing ACK grouping tracker for peresistent topics that disabled ACK grouping.
41-
* @param[in] handler the connection handler.
42-
* @param[in] consumerId consumer ID that this tracker belongs to.
43-
*/
44-
AckGroupingTrackerDisabled(HandlerBase& handler, uint64_t consumerId);
45-
46-
void addAcknowledge(const MessageId& msgId) override;
47-
void addAcknowledgeList(const MessageIdList& msgIds) override;
48-
void addAcknowledgeCumulative(const MessageId& msgId) override;
49-
50-
private:
51-
//! The connection handler.
52-
HandlerBase& handler_;
35+
using AckGroupingTracker::AckGroupingTracker;
5336

54-
//! ID of the consumer that this tracker belongs to.
55-
uint64_t consumerId_;
37+
void addAcknowledge(const MessageId&, ResultCallback) override;
38+
void addAcknowledgeList(const std::vector<std::pair<MessageId, ResultCallback>>&) override;
39+
void addAcknowledgeCumulative(const MessageId&, ResultCallback) override;
5640
}; // class AckGroupingTrackerDisabled
5741

5842
} // namespace pulsar

0 commit comments

Comments
 (0)