4343import java .util .Optional ;
4444import java .util .Random ;
4545import java .util .Set ;
46- import java .util .StringJoiner ;
4746import java .util .concurrent .CompletableFuture ;
4847import java .util .concurrent .ConcurrentHashMap ;
4948import java .util .concurrent .ConcurrentMap ;
5049import java .util .concurrent .TimeUnit ;
5150import java .util .function .Supplier ;
5251import java .util .stream .Collectors ;
52+ import lombok .AllArgsConstructor ;
53+ import lombok .EqualsAndHashCode ;
5354import org .apache .commons .lang3 .StringUtils ;
5455import org .apache .pulsar .client .api .PulsarClientException ;
5556import org .apache .pulsar .client .api .PulsarClientException .InvalidServiceURL ;
@@ -65,7 +66,7 @@ public class ConnectionPool implements AutoCloseable {
6566
6667 public static final int IDLE_DETECTION_INTERVAL_SECONDS_MIN = 60 ;
6768
68- protected final ConcurrentMap <String , CompletableFuture <ClientCnx >> pool ;
69+ protected final ConcurrentMap <Key , CompletableFuture <ClientCnx >> pool ;
6970
7071 private final Bootstrap bootstrap ;
7172 private final PulsarChannelInitializer channelInitializerHandler ;
@@ -88,6 +89,15 @@ public class ConnectionPool implements AutoCloseable {
8889 /** Async release useless connections task. **/
8990 private ScheduledFuture asyncReleaseUselessConnectionsTask ;
9091
92+
93+ @ AllArgsConstructor
94+ @ EqualsAndHashCode
95+ private static class Key {
96+ private final InetSocketAddress logicalAddress ;
97+ private final InetSocketAddress physicalAddress ;
98+ private final int randomKey ;
99+ }
100+
91101 public ConnectionPool (ClientConfigurationData conf , EventLoopGroup eventLoopGroup ) throws PulsarClientException {
92102 this (conf , eventLoopGroup , () -> new ClientCnx (conf , eventLoopGroup ));
93103 }
@@ -202,13 +212,8 @@ void closeAllConnections() {
202212 });
203213 }
204214
205- private String getKey (InetSocketAddress logicalAddress ,
206- InetSocketAddress physicalAddress , final int randomKey ) {
207- StringJoiner sj = new StringJoiner ("#" );
208- sj .add (logicalAddress .toString ());
209- sj .add (physicalAddress .toString ());
210- sj .add (String .valueOf (randomKey ));
211- return sj .toString ();
215+ private Key getKey (InetSocketAddress logicalAddress , InetSocketAddress physicalAddress , final int randomKey ) {
216+ return new Key (logicalAddress , physicalAddress , randomKey );
212217 }
213218 /**
214219 * Get a connection from the pool.
@@ -233,7 +238,7 @@ public CompletableFuture<ClientCnx> getConnection(InetSocketAddress logicalAddre
233238 // Disable pooling
234239 return createConnection (logicalAddress , physicalAddress , -1 );
235240 }
236- String key = getKey (logicalAddress , physicalAddress , randomKey );
241+ Key key = getKey (logicalAddress , physicalAddress , randomKey );
237242 CompletableFuture <ClientCnx > completableFuture = pool
238243 .computeIfAbsent (key , k -> createConnection (logicalAddress , physicalAddress , randomKey ));
239244 if (completableFuture .isCompletedExceptionally ()) {
@@ -449,7 +454,7 @@ public void close() throws Exception {
449454 private void cleanupConnection (InetSocketAddress logicalAddress ,
450455 InetSocketAddress physicalAddress , int connectionKey ,
451456 CompletableFuture <ClientCnx > connectionFuture ) {
452- String key = getKey (logicalAddress , physicalAddress , connectionKey );
457+ Key key = getKey (logicalAddress , physicalAddress , connectionKey );
453458 pool .remove (key , connectionFuture );
454459 }
455460
@@ -465,7 +470,7 @@ public void doMarkAndReleaseUselessConnections(){
465470 return ;
466471 }
467472 List <Runnable > releaseIdleConnectionTaskList = new ArrayList <>();
468- for (Map .Entry <String , CompletableFuture <ClientCnx >> entry :
473+ for (Map .Entry <Key , CompletableFuture <ClientCnx >> entry :
469474 pool .entrySet ()){
470475 CompletableFuture <ClientCnx > future = entry .getValue ();
471476 // Ensure connection has been connected.
0 commit comments