Skip to content

Commit 5d18437

Browse files
author
Prathyusha Garre
committed
HBASE-27826 Add FSFT implementations for Virtual links and enable them as part of SplitProcedure
1 parent aa14037 commit 5d18437

17 files changed

Lines changed: 253 additions & 70 deletions

File tree

hbase-protocol-shaded/src/main/protobuf/server/region/StoreFileTracker.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@ option java_generic_services = true;
2525
option java_generate_equals_and_hash = true;
2626
option optimize_for = SPEED;
2727

28+
import "server/io/FS.proto";
2829
message StoreFileEntry {
2930
required string name = 1;
3031
required uint64 size = 2;
32+
optional Reference reference = 3;
3133
}
3234

3335
message StoreFileList {

hbase-server/src/main/java/org/apache/hadoop/hbase/io/Reference.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public class Reference {
5656
* For split HStoreFiles, it specifies if the file covers the lower half or the upper half of the
5757
* key range
5858
*/
59-
static enum Range {
59+
public static enum Range {
6060
/** HStoreFile contains upper half of key range */
6161
top,
6262
/** HStoreFile contains lower half of key range */

hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java

Lines changed: 57 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -644,20 +644,46 @@ public void createDaughterRegions(final MasterProcedureEnv env) throws IOExcepti
644644
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
645645
env.getMasterConfiguration(), fs, tabledir, getParentRegion(), false);
646646
regionFs.createSplitsDir(daughterOneRI, daughterTwoRI);
647+
Pair<List<StoreFileInfo>, List<StoreFileInfo>> expectedReferences =
648+
splitStoreFiles(env, regionFs);
649+
final ExecutorService threadPool = Executors.newFixedThreadPool(2,
650+
new ThreadFactoryBuilder().setNameFormat("RegionCommitter-pool-%d").setDaemon(true)
651+
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
652+
threadPool.submit(new Callable<Path>() {
653+
@Override
654+
public Path call() throws IOException {
655+
return regionFs.commitDaughterRegion(daughterOneRI, expectedReferences.getFirst(), env);
656+
}
657+
});
658+
threadPool.submit(new Callable<Path>() {
659+
@Override
660+
public Path call() throws IOException {
661+
return regionFs.commitDaughterRegion(daughterTwoRI, expectedReferences.getSecond(), env);
662+
}
663+
});
664+
// Shutdown the pool
665+
threadPool.shutdown();
647666

648-
Pair<List<Path>, List<Path>> expectedReferences = splitStoreFiles(env, regionFs);
649-
650-
assertSplitResultFilesCount(fs, expectedReferences.getFirst().size(),
651-
regionFs.getSplitsDir(daughterOneRI));
652-
regionFs.commitDaughterRegion(daughterOneRI, expectedReferences.getFirst(), env);
653-
assertSplitResultFilesCount(fs, expectedReferences.getFirst().size(),
654-
new Path(tabledir, daughterOneRI.getEncodedName()));
655-
656-
assertSplitResultFilesCount(fs, expectedReferences.getSecond().size(),
657-
regionFs.getSplitsDir(daughterTwoRI));
658-
regionFs.commitDaughterRegion(daughterTwoRI, expectedReferences.getSecond(), env);
659-
assertSplitResultFilesCount(fs, expectedReferences.getSecond().size(),
660-
new Path(tabledir, daughterTwoRI.getEncodedName()));
667+
Configuration conf = env.getMasterConfiguration();
668+
// Wait for all the tasks to finish.
669+
// When splits ran on the RegionServer, how-long-to-wait-configuration was named
670+
// hbase.regionserver.fileSplitTimeout. If set, use its value.
671+
long fileSplitTimeout = conf.getLong("hbase.master.fileSplitTimeout",
672+
conf.getLong("hbase.regionserver.fileSplitTimeout", 600000));
673+
try {
674+
boolean stillRunning = !threadPool.awaitTermination(fileSplitTimeout, TimeUnit.MILLISECONDS);
675+
if (stillRunning) {
676+
threadPool.shutdownNow();
677+
// wait for the thread to shutdown completely.
678+
while (!threadPool.isTerminated()) {
679+
Thread.sleep(50);
680+
}
681+
throw new IOException(
682+
"Took too long to split the" + " files and create the references, aborting split");
683+
}
684+
} catch (InterruptedException e) {
685+
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
686+
}
661687
}
662688

663689
private void deleteDaughterRegions(final MasterProcedureEnv env) throws IOException {
@@ -673,8 +699,8 @@ private void deleteDaughterRegions(final MasterProcedureEnv env) throws IOExcept
673699
* Create Split directory
674700
* @param env MasterProcedureEnv
675701
*/
676-
private Pair<List<Path>, List<Path>> splitStoreFiles(final MasterProcedureEnv env,
677-
final HRegionFileSystem regionFs) throws IOException {
702+
private Pair<List<StoreFileInfo>, List<StoreFileInfo>> splitStoreFiles(
703+
final MasterProcedureEnv env, final HRegionFileSystem regionFs) throws IOException {
678704
final Configuration conf = env.getMasterConfiguration();
679705
TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName());
680706
// The following code sets up a thread pool executor with as many slots as
@@ -729,7 +755,8 @@ private Pair<List<Path>, List<Path>> splitStoreFiles(final MasterProcedureEnv en
729755
final ExecutorService threadPool = Executors.newFixedThreadPool(maxThreads,
730756
new ThreadFactoryBuilder().setNameFormat("StoreFileSplitter-pool-%d").setDaemon(true)
731757
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
732-
final List<Future<Pair<Path, Path>>> futures = new ArrayList<Future<Pair<Path, Path>>>(nbFiles);
758+
final List<Future<Pair<StoreFileInfo, StoreFileInfo>>> futures =
759+
new ArrayList<Future<Pair<StoreFileInfo, StoreFileInfo>>>(nbFiles);
733760

734761
// Split each store file.
735762
for (Map.Entry<String, Collection<StoreFileInfo>> e : files.entrySet()) {
@@ -776,12 +803,12 @@ private Pair<List<Path>, List<Path>> splitStoreFiles(final MasterProcedureEnv en
776803
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
777804
}
778805

779-
List<Path> daughterA = new ArrayList<>();
780-
List<Path> daughterB = new ArrayList<>();
806+
List<StoreFileInfo> daughterA = new ArrayList<>();
807+
List<StoreFileInfo> daughterB = new ArrayList<>();
781808
// Look for any exception
782-
for (Future<Pair<Path, Path>> future : futures) {
809+
for (Future<Pair<StoreFileInfo, StoreFileInfo>> future : futures) {
783810
try {
784-
Pair<Path, Path> p = future.get();
811+
Pair<StoreFileInfo, StoreFileInfo> p = future.get();
785812
if (p.getFirst() != null) {
786813
daughterA.add(p.getFirst());
787814
}
@@ -803,6 +830,7 @@ private Pair<List<Path>, List<Path>> splitStoreFiles(final MasterProcedureEnv en
803830
return new Pair<>(daughterA, daughterB);
804831
}
805832

833+
// TODO: update assert to do SFT.load instead of FileSystem listing
806834
private void assertSplitResultFilesCount(final FileSystem fs,
807835
final int expectedSplitResultFileCount, Path dir) throws IOException {
808836
if (expectedSplitResultFileCount != 0) {
@@ -814,8 +842,8 @@ private void assertSplitResultFilesCount(final FileSystem fs,
814842
}
815843
}
816844

817-
private Pair<Path, Path> splitStoreFile(HRegionFileSystem regionFs, TableDescriptor htd,
818-
ColumnFamilyDescriptor hcd, HStoreFile sf) throws IOException {
845+
private Pair<StoreFileInfo, StoreFileInfo> splitStoreFile(HRegionFileSystem regionFs,
846+
TableDescriptor htd, ColumnFamilyDescriptor hcd, HStoreFile sf) throws IOException {
819847
if (LOG.isDebugEnabled()) {
820848
LOG.debug("pid=" + getProcId() + " splitting started for store file: " + sf.getPath()
821849
+ " for region: " + getParentRegion().getShortNameToLog());
@@ -831,22 +859,22 @@ private Pair<Path, Path> splitStoreFile(HRegionFileSystem regionFs, TableDescrip
831859
StoreFileTrackerFactory.create(regionFs.getFileSystem().getConf(), htd, hcd,
832860
HRegionFileSystem.create(regionFs.getFileSystem().getConf(), regionFs.getFileSystem(),
833861
regionFs.getTableDir(), daughterTwoRI));
834-
final Path path_first = regionFs.splitStoreFile(this.daughterOneRI, familyName, sf, splitRow,
835-
false, splitPolicy, daughterOneSft);
836-
final Path path_second = regionFs.splitStoreFile(this.daughterTwoRI, familyName, sf, splitRow,
837-
true, splitPolicy, daughterTwoSft);
862+
final StoreFileInfo sfiFirst = regionFs.splitStoreFile(this.daughterOneRI, familyName, sf,
863+
splitRow, false, splitPolicy, daughterOneSft);
864+
final StoreFileInfo sfiSecond = regionFs.splitStoreFile(this.daughterTwoRI, familyName, sf,
865+
splitRow, true, splitPolicy, daughterTwoSft);
838866
if (LOG.isDebugEnabled()) {
839867
LOG.debug("pid=" + getProcId() + " splitting complete for store file: " + sf.getPath()
840868
+ " for region: " + getParentRegion().getShortNameToLog());
841869
}
842-
return new Pair<Path, Path>(path_first, path_second);
870+
return new Pair<StoreFileInfo, StoreFileInfo>(sfiFirst, sfiSecond);
843871
}
844872

845873
/**
846874
* Utility class used to do the file splitting / reference writing in parallel instead of
847875
* sequentially.
848876
*/
849-
private class StoreFileSplitter implements Callable<Pair<Path, Path>> {
877+
private class StoreFileSplitter implements Callable<Pair<StoreFileInfo, StoreFileInfo>> {
850878
private final HRegionFileSystem regionFs;
851879
private final ColumnFamilyDescriptor hcd;
852880
private final HStoreFile sf;
@@ -867,7 +895,7 @@ public StoreFileSplitter(HRegionFileSystem regionFs, TableDescriptor htd,
867895
}
868896

869897
@Override
870-
public Pair<Path, Path> call() throws IOException {
898+
public Pair<StoreFileInfo, StoreFileInfo> call() throws IOException {
871899
return splitStoreFile(regionFs, htd, hcd, sf);
872900
}
873901
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -519,7 +519,7 @@ void cleanupDaughterRegion(final RegionInfo regionInfo) throws IOException {
519519
* in the filesystem.
520520
* @param regionInfo daughter {@link org.apache.hadoop.hbase.client.RegionInfo}
521521
*/
522-
public Path commitDaughterRegion(final RegionInfo regionInfo, List<Path> allRegionFiles,
522+
public Path commitDaughterRegion(final RegionInfo regionInfo, List<StoreFileInfo> allRegionFiles,
523523
MasterProcedureEnv env) throws IOException {
524524
Path regionDir = this.getSplitsDir(regionInfo);
525525
if (fs.exists(regionDir)) {
@@ -534,7 +534,29 @@ public Path commitDaughterRegion(final RegionInfo regionInfo, List<Path> allRegi
534534
return regionDir;
535535
}
536536

537-
private void insertRegionFilesIntoStoreTracker(List<Path> allFiles, MasterProcedureEnv env,
537+
private void insertRegionFilesIntoStoreTracker(List<StoreFileInfo> allFiles,
538+
MasterProcedureEnv env, HRegionFileSystem regionFs) throws IOException {
539+
TableDescriptor tblDesc =
540+
env.getMasterServices().getTableDescriptors().get(regionInfo.getTable());
541+
// we need to map trackers per store
542+
Map<String, StoreFileTracker> trackerMap = new HashMap<>();
543+
// we need to map store files per store
544+
Map<String, List<StoreFileInfo>> fileInfoMap = new HashMap<>();
545+
for (StoreFileInfo sfi : allFiles) {
546+
Path file = sfi.getPath();
547+
String familyName = file.getParent().getName();
548+
trackerMap.computeIfAbsent(familyName, t -> StoreFileTrackerFactory.create(conf, tblDesc,
549+
tblDesc.getColumnFamily(Bytes.toBytes(familyName)), regionFs));
550+
fileInfoMap.computeIfAbsent(familyName, l -> new ArrayList<>());
551+
List<StoreFileInfo> infos = fileInfoMap.get(familyName);
552+
infos.add(sfi);
553+
}
554+
for (Map.Entry<String, StoreFileTracker> entry : trackerMap.entrySet()) {
555+
entry.getValue().add(fileInfoMap.get(entry.getKey()));
556+
}
557+
}
558+
559+
private void insertRegionfilePathsIntoStoreTracker(List<Path> allFiles, MasterProcedureEnv env,
538560
HRegionFileSystem regionFs) throws IOException {
539561
TableDescriptor tblDesc =
540562
env.getMasterServices().getTableDescriptors().get(regionInfo.getTable());
@@ -591,8 +613,9 @@ public void createSplitsDir(RegionInfo daughterA, RegionInfo daughterB) throws I
591613
* have a reference to a Region.
592614
* @return Path to created reference.
593615
*/
594-
public Path splitStoreFile(RegionInfo hri, String familyName, HStoreFile f, byte[] splitRow,
595-
boolean top, RegionSplitPolicy splitPolicy, StoreFileTracker tracker) throws IOException {
616+
public StoreFileInfo splitStoreFile(RegionInfo hri, String familyName, HStoreFile f,
617+
byte[] splitRow, boolean top, RegionSplitPolicy splitPolicy, StoreFileTracker tracker)
618+
throws IOException {
596619
Path splitDir = new Path(getSplitsDir(hri), familyName);
597620
// Add the referred-to regions name as a dot separated suffix.
598621
// See REF_NAME_REGEX regex above. The referred-to regions name is
@@ -604,7 +627,7 @@ public Path splitStoreFile(RegionInfo hri, String familyName, HStoreFile f, byte
604627
Path p = new Path(splitDir, f.getPath().getName() + "." + parentRegionName);
605628
if (fs.exists(p)) {
606629
LOG.warn("Found an already existing split file for {}. Assuming this is a recovery.", p);
607-
return p;
630+
return tracker.getStoreFileInfo(fs.getFileStatus(p), p, true);
608631
}
609632
boolean createLinkFile = false;
610633
if (splitPolicy == null || !splitPolicy.skipStoreFileRangeCheck(familyName)) {
@@ -662,12 +685,12 @@ public Path splitStoreFile(RegionInfo hri, String familyName, HStoreFile f, byte
662685
hfileName = m.group(4);
663686
}
664687
// must create back reference here
665-
tracker.createHFileLink(linkedTable, linkedRegion, hfileName, true);
688+
HFileLink hFileLink = tracker.createHFileLink(linkedTable, linkedRegion, hfileName, true);
666689
Path path =
667690
new Path(splitDir, HFileLink.createHFileLinkName(linkedTable, linkedRegion, hfileName));
668691
LOG.info("Created linkFile:" + path.toString() + " for child: " + hri.getEncodedName()
669692
+ ", parent: " + regionInfoForFs.getEncodedName());
670-
return path;
693+
return new StoreFileInfo(conf, fs, path, hFileLink);
671694
} catch (IOException e) {
672695
// if create HFileLink file failed, then just skip the error and create Reference file
673696
LOG.error("Create link file for " + hfileName + " for child " + hri.getEncodedName()
@@ -678,7 +701,7 @@ public Path splitStoreFile(RegionInfo hri, String familyName, HStoreFile f, byte
678701
Reference r =
679702
top ? Reference.createTopReference(splitRow) : Reference.createBottomReference(splitRow);
680703
tracker.createReference(r, p);
681-
return p;
704+
return new StoreFileInfo(conf, fs, p, r);
682705
}
683706

684707
// ===========================================================================
@@ -747,7 +770,8 @@ public void commitMergedRegion(List<Path> allMergedFiles, MasterProcedureEnv env
747770
Path regionInfoFile = new Path(regionDir, REGION_INFO_FILE);
748771
byte[] regionInfoContent = getRegionInfoFileContent(regionInfo);
749772
writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
750-
insertRegionFilesIntoStoreTracker(allMergedFiles, env, this);
773+
List<StoreFileInfo> infos = new ArrayList<StoreFileInfo>();
774+
insertRegionfilePathsIntoStoreTracker(allMergedFiles, env, this);
751775
}
752776
}
753777

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,17 @@ public StoreFileInfo(final Configuration conf, final FileSystem fs, final FileSt
171171
this(conf, fs, fileStatus, null, link);
172172
}
173173

174+
/**
175+
* Create a Store File Info from an HFileLink
176+
* @param conf The {@link Configuration} to use
177+
* @param fs The current file system to use
178+
* @param fileStatus The {@link FileStatus} of the file
179+
*/
180+
public StoreFileInfo(final Configuration conf, final FileSystem fs, final Path initiaPath,
181+
final HFileLink link) {
182+
this(conf, fs, initiaPath, null, link);
183+
}
184+
174185
/**
175186
* Create a Store File Info from an HFileLink
176187
* @param conf The {@link Configuration} to use
@@ -183,6 +194,18 @@ public StoreFileInfo(final Configuration conf, final FileSystem fs, final FileSt
183194
this(conf, fs, fileStatus, reference, null);
184195
}
185196

197+
/**
198+
* Create a Store File Info from an HFileLink
199+
* @param conf The {@link Configuration} to use
200+
* @param fs The current file system to use
201+
* @param fileStatus The {@link FileStatus} of the file
202+
* @param reference The reference instance
203+
*/
204+
public StoreFileInfo(final Configuration conf, final FileSystem fs, final Path initialPath,
205+
final Reference reference) {
206+
this(conf, fs, initialPath, reference, null);
207+
}
208+
186209
/**
187210
* Create a Store File Info from an HFileLink and a Reference
188211
* @param conf The {@link Configuration} to use
@@ -204,6 +227,26 @@ public StoreFileInfo(final Configuration conf, final FileSystem fs, final FileSt
204227
this.conf.getBoolean(STORE_FILE_READER_NO_READAHEAD, DEFAULT_STORE_FILE_READER_NO_READAHEAD);
205228
}
206229

230+
/**
231+
* Create a Store File Info from an HFileLink and a Reference
232+
* @param conf The {@link Configuration} to use
233+
* @param fs The current file system to use
234+
* @param fileStatus The {@link FileStatus} of the file
235+
* @param reference The reference instance
236+
* @param link The link instance
237+
*/
238+
public StoreFileInfo(final Configuration conf, final FileSystem fs, final Path path,
239+
final Reference reference, final HFileLink link) {
240+
this.fs = fs;
241+
this.conf = conf;
242+
this.primaryReplica = false;
243+
this.initialPath = path;
244+
this.reference = reference;
245+
this.link = link;
246+
this.noReadahead =
247+
this.conf.getBoolean(STORE_FILE_READER_NO_READAHEAD, DEFAULT_STORE_FILE_READER_NO_READAHEAD);
248+
}
249+
207250
/**
208251
* Create a Store File Info from an HFileLink and a Reference
209252
* @param conf The {@link Configuration} to use

0 commit comments

Comments
 (0)