Skip to content

Commit 4718d24

Browse files
infraiosunxin
authored andcommitted
HBASE-24998 Introduce a ReplicationSourceOverallController interface and decouple ReplicationSourceManager and ReplicationSource (#2364)
Signed-off-by: meiyi <myimeiyi@gmail.com>
1 parent d2588a1 commit 4718d24

13 files changed

Lines changed: 125 additions & 100 deletions

File tree

hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -994,6 +994,8 @@ public enum OperationStatusCode {
994994
/*
995995
* cluster replication constants.
996996
*/
997+
public static final String REPLICATION_OFFLOAD_ENABLE_KEY = "hbase.replication.offload.enabled";
998+
public static final boolean REPLICATION_OFFLOAD_ENABLE_DEFAULT = false;
997999
public static final String
9981000
REPLICATION_SOURCE_SERVICE_CLASSNAME = "hbase.replication.source.service";
9991001
public static final String REPLICATION_SERVICE_CLASSNAME_DEFAULT =

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,8 @@
258258
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
259259
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
260260
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
261+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos;
262+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService;
261263
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
262264
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
263265
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
@@ -271,7 +273,7 @@
271273
@SuppressWarnings("deprecation")
272274
public class RSRpcServices implements HBaseRPCErrorHandler,
273275
AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction,
274-
ConfigurationObserver {
276+
ConfigurationObserver, ReplicationServerService.BlockingInterface {
275277
private static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class);
276278

277279
/** RPC scheduler to use for the region server. */

hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java renamed to hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceController.java

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
/*
2-
*
1+
/**
32
* Licensed to the Apache Software Foundation (ASF) under one
43
* or more contributor license agreements. See the NOTICE file
54
* distributed with this work for additional information
@@ -18,21 +17,32 @@
1817
*/
1918
package org.apache.hadoop.hbase.replication;
2019

21-
import org.apache.hadoop.hbase.ServerName;
20+
import java.util.concurrent.atomic.AtomicLong;
21+
22+
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
23+
import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
2224
import org.apache.yetus.audience.InterfaceAudience;
2325

2426
/**
25-
* The replication listener interface can be implemented if a class needs to subscribe to events
26-
* generated by the ReplicationTracker. These events include things like addition/deletion of peer
27-
* clusters or failure of a local region server. To receive events, the class also needs to register
28-
* itself with a Replication Tracker.
27+
* Used to control all replication sources inside one RegionServer or ReplicationServer.
28+
* Used by {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationSource} or
29+
* {@link RecoveredReplicationSource}.
2930
*/
3031
@InterfaceAudience.Private
31-
public interface ReplicationListener {
32+
public interface ReplicationSourceController {
33+
34+
/**
35+
* Returns the maximum size in bytes of edits held in memory which are pending replication
36+
* across all sources inside this RegionServer or ReplicationServer.
37+
*/
38+
long getTotalBufferLimit();
39+
40+
AtomicLong getTotalBufferUsed();
41+
42+
MetricsReplicationGlobalSourceSource getGlobalMetrics();
3243

3344
/**
34-
* A region server has been removed from the local cluster
35-
* @param regionServer the removed region server
45+
* Call this when the recovered replication source replicated all WALs.
3646
*/
37-
public void regionServerRemoved(ServerName regionServer);
47+
void finishRecoveredSource(RecoveredReplicationSource src);
3848
}

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.hadoop.hbase.ServerName;
3131
import org.apache.hadoop.hbase.replication.ReplicationPeer;
3232
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
33+
import org.apache.hadoop.hbase.replication.ReplicationSourceController;
3334
import org.apache.hadoop.hbase.util.CommonFSUtils;
3435
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
3536
import org.apache.yetus.audience.InterfaceAudience;
@@ -45,18 +46,15 @@ public class RecoveredReplicationSource extends ReplicationSource {
4546

4647
private static final Logger LOG = LoggerFactory.getLogger(RecoveredReplicationSource.class);
4748

48-
private Path walDir;
49-
5049
private String actualPeerId;
5150

5251
@Override
53-
public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
54-
ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
55-
String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
56-
MetricsSource metrics) throws IOException {
57-
super.init(conf, fs, walDir, manager, queueStorage, replicationPeer, server, peerClusterZnode,
58-
clusterId, walFileLengthProvider, metrics);
59-
this.walDir = walDir;
52+
public void init(Configuration conf, FileSystem fs, Path walDir,
53+
ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
54+
ReplicationPeer replicationPeer, Server server, String peerClusterZnode, UUID clusterId,
55+
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
56+
super.init(conf, fs, walDir, overallController, queueStorage, replicationPeer, server,
57+
peerClusterZnode, clusterId, walFileLengthProvider, metrics);
6058
this.actualPeerId = this.replicationQueueInfo.getPeerId();
6159
}
6260

@@ -149,7 +147,7 @@ private Path getReplSyncUpPath(Path path) throws IOException {
149147
void tryFinish() {
150148
if (workerThreads.isEmpty()) {
151149
this.getSourceMetrics().clear();
152-
manager.finishRecoveredSource(this);
150+
controller.finishRecoveredSource(this);
153151
}
154152
}
155153

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.apache.hadoop.hbase.replication.ReplicationPeer;
5959
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
6060
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
61+
import org.apache.hadoop.hbase.replication.ReplicationSourceController;
6162
import org.apache.hadoop.hbase.replication.ReplicationUtils;
6263
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
6364
import org.apache.hadoop.hbase.replication.WALEntryFilter;
@@ -96,8 +97,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
9697
protected Configuration conf;
9798
protected ReplicationQueueInfo replicationQueueInfo;
9899

99-
// The manager of all sources to which we ping back our progress
100-
ReplicationSourceManager manager;
100+
protected Path walDir;
101+
102+
protected ReplicationSourceController controller;
101103
// Should we stop everything?
102104
protected Server server;
103105
// How long should we sleep for each retry
@@ -181,23 +183,14 @@ public class ReplicationSource implements ReplicationSourceInterface {
181183
this.baseFilterOutWALEntries = Collections.unmodifiableList(baseFilterOutWALEntries);
182184
}
183185

184-
/**
185-
* Instantiation method used by region servers
186-
* @param conf configuration to use
187-
* @param fs file system to use
188-
* @param manager replication manager to ping to
189-
* @param server the server for this region server
190-
* @param queueId the id of our replication queue
191-
* @param clusterId unique UUID for the cluster
192-
* @param metrics metrics for replication source
193-
*/
194186
@Override
195-
public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
196-
ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
197-
String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
198-
MetricsSource metrics) throws IOException {
187+
public void init(Configuration conf, FileSystem fs, Path walDir,
188+
ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
189+
ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId,
190+
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
199191
this.server = server;
200192
this.conf = HBaseConfiguration.create(conf);
193+
this.walDir = walDir;
201194
this.waitOnEndpointSeconds =
202195
this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS);
203196
decorateConf();
@@ -209,7 +202,7 @@ public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSour
209202
this.logQueue = new ReplicationSourceLogQueue(conf, metrics, this);
210203
this.queueStorage = queueStorage;
211204
this.replicationPeer = replicationPeer;
212-
this.manager = manager;
205+
this.controller = overallController;
213206
this.fs = fs;
214207
this.metrics = metrics;
215208
this.clusterId = clusterId;
@@ -336,9 +329,9 @@ private void tryStartNewShipper(String walGroupId) {
336329
Threads.setDaemonThreadRunning(
337330
walReader, Thread.currentThread().getName()
338331
+ ".replicationSource.wal-reader." + walGroupId + "," + queueId,
339-
(t,e) -> this.uncaughtException(t, e, this.manager, this.getPeerId()));
332+
(t,e) -> this.uncaughtException(t, e, null, this.getPeerId()));
340333
worker.setWALReader(walReader);
341-
worker.startup((t,e) -> this.uncaughtException(t, e, this.manager, this.getPeerId()));
334+
worker.startup((t,e) -> this.uncaughtException(t, e, null, this.getPeerId()));
342335
return worker;
343336
}
344337
});
@@ -766,9 +759,9 @@ public void postShipEdits(List<Entry> entries, int batchSize) {
766759
throttler.addPushSize(batchSize);
767760
}
768761
totalReplicatedEdits.addAndGet(entries.size());
769-
long newBufferUsed = manager.getTotalBufferUsed().addAndGet(-batchSize);
762+
long newBufferUsed = controller.getTotalBufferUsed().addAndGet(-batchSize);
770763
// Record the new buffer usage
771-
this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
764+
controller.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
772765
}
773766

774767
@Override

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
3232
import org.apache.hadoop.hbase.replication.ReplicationPeer;
3333
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
34+
import org.apache.hadoop.hbase.replication.ReplicationSourceController;
3435
import org.apache.hadoop.hbase.wal.WAL.Entry;
3536
import org.apache.yetus.audience.InterfaceAudience;
3637

@@ -42,14 +43,22 @@ public interface ReplicationSourceInterface {
4243
/**
4344
* Initializer for the source
4445
*
45-
* @param conf the configuration to use
46-
* @param fs the file system to use
47-
* @param server the server for this region server
48-
*/
49-
void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
50-
ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
51-
String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
52-
MetricsSource metrics) throws IOException;
46+
* @param conf configuration to use
47+
* @param fs file system to use
48+
* @param walDir the directory where the WAL is located
49+
* @param overallController the overall controller of all replication sources
50+
* @param queueStorage the replication queue storage
51+
* @param replicationPeer the replication peer
52+
* @param server the server which start and run this replication source
53+
* @param queueId the id of our replication queue
54+
* @param clusterId unique UUID for the cluster
55+
* @param walFileLengthProvider used to get the WAL length
56+
* @param metrics metrics for this replication source
57+
*/
58+
void init(Configuration conf, FileSystem fs, Path walDir,
59+
ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
60+
ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId,
61+
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException;
5362

5463
/**
5564
* Add a log to the list of logs to replicate

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.apache.hadoop.hbase.replication.ReplicationPeers;
5757
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
5858
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
59+
import org.apache.hadoop.hbase.replication.ReplicationSourceController;
5960
import org.apache.hadoop.hbase.replication.SyncReplicationState;
6061
import org.apache.hadoop.hbase.util.Bytes;
6162
import org.apache.hadoop.hbase.util.Pair;
@@ -93,7 +94,7 @@
9394
* </ul>
9495
*/
9596
@InterfaceAudience.Private
96-
public class ReplicationSourceManager {
97+
public class ReplicationSourceManager implements ReplicationSourceController {
9798
private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceManager.class);
9899
// all the sources that read this RS's logs and every peer only has one replication source
99100
private final ConcurrentMap<String, ReplicationSourceInterface> sources;
@@ -134,12 +135,6 @@ public class ReplicationSourceManager {
134135

135136
private AtomicLong totalBufferUsed = new AtomicLong();
136137

137-
// How long should we sleep for each retry when deleting remote wal files for sync replication
138-
// peer.
139-
private final long sleepForRetries;
140-
// Maximum number of retries before taking bold actions when deleting remote wal files for sync
141-
// replication peer.
142-
private final int maxRetriesMultiplier;
143138
// Total buffer size on this RegionServer for holding batched edits to be shipped.
144139
private final long totalBufferLimit;
145140
private final MetricsReplicationGlobalSourceSource globalMetrics;
@@ -154,6 +149,12 @@ public class ReplicationSourceManager {
154149
*/
155150
AtomicReference<ReplicationSourceInterface> catalogReplicationSource = new AtomicReference<>();
156151

152+
/**
153+
* When enable replication offload, will not create replication source and only write WAL to
154+
* replication queue storage. The replication source will be started by ReplicationServer.
155+
*/
156+
private final boolean replicationOffload;
157+
157158
/**
158159
* Creates a replication manager and sets the watch on all the other registered region servers
159160
* @param queueStorage the interface for manipulating replication queues
@@ -197,12 +198,11 @@ public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
197198
this.latestPaths = new HashMap<>();
198199
this.replicationForBulkLoadDataEnabled = conf.getBoolean(
199200
HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
200-
this.sleepForRetries = this.conf.getLong("replication.source.sync.sleepforretries", 1000);
201-
this.maxRetriesMultiplier =
202-
this.conf.getInt("replication.source.sync.maxretriesmultiplier", 60);
203201
this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
204202
HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
205203
this.globalMetrics = globalMetrics;
204+
this.replicationOffload = conf.getBoolean(HConstants.REPLICATION_OFFLOAD_ENABLE_KEY,
205+
HConstants.REPLICATION_OFFLOAD_ENABLE_DEFAULT);
206206
}
207207

208208
/**
@@ -338,7 +338,9 @@ ReplicationSourceInterface addSource(String peerId) throws IOException {
338338
if (peerConfig.isSyncReplication()) {
339339
syncReplicationPeerMappingManager.add(peer.getId(), peerConfig);
340340
}
341-
src.startup();
341+
if (!replicationOffload) {
342+
src.startup();
343+
}
342344
return src;
343345
}
344346

@@ -431,7 +433,9 @@ public void refreshSources(String peerId) throws ReplicationException, IOExcepti
431433
.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal)));
432434
}
433435
LOG.info("Startup replication source for " + src.getPeerId());
434-
src.startup();
436+
if (!replicationOffload) {
437+
src.startup();
438+
}
435439

436440
List<ReplicationSourceInterface> toStartup = new ArrayList<>();
437441
// synchronized on oldsources to avoid race with NodeFailoverWorker
@@ -454,8 +458,10 @@ public void refreshSources(String peerId) throws ReplicationException, IOExcepti
454458
toStartup.add(recoveredReplicationSource);
455459
}
456460
}
457-
for (ReplicationSourceInterface replicationSource : toStartup) {
458-
replicationSource.startup();
461+
if (!replicationOffload) {
462+
for (ReplicationSourceInterface replicationSource : toStartup) {
463+
replicationSource.startup();
464+
}
459465
}
460466
}
461467

@@ -473,7 +479,8 @@ private boolean removeRecoveredSource(ReplicationSourceInterface src) {
473479
return true;
474480
}
475481

476-
void finishRecoveredSource(ReplicationSourceInterface src) {
482+
@Override
483+
public void finishRecoveredSource(RecoveredReplicationSource src) {
477484
synchronized (oldsources) {
478485
if (!removeRecoveredSource(src)) {
479486
return;
@@ -487,8 +494,7 @@ void finishRecoveredSource(ReplicationSourceInterface src) {
487494
* Clear the metrics and related replication queue of the specified old source
488495
* @param src source to clear
489496
*/
490-
void removeSource(ReplicationSourceInterface src) {
491-
LOG.info("Done with the queue " + src.getQueueId());
497+
private void removeSource(ReplicationSourceInterface src) {
492498
this.sources.remove(src.getPeerId());
493499
// Delete queue from storage and memory
494500
deleteQueue(src.getQueueId());
@@ -532,7 +538,7 @@ private void abortAndThrowIOExceptionWhenFail(ReplicationQueueOperation op) thro
532538
}
533539
}
534540

535-
// public because of we call it in TestReplicationEmptyWALRecovery
541+
@InterfaceAudience.Private
536542
public void preLogRoll(Path newLog) throws IOException {
537543
String logName = newLog.getName();
538544
String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName);
@@ -550,8 +556,8 @@ public void preLogRoll(Path newLog) throws IOException {
550556
}
551557
}
552558

553-
// public because of we call it in TestReplicationEmptyWALRecovery
554-
public void postLogRoll(Path newLog) throws IOException {
559+
@InterfaceAudience.Private
560+
public void postLogRoll(Path newLog) {
555561
// This only updates the sources we own, not the recovered ones
556562
for (ReplicationSourceInterface source : this.sources.values()) {
557563
source.enqueueLog(newLog);
@@ -758,6 +764,7 @@ Set<Path> getLastestPath() {
758764
}
759765
}
760766

767+
@Override
761768
public AtomicLong getTotalBufferUsed() {
762769
return totalBufferUsed;
763770
}
@@ -766,6 +773,7 @@ public AtomicLong getTotalBufferUsed() {
766773
* Returns the maximum size in bytes of edits held in memory which are pending replication
767774
* across all sources inside this RegionServer.
768775
*/
776+
@Override
769777
public long getTotalBufferLimit() {
770778
return totalBufferLimit;
771779
}
@@ -856,7 +864,8 @@ int activeFailoverTaskCount() {
856864
return executor.getActiveCount();
857865
}
858866

859-
MetricsReplicationGlobalSourceSource getGlobalMetrics() {
867+
@Override
868+
public MetricsReplicationGlobalSourceSource getGlobalMetrics() {
860869
return this.globalMetrics;
861870
}
862871

0 commit comments

Comments
 (0)