Skip to content

Commit 3ddf1ea

Browse files
ArtDuakudiyar
authored andcommitted
Fix discovery losing connection problem
Losing address connections in discovery if restart happens faster than discovery task Closes #404
1 parent 54beef6 commit 3ddf1ea

5 files changed

Lines changed: 40 additions & 50 deletions

File tree

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
## [Unreleased]
44

5+
### Bugfixes
6+
- Fix losing address connections in discovery if restart happens faster than discovery task ([#404](https://github.com/tarantool/cartridge-springdata/issues/404))
7+
58
## [0.12.0] - 2023-06-09
69

710
### Internal and API changes

src/main/java/io/tarantool/driver/cluster/AbstractDiscoveryClusterAddressProvider.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,20 +36,13 @@ public AbstractDiscoveryClusterAddressProvider(TarantoolClusterDiscoveryConfig d
3636
protected void startDiscoveryTask() throws TarantoolClientException {
3737
Runnable discoveryTask = () -> {
3838
try {
39-
Collection<TarantoolServerAddress> currentAddresses = this.addressesHolder.get();
40-
Collection<TarantoolServerAddress> addresses = discoverAddresses();
41-
setAddresses(addresses);
42-
43-
if (currentAddresses != null
44-
&& addresses.size() != currentAddresses.size()
45-
|| !addresses.equals(currentAddresses)) {
46-
this.refreshCallback.get().run();
47-
}
39+
setAddresses(discoverAddresses());
4840
} finally {
4941
if (initLatch.getCount() > 0) {
5042
initLatch.countDown();
5143
}
5244
}
45+
this.refreshCallback.get().run();
5346
};
5447

5548
this.scheduledExecutorService.scheduleWithFixedDelay(

src/main/java/io/tarantool/driver/core/connection/AbstractTarantoolConnectionManager.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.util.Collection;
1818
import java.util.Collections;
1919
import java.util.HashMap;
20+
import java.util.HashSet;
2021
import java.util.List;
2122
import java.util.Map;
2223
import java.util.Objects;
@@ -95,6 +96,25 @@ public boolean refresh() {
9596
return connectionMode.compareAndSet(ConnectionMode.OFF, ConnectionMode.PARTIAL);
9697
}
9798

99+
protected boolean areAddressesChanged() {
100+
Collection<TarantoolServerAddress> addresses = getAddresses();
101+
if (addresses == null) {
102+
logger.debug("The list of server addresses is not defined");
103+
return true;
104+
}
105+
return !connectionRegistry.keySet().equals(new HashSet<>(addresses));
106+
}
107+
108+
protected boolean areConnectionsAlive() {
109+
for (List<TarantoolConnection> connections : connectionRegistry.values()) {
110+
int isAliveConnections = (int) connections.stream().filter(TarantoolConnection::isConnected).count();
111+
if (isAliveConnections != config.getConnections()) {
112+
return false;
113+
}
114+
}
115+
return true;
116+
}
117+
98118
private CompletableFuture<TarantoolConnection> getConnectionInternal() {
99119
CompletableFuture<TarantoolConnection> result;
100120
ConnectionMode currentMode = connectionMode.get();

src/main/java/io/tarantool/driver/core/connection/TarantoolClusterConnectionManager.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,13 @@ public TarantoolClusterConnectionManager(
3030
TarantoolClusterAddressProvider addressProvider) {
3131
super(config, connectionFactory, listeners);
3232
this.addressProvider = addressProvider;
33-
this.addressProvider.setRefreshCallback(super::refresh);
33+
this.addressProvider.setRefreshCallback(this::refreshIfConditionsChanged);
34+
}
35+
36+
private void refreshIfConditionsChanged() {
37+
if (areAddressesChanged() || !areConnectionsAlive()) {
38+
super.refresh();
39+
}
3440
}
3541

3642
@Override

src/test/java/io/tarantool/driver/integration/ProxyTarantoolClientIT.java

Lines changed: 8 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package io.tarantool.driver.integration;
22

3-
import java.lang.reflect.InvocationTargetException;
4-
import java.lang.reflect.Method;
53
import java.util.ArrayList;
64
import java.util.Arrays;
75
import java.util.Collections;
@@ -12,12 +10,10 @@
1210
import java.util.concurrent.ExecutionException;
1311
import java.util.concurrent.ExecutorService;
1412
import java.util.concurrent.Executors;
15-
import java.util.concurrent.atomic.AtomicInteger;
1613

1714
import org.junit.jupiter.api.BeforeAll;
1815
import org.junit.jupiter.api.BeforeEach;
1916
import org.junit.jupiter.api.Test;
20-
import org.testcontainers.shaded.org.apache.commons.lang3.reflect.FieldUtils;
2117
import static org.junit.jupiter.api.Assertions.assertEquals;
2218
import static org.junit.jupiter.api.Assertions.assertFalse;
2319
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -32,6 +28,7 @@
3228
import io.tarantool.driver.api.TarantoolResult;
3329
import io.tarantool.driver.api.TarantoolServerAddress;
3430
import io.tarantool.driver.api.conditions.Conditions;
31+
import io.tarantool.driver.api.connection.TarantoolConnection;
3532
import io.tarantool.driver.api.metadata.TarantoolIndexMetadata;
3633
import io.tarantool.driver.api.metadata.TarantoolIndexType;
3734
import io.tarantool.driver.api.metadata.TarantoolMetadataOperations;
@@ -52,7 +49,6 @@
5249
import io.tarantool.driver.core.ClusterTarantoolTupleClient;
5350
import io.tarantool.driver.core.ProxyTarantoolTupleClient;
5451
import io.tarantool.driver.core.RetryingTarantoolTupleClient;
55-
import io.tarantool.driver.core.connection.TarantoolClusterConnectionManager;
5652
import io.tarantool.driver.core.tuple.TarantoolTupleImpl;
5753
import io.tarantool.driver.exceptions.TarantoolInternalException;
5854
import io.tarantool.driver.exceptions.TarantoolNoSuchProcedureException;
@@ -84,9 +80,7 @@ public class ProxyTarantoolClientIT extends SharedCartridgeContainer {
8480
private static final String PK_FIELD_NAME = "profile_id";
8581
private static final String BUCKET_ID_FIELD_NAME = "bucket_id";
8682
private static final int DEFAULT_TIMEOUT = 5 * 1000;
87-
private static TestWrappedClusterAddressProvider addressProvider;
88-
private static Method getAliveConnections;
89-
private static TarantoolClusterConnectionManager connectionManager;
83+
private static List<TarantoolConnection> connections = new ArrayList<>();
9084

9185
@BeforeAll
9286
public static void setUp() throws Exception {
@@ -152,6 +146,10 @@ public static void initClient() {
152146
.builder(10, thr -> thr instanceof TarantoolNoSuchProcedureException)
153147
.withDelay(100)
154148
.build());
149+
client.getConnectionListeners().add(conn -> {
150+
connections.add(conn);
151+
return CompletableFuture.completedFuture(conn);
152+
});
155153
}
156154

157155
@Test
@@ -706,37 +704,7 @@ public void test_should_reconnect_ifReconnectIsInvoked() throws Exception {
706704
});
707705
}
708706

709-
private static int getAliveConnections() throws IllegalAccessException, NoSuchMethodException {
710-
if (addressProvider == null) {
711-
ClusterTarantoolTupleClient
712-
innerClient = (ClusterTarantoolTupleClient) FieldUtils.readField(
713-
FieldUtils.readField(client, "client", true),
714-
"client", true);
715-
connectionManager =
716-
(TarantoolClusterConnectionManager) FieldUtils.readField(innerClient, "connectionManager", true);
717-
718-
getAliveConnections =
719-
connectionManager.getClass().getSuperclass()
720-
.getDeclaredMethod("getAliveConnections", TarantoolServerAddress.class);
721-
getAliveConnections.setAccessible(true);
722-
723-
addressProvider =
724-
(TestWrappedClusterAddressProvider) FieldUtils.readField(innerClient, "addressProvider", true);
725-
}
726-
727-
return getAliveConnectionsAux();
728-
}
729-
730-
private static int getAliveConnectionsAux() {
731-
AtomicInteger res = new AtomicInteger();
732-
addressProvider.getAddresses().forEach(r -> {
733-
try {
734-
res.addAndGet(((List<?>) getAliveConnections.invoke(connectionManager, r)).size());
735-
} catch (IllegalAccessException | InvocationTargetException e) {
736-
throw new RuntimeException(e);
737-
}
738-
});
739-
740-
return res.get();
707+
private static long getAliveConnections() throws IllegalAccessException, NoSuchMethodException {
708+
return connections.stream().filter(TarantoolConnection::isConnected).count();
741709
}
742710
}

0 commit comments

Comments
 (0)