Skip to content

Commit a1746a9

Browse files
committed
implemented
1 parent 7862234 commit a1746a9

File tree

4 files changed

+116
-16
lines changed

4 files changed

+116
-16
lines changed

lib/ClientImpl.cc

Lines changed: 48 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -144,11 +144,11 @@ ExecutorServiceProviderPtr ClientImpl::getPartitionListenerExecutorProvider() {
144144
}
145145

146146
LookupServicePtr ClientImpl::getLookup(const std::string& redirectedClusterURI) {
147+
Lock lock(mutex_);
147148
if (redirectedClusterURI.empty()) {
148149
return lookupServicePtr_;
149150
}
150151

151-
Lock lock(mutex_);
152152
auto it = redirectedClusterLookupServicePtrs_.find(redirectedClusterURI);
153153
if (it == redirectedClusterLookupServicePtrs_.end()) {
154154
auto lookup = createLookup(redirectedClusterURI);
@@ -180,20 +180,21 @@ void ClientImpl::createProducerAsync(const std::string& topic, const ProducerCon
180180

181181
if (autoDownloadSchema) {
182182
auto self = shared_from_this();
183-
lookupServicePtr_->getSchema(topicName).addListener(
183+
auto lookup = getLookup();
184+
lookup->getSchema(topicName).addListener(
184185
[self, topicName, callback](Result res, const SchemaInfo& topicSchema) {
185186
if (res != ResultOk) {
186187
callback(res, Producer());
187188
return;
188189
}
189190
ProducerConfiguration conf;
190191
conf.setSchema(topicSchema);
191-
self->lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
192+
self->getLookup()->getPartitionMetadataAsync(topicName).addListener(
192193
std::bind(&ClientImpl::handleCreateProducer, self, std::placeholders::_1,
193194
std::placeholders::_2, topicName, conf, callback));
194195
});
195196
} else {
196-
lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
197+
getLookup()->getPartitionMetadataAsync(topicName).addListener(
197198
std::bind(&ClientImpl::handleCreateProducer, shared_from_this(), std::placeholders::_1,
198199
std::placeholders::_2, topicName, conf, callback));
199200
}
@@ -266,7 +267,7 @@ void ClientImpl::createReaderAsync(const std::string& topic, const MessageId& st
266267
}
267268

268269
MessageId msgId(startMessageId);
269-
lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
270+
getLookup()->getPartitionMetadataAsync(topicName).addListener(
270271
std::bind(&ClientImpl::handleReaderMetadataLookup, shared_from_this(), std::placeholders::_1,
271272
std::placeholders::_2, topicName, msgId, conf, callback));
272273
}
@@ -379,7 +380,8 @@ void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern, const
379380
return;
380381
}
381382

382-
lookupServicePtr_->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(), mode)
383+
getLookup()
384+
->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(), mode)
383385
.addListener(std::bind(&ClientImpl::createPatternMultiTopicsConsumer, shared_from_this(),
384386
std::placeholders::_1, std::placeholders::_2, regexPattern, mode,
385387
subscriptionName, conf, callback));
@@ -403,7 +405,7 @@ void ClientImpl::createPatternMultiTopicsConsumer(Result result, const Namespace
403405

404406
consumer = std::make_shared<PatternMultiTopicsConsumerImpl>(shared_from_this(), regexPattern, mode,
405407
*matchTopics, subscriptionName, conf,
406-
lookupServicePtr_, interceptors);
408+
getLookup(), interceptors);
407409

