Skip to content
Merged
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Value;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
Expand All @@ -64,7 +65,7 @@ public class ConnectionPool implements AutoCloseable {

public static final int IDLE_DETECTION_INTERVAL_SECONDS_MIN = 60;

protected final ConcurrentHashMap<InetSocketAddress, ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> pool;
protected final ConcurrentMap<Key, CompletableFuture<ClientCnx>> pool;

private final Bootstrap bootstrap;
private final PulsarChannelInitializer channelInitializerHandler;
Expand All @@ -87,6 +88,14 @@ public class ConnectionPool implements AutoCloseable {
/** Async release useless connections task. **/
private ScheduledFuture asyncReleaseUselessConnectionsTask;


@Value
private static class Key {
InetSocketAddress logicalAddress;
InetSocketAddress physicalAddress;
int randomKey;
}

public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
this(conf, eventLoopGroup, () -> new ClientCnx(conf, eventLoopGroup));
}
Expand Down Expand Up @@ -185,7 +194,7 @@ public CompletableFuture<ClientCnx> getConnection(final InetSocketAddress addres
}

void closeAllConnections() {
pool.values().forEach(map -> map.values().forEach(future -> {
pool.values().forEach(future -> {
if (future.isDone()) {
if (!future.isCompletedExceptionally()) {
// Connection was already created successfully, the join will not throw any exception
Expand All @@ -198,10 +207,9 @@ void closeAllConnections() {
// succeed
future.thenAccept(ClientCnx::close);
}
}));
});
}

/**
/**
* Get a connection from the pool.
* <p>
* The connection can either be created or be coming from the pool itself.
Expand All @@ -222,59 +230,52 @@ public CompletableFuture<ClientCnx> getConnection(InetSocketAddress logicalAddre
InetSocketAddress physicalAddress, final int randomKey) {
if (maxConnectionsPerHosts == 0) {
// Disable pooling
return createConnection(logicalAddress, physicalAddress, -1);
return createConnection(new Key(logicalAddress, physicalAddress, -1));
}

final ConcurrentMap<Integer, CompletableFuture<ClientCnx>> innerPool =
pool.computeIfAbsent(logicalAddress, a -> new ConcurrentHashMap<>());
CompletableFuture<ClientCnx> completableFuture = innerPool
.computeIfAbsent(randomKey, k -> createConnection(logicalAddress, physicalAddress, randomKey));
Key key = new Key(logicalAddress, physicalAddress, randomKey);
CompletableFuture<ClientCnx> completableFuture = pool.computeIfAbsent(key, k -> createConnection(key));
if (completableFuture.isCompletedExceptionally()) {
// we cannot cache a failed connection, so we remove it from the pool
// there is a race condition in which
// cleanupConnection is called before caching this result
// and so the clean up fails
cleanupConnection(logicalAddress, randomKey, completableFuture);
pool.remove(key, completableFuture);
return completableFuture;
}

return completableFuture.thenCompose(clientCnx -> {
// If connection already release, create a new one.
if (clientCnx.getIdleState().isReleased()) {
cleanupConnection(logicalAddress, randomKey, completableFuture);
return innerPool
.computeIfAbsent(randomKey, k -> createConnection(logicalAddress, physicalAddress, randomKey));
pool.remove(key, completableFuture);
return pool.computeIfAbsent(key, k -> createConnection(key));
}
// Try use exists connection.
if (clientCnx.getIdleState().tryMarkUsingAndClearIdleTime()) {
return CompletableFuture.completedFuture(clientCnx);
} else {
// If connection already release, create a new one.
cleanupConnection(logicalAddress, randomKey, completableFuture);
return innerPool
.computeIfAbsent(randomKey, k -> createConnection(logicalAddress, physicalAddress, randomKey));
pool.remove(key, completableFuture);
return pool.computeIfAbsent(key, k -> createConnection(key));
}
});
}

private CompletableFuture<ClientCnx> createConnection(InetSocketAddress logicalAddress,
InetSocketAddress physicalAddress, int connectionKey) {
private CompletableFuture<ClientCnx> createConnection(Key key) {
if (log.isDebugEnabled()) {
log.debug("Connection for {} not found in cache", logicalAddress);
log.debug("Connection for {} not found in cache", key.logicalAddress);
}

final CompletableFuture<ClientCnx> cnxFuture = new CompletableFuture<>();

// Trigger async connect to broker
createConnection(logicalAddress, physicalAddress).thenAccept(channel -> {
createConnection(key.logicalAddress, key.physicalAddress).thenAccept(channel -> {
log.info("[{}] Connected to server", channel);

channel.closeFuture().addListener(v -> {
// Remove connection from pool when it gets closed
if (log.isDebugEnabled()) {
log.debug("Removing closed connection from pool: {}", v);
}
cleanupConnection(logicalAddress, connectionKey, cnxFuture);
pool.remove(key, cnxFuture);
});

// We are connected to broker, but need to wait until the connect/connected handshake is
Expand All @@ -300,14 +301,14 @@ private CompletableFuture<ClientCnx> createConnection(InetSocketAddress logicalA
// CompletableFuture is cached into the "pool" map,
// it is not enough to clean it here, we need to clean it
// in the "pool" map when the CompletableFuture is cached
cleanupConnection(logicalAddress, connectionKey, cnxFuture);
pool.remove(key, cnxFuture);
cnx.ctx().close();
return null;
});
}).exceptionally(exception -> {
eventLoopGroup.execute(() -> {
log.warn("Failed to open connection to {} : {}", physicalAddress, exception.getMessage());
cleanupConnection(logicalAddress, connectionKey, cnxFuture);
log.warn("Failed to open connection to {} : {}", key.physicalAddress, exception.getMessage());
pool.remove(key, cnxFuture);
cnxFuture.completeExceptionally(new PulsarClientException(exception));
});
return null;
Expand Down Expand Up @@ -439,17 +440,9 @@ public void close() throws Exception {
}
}

private void cleanupConnection(InetSocketAddress address, int connectionKey,
CompletableFuture<ClientCnx> connectionFuture) {
ConcurrentMap<Integer, CompletableFuture<ClientCnx>> map = pool.get(address);
if (map != null) {
map.remove(connectionKey, connectionFuture);
}
}

@VisibleForTesting
int getPoolSize() {
return pool.values().stream().mapToInt(Map::size).sum();
return pool.size();
}

private static final Logger log = LoggerFactory.getLogger(ConnectionPool.class);
Expand All @@ -459,11 +452,8 @@ public void doMarkAndReleaseUselessConnections(){
return;
}
List<Runnable> releaseIdleConnectionTaskList = new ArrayList<>();
for (Map.Entry<InetSocketAddress, ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> entry :
pool.entrySet()){
ConcurrentMap<Integer, CompletableFuture<ClientCnx>> innerPool = entry.getValue();
for (Map.Entry<Integer, CompletableFuture<ClientCnx>> entry0 : innerPool.entrySet()) {
CompletableFuture<ClientCnx> future = entry0.getValue();
for (Map.Entry<Key, CompletableFuture<ClientCnx>> entry : pool.entrySet()) {
CompletableFuture<ClientCnx> future = entry.getValue();
// Ensure connection has been connected.
if (!future.isDone()) {
continue;
Expand All @@ -481,18 +471,17 @@ public void doMarkAndReleaseUselessConnections(){
if (clientCnx.getIdleState().isReleasing()) {
releaseIdleConnectionTaskList.add(() -> {
if (clientCnx.getIdleState().tryMarkReleasedAndCloseConnection()) {
cleanupConnection(entry.getKey(), entry0.getKey(), future);
pool.remove(entry.getKey(), future);
}
});
}
}
}
// Do release idle connections.
releaseIdleConnectionTaskList.forEach(Runnable::run);
}

public Set<CompletableFuture<ClientCnx>> getConnections() {
return Collections.unmodifiableSet(
pool.values().stream().flatMap(n -> n.values().stream()).collect(Collectors.toSet()));
pool.values().stream().collect(Collectors.toSet()));
}
}