Skip to content

Commit 1c04205

Browse files
wchevreuilGuanghao Zhang
authored andcommitted
HBASE-22380 break circle replication when doing bulkload (apache#494)
Signed-off-by: stack <stack@apache.org> Signed-off-by: Andrew Purtell <apurtell@apache.org> Signed-off-by: Norbert Kalmar <nkalmar@cloudera.com>
1 parent 964cde1 commit 1c04205

13 files changed

Lines changed: 402 additions & 33 deletions

File tree

hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,8 @@ public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client
115115
final List<Pair<byte[], String>> familyPaths,
116116
final byte[] regionName, boolean assignSeqNum,
117117
final Token<?> userToken, final String bulkToken) throws IOException {
118-
return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken, bulkToken,
119-
false);
118+
return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken,
119+
bulkToken, false, null);
120120
}
121121

122122
/**
@@ -132,13 +132,23 @@ public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client
132132
* @return true if all are loaded
133133
* @throws IOException
134134
*/
135+
public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client,
136+
final List<Pair<byte[], String>> familyPaths,
137+
final byte[] regionName, boolean assignSeqNum,
138+
final Token<?> userToken, final String bulkToken,
139+
boolean copyFiles) throws IOException {
140+
return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken,
141+
bulkToken, false, null);
142+
}
143+
135144
public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client,
136145
final List<Pair<byte[], String>> familyPaths,
137146
final byte[] regionName, boolean assignSeqNum,
138-
final Token<?> userToken, final String bulkToken, boolean copyFiles) throws IOException {
147+
final Token<?> userToken, final String bulkToken,
148+
boolean copyFiles, List<String> clusterIds) throws IOException {
139149
BulkLoadHFileRequest request =
140150
RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum,
141-
userToken, bulkToken, copyFiles);
151+
userToken, bulkToken, copyFiles, clusterIds);
142152

143153
try {
144154
BulkLoadHFileResponse response = client.bulkLoadHFile(null, request);

hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2581,13 +2581,23 @@ public static QuotaProtos.SpaceQuota toProtoSpaceQuota(
25812581
* name
25822582
* @return The WAL log marker for bulk loads.
25832583
*/
2584+
public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName,
2585+
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
2586+
Map<String, Long> storeFilesSize, long bulkloadSeqId) {
2587+
return toBulkLoadDescriptor(tableName, encodedRegionName, storeFiles,
2588+
storeFilesSize, bulkloadSeqId, null);
2589+
}
2590+
25842591
public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName,
25852592
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
2586-
Map<String, Long> storeFilesSize, long bulkloadSeqId) {
2593+
Map<String, Long> storeFilesSize, long bulkloadSeqId, List<String> clusterIds) {
25872594
BulkLoadDescriptor.Builder desc =
25882595
BulkLoadDescriptor.newBuilder()
25892596
.setTableName(ProtobufUtil.toProtoTableName(tableName))
25902597
.setEncodedRegionName(encodedRegionName).setBulkloadSeqNum(bulkloadSeqId);
2598+
if(clusterIds != null) {
2599+
desc.addAllClusterIds(clusterIds);
2600+
}
25912601

25922602
for (Map.Entry<byte[], List<Path>> entry : storeFiles.entrySet()) {
25932603
WALProtos.StoreDescriptor.Builder builder = StoreDescriptor.newBuilder()

hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -569,7 +569,7 @@ public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
569569
final byte[] regionName, boolean assignSeqNum,
570570
final Token<?> userToken, final String bulkToken) {
571571
return buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, userToken, bulkToken,
572-
false);
572+
false, null);
573573
}
574574

575575
/**
@@ -584,9 +584,9 @@ public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
584584
* @return a bulk load request
585585
*/
586586
public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
587-
final List<Pair<byte[], String>> familyPaths,
588-
final byte[] regionName, boolean assignSeqNum,
589-
final Token<?> userToken, final String bulkToken, boolean copyFiles) {
587+
final List<Pair<byte[], String>> familyPaths, final byte[] regionName, boolean assignSeqNum,
588+
final Token<?> userToken, final String bulkToken, boolean copyFiles,
589+
List<String> clusterIds) {
590590
RegionSpecifier region = RequestConverter.buildRegionSpecifier(
591591
RegionSpecifierType.REGION_NAME, regionName);
592592

@@ -624,6 +624,9 @@ public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
624624
request.setBulkToken(bulkToken);
625625
}
626626
request.setCopyFile(copyFiles);
627+
if (clusterIds != null) {
628+
request.addAllClusterIds(clusterIds);
629+
}
627630
return request.build();
628631
}
629632