408410
consumer->getConsumerCreatedFuture().addListener(
409411
std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1,
@@ -450,7 +452,7 @@ void ClientImpl::subscribeAsync(const std::vector<std::string>& originalTopics,
450452
auto interceptors = std::make_shared<ConsumerInterceptors>(conf.getInterceptors());
451453

452454
ConsumerImplBasePtr consumer = std::make_shared<MultiTopicsConsumerImpl>(
453-
shared_from_this(), topics, subscriptionName, topicNamePtr, conf, lookupServicePtr_, interceptors);
455+
shared_from_this(), topics, subscriptionName, topicNamePtr, conf, getLookup(), interceptors);
454456

455457
consumer->getConsumerCreatedFuture().addListener(std::bind(&ClientImpl::handleConsumerCreated,
456458
shared_from_this(), std::placeholders::_1,
@@ -480,7 +482,7 @@ void ClientImpl::subscribeAsync(const std::string& topic, const std::string& sub
480482
}
481483
}
482484

483-
lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
485+
getLookup()->getPartitionMetadataAsync(topicName).addListener(
484486
std::bind(&ClientImpl::handleSubscribe, shared_from_this(), std::placeholders::_1,
485487
std::placeholders::_2, topicName, subscriptionName, conf, callback));
486488
}
@@ -505,7 +507,7 @@ void ClientImpl::handleSubscribe(Result result, const LookupDataResultPtr& parti
505507
}
506508
consumer = std::make_shared<MultiTopicsConsumerImpl>(
507509
shared_from_this(), topicName, partitionMetadata->getPartitions(), subscriptionName, conf,
508-
lookupServicePtr_, interceptors);
510+
getLookup(), interceptors);
509511
} else {
510512
auto consumerImpl = std::make_shared<ConsumerImpl>(shared_from_this(), topicName->toString(),
511513
subscriptionName, conf,
@@ -658,7 +660,7 @@ void ClientImpl::getPartitionsForTopicAsync(const std::string& topic, const GetP
658660
return;
659661
}
660662
}
661-
lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
663+
getLookup()->getPartitionMetadataAsync(topicName).addListener(
662664
std::bind(&ClientImpl::handleGetPartitions, shared_from_this(), std::placeholders::_1,
663665
std::placeholders::_2, topicName, callback));
664666
}
@@ -674,7 +676,7 @@ void ClientImpl::closeAsync(const CloseCallback& callback) {
674676
state_ = Closing;
675677

676678
memoryLimitController_.close();
677-
lookupServicePtr_->close();
679+
getLookup()->close();
678680
for (const auto& it : redirectedClusterLookupServicePtrs_) {
679681
it.second->close();
680682
}
@@ -776,7 +778,7 @@ void ClientImpl::shutdown() {
776778
<< " consumers have been shutdown.");
777779
}
778780

