Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public enum CnToDnAsyncRequestType {
SUBMIT_TEST_CONNECTION_TASK,
SUBMIT_TEST_DN_INTERNAL_CONNECTION_TASK,
TEST_CONNECTION,
PUSH_TOPOLOGY,

// Region Maintenance
CREATE_DATA_REGION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import org.apache.iotdb.mpp.rpc.thrift.TTableDeviceDeletionWithPatternAndFilterReq;
import org.apache.iotdb.mpp.rpc.thrift.TTableDeviceDeletionWithPatternOrModReq;
import org.apache.iotdb.mpp.rpc.thrift.TTableDeviceInvalidateCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateClusterTopologyReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTableReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq;
Expand Down Expand Up @@ -407,6 +408,11 @@ protected void initActionMapBuilder() {
CnToDnAsyncRequestType.TEST_CONNECTION,
(req, client, handler) ->
client.testConnectionEmptyRPC((DataNodeTSStatusRPCHandler) handler));
actionMapBuilder.put(
CnToDnAsyncRequestType.PUSH_TOPOLOGY,
(req, client, handler) ->
client.updateClusterTopology(
(TUpdateClusterTopologyReq) req, (DataNodeTSStatusRPCHandler) handler));
actionMapBuilder.put(
CnToDnAsyncRequestType.INSERT_RECORD,
(req, client, handler) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ public static DataNodeAsyncRequestRPCHandler<?> buildHandler(
dataNodeLocationMap,
(Map<Integer, TExternalServiceListResp>) responseMap,
countDownLatch);
case PUSH_TOPOLOGY:
case SET_TTL:
case CREATE_DATA_REGION:
case CREATE_SCHEMA_REGION:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,12 @@ public class ConfigNodeConfig {
/** Acceptable pause duration for Phi accrual failure detector */
private long failureDetectorPhiAcceptablePauseInMs = 10000;

/** Base interval in ms for topology probing. */
private long topologyProbingBaseIntervalInMs = 5000;

/** Ratio of probing timeout to probing interval (must be less than 1.0). */
private double topologyProbingTimeoutRatio = 0.5;

/** The policy of cluster RegionGroups' leader distribution. */
private String leaderDistributionPolicy = AbstractLeaderBalancer.CFD_POLICY;

Expand Down Expand Up @@ -1288,4 +1294,20 @@ public long getFailureDetectorPhiAcceptablePauseInMs() {
public void setFailureDetectorPhiAcceptablePauseInMs(long failureDetectorPhiAcceptablePauseInMs) {
this.failureDetectorPhiAcceptablePauseInMs = failureDetectorPhiAcceptablePauseInMs;
}

public long getTopologyProbingBaseIntervalInMs() {
return topologyProbingBaseIntervalInMs;
}

public void setTopologyProbingBaseIntervalInMs(long topologyProbingBaseIntervalInMs) {
this.topologyProbingBaseIntervalInMs = topologyProbingBaseIntervalInMs;
}

public double getTopologyProbingTimeoutRatio() {
return topologyProbingTimeoutRatio;
}

public void setTopologyProbingTimeoutRatio(double topologyProbingTimeoutRatio) {
this.topologyProbingTimeoutRatio = topologyProbingTimeoutRatio;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,18 @@ private void loadProperties(TrimProperties properties) throws BadNodeUrlExceptio
"failure_detector_phi_acceptable_pause_in_ms",
String.valueOf(conf.getFailureDetectorPhiAcceptablePauseInMs()))));

conf.setTopologyProbingBaseIntervalInMs(
Long.parseLong(
properties.getProperty(
"topology_probing_base_interval_in_ms",
String.valueOf(conf.getTopologyProbingBaseIntervalInMs()))));

conf.setTopologyProbingTimeoutRatio(
Double.parseDouble(
properties.getProperty(
"topology_probing_timeout_ratio",
String.valueOf(conf.getTopologyProbingTimeoutRatio()))));

String leaderDistributionPolicy =
properties.getProperty("leader_distribution_policy", conf.getLeaderDistributionPolicy());
if (AbstractLeaderBalancer.GREEDY_POLICY.equals(leaderDistributionPolicy)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,7 @@ public void updateTopology(Map<Integer, Set<Integer>> latestTopology) {
for (int fromId : latestTopology.keySet()) {
for (int toId : latestTopology.keySet()) {
boolean originReachable =
latestTopology.getOrDefault(fromId, Collections.emptySet()).contains(toId);
topologyGraph.getOrDefault(fromId, Collections.emptySet()).contains(toId);
boolean newReachable =
latestTopology.getOrDefault(fromId, Collections.emptySet()).contains(toId);
if (originReachable != newReachable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -168,13 +167,6 @@ protected TDataNodeHeartbeatReq genHeartbeatReq() {
heartbeatReq.setSpaceQuotaUsage(configManager.getClusterQuotaManager().getSpaceQuotaUsage());
}

final Map<Integer, Set<Integer>> topologyMap =
configManager.getLoadManager().getLoadCache().getTopology();
if (topologyMap != null) {
heartbeatReq.setTopology(topologyMap);
heartbeatReq.setDataNodes(configManager.getNodeManager().getRegisteredDataNodeLocations());
}

// We broadcast region operations list every 100 heartbeat loops
if (heartbeatCounter.get() % 100 == 0) {
heartbeatReq.setCurrentRegionOperations(
Expand Down
Loading
Loading