Skip to content

Commit 530feba

Browse files
HBASE-29255: Integrate backup WAL cleanup logic with the delete command (#7007)
* Store bulkload files in daywise bucket as well * Integrate backup WAL cleanup logic with the delete command * address the review comments * address the review comments * address the review comments * add more unit tests to cover all cases * address the review comments
1 parent 5fc58af commit 530feba

7 files changed

Lines changed: 575 additions & 8 deletions

File tree

hbase-backup/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,11 @@
163163
<artifactId>junit</artifactId>
164164
<scope>test</scope>
165165
</dependency>
166+
<dependency>
167+
<groupId>org.mockito</groupId>
168+
<artifactId>mockito-inline</artifactId>
169+
<scope>test</scope>
170+
</dependency>
166171
</dependencies>
167172
<build>
168173
<plugins>

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package org.apache.hadoop.hbase.backup.impl;
1919

2020
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
21+
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
22+
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER;
2123
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
2224
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_BACKUP_LIST_DESC;
2325
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_BANDWIDTH;
@@ -47,18 +49,26 @@
4749
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_WORKERS_DESC;
4850
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME;
4951
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME_DESC;
52+
import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
53+
import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
5054

5155
import java.io.IOException;
5256
import java.net.URI;
57+
import java.text.ParseException;
58+
import java.text.SimpleDateFormat;
5359
import java.util.ArrayList;
60+
import java.util.Collections;
61+
import java.util.HashSet;
5462
import java.util.List;
5563
import java.util.Map;
5664
import java.util.Optional;
5765
import java.util.Set;
66+
import java.util.TimeZone;
5867
import java.util.concurrent.TimeUnit;
5968
import org.apache.commons.lang3.StringUtils;
6069
import org.apache.hadoop.conf.Configuration;
6170
import org.apache.hadoop.conf.Configured;
71+
import org.apache.hadoop.fs.FileStatus;
6272
import org.apache.hadoop.fs.FileSystem;
6373
import org.apache.hadoop.fs.Path;
6474
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -71,6 +81,7 @@
7181
import org.apache.hadoop.hbase.backup.BackupRestoreConstants.BackupCommand;
7282
import org.apache.hadoop.hbase.backup.BackupType;
7383
import org.apache.hadoop.hbase.backup.HBackupFileSystem;
84+
import org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager;
7485
import org.apache.hadoop.hbase.backup.util.BackupSet;
7586
import org.apache.hadoop.hbase.backup.util.BackupUtils;
7687
import org.apache.hadoop.hbase.client.Connection;
@@ -80,6 +91,7 @@
8091
import org.apache.yetus.audience.InterfaceAudience;
8192

8293
import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
94+
import org.apache.hbase.thirdparty.com.google.common.base.Strings;
8395
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
8496
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
8597
import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
@@ -649,6 +661,8 @@ public void execute() throws IOException {
649661
} else if (cmdline.hasOption(OPTION_LIST)) {
650662
executeDeleteListOfBackups(cmdline, isForceDelete);
651663
}
664+
665+
cleanUpUnusedBackupWALs();
652666
}
653667

654668
private void executeDeleteOlderThan(CommandLine cmdline, boolean isForceDelete)
@@ -876,6 +890,139 @@ private boolean canAnyOtherBackupCover(List<BackupInfo> allBackups, BackupInfo c
876890
return false;
877891
}
878892

