Skip to content

Commit f67c3df

Browse files
authored
HBASE-24999 Master manages ReplicationServers (#2579)
Signed-off-by: Guanghao Zhang <zghao@apache.org>
1 parent 1a64f1a commit f67c3df

12 files changed

Lines changed: 620 additions & 103 deletions

File tree

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
syntax = "proto2";
19+
20+
package hbase.pb;
21+
22+
option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
23+
option java_outer_classname = "ReplicationServerStatusProtos";
24+
option java_generic_services = true;
25+
option java_generate_equals_and_hash = true;
26+
option optimize_for = SPEED;
27+
28+
import "server/master/RegionServerStatus.proto";
29+
30+
service ReplicationServerStatusService {
31+
32+
rpc ReplicationServerReport(RegionServerReportRequest)
33+
returns(RegionServerReportResponse);
34+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,8 @@ public void run() {
359359
// manager of assignment nodes in zookeeper
360360
private AssignmentManager assignmentManager;
361361

362+
// server manager to deal with replication server info
363+
private ReplicationServerManager replicationServerManager;
362364

363365
/**
364366
* Cache for the meta region replica's locations. Also tracks their changes to avoid stale
@@ -963,6 +965,8 @@ private void finishActiveMasterInitialization(MonitoredTask status) throws IOExc
963965
.collect(Collectors.toList());
964966
this.assignmentManager.setupRIT(ritList);
965967

968+
this.replicationServerManager = new ReplicationServerManager(this);
969+
966970
// Start RegionServerTracker with listing of servers found with exiting SCPs -- these should
967971
// be registered in the deadServers set -- and with the list of servernames out on the
968972
// filesystem that COULD BE 'alive' (we'll schedule SCPs for each and let SCP figure it out).
@@ -1131,6 +1135,7 @@ private void finishActiveMasterInitialization(MonitoredTask status) throws IOExc
11311135
this.hbckChore = new HbckChore(this);
11321136
getChoreService().scheduleChore(hbckChore);
11331137
this.serverManager.startChore();
1138+
this.replicationServerManager.startChore();
11341139

11351140
// Only for rolling upgrade, where we need to migrate the data in namespace table to meta table.
11361141
if (!waitForNamespaceOnline()) {
@@ -1389,6 +1394,11 @@ public ServerManager getServerManager() {
13891394
return this.serverManager;
13901395
}
13911396

1397+
@Override
1398+
public ReplicationServerManager getReplicationServerManager() {
1399+
return this.replicationServerManager;
1400+
}
1401+
13921402
@Override
13931403
public MasterFileSystem getMasterFileSystem() {
13941404
return this.fileSystemManager;

hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,7 @@
401401
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
402402
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
403403
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
404+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerStatusProtos.ReplicationServerStatusService;
404405
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
405406
import org.apache.hadoop.hbase.shaded.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsService;
406407

@@ -412,7 +413,7 @@
412413
public class MasterRpcServices extends RSRpcServices implements
413414
MasterService.BlockingInterface, RegionServerStatusService.BlockingInterface,
414415
LockService.BlockingInterface, HbckService.BlockingInterface,
415-
ClientMetaService.BlockingInterface {
416+
ClientMetaService.BlockingInterface, ReplicationServerStatusService.BlockingInterface {
416417

417418
private static final Logger LOG = LoggerFactory.getLogger(MasterRpcServices.class.getName());
418419
private static final Logger AUDITLOG =
@@ -546,7 +547,7 @@ boolean synchronousBalanceSwitch(final boolean b) throws IOException {
546547
*/
547548
@Override
548549
protected List<BlockingServiceAndInterface> getServices() {
549-
List<BlockingServiceAndInterface> bssi = new ArrayList<>(5);
550+
List<BlockingServiceAndInterface> bssi = new ArrayList<>(6);
550551
bssi.add(new BlockingServiceAndInterface(
551552
MasterService.newReflectiveBlockingService(this),
552553
MasterService.BlockingInterface.class));
@@ -559,6 +560,9 @@ protected List<BlockingServiceAndInterface> getServices() {
559560
HbckService.BlockingInterface.class));
560561
bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this),
561562
ClientMetaService.BlockingInterface.class));
563+
bssi.add(new BlockingServiceAndInterface(
564+
ReplicationServerStatusService.newReflectiveBlockingService(this),
565+
ReplicationServerStatusService.BlockingInterface.class));
562566
bssi.addAll(super.getServices());
563567
return bssi;
564568
}
@@ -3402,4 +3406,33 @@ public ListReplicationSinkServersResponse listReplicationSinkServers(
34023406
}
34033407
return builder.build();
34043408
}
3409+
3410+
@Override
3411+
public RegionServerReportResponse replicationServerReport(RpcController controller,
3412+
RegionServerReportRequest request) throws ServiceException {
3413+
try {
3414+
master.checkServiceStarted();
3415+
int versionNumber = 0;
3416+
String version = "0.0.0";
3417+
VersionInfo versionInfo = VersionInfoUtil.getCurrentClientVersionInfo();
3418+
if (versionInfo != null) {
3419+
version = versionInfo.getVersion();
3420+
versionNumber = VersionInfoUtil.getVersionNumber(versionInfo);
3421+
}
3422+
ClusterStatusProtos.ServerLoad sl = request.getLoad();
3423+
ServerName serverName = ProtobufUtil.toServerName(request.getServer());
3424+
ServerMetrics oldMetrics = master.getReplicationServerManager().getServerMetrics(serverName);
3425+
ServerMetrics newMetrics =
3426+
ServerMetricsBuilder.toServerMetrics(serverName, versionNumber, version, sl);
3427+
master.getReplicationServerManager().serverReport(serverName, newMetrics);
3428+
if (sl != null && master.metricsMaster != null) {
3429+
// Up our metrics.
3430+
master.metricsMaster.incrementRequests(sl.getTotalNumberOfRequests()
3431+
- (oldMetrics != null ? oldMetrics.getRequestCount() : 0));
3432+
}
3433+
} catch (IOException ioe) {
3434+
throw new ServiceException(ioe);
3435+
}
3436+
return RegionServerReportResponse.newBuilder().build();
3437+
}
34053438
}

hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,11 @@ public interface MasterServices extends Server {
101101
*/
102102
ServerManager getServerManager();
103103

104+
/**
105+
* @return Master's {@link ReplicationServerManager} instance.
106+
*/
107+
ReplicationServerManager getReplicationServerManager();
108+
104109
/**
105110
* @return Master's instance of {@link ExecutorService}
106111
*/
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.master;
19+
20+
import java.util.ArrayList;
21+
import java.util.Collections;
22+
import java.util.List;
23+
import java.util.Map;
24+
import java.util.concurrent.ConcurrentNavigableMap;
25+
import java.util.concurrent.ConcurrentSkipListMap;
26+
27+
import org.apache.hadoop.conf.Configuration;
28+
import org.apache.hadoop.hbase.ScheduledChore;
29+
import org.apache.hadoop.hbase.ServerMetrics;
30+
import org.apache.hadoop.hbase.ServerName;
31+
import org.apache.yetus.audience.InterfaceAudience;
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
34+
35+
/**
36+
* The ReplicationServerManager class manages info about replication servers.
37+
* <p>
38+
* Maintains lists of online and dead servers.
39+
* <p>
40+
* Servers are distinguished in two different ways. A given server has a
41+
* location, specified by hostname and port, and of which there can only be one
42+
* online at any given time. A server instance is specified by the location
43+
* (hostname and port) as well as the startcode (timestamp from when the server
44+
* was started). This is used to differentiate a restarted instance of a given
45+
* server from the original instance.
46+
*/
47+
@InterfaceAudience.Private
48+
public class ReplicationServerManager {
49+
50+
private static final Logger LOG = LoggerFactory.getLogger(ReplicationServerManager.class);
51+
52+
public static final String ONLINE_SERVER_REFRESH_INTERVAL =
53+
"hbase.master.replication.server.refresh.interval";
54+
public static final int ONLINE_SERVER_REFRESH_INTERVAL_DEFAULT = 60 * 1000; // 1 mins
55+
56+
private final MasterServices master;
57+
58+
/** Map of registered servers to their current load */
59+
private final ConcurrentNavigableMap<ServerName, ServerMetrics> onlineServers =
60+
new ConcurrentSkipListMap<>();
61+
62+
private OnlineServerRefresher onlineServerRefresher;
63+
private int refreshPeriod;
64+
65+
/**
66+
* Constructor.
67+
*/
68+
public ReplicationServerManager(final MasterServices master) {
69+
this.master = master;
70+
}
71+
72+
/**
73+
* start chore in ServerManager
74+
*/
75+
public void startChore() {
76+
Configuration conf = master.getConfiguration();
77+
refreshPeriod = conf.getInt(ONLINE_SERVER_REFRESH_INTERVAL,
78+
ONLINE_SERVER_REFRESH_INTERVAL_DEFAULT);
79+
onlineServerRefresher = new OnlineServerRefresher("ReplicationServerRefresher", refreshPeriod);
80+
master.getChoreService().scheduleChore(onlineServerRefresher);
81+
}
82+
83+
/**
84+
* Stop the ServerManager.
85+
*/
86+
public void stop() {
87+
if (onlineServerRefresher != null) {
88+
onlineServerRefresher.cancel();
89+
}
90+
}
91+
92+
public void serverReport(ServerName sn, ServerMetrics sl) {
93+
if (null == this.onlineServers.replace(sn, sl)) {
94+
if (!checkAndRecordNewServer(sn, sl)) {
95+
LOG.info("ReplicationServerReport ignored, could not record the server: {}", sn);
96+
}
97+
}
98+
}
99+
100+
/**
101+
* Check is a server of same host and port already exists,
102+
* if not, or the existed one got a smaller start code, record it.
103+
*
104+
* @param serverName the server to check and record
105+
* @param sl the server load on the server
106+
* @return true if the server is recorded, otherwise, false
107+
*/
108+
private boolean checkAndRecordNewServer(final ServerName serverName, final ServerMetrics sl) {
109+
ServerName existingServer = null;
110+
synchronized (this.onlineServers) {
111+
existingServer = findServerWithSameHostnamePort(serverName);
112+
if (existingServer != null && (existingServer.getStartcode() > serverName.getStartcode())) {
113+
LOG.info("ReplicationServer serverName={} rejected; we already have {} registered with "
114+
+ "same hostname and port", serverName, existingServer);
115+
return false;
116+
}
117+
recordNewServer(serverName, sl);
118+
// Note that we assume that same ts means same server, and don't expire in that case.
119+
if (existingServer != null && (existingServer.getStartcode() < serverName.getStartcode())) {
120+
LOG.info("Triggering server recovery; existingServer {} looks stale, new server: {}",
121+
existingServer, serverName);
122+
expireServer(existingServer);
123+
}
124+
}
125+
return true;
126+
}
127+
128+
/**
129+
* Assumes onlineServers is locked.
130+
* @return ServerName with matching hostname and port.
131+
*/
132+
private ServerName findServerWithSameHostnamePort(final ServerName serverName) {
133+
ServerName end = ServerName.valueOf(serverName.getHostname(), serverName.getPort(),
134+
Long.MAX_VALUE);
135+
136+
ServerName r = onlineServers.lowerKey(end);
137+
if (r != null && ServerName.isSameAddress(r, serverName)) {
138+
return r;
139+
}
140+
return null;
141+
}
142+
143+
/**
144+
* Assumes onlineServers is locked.
145+
*/
146+
private void recordNewServer(final ServerName serverName, final ServerMetrics sl) {
147+
LOG.info("Registering ReplicationServer={}", serverName);
148+
this.onlineServers.put(serverName, sl);
149+
}
150+
151+
/**
152+
* Assumes onlineServers is locked.
153+
* Expire the passed server. Remove it from list of online servers
154+
*/
155+
public void expireServer(final ServerName serverName) {
156+
LOG.info("Expiring ReplicationServer={}", serverName);
157+
onlineServers.remove(serverName);
158+
}
159+
160+
/**
161+
* @return Read-only map of servers to serverinfo
162+
*/
163+
public Map<ServerName, ServerMetrics> getOnlineServers() {
164+
// Presumption is that iterating the returned Map is OK.
165+
synchronized (this.onlineServers) {
166+
return Collections.unmodifiableMap(this.onlineServers);
167+
}
168+
}
169+
170+
/**
171+
* @return A copy of the internal list of online servers.
172+
*/
173+
public List<ServerName> getOnlineServersList() {
174+
return new ArrayList<>(this.onlineServers.keySet());
175+
}
176+
177+
/**
178+
* @param serverName server name
179+
* @return ServerMetrics if serverName is known else null
180+
*/
181+
public ServerMetrics getServerMetrics(final ServerName serverName) {
182+
return this.onlineServers.get(serverName);
183+
}
184+
185+
private class OnlineServerRefresher extends ScheduledChore {
186+
187+
public OnlineServerRefresher(String name, int p) {
188+
super(name, master, p, 60 * 1000); // delay one minute before first execute
189+
}
190+
191+
@Override
192+
protected void chore() {
193+
synchronized (onlineServers) {
194+
List<ServerName> servers = getOnlineServersList();
195+
servers.forEach(s -> {
196+
ServerMetrics metrics = onlineServers.get(s);
197+
if (metrics.getReportTimestamp() + refreshPeriod < System.currentTimeMillis()) {
198+
expireServer(s);
199+
}
200+
});
201+
}
202+
}
203+
}
204+
}

0 commit comments

Comments
 (0)