hbase-protocol-shaded/src/main/protobuf/Client.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,7 @@ message BulkLoadHFileRequest {
378378
optional DelegationToken fs_token = 4;
379379
optional string bulk_token = 5;
380380
optional bool copy_file = 6 [default = false];
381+
repeated string cluster_ids = 7;
381382

382383
message FamilyPath {
383384
required bytes family = 1;

hbase-protocol-shaded/src/main/protobuf/WAL.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ message BulkLoadDescriptor {
150150
required bytes encoded_region_name = 2;
151151
repeated StoreDescriptor stores = 3;
152152
required int64 bulkload_seq_num = 4;
153+
repeated string cluster_ids = 5;
153154
}
154155

155156
/**

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6252,7 +6252,7 @@ private static boolean hasMultipleColumnFamilies(Collection<Pair<byte[], String>
62526252
*/
62536253
public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
62546254
BulkLoadListener bulkLoadListener) throws IOException {
6255-
return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false);
6255+
return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, null);
62566256
}
62576257

62586258
/**
@@ -6297,11 +6297,13 @@ String prepareBulkLoad(byte[] family, String srcPath, boolean copyFile)
62976297
* @param bulkLoadListener Internal hooks enabling massaging/preparation of a
62986298
* file about to be bulk loaded
62996299
* @param copyFile always copy hfiles if true
6300+
* @param clusterIds ids from clusters that had already handled the given bulkload event.
63006301
* @return Map from family to List of store file paths if successful, null if failed recoverably
63016302
* @throws IOException if failed unrecoverably.
63026303
*/
63036304
public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths,
6304-
boolean assignSeqId, BulkLoadListener bulkLoadListener, boolean copyFile) throws IOException {
6305+
boolean assignSeqId, BulkLoadListener bulkLoadListener,
6306+
boolean copyFile, List<String> clusterIds) throws IOException {
63056307
long seqId = -1;
63066308
Map<byte[], List<Path>> storeFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR);
63076309
Map<String, Long> storeFilesSizes = new HashMap<>();
@@ -6476,8 +6478,7 @@ public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> f
64766478
WALProtos.BulkLoadDescriptor loadDescriptor =
64776479
ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(),
64786480
UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()),
6479-
storeFiles,
6480-
storeFilesSizes, seqId);
6481+
storeFiles, storeFilesSizes, seqId, clusterIds);
64816482
WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(),
64826483
loadDescriptor, mvcc);
64836484
} catch (IOException ioe) {

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2415,6 +2415,12 @@ public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
24152415
public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
24162416
final BulkLoadHFileRequest request) throws ServiceException {
24172417
long start = EnvironmentEdgeManager.currentTime();
2418+
List<String> clusterIds = new ArrayList<>(request.getClusterIdsList());
2419+
if(clusterIds.contains(this.regionServer.clusterId)){
2420+
return BulkLoadHFileResponse.newBuilder().setLoaded(true).build();
2421+
} else {
2422+
clusterIds.add(this.regionServer.clusterId);
2423+
}
24182424
try {
24192425
checkOpen();
24202426
requestCount.increment();
@@ -2447,15 +2453,15 @@ public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
24472453
}
24482454
try {
24492455
map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null,
2450-
request.getCopyFile());
2456+
request.getCopyFile(), clusterIds);
24512457
} finally {
24522458
if (region.getCoprocessorHost() != null) {
24532459
region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map);
24542460
}
24552461
}
24562462
} else {
24572463
// secure bulk load
2458-
map = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request);
2464+
map = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request, clusterIds);
24592465
}
24602466
BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
24612467
builder.setLoaded(map != null);

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,12 @@ private boolean isUserReferenced(UserGroupInformation ugi) {
213213
}
214214