779-
lookupServicePtr_->close();
781+
getLookup()->close();
780782
if (!pool_.close()) {
781783
// pool_ has already been closed. It means shutdown() has been called before.
782784
return;
@@ -857,9 +859,39 @@ std::chrono::nanoseconds ClientImpl::getOperationTimeout(const ClientConfigurati
857859
void ClientImpl::updateConnectionInfo(const std::string& serviceUrl,
858860
const std::optional<const AuthenticationPtr>& authentication,
859861
const std::optional<std::string>& tlsTrustCertsFilePath) {
860-
// TODO:
861-
// 1. Reset the `lookupServicePtr_` with the new serviceUrl and auth parameters, and close the old one.
862-
// 2. Close all connections in `pool_`
862+
LookupServicePtr oldLookupServicePtr;
863+
std::unordered_map<std::string, LookupServicePtr> oldRedirectedLookupServicePtrs;
864+
865+
{
866+
Lock lock(mutex_);
867+
if (state_ != Open) {
868+
LOG_ERROR("Client is not open, cannot update connection info");
869+
return;
870+
}
871+
872+
if (authentication.has_value()) {
873+
clientConfiguration_.setAuth(*authentication);
874+
}
875+
if (tlsTrustCertsFilePath.has_value()) {
876+
clientConfiguration_.setTlsTrustCertsFilePath(*tlsTrustCertsFilePath);
877+
}
878+
clientConfiguration_.setUseTls(ServiceNameResolver::useTls(ServiceURI(serviceUrl)));
879+
880+
oldLookupServicePtr = std::move(lookupServicePtr_);
881+
oldRedirectedLookupServicePtrs = std::move(redirectedClusterLookupServicePtrs_);
882+
883+
lookupServicePtr_ = createLookup(serviceUrl);
884+
redirectedClusterLookupServicePtrs_.clear();
885+
}
886+
887+
if (oldLookupServicePtr) {
888+
oldLookupServicePtr->close();
889+
}
890+
for (const auto& it : oldRedirectedLookupServicePtrs) {
891+
it.second->close();
892+
}
893+
894+
pool_.resetConnections(clientConfiguration_.getAuthPtr(), clientConfiguration_);
863895
}
864896

865897
} /* namespace pulsar */

lib/ConnectionPool.cc

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,21 @@ bool ConnectionPool::close() {
6767
return true;
6868
}
6969

70+
void ConnectionPool::resetConnections(const AuthenticationPtr& authentication,
71+
const ClientConfiguration& conf) {
72+
std::unique_lock<std::recursive_mutex> lock(mutex_);
73+
authentication_ = authentication;
74+
clientConfiguration_ = conf;
75+
76+
for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) {
77+
auto& cnx = cnxIt->second;
78+
if (cnx) {
79+
cnx->close(ResultDisconnected, false);
80+
}
81+
}
82+
pool_.clear();
83+
}
84+
7085
static const std::string getKey(const std::string& logicalAddress, const std::string& physicalAddress,
7186
size_t keySuffix) {
7287
std::stringstream ss;

lib/ConnectionPool.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@ class PULSAR_PUBLIC ConnectionPool {
5151
*/
5252
bool close();
5353

54+
/**
55+
* Close all existing connections and update the authentication and configuration.
56+
* Unlike close(), the pool remains open for new connections.
57+
*/
58+
void resetConnections(const AuthenticationPtr& authentication, const ClientConfiguration& conf);
59+
5460
void remove(const std::string& logicalAddress, const std::string& physicalAddress, size_t keySuffix,
5561
ClientConnection* value);
5662

tests/ClientTest.cc

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -506,3 +506,50 @@ TEST(ClientTest, testNoRetry) {
506506
ASSERT_TRUE(result.timeMs < 1000) << "consumer: " << result.timeMs << " ms";
507507
}
508508
}
509+
510+
TEST(ClientTest, testUpdateConnectionInfo) {
511+
const std::string cluster1Url = "pulsar://localhost:6650";
512+
const std::string cluster2Url = "pulsar://localhost:6652";
513+
const std::string topic1 = "testUpdateConnectionInfo-cluster1-" + std::to_string(time(nullptr));
514+
const std::string topic2 = "testUpdateConnectionInfo-cluster2-" + std::to_string(time(nullptr));
515+
516+
Client client(cluster1Url);
517+
518+
// Produce and consume on cluster 1
519+
Producer producer1;
520+
ASSERT_EQ(ResultOk, client.createProducer(topic1, producer1));
521+
MessageId msgId;
522+
ASSERT_EQ(ResultOk, producer1.send(MessageBuilder().setContent("msg-on-cluster1").build(), msgId));
523+
producer1.close();
524+
525+
// Verify there are connections in the pool
526+
auto connections = PulsarFriend::getConnections(client);
527+
ASSERT_FALSE(connections.empty());
528+
529+
// Switch to cluster 2
530+
client.updateConnectionInfo(cluster2Url, std::nullopt, std::nullopt);
531+
532+
// Previous connections should have been closed
533+
for (const auto &cnx : connections) {
534+
ASSERT_TRUE(cnx->isClosed());
535+
}
536+
537+
// Produce and consume on cluster 2 using the same client
538+
Producer producer2;
539+
ASSERT_EQ(ResultOk, client.createProducer(topic2, producer2));
540+
ASSERT_EQ(ResultOk, producer2.send(MessageBuilder().setContent("msg-on-cluster2").build(), msgId));
541+
542+
Consumer consumer2;
543+
ASSERT_EQ(ResultOk, client.subscribe(topic2, "sub", consumer2));
544+
Message msg;
545+
ASSERT_EQ(ResultOk, consumer2.receive(msg, 5000));
546+
ASSERT_EQ("msg-on-cluster2", msg.getDataAsString());
547+
548+
// Verify connection pool now has connections to cluster 2
549+
auto newConnections = PulsarFriend::getConnections(client);
550+
ASSERT_FALSE(newConnections.empty());
551+
552+
consumer2.close();
553+
producer2.close();
554+
client.close();
555+
}

0 commit comments

Comments
 (0)