893+
/**
894+
* Cleans up Write-Ahead Logs (WALs) that are no longer required for PITR after a successful
895+
* backup deletion.
896+
*/
897+
private void cleanUpUnusedBackupWALs() throws IOException {
898+
Configuration conf = getConf() != null ? getConf() : HBaseConfiguration.create();
899+
String backupWalDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
900+
901+
if (Strings.isNullOrEmpty(backupWalDir)) {
902+
System.out.println("No WAL directory specified for continuous backup. Skipping cleanup.");
903+
return;
904+
}
905+
906+
try (BackupSystemTable sysTable = new BackupSystemTable(conn)) {
907+
// Get list of tables under continuous backup
908+
Map<TableName, Long> continuousBackupTables = sysTable.getContinuousBackupTableSet();
909+
if (continuousBackupTables.isEmpty()) {
910+
System.out.println("No continuous backups configured. Skipping WAL cleanup.");
911+
return;
912+
}
913+
914+
// Find the earliest timestamp after which WALs are still needed
915+
long cutoffTimestamp = determineWALCleanupCutoffTime(sysTable);
916+
if (cutoffTimestamp == 0) {
917+
System.err.println("ERROR: No valid full backup found. Skipping WAL cleanup.");
918+
return;
919+
}
920+
921+
// Update metadata before actual cleanup to avoid inconsistencies
922+
updateBackupTableStartTimes(sysTable, cutoffTimestamp);
923+
924+
// Delete WAL files older than cutoff timestamp
925+
deleteOldWALFiles(conf, backupWalDir, cutoffTimestamp);
926+
927+
}
928+
}
929+
930+
/**
931+
* Determines the cutoff time for cleaning WAL files.
932+
* @param sysTable Backup system table
933+
* @return cutoff timestamp or 0 if not found
934+
*/
935+
long determineWALCleanupCutoffTime(BackupSystemTable sysTable) throws IOException {
936+
List<BackupInfo> backupInfos = sysTable.getBackupInfos(BackupState.COMPLETE);
937+
Collections.reverse(backupInfos); // Start from oldest
938+
939+
for (BackupInfo backupInfo : backupInfos) {
940+
if (BackupType.FULL.equals(backupInfo.getType())) {
941+
return backupInfo.getStartTs();
942+
}
943+
}
944+
return 0;
945+
}
946+
947+
/**
948+
* Updates the start time for continuous backups if older than cutoff timestamp.
949+
* @param sysTable Backup system table
950+
* @param cutoffTimestamp Timestamp before which WALs are no longer needed
951+
*/
952+
void updateBackupTableStartTimes(BackupSystemTable sysTable, long cutoffTimestamp)
953+
throws IOException {
954+
955+
Map<TableName, Long> backupTables = sysTable.getContinuousBackupTableSet();
956+
Set<TableName> tablesToUpdate = new HashSet<>();
957+
958+
for (Map.Entry<TableName, Long> entry : backupTables.entrySet()) {
959+
if (entry.getValue() < cutoffTimestamp) {
960+
tablesToUpdate.add(entry.getKey());
961+
}
962+
}
963+
964+
if (!tablesToUpdate.isEmpty()) {
965+
sysTable.updateContinuousBackupTableSet(tablesToUpdate, cutoffTimestamp);
966+
}
967+
}
968+
969+
/**
970+
* Cleans up old WAL and bulk-loaded files based on the determined cutoff timestamp.
971+
*/
972+
void deleteOldWALFiles(Configuration conf, String backupWalDir, long cutoffTime)
973+
throws IOException {
974+
System.out.println("Starting WAL cleanup in backup directory: " + backupWalDir
975+
+ " with cutoff time: " + cutoffTime);
976+
977+
BackupFileSystemManager manager =
978+
new BackupFileSystemManager(CONTINUOUS_BACKUP_REPLICATION_PEER, conf, backupWalDir);
979+
FileSystem fs = manager.getBackupFs();
980+
Path walDir = manager.getWalsDir();
981+
Path bulkloadDir = manager.getBulkLoadFilesDir();
982+
983+
SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
984+
dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
985+
986+
System.out.println("Listing directories under: " + walDir);
987+
988+
FileStatus[] directories = fs.listStatus(walDir);
989+
990+
for (FileStatus dirStatus : directories) {
991+
if (!dirStatus.isDirectory()) {
992+
continue; // Skip files, we only want directories
993+
}
994+
995+
Path dirPath = dirStatus.getPath();
996+
String dirName = dirPath.getName();
997+
998+
try {
999+
long dayStart = parseDayDirectory(dirName, dateFormat);
1000+
System.out
1001+
.println("Checking WAL directory: " + dirName + " (Start Time: " + dayStart + ")");
1002+
1003+
// If WAL files of that day are older than cutoff time, delete them
1004+
if (dayStart + ONE_DAY_IN_MILLISECONDS - 1 < cutoffTime) {
1005+
System.out.println("Deleting outdated WAL directory: " + dirPath);
1006+
fs.delete(dirPath, true);
1007+
fs.delete(new Path(bulkloadDir, dirName), true);
1008+
}
1009+
} catch (ParseException e) {
1010+
System.out.println("WARNING: Failed to parse directory name '" + dirName
1011+
+ "'. Skipping. Error: " + e.getMessage());
1012+
} catch (IOException e) {
1013+
System.out.println("WARNING: Failed to delete directory '" + dirPath
1014+
+ "'. Skipping. Error: " + e.getMessage());
1015+
}
1016+
}
1017+
1018+
System.out.println("Completed WAL cleanup for backup directory: " + backupWalDir);
1019+
}
1020+
1021+
private long parseDayDirectory(String dayDir, SimpleDateFormat dateFormat)
1022+
throws ParseException {
1023+
return dateFormat.parse(dayDir).getTime();
1024+
}
1025+
8791026
@Override
8801027
protected void printUsage() {
8811028
System.out.println(DELETE_CMD_USAGE);

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1056,6 +1056,32 @@ public void addContinuousBackupTableSet(Set<TableName> tables, long startTimesta
10561056
}
10571057
}
10581058