215215
public Map<byte[], List<Path>> secureBulkLoadHFiles(final HRegion region,
216-
final BulkLoadHFileRequest request) throws IOException {
216+
final BulkLoadHFileRequest request) throws IOException {
217+
return secureBulkLoadHFiles(region, request, null);
218+
}
219+
220+
public Map<byte[], List<Path>> secureBulkLoadHFiles(final HRegion region,
221+
final BulkLoadHFileRequest request, List<String> clusterIds) throws IOException {
217222
final List<Pair<byte[], String>> familyPaths = new ArrayList<>(request.getFamilyPathCount());
218223
for(ClientProtos.BulkLoadHFileRequest.FamilyPath el : request.getFamilyPathList()) {
219224
familyPaths.add(new Pair<>(el.getFamily().toByteArray(), el.getPath()));
@@ -289,7 +294,8 @@ public Map<byte[], List<Path>> run() {
289294
//We call bulkLoadHFiles as requesting user
290295
//To enable access prior to staging
291296
return region.bulkLoadHFiles(familyPaths, true,
292-
new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile());
297+
new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile(),
298+
clusterIds);
293299
} catch (Exception e) {
294300
LOG.error("Failed to complete bulk load", e);
295301
}

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,17 +87,19 @@ public class HFileReplicator {
8787
private ThreadPoolExecutor exec;
8888
private int maxCopyThreads;
8989
private int copiesPerThread;
90+
private List<String> sourceClusterIds;
9091

9192
public HFileReplicator(Configuration sourceClusterConf,
9293
String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath,
9394
Map<String, List<Pair<byte[], List<String>>>> tableQueueMap, Configuration conf,
94-
Connection connection) throws IOException {
95+
Connection connection, List<String> sourceClusterIds) throws IOException {
9596
this.sourceClusterConf = sourceClusterConf;
9697
this.sourceBaseNamespaceDirPath = sourceBaseNamespaceDirPath;
9798
this.sourceHFileArchiveDirPath = sourceHFileArchiveDirPath;
9899
this.bulkLoadHFileMap = tableQueueMap;
99100
this.conf = conf;
100101
this.connection = connection;
102+
this.sourceClusterIds = sourceClusterIds;
101103

102104
userProvider = UserProvider.instantiate(conf);
103105
fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
@@ -128,6 +130,7 @@ public Void replicate() throws IOException {
128130
LoadIncrementalHFiles loadHFiles = null;
129131
try {
130132
loadHFiles = new LoadIncrementalHFiles(conf);
133+
loadHFiles.setClusterIds(sourceClusterIds);
131134
} catch (Exception e) {
132135
LOG.error("Failed to initialize LoadIncrementalHFiles for replicating bulk loaded"
133136
+ " data.", e);

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -175,9 +175,7 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
175175
// invocation of this method per table and cluster id.
176176
Map<TableName, Map<List<UUID>, List<Row>>> rowMap = new TreeMap<>();
177177

178-
// Map of table name Vs list of pair of family and list of hfile paths from its namespace
179-
Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = null;
180-
178+
Map<List<String>, Map<String, List<Pair<byte[], List<String>>>>> bulkLoadsPerClusters = null;
181179
for (WALEntry entry : entries) {
182180
TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray());
183181
if (this.walEntrySinkFilter != null) {
@@ -204,10 +202,19 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
204202
Cell cell = cells.current();
205203
// Handle bulk load hfiles replication
206204
if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
205+
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
206+
if(bulkLoadsPerClusters == null) {
207+
bulkLoadsPerClusters = new HashMap<>();
208+
}
209+
// Map of table name Vs list of pair of family and list of
210+
// hfile paths from its namespace
211+
Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
212+
bulkLoadsPerClusters.get(bld.getClusterIdsList());
207213
if (bulkLoadHFileMap == null) {
208214
bulkLoadHFileMap = new HashMap<>();
215+
bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
209216
}
210-
buildBulkLoadHFileMap(bulkLoadHFileMap, table, cell);
217+
buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
211218
} else {
212219
// Handle wal replication
213220
if (isNewRowOrType(previousCell, cell)) {
@@ -245,14 +252,26 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
245252
LOG.debug("Finished replicating mutations.");
246253
}
247254

248-
if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) {
249-
LOG.debug("Started replicating bulk loaded data.");
250-
HFileReplicator hFileReplicator =
251-
new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId),
255+
if(bulkLoadsPerClusters != null) {
256+
for (Entry<List<String>, Map<String, List<Pair<byte[],
257+
List<String>>>>> entry : bulkLoadsPerClusters.entrySet()) {
258+
Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = entry.getValue();
259+
if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) {
260+
if(LOG.isDebugEnabled()) {
261+
LOG.debug("Started replicating bulk loaded data from cluster ids: {}.",
262+
entry.getKey().toString());
263+
}
264+
HFileReplicator hFileReplicator =
265+
new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId),
252266
sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf,
253-
getConnection());
254-
hFileReplicator.replicate();
255-
LOG.debug("Finished replicating bulk loaded data.");
267+
getConnection(), entry.getKey());
268+
hFileReplicator.replicate();
269+
if(LOG.isDebugEnabled()) {
270+
LOG.debug("Finished replicating bulk loaded data from cluster id: {}",
271+
entry.getKey().toString());
272+
}
273+
}
274+
}
256275
}
257276

258277
int size = entries.size();
@@ -267,8 +286,7 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
267286

268287
private void buildBulkLoadHFileMap(
269288
final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table,
270-
Cell cell) throws IOException {
271-
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
289+
BulkLoadDescriptor bld) throws IOException {
272290
List<StoreDescriptor> storesList = bld.getStoresList();
273291
int storesSize = storesList.size();
274292
for (int j = 0; j < storesSize; j++) {

0 commit comments

Comments
 (0)