Skip to content

Commit e66b7ee

Browse files
junmuzymuzammil
authored andcommitted
Ensuring JDBC connection is not stale before using it
1 parent e3314ed commit e66b7ee

3 files changed

Lines changed: 192 additions & 0 deletions

File tree

paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,19 @@ public <R> R run(Action<R, C, E> action) throws E, InterruptedException {
6565
continue;
6666
}
6767
try {
68+
client = reconnect(client);
6869
return action.run(client);
6970
} finally {
7071
clients.addFirst(client);
7172
}
7273
}
7374
}
7475

76+
/** Validate and potentially replace a client before use. */
77+
protected C reconnect(C client) {
78+
return client;
79+
}
80+
7581
@Override
7682
public void execute(ExecuteAction<C, E> action) throws E, InterruptedException {
7783
run(

paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcClientPool.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020

2121
import org.apache.paimon.client.ClientPool;
2222

23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
2326
import java.sql.Connection;
2427
import java.sql.DriverManager;
2528
import java.sql.SQLException;
@@ -32,12 +35,16 @@
3235
/** Client pool for jdbc. */
3336
public class JdbcClientPool extends ClientPool.ClientPoolImpl<Connection, SQLException> {
3437

38+
private static final Logger LOG = LoggerFactory.getLogger(JdbcClientPool.class);
3539
private static final Pattern PROTOCOL_PATTERN = Pattern.compile("jdbc:([^:]+):(.*)");
40+
private static final int CONNECTION_VALIDATION_TIMEOUT_SECONDS = 5;
3641

3742
private final String protocol;
43+
private final Supplier<Connection> connectionSupplier;
3844

3945
public JdbcClientPool(int poolSize, String dbUrl, Map<String, String> props) {
4046
super(poolSize, clientSupplier(dbUrl, props));
47+
this.connectionSupplier = clientSupplier(dbUrl, props);
4148
Matcher matcher = PROTOCOL_PATTERN.matcher(dbUrl);
4249
if (matcher.matches()) {
4350
this.protocol = matcher.group(1);
@@ -46,6 +53,23 @@ public JdbcClientPool(int poolSize, String dbUrl, Map<String, String> props) {
4653
}
4754
}
4855

56+
@Override
57+
protected Connection reconnect(Connection connection) {
58+
try {
59+
if (connection.isClosed()
60+
|| !connection.isValid(CONNECTION_VALIDATION_TIMEOUT_SECONDS)) {
61+
LOG.warn("Stale JDBC connection detected, creating a new connection.");
62+
closeQuietly(connection);
63+
return connectionSupplier.get();
64+
}
65+
} catch (SQLException e) {
66+
LOG.warn("Failed to validate JDBC connection, creating a new connection.", e);
67+
closeQuietly(connection);
68+
return connectionSupplier.get();
69+
}
70+
return connection;
71+
}
72+
4973
private static Supplier<Connection> clientSupplier(String dbUrl, Map<String, String> props) {
5074
return () -> {
5175
try {
@@ -70,4 +94,14 @@ protected void close(Connection client) {
7094
throw new RuntimeException("Failed to close connection", e);
7195
}
7296
}
97+
98+
private static void closeQuietly(Connection connection) {
99+
try {
100+
if (connection != null && !connection.isClosed()) {
101+
connection.close();
102+
}
103+
} catch (SQLException e) {
104+
LOG.debug("Failed to close stale connection", e);
105+
}
106+
}
73107
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.jdbc;
20+
21+
import org.junit.jupiter.api.Test;
22+
23+
import java.sql.Connection;
24+
import java.sql.SQLException;
25+
import java.util.Collections;
26+
import java.util.UUID;
27+
import java.util.concurrent.atomic.AtomicReference;
28+
29+
import static org.assertj.core.api.Assertions.assertThat;
30+
31+
/** Tests for {@link JdbcClientPool} connection validation. */
32+
public class JdbcClientPoolTest {
33+
34+
private JdbcClientPool createPool(int poolSize) {
35+
String dbUrl =
36+
"jdbc:sqlite:file::memory:?ic" + UUID.randomUUID().toString().replace("-", "");
37+
return new JdbcClientPool(poolSize, dbUrl, Collections.emptyMap());
38+
}
39+
40+
@Test
41+
public void testValidConnectionIsReused() throws SQLException, InterruptedException {
42+
JdbcClientPool pool = createPool(1);
43+
try {
44+
AtomicReference<Connection> firstConn = new AtomicReference<>();
45+
AtomicReference<Connection> secondConn = new AtomicReference<>();
46+
47+
pool.run(
48+
connection -> {
49+
firstConn.set(connection);
50+
return null;
51+
});
52+
53+
pool.run(
54+
connection -> {
55+
secondConn.set(connection);
56+
return null;
57+
});
58+
59+
assertThat(secondConn.get()).isSameAs(firstConn.get());
60+
} finally {
61+
pool.close();
62+
}
63+
}
64+
65+
@Test
66+
public void testClosedConnectionIsReplaced() throws SQLException, InterruptedException {
67+
JdbcClientPool pool = createPool(1);
68+
try {
69+
AtomicReference<Connection> firstConn = new AtomicReference<>();
70+
AtomicReference<Connection> secondConn = new AtomicReference<>();
71+
72+
// Get the connection and close it to simulate a stale connection
73+
pool.run(
74+
connection -> {
75+
firstConn.set(connection);
76+
connection.close();
77+
return null;
78+
});
79+
80+
// The pool should detect the closed connection and create a new one
81+
pool.run(
82+
connection -> {
83+
secondConn.set(connection);
84+
return null;
85+
});
86+
87+
assertThat(secondConn.get()).isNotSameAs(firstConn.get());
88+
assertThat(secondConn.get().isClosed()).isFalse();
89+
} finally {
90+
pool.close();
91+
}
92+
}
93+
94+
@Test
95+
public void testReplacedConnectionIsReturnedToPool() throws SQLException, InterruptedException {
96+
JdbcClientPool pool = createPool(1);
97+
try {
98+
AtomicReference<Connection> replacedConn = new AtomicReference<>();
99+
AtomicReference<Connection> thirdConn = new AtomicReference<>();
100+
101+
// Close the connection to trigger replacement
102+
pool.run(
103+
connection -> {
104+
connection.close();
105+
return null;
106+
});
107+
108+
// This call gets the replacement connection
109+
pool.run(
110+
connection -> {
111+
replacedConn.set(connection);
112+
return null;
113+
});
114+
115+
// The replacement should be reused since it's valid
116+
pool.run(
117+
connection -> {
118+
thirdConn.set(connection);
119+
return null;
120+
});
121+
122+
assertThat(thirdConn.get()).isSameAs(replacedConn.get());
123+
} finally {
124+
pool.close();
125+
}
126+
}
127+
128+
@Test
129+
public void testActionIsExecutedOnValidConnection() throws SQLException, InterruptedException {
130+
JdbcClientPool pool = createPool(1);
131+
try {
132+
// Close the connection to simulate staleness
133+
pool.run(
134+
connection -> {
135+
connection.close();
136+
return null;
137+
});
138+
139+
// The action should receive a valid connection and succeed
140+
boolean result =
141+
pool.run(
142+
connection -> {
143+
// Execute a real SQL statement to verify the connection works
144+
return connection.prepareStatement("SELECT 1").execute();
145+
});
146+
147+
assertThat(result).isTrue();
148+
} finally {
149+
pool.close();
150+
}
151+
}
152+
}

0 commit comments

Comments
 (0)