1059+
/**
1060+
* Updates the system table with the new start timestamps for continuous backup tables.
1061+
* @param tablesToUpdate The set of tables that need their start timestamps updated.
1062+
* @param newStartTimestamp The new start timestamp to be set.
1063+
*/
1064+
public void updateContinuousBackupTableSet(Set<TableName> tablesToUpdate, long newStartTimestamp)
1065+
throws IOException {
1066+
if (tablesToUpdate == null || tablesToUpdate.isEmpty()) {
1067+
LOG.warn("No tables provided for updating start timestamps.");
1068+
return;
1069+
}
1070+
1071+
try (Table table = connection.getTable(tableName)) {
1072+
Put put = new Put(rowkey(CONTINUOUS_BACKUP_SET));
1073+
1074+
for (TableName tableName : tablesToUpdate) {
1075+
put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes(tableName.getNameAsString()),
1076+
Bytes.toBytes(newStartTimestamp));
1077+
}
1078+
1079+
table.put(put);
1080+
LOG.info("Successfully updated start timestamps for {} tables in the backup system table.",
1081+
tablesToUpdate.size());
1082+
}
1083+
}
1084+
10591085
/**
10601086
* Removes tables from the global continuous backup set. Only removes entries that currently exist
10611087
* in the backup system table.

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Date;
2424
import java.util.List;
2525
import java.util.Map;
26+
import java.util.TimeZone;
2627
import java.util.UUID;
2728
import java.util.concurrent.ConcurrentHashMap;
2829
import java.util.concurrent.Executors;
@@ -304,7 +305,7 @@ private void backupWalEntries(long day, List<WAL.Entry> walEntries) throws IOExc
304305
walWriter.append(entry);
305306
}
306307
walWriter.sync(true);
307-
uploadBulkLoadFiles(bulkLoadFiles);
308+
uploadBulkLoadFiles(day, bulkLoadFiles);
308309
} catch (UncheckedIOException e) {
309310
String errorMsg = Utils.logPeerId(peerId) + " Failed to get or create WAL Writer for " + day;
310311
LOG.error("{} Backup failed for day {}. Error: {}", Utils.logPeerId(peerId), day,
@@ -314,9 +315,7 @@ private void backupWalEntries(long day, List<WAL.Entry> walEntries) throws IOExc
314315
}
315316

316317
private FSHLogProvider.Writer createWalWriter(long dayInMillis) {
317-
// Convert dayInMillis to "yyyy-MM-dd" format
318-
SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
319-
String dayDirectoryName = dateFormat.format(new Date(dayInMillis));
318+
String dayDirectoryName = formatToDateString(dayInMillis);
320319

321320
FileSystem fs = backupFileSystemManager.getBackupFs();
322321
Path walsDir = backupFileSystemManager.getWalsDir();
@@ -376,17 +375,21 @@ private void close() {
376375
}
377376
}
378377

379-
private void uploadBulkLoadFiles(List<Path> bulkLoadFiles) throws IOException {
378+
private void uploadBulkLoadFiles(long dayInMillis, List<Path> bulkLoadFiles) throws IOException {
380379
LOG.debug("{} Starting upload of {} bulk load files", Utils.logPeerId(peerId),
381380
bulkLoadFiles.size());
382381

383382
if (LOG.isTraceEnabled()) {
384383
LOG.trace("{} Bulk load files to upload: {}", Utils.logPeerId(peerId),
385384
bulkLoadFiles.stream().map(Path::toString).collect(Collectors.joining(", ")));
386385
}
386+
String dayDirectoryName = formatToDateString(dayInMillis);
387+
Path bulkloadDir = new Path(backupFileSystemManager.getBulkLoadFilesDir(), dayDirectoryName);
388+
backupFileSystemManager.getBackupFs().mkdirs(bulkloadDir);
389+
387390
for (Path file : bulkLoadFiles) {
388391
Path sourcePath = getBulkLoadFileStagingPath(file);
389-
Path destPath = new Path(backupFileSystemManager.getBulkLoadFilesDir(), file);
392+
Path destPath = new Path(bulkloadDir, file);
390393

391394
try {
392395
LOG.debug("{} Copying bulk load file from {} to {}", Utils.logPeerId(peerId), sourcePath,
@@ -407,6 +410,15 @@ private void uploadBulkLoadFiles(List<Path> bulkLoadFiles) throws IOException {
407410
LOG.debug("{} Completed upload of bulk load files", Utils.logPeerId(peerId));
408411
}
409412

413+
/**
414+
* Convert dayInMillis to "yyyy-MM-dd" format
415+
*/
416+
private String formatToDateString(long dayInMillis) {
417+
SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
418+
dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
419+
return dateFormat.format(new Date(dayInMillis));
420+
}
421+
410422
private Path getBulkLoadFileStagingPath(Path relativePathFromNamespace) throws IOException {
411423
FileSystem rootFs = CommonFSUtils.getRootDirFileSystem(conf);
412424
Path rootDir = CommonFSUtils.getRootDir(conf);

0 commit comments

Comments
 (0)