diff --git a/hbase-backup/pom.xml b/hbase-backup/pom.xml index dec8afc02eb1..5c496154d633 100644 --- a/hbase-backup/pom.xml +++ b/hbase-backup/pom.xml @@ -182,6 +182,11 @@ mockito-inline test + + org.mockito + mockito-inline + test + diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreDriver.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreDriver.java index 19159eeba921..abdf52f14302 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreDriver.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreDriver.java @@ -27,7 +27,6 @@ import java.net.URI; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; @@ -102,15 +101,9 @@ protected int executeRestore(boolean check, TableName[] fromTables, TableName[] return -5; } - // TODO: Currently hardcoding keepOriginalSplits=false and restoreRootDir via tmp dir. - // These should come from user input (same issue exists in normal restore). - // Expose them as configurable options in future. - PointInTimeRestoreRequest pointInTimeRestoreRequest = - new PointInTimeRestoreRequest.Builder().withBackupRootDir(backupRootDir).withCheck(check) - .withFromTables(fromTables).withToTables(toTables).withOverwrite(isOverwrite) - .withToDateTime(endTime).withKeepOriginalSplits(false).withRestoreRootDir( - BackupUtils.getTmpRestoreOutputDir(FileSystem.get(conf), conf).toString()) - .build(); + PointInTimeRestoreRequest pointInTimeRestoreRequest = new PointInTimeRestoreRequest.Builder() + .withBackupRootDir(backupRootDir).withCheck(check).withFromTables(fromTables) + .withToTables(toTables).withOverwrite(isOverwrite).withToDateTime(endTime).build(); client.pointInTimeRestore(pointInTimeRestoreRequest); } catch (Exception e) { diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreRequest.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreRequest.java index d7f69c05b683..f2462a1cfd18 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreRequest.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreRequest.java @@ -27,33 +27,25 @@ public final class PointInTimeRestoreRequest { private final String backupRootDir; - private final String restoreRootDir; private final boolean check; private final TableName[] fromTables; private final TableName[] toTables; private final boolean overwrite; private final long toDateTime; - private final boolean isKeepOriginalSplits; private PointInTimeRestoreRequest(Builder builder) { this.backupRootDir = builder.backupRootDir; - this.restoreRootDir = builder.restoreRootDir; this.check = builder.check; this.fromTables = builder.fromTables; this.toTables = builder.toTables; this.overwrite = builder.overwrite; this.toDateTime = builder.toDateTime; - this.isKeepOriginalSplits = builder.isKeepOriginalSplits; } public String getBackupRootDir() { return backupRootDir; } - public String getRestoreRootDir() { - return restoreRootDir; - } - public boolean isCheck() { return check; } @@ -74,30 +66,19 @@ public long getToDateTime() { return toDateTime; } - public boolean isKeepOriginalSplits() { - return isKeepOriginalSplits; - } - public static class Builder { private String backupRootDir; - private String restoreRootDir; private boolean check = false; private TableName[] fromTables; private TableName[] toTables; private boolean overwrite = false; private long toDateTime; - private boolean isKeepOriginalSplits; public Builder withBackupRootDir(String backupRootDir) { this.backupRootDir = backupRootDir; return this; } - public Builder withRestoreRootDir(String restoreRootDir) { - this.restoreRootDir = restoreRootDir; - return this; - } - public Builder withCheck(boolean check) { this.check = check; return this; @@ -123,11 +104,6 @@ public Builder withToDateTime(long dateTime) { return this; } - public Builder withKeepOriginalSplits(boolean isKeepOriginalSplits) { - this.isKeepOriginalSplits = isKeepOriginalSplits; - return this; - } - public PointInTimeRestoreRequest build() { return new PointInTimeRestoreRequest(this); } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java index 3f31255d60f6..4526882ca6f4 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS; +import static org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY; import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_EMPTY_FILES; import java.io.IOException; @@ -33,9 +34,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.backup.BackupRestoreFactory; import org.apache.hadoop.hbase.backup.PointInTimeRestoreRequest; -import org.apache.hadoop.hbase.backup.RestoreJob; import org.apache.hadoop.hbase.backup.RestoreRequest; import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.client.Connection; @@ -411,6 +410,9 @@ private Tool initializeWalPlayer(long startTime, long endTime) { conf.setLong(WALInputFormat.START_TIME_KEY, startTime); conf.setLong(WALInputFormat.END_TIME_KEY, endTime); conf.setBoolean(IGNORE_EMPTY_FILES, true); + // HFile output format defaults to false in HFileOutputFormat2, but we are explicitly setting + // it here just in case + conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); Tool walPlayer = new WALPlayer(); walPlayer.setConf(conf); return walPlayer; diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java index ad57ada42be3..91192d2ccbe0 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java @@ -17,11 +17,6 @@ */ package org.apache.hadoop.hbase.backup.impl; -import static org.apache.hadoop.hbase.backup.BackupInfo.withRoot; -import static org.apache.hadoop.hbase.backup.BackupInfo.withState; -import static org.apache.hadoop.hbase.backup.BackupInfo.withType; -import static org.apache.hadoop.hbase.backup.impl.BackupSystemTable.Order.NEW_TO_OLD; - import com.google.errorprone.annotations.RestrictedApi; import java.io.IOException; import java.util.ArrayList; diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java index 23b978ab8222..33afb5f99672 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.backup.impl; -import static org.apache.hadoop.hbase.backup.BackupInfo.withState; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER; @@ -50,7 +49,6 @@ import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_WORKERS_DESC; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME_DESC; -import static org.apache.hadoop.hbase.backup.impl.BackupSystemTable.Order.NEW_TO_OLD; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT; @@ -83,7 +81,7 @@ import org.apache.hadoop.hbase.backup.BackupRestoreConstants.BackupCommand; import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.HBackupFileSystem; -import org.apache.hadoop.hbase.backup.util.BackupFileSystemManager; +import org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager; import org.apache.hadoop.hbase.backup.util.BackupSet; import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.client.Admin; @@ -803,8 +801,7 @@ private List getTablesDependentOnBackupForPITR(String backupId) throw } // Check if there is any other valid backup that can cover the PITR window - List allBackups = - backupSystemTable.getBackupHistory(withState(BackupInfo.BackupState.COMPLETE)); + List allBackups = backupSystemTable.getBackupInfos(BackupState.COMPLETE); boolean hasAnotherValidBackup = canAnyOtherBackupCover(allBackups, targetBackup, table, coveredPitrWindow.get(), continuousBackupStartTimes.get(table), maxAllowedPITRTime, currentTime); @@ -896,8 +893,7 @@ private boolean canAnyOtherBackupCover(List allBackups, BackupInfo c /** * Cleans up Write-Ahead Logs (WALs) that are no longer required for PITR after a successful - * backup deletion. If no full backups are present, all WALs are deleted, tables are removed - * from continuous backup metadata, and the associated replication peer is disabled. + * backup deletion. */ private void cleanUpUnusedBackupWALs() throws IOException { Configuration conf = getConf() != null ? getConf() : HBaseConfiguration.create(); @@ -908,8 +904,7 @@ private void cleanUpUnusedBackupWALs() throws IOException { return; } - try (Admin admin = conn.getAdmin(); - BackupSystemTable sysTable = new BackupSystemTable(conn)) { + try (BackupSystemTable sysTable = new BackupSystemTable(conn)) { // Get list of tables under continuous backup Map continuousBackupTables = sysTable.getContinuousBackupTableSet(); if (continuousBackupTables.isEmpty()) { @@ -920,15 +915,7 @@ private void cleanUpUnusedBackupWALs() throws IOException { // Find the earliest timestamp after which WALs are still needed long cutoffTimestamp = determineWALCleanupCutoffTime(sysTable); if (cutoffTimestamp == 0) { - // No full backup exists. PITR cannot function without a base full backup. - // Clean up all WALs, remove tables from backup metadata, and disable the replication - // peer. - System.out - .println("No full backups found. Cleaning up all WALs and disabling replication peer."); - - disableContinuousBackupReplicationPeer(admin); - removeAllTablesFromContinuousBackup(sysTable); - deleteAllBackupWALFiles(conf, backupWalDir); + System.err.println("ERROR: No valid full backup found. Skipping WAL cleanup."); return; } @@ -947,8 +934,7 @@ private void cleanUpUnusedBackupWALs() throws IOException { * @return cutoff timestamp or 0 if not found */ long determineWALCleanupCutoffTime(BackupSystemTable sysTable) throws IOException { - List backupInfos = - sysTable.getBackupHistory(withState(BackupInfo.BackupState.COMPLETE)); + List backupInfos = sysTable.getBackupInfos(BackupState.COMPLETE); Collections.reverse(backupInfos); // Start from oldest for (BackupInfo backupInfo : backupInfos) { @@ -959,16 +945,6 @@ long determineWALCleanupCutoffTime(BackupSystemTable sysTable) throws IOExceptio return 0; } - private void disableContinuousBackupReplicationPeer(Admin admin) throws IOException { - for (ReplicationPeerDescription peer : admin.listReplicationPeers()) { - if (peer.getPeerId().equals(CONTINUOUS_BACKUP_REPLICATION_PEER) && peer.isEnabled()) { - admin.disableReplicationPeer(CONTINUOUS_BACKUP_REPLICATION_PEER); - System.out.println("Disabled replication peer: " + CONTINUOUS_BACKUP_REPLICATION_PEER); - break; - } - } - } - /** * Updates the start time for continuous backups if older than cutoff timestamp. * @param sysTable Backup system table @@ -991,49 +967,6 @@ void updateBackupTableStartTimes(BackupSystemTable sysTable, long cutoffTimestam } } - private void removeAllTablesFromContinuousBackup(BackupSystemTable sysTable) - throws IOException { - Map allTables = sysTable.getContinuousBackupTableSet(); - if (!allTables.isEmpty()) { - sysTable.removeContinuousBackupTableSet(allTables.keySet()); - System.out.println("Removed all tables from continuous backup metadata."); - } - } - - private void deleteAllBackupWALFiles(Configuration conf, String backupWalDir) - throws IOException { - try { - BackupFileSystemManager manager = - new BackupFileSystemManager(CONTINUOUS_BACKUP_REPLICATION_PEER, conf, backupWalDir); - FileSystem fs = manager.getBackupFs(); - Path walDir = manager.getWalsDir(); - Path bulkloadDir = manager.getBulkLoadFilesDir(); - - // Delete contents under WAL directory - if (fs.exists(walDir)) { - FileStatus[] walContents = fs.listStatus(walDir); - for (FileStatus item : walContents) { - fs.delete(item.getPath(), true); // recursive delete of each child - } - System.out.println("Deleted all contents under WAL directory: " + walDir); - } - - // Delete contents under bulk load directory - if (fs.exists(bulkloadDir)) { - FileStatus[] bulkContents = fs.listStatus(bulkloadDir); - for (FileStatus item : bulkContents) { - fs.delete(item.getPath(), true); // recursive delete of each child - } - System.out.println("Deleted all contents under Bulk Load directory: " + bulkloadDir); - } - - } catch (IOException e) { - System.out.println("WARNING: Failed to delete contents under backup directories: " - + backupWalDir + ". Error: " + e.getMessage()); - throw e; - } - } - /** * Cleans up old WAL and bulk-loaded files based on the determined cutoff timestamp. */ @@ -1072,15 +1005,13 @@ void deleteOldWALFiles(Configuration conf, String backupWalDir, long cutoffTime) if (dayStart + ONE_DAY_IN_MILLISECONDS - 1 < cutoffTime) { System.out.println("Deleting outdated WAL directory: " + dirPath); fs.delete(dirPath, true); - Path bulkloadPath = new Path(bulkloadDir, dirName); - System.out.println("Deleting corresponding bulk-load directory: " + bulkloadPath); - fs.delete(bulkloadPath, true); + fs.delete(new Path(bulkloadDir, dirName), true); } } catch (ParseException e) { System.out.println("WARNING: Failed to parse directory name '" + dirName + "'. Skipping. Error: " + e.getMessage()); } catch (IOException e) { - System.err.println("WARNING: Failed to delete directory '" + dirPath + System.out.println("WARNING: Failed to delete directory '" + dirPath + "'. Skipping. Error: " + e.getMessage()); } } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java index 73ed588b265c..cdc396276947 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java @@ -1372,7 +1372,7 @@ private Delete createDeleteForIncrBackupTableSet(String backupRoot) { private Delete createDeleteForContinuousBackupTableSet(Set tables) { Delete delete = new Delete(rowkey(CONTINUOUS_BACKUP_SET)); for (TableName tableName : tables) { - delete.addColumn(META_FAMILY, Bytes.toBytes(tableName.getNameAsString())); + delete.addColumns(META_FAMILY, Bytes.toBytes(tableName.getNameAsString())); } return delete; } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/DefaultPitrRestoreHandler.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/DefaultPitrRestoreHandler.java index a8b15a222e8e..c6844ba96bd3 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/DefaultPitrRestoreHandler.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/DefaultPitrRestoreHandler.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hbase.backup.impl; -import static org.apache.hadoop.hbase.backup.BackupInfo.withState; - import java.io.IOException; import java.util.List; import java.util.stream.Collectors; @@ -50,7 +48,7 @@ public DefaultPitrRestoreHandler(Connection conn, PointInTimeRestoreRequest requ protected List getBackupMetadata(PointInTimeRestoreRequest request) throws IOException { try (BackupSystemTable table = new BackupSystemTable(conn)) { - return table.getBackupHistory(withState(BackupInfo.BackupState.COMPLETE)).stream() + return table.getBackupInfos(BackupInfo.BackupState.COMPLETE).stream() .map(BackupInfoAdapter::new).collect(Collectors.toList()); } } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java index 3acc21593b25..dafecefabb02 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.backup.impl; -import static org.apache.hadoop.hbase.HConstants.REPLICATION_BULKLOAD_ENABLE_KEY; import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.BACKUP_ATTEMPTS_PAUSE_MS_KEY; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.BACKUP_MAX_ATTEMPTS_KEY; @@ -191,17 +190,17 @@ private void handleContinuousBackup(Admin admin) throws IOException { // After this checkpoint, even if entering cancel process, will let the backup finished backupInfo.setState(BackupState.COMPLETE); - if (!conf.getBoolean(REPLICATION_BULKLOAD_ENABLE_KEY, false)) { - System.out.println("WARNING: Bulkload replication is not enabled. " - + "Since continuous backup is using HBase replication, bulk loaded files won't be backed up as part of continuous backup. " - + "To ensure bulk-loaded files are backed up, enable bulkload replication " - + "(hbase.replication.bulkload.enabled=true) and configure a unique cluster ID using " - + "hbase.replication.cluster.id. This cluster ID is required by the replication framework " - + "to uniquely identify clusters, even if continuous backup itself does not directly rely on it."); + if (!conf.getBoolean("hbase.replication.bulkload.enabled", false)) { + System.out.println("NOTE: Bulkload replication is not enabled. " + + "Bulk loaded files will not be backed up as part of continuous backup. " + + "To ensure bulk loaded files are included in the backup, please enable bulkload replication " + + "(hbase.replication.bulkload.enabled=true) and configure other necessary settings " + + "to properly enable bulkload replication."); } } private void handleNonContinuousBackup(Admin admin) throws IOException { + initializeBackupStartCode(backupManager); performLogRoll(); performBackupSnapshots(admin); backupManager.addIncrementalBackupTableSet(backupInfo.getTables()); @@ -213,6 +212,19 @@ private void handleNonContinuousBackup(Admin admin) throws IOException { updateBackupMetadata(); } + private void initializeBackupStartCode(BackupManager backupManager) throws IOException { + String savedStartCode; + boolean firstBackup; + // do snapshot for full table backup + savedStartCode = backupManager.readBackupStartCode(); + firstBackup = savedStartCode == null || Long.parseLong(savedStartCode) == 0L; + if (firstBackup) { + // This is our first backup. Let's put some marker to system table so that we can hold the + // logs while we do the backup. + backupManager.writeBackupStartCode(0L); + } + } + private void performLogRoll() throws IOException { // We roll log here before we do the snapshot. It is possible there is duplicate data // in the log that is already in the snapshot. But if we do it after the snapshot, we @@ -248,6 +260,8 @@ private void updateBackupMetadata() throws IOException { backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps); Map> timestampMap = backupManager.readLogTimestampMap(); backupInfo.setTableSetTimestampMap(timestampMap); + Long newStartCode = BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(timestampMap)); + backupManager.writeBackupStartCode(newStartCode); } private long startContinuousWALBackup(Admin admin) throws IOException { @@ -286,9 +300,6 @@ private void updateContinuousBackupReplicationPeer(Admin admin) throws IOExcepti .collect(Collectors.toMap(tableName -> tableName, tableName -> new ArrayList<>())); try { - if (!admin.isReplicationPeerEnabled(CONTINUOUS_BACKUP_REPLICATION_PEER)) { - admin.enableReplicationPeer(CONTINUOUS_BACKUP_REPLICATION_PEER); - } admin.appendReplicationPeerTableCFs(CONTINUOUS_BACKUP_REPLICATION_PEER, tableMap); LOG.info("Updated replication peer {} with table and column family map.", CONTINUOUS_BACKUP_REPLICATION_PEER); diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index 8339786f9733..190becad36b6 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -17,9 +17,9 @@ */ package org.apache.hadoop.hbase.backup.impl; -import static org.apache.hadoop.hbase.backup.BackupInfo.withState; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY; +import static org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY; import java.io.IOException; import java.net.URI; @@ -33,6 +33,7 @@ import java.util.stream.Collectors; import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; @@ -52,6 +53,7 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.mapreduce.WALInputFormat; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.hbase.mapreduce.WALInputFormat; import org.apache.hadoop.hbase.mapreduce.WALPlayer; @@ -251,8 +253,12 @@ private void mergeSplitAndCopyBulkloadedHFiles(List activeFiles, private void mergeSplitAndCopyBulkloadedHFiles(List files, TableName tn, FileSystem tgtFs) throws IOException { MapReduceHFileSplitterJob player = new MapReduceHFileSplitterJob(); + Configuration conf = new Configuration(this.conf); conf.set(MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY, getBulkOutputDirForTable(tn).toString()); + if (backupInfo.isContinuousBackupEnabled()) { + conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); + } player.setConf(conf); String inputDirs = StringUtils.join(files, ","); @@ -361,10 +367,26 @@ public void execute() throws IOException, ColumnFamilyMismatchException { setupRegionLocator(); // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT convertWALsToHFiles(tablesToWALFileList, tablesToPrevBackupTs); - incrementalCopyHFiles(new String[] { getBulkOutputDir().toString() }, - backupInfo.getBackupRootDir()); + + String[] bulkOutputFiles; + String backupDest = backupInfo.getBackupRootDir(); + if (backupInfo.isContinuousBackupEnabled()) { + // For the continuous backup case, the WALs have been converted to HFiles in a separate + // map-reduce job for each table. In order to prevent MR job failures due to HBASE-29891, + // these HFiles were sent to a different output directory for each table. This means + // continuous backups require a list of source directories and a different destination + // directory when copying HFiles to the incremental backup directory. + List uniqueNamespaces = tablesToWALFileList.keySet().stream() + .map(TableName::getNamespaceAsString).distinct().toList(); + bulkOutputFiles = uniqueNamespaces.stream() + .map(ns -> new Path(getBulkOutputDir(), ns).toString()).toArray(String[]::new); + backupDest = backupDest + Path.SEPARATOR + backupId; + } else { + bulkOutputFiles = new String[] { getBulkOutputDir().toString() }; + } + incrementalCopyHFiles(bulkOutputFiles, backupDest); } catch (Exception e) { - String msg = "Unexpected exception in incremental-backup: incremental copy " + backupId; + String msg = "Unexpected exception in incremental-backup: incremental copy " + backupId + " "; // fail the overall backup and return failBackup(conn, backupInfo, backupManager, e, msg, BackupType.INCREMENTAL, conf); throw new IOException(e); @@ -382,10 +404,15 @@ public void execute() throws IOException, ColumnFamilyMismatchException { // The table list in backupInfo is good for both full backup and incremental backup. // For incremental backup, it contains the incremental backup table set. backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps); - } - Map> newTableSetTimestampMap = - backupManager.readLogTimestampMap(); + Map> newTableSetTimestampMap = + backupManager.readLogTimestampMap(); + + backupInfo.setTableSetTimestampMap(newTableSetTimestampMap); + Long newStartCode = + BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap)); + backupManager.writeBackupStartCode(newStartCode); + } List bulkLoads = handleBulkLoad(backupInfo.getTableNames(), tablesToWALFileList, tablesToPrevBackupTs); @@ -418,7 +445,8 @@ protected void incrementalCopyHFiles(String[] files, String backupDest) throws I System.arraycopy(files, 0, strArr, 0, files.length); strArr[strArr.length - 1] = backupDest; - String jobname = "Incremental_Backup-HFileCopy-" + backupInfo.getBackupId(); + String jobname = "Incremental_Backup-HFileCopy-" + backupInfo.getBackupId() + "-" + + System.currentTimeMillis(); if (LOG.isDebugEnabled()) { LOG.debug("Setting incremental copy HFiles job name to : " + jobname); } @@ -464,8 +492,7 @@ protected void convertWALsToHFiles(Map> tablesToWALFileL Path walBackupPath = new Path(walBackupDir); Set tableSet = backupInfo.getTables(); currentBackupTs = backupInfo.getIncrCommittedWalTs(); - List backupInfos = - backupManager.getBackupHistory(withState(BackupInfo.BackupState.COMPLETE)); + List backupInfos = backupManager.getBackupHistory(true); for (TableName table : tableSet) { for (BackupInfo backup : backupInfos) { // find previous backup for this table @@ -517,23 +544,25 @@ protected boolean tableExists(TableName table, Connection conn) throws IOExcepti protected void walToHFiles(List dirPaths, List tableList, long previousBackupTs) throws IOException { Tool player = new WALPlayer(); + Configuration conf = new Configuration(this.conf); // Player reads all files in arbitrary directory structure and creates // a Map task for each file. We use ';' as separator // because WAL file names contains ',' String dirs = StringUtils.join(dirPaths, ';'); - String jobname = "Incremental_Backup-" + backupId; + String jobname = "Incremental_Backup-" + backupId + "-" + System.currentTimeMillis(); - Path bulkOutputPath = getBulkOutputDir(); - conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); + setBulkOutputPath(conf, tableList); conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";"); conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true); conf.set(JOB_NAME_CONF_KEY, jobname); - boolean diskBasedSortingEnabledOriginalValue = HFileOutputFormat2.diskBasedSortingEnabled(conf); conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, true); if (backupInfo.isContinuousBackupEnabled()) { conf.set(WALInputFormat.START_TIME_KEY, Long.toString(previousBackupTs)); conf.set(WALInputFormat.END_TIME_KEY, Long.toString(backupInfo.getIncrCommittedWalTs())); + // We do not want a multi-table HFile format here because continuous backups run the WALPlayer + // individually on each table in the backup set. + conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); } String[] playerArgs = { dirs, StringUtils.join(tableList, ",") }; @@ -548,43 +577,70 @@ protected void walToHFiles(List dirPaths, List tableList, long p } catch (Exception ee) { throw new IOException("Can not convert from directory " + dirs + " (check Hadoop, HBase and WALPlayer M/R job logs) ", ee); - } finally { - conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, - diskBasedSortingEnabledOriginalValue); - conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY); - conf.unset(JOB_NAME_CONF_KEY); } } + private void setBulkOutputPath(Configuration conf, List tableList) { + Path bulkOutputPath = getBulkOutputDir(); + if (backupInfo.isContinuousBackupEnabled()) { + if (tableList.size() != 1) { + // Continuous backups run the WALPlayer job on one table at a time, so the list of tables + // should have only one element. + throw new RuntimeException( + "Expected table list to have only one element, but got: " + tableList); + } + bulkOutputPath = getTmpBackupDirForTable(TableName.valueOf(tableList.get(0))); + } + conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); + } + private void incrementalCopyBulkloadHFiles(FileSystem tgtFs, TableName tn) throws IOException { Path bulkOutDir = getBulkOutputDirForTable(tn); + Configuration conf = new Configuration(this.conf); if (tgtFs.exists(bulkOutDir)) { conf.setInt(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 2); Path tgtPath = getTargetDirForTable(tn); - try { - RemoteIterator locatedFiles = tgtFs.listFiles(bulkOutDir, true); - List files = new ArrayList<>(); - while (locatedFiles.hasNext()) { - LocatedFileStatus file = locatedFiles.next(); - if (file.isFile() && HFile.isHFileFormat(tgtFs, file.getPath())) { - files.add(file.getPath().toString()); - } + RemoteIterator locatedFiles = tgtFs.listFiles(bulkOutDir, true); + List files = new ArrayList<>(); + while (locatedFiles.hasNext()) { + LocatedFileStatus file = locatedFiles.next(); + if (file.isFile() && HFile.isHFileFormat(tgtFs, file.getPath())) { + files.add(file.getPath().toString()); } - incrementalCopyHFiles(files.toArray(files.toArray(new String[0])), tgtPath.toString()); - } finally { - conf.unset(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY); } + incrementalCopyHFiles(files.toArray(files.toArray(new String[0])), tgtPath.toString()); } } + /** + * Creates a path to the bulk load output directory for a table. This directory will look like: + * .../backupRoot/.tmp/backupId/namespace/table/data + * @param table The table whose HFiles are being bulk loaded + * @return A Path object representing the directory + */ protected Path getBulkOutputDirForTable(TableName table) { + Path tablePath = getTmpBackupDirForTable(table); + return new Path(tablePath, "data"); + } + + /** + * Creates a path to a table's directory within the temporary directory. This directory will look + * like: .../backupRoot/.tmp/backupId/namespace/table + * @param table The table whose HFiles are being bulk loaded + * @return A Path object representing the directory + */ + protected Path getTmpBackupDirForTable(TableName table) { Path tablePath = getBulkOutputDir(); tablePath = new Path(tablePath, table.getNamespaceAsString()); - tablePath = new Path(tablePath, table.getQualifierAsString()); - return new Path(tablePath, "data"); + return new Path(tablePath, table.getQualifierAsString()); } + /** + * Creates a path to a temporary backup directory. This directory will look like: + * .../backupRoot/.tmp/backupId + * @return A Path object representing the directory + */ protected Path getBulkOutputDir() { String backupId = backupInfo.getBackupId(); Path path = new Path(backupInfo.getBackupRootDir()); @@ -593,6 +649,12 @@ protected Path getBulkOutputDir() { return path; } + /** + * Creates a path to a destination directory for bulk loaded HFiles. This directory will look + * like: .../backupRoot/backupID/namespace/table + * @param table The table whose HFiles are being bulk loaded + * @return A Path object representing the directory + */ private Path getTargetDirForTable(TableName table) { Path path = new Path(backupInfo.getBackupRootDir() + Path.SEPARATOR + backupInfo.getBackupId()); path = new Path(path, table.getNamespaceAsString()); diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java new file mode 100644 index 000000000000..225d32172766 --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.replication; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Initializes and organizes backup directories for continuous Write-Ahead Logs (WALs) and + * bulk-loaded files within the specified backup root directory. + */ +@InterfaceAudience.Private +public class BackupFileSystemManager { + private static final Logger LOG = LoggerFactory.getLogger(BackupFileSystemManager.class); + + public static final String WALS_DIR = "WALs"; + public static final String BULKLOAD_FILES_DIR = "bulk-load-files"; + private final String peerId; + private final FileSystem backupFs; + private final Path backupRootDir; + private final Path walsDir; + private final Path bulkLoadFilesDir; + + public BackupFileSystemManager(String peerId, Configuration conf, String backupRootDirStr) + throws IOException { + this.peerId = peerId; + this.backupRootDir = new Path(backupRootDirStr); + this.backupFs = FileSystem.get(backupRootDir.toUri(), conf); + this.walsDir = createDirectory(WALS_DIR); + this.bulkLoadFilesDir = createDirectory(BULKLOAD_FILES_DIR); + } + + private Path createDirectory(String dirName) throws IOException { + Path dirPath = new Path(backupRootDir, dirName); + backupFs.mkdirs(dirPath); + LOG.info("{} Initialized directory: {}", Utils.logPeerId(peerId), dirPath); + return dirPath; + } + + public Path getWalsDir() { + return walsDir; + } + + public Path getBulkLoadFilesDir() { + return bulkLoadFilesDir; + } + + public FileSystem getBackupFs() { + return backupFs; + } +} diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadProcessor.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadProcessor.java new file mode 100644 index 000000000000..6e1271313bcd --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadProcessor.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.replication; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; + +/** + * Processes bulk load files from Write-Ahead Log (WAL) entries for HBase replication. + *

+ * This utility class extracts and constructs the file paths of bulk-loaded files based on WAL + * entries. It processes bulk load descriptors and their associated store descriptors to generate + * the paths for each bulk-loaded file. + *

+ * The class is designed for scenarios where replicable bulk load operations need to be parsed and + * their file paths need to be determined programmatically. + *

+ */ +@InterfaceAudience.Private +public final class BulkLoadProcessor { + private BulkLoadProcessor() { + } + + public static List processBulkLoadFiles(List walEntries) throws IOException { + List bulkLoadFilePaths = new ArrayList<>(); + + for (WAL.Entry entry : walEntries) { + WALEdit edit = entry.getEdit(); + for (Cell cell : edit.getCells()) { + if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { + TableName tableName = entry.getKey().getTableName(); + String namespace = tableName.getNamespaceAsString(); + String table = tableName.getQualifierAsString(); + bulkLoadFilePaths.addAll(processBulkLoadDescriptor(cell, namespace, table)); + } + } + } + return bulkLoadFilePaths; + } + + private static List processBulkLoadDescriptor(Cell cell, String namespace, String table) + throws IOException { + List bulkLoadFilePaths = new ArrayList<>(); + WALProtos.BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); + + if (bld == null || !bld.getReplicate() || bld.getEncodedRegionName() == null) { + return bulkLoadFilePaths; // Skip if not replicable + } + + String regionName = bld.getEncodedRegionName().toStringUtf8(); + for (WALProtos.StoreDescriptor storeDescriptor : bld.getStoresList()) { + bulkLoadFilePaths + .addAll(processStoreDescriptor(storeDescriptor, namespace, table, regionName)); + } + + return bulkLoadFilePaths; + } + + private static List processStoreDescriptor(WALProtos.StoreDescriptor storeDescriptor, + String namespace, String table, String regionName) { + List paths = new ArrayList<>(); + String columnFamily = storeDescriptor.getFamilyName().toStringUtf8(); + + for (String storeFile : storeDescriptor.getStoreFileList()) { + paths.add(new Path(namespace, + new Path(table, new Path(regionName, new Path(columnFamily, storeFile))))); + } + + return paths; + } +} diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java index 19cd2733af7b..afd6d69d8b14 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hbase.backup.replication; -import com.google.errorprone.annotations.RestrictedApi; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.UncheckedIOException; import java.util.List; @@ -31,22 +29,18 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; -import org.apache.hadoop.hbase.backup.util.BackupFileSystemManager; import org.apache.hadoop.hbase.backup.util.BackupUtils; -import org.apache.hadoop.hbase.backup.util.BulkLoadProcessor; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; -import org.apache.hadoop.hbase.replication.EmptyEntriesPolicy; import org.apache.hadoop.hbase.replication.ReplicationResult; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.util.CommonFSUtils; @@ -211,14 +205,6 @@ protected void doStart() { notifyStarted(); } - @Override - public EmptyEntriesPolicy getEmptyEntriesPolicy() { - // Since this endpoint writes to S3 asynchronously, an empty entry batch - // does not guarantee that all previously submitted entries were persisted. - // Hence, avoid committing the WAL position. - return EmptyEntriesPolicy.SUBMIT; - } - @Override public ReplicationResult replicate(ReplicateContext replicateContext) { final List entries = replicateContext.getEntries(); @@ -303,29 +289,26 @@ private void backupWalEntries(long day, List walEntries) throws IOExc try { FSHLogProvider.Writer walWriter = walWriters.computeIfAbsent(day, this::createWalWriter); + List bulkLoadFiles = BulkLoadProcessor.processBulkLoadFiles(walEntries); + + if (LOG.isTraceEnabled()) { + LOG.trace("{} Processed {} bulk load files for WAL entries", Utils.logPeerId(peerId), + bulkLoadFiles.size()); + LOG.trace("{} Bulk load files: {}", Utils.logPeerId(peerId), + bulkLoadFiles.stream().map(Path::toString).collect(Collectors.joining(", "))); + } for (WAL.Entry entry : walEntries) { walWriter.append(entry); } - walWriter.sync(true); + uploadBulkLoadFiles(day, bulkLoadFiles); } catch (UncheckedIOException e) { String errorMsg = Utils.logPeerId(peerId) + " Failed to get or create WAL Writer for " + day; LOG.error("{} Backup failed for day {}. Error: {}", Utils.logPeerId(peerId), day, e.getMessage(), e); throw new IOException(errorMsg, e); } - - List bulkLoadFiles = BulkLoadProcessor.processBulkLoadFiles(walEntries); - - if (LOG.isTraceEnabled()) { - LOG.trace("{} Processed {} bulk load files for WAL entries", Utils.logPeerId(peerId), - bulkLoadFiles.size()); - LOG.trace("{} Bulk load files: {}", Utils.logPeerId(peerId), - bulkLoadFiles.stream().map(Path::toString).collect(Collectors.joining(", "))); - } - - uploadBulkLoadFiles(day, bulkLoadFiles); } private FSHLogProvider.Writer createWalWriter(long dayInMillis) { @@ -389,17 +372,7 @@ private void close() { } } - @RestrictedApi( - explanation = "Package-private for test visibility only. Do not use outside tests.", - link = "", - allowedOnPath = "(.*/src/test/.*|.*/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java)") - void uploadBulkLoadFiles(long dayInMillis, List bulkLoadFiles) - throws BulkLoadUploadException { - if (bulkLoadFiles.isEmpty()) { - LOG.debug("{} No bulk load files to upload for {}", Utils.logPeerId(peerId), dayInMillis); - return; - } - + private void uploadBulkLoadFiles(long dayInMillis, List bulkLoadFiles) throws IOException { LOG.debug("{} Starting upload of {} bulk load files", Utils.logPeerId(peerId), bulkLoadFiles.size()); @@ -409,36 +382,18 @@ void uploadBulkLoadFiles(long dayInMillis, List bulkLoadFiles) } String dayDirectoryName = BackupUtils.formatToDateString(dayInMillis); Path bulkloadDir = new Path(backupFileSystemManager.getBulkLoadFilesDir(), dayDirectoryName); - try { - backupFileSystemManager.getBackupFs().mkdirs(bulkloadDir); - } catch (IOException e) { - throw new BulkLoadUploadException( - String.format("%s Failed to create bulkload directory in backupFS: %s", - Utils.logPeerId(peerId), bulkloadDir), - e); - } + backupFileSystemManager.getBackupFs().mkdirs(bulkloadDir); for (Path file : bulkLoadFiles) { - Path sourcePath; - try { - sourcePath = getBulkLoadFileStagingPath(file); - } catch (FileNotFoundException fnfe) { - throw new BulkLoadUploadException( - String.format("%s Bulk load file not found: %s", Utils.logPeerId(peerId), file), fnfe); - } catch (IOException ioe) { - throw new BulkLoadUploadException( - String.format("%s Failed to resolve source path for: %s", Utils.logPeerId(peerId), file), - ioe); - } - + Path sourcePath = getBulkLoadFileStagingPath(file); Path destPath = new Path(bulkloadDir, file); try { LOG.debug("{} Copying bulk load file from {} to {}", Utils.logPeerId(peerId), sourcePath, destPath); - copyWithCleanup(CommonFSUtils.getRootDirFileSystem(conf), sourcePath, - backupFileSystemManager.getBackupFs(), destPath, conf); + FileUtil.copy(CommonFSUtils.getRootDirFileSystem(conf), sourcePath, + backupFileSystemManager.getBackupFs(), destPath, false, conf); LOG.info("{} Bulk load file {} successfully backed up to {}", Utils.logPeerId(peerId), file, destPath); @@ -507,6 +462,14 @@ private Path getBulkLoadFileStagingPath(Path relativePathFromNamespace) throws I Path result = findExistingPath(rootFs, baseNamespaceDir, hFileArchiveDir, relativePathFromNamespace); + + if (result == null) { + LOG.error("{} No bulk loaded file found in relative path: {}", Utils.logPeerId(peerId), + relativePathFromNamespace); + throw new IOException( + "No Bulk loaded file found in relative path: " + relativePathFromNamespace); + } + LOG.debug("{} Bulk load file found at {}", Utils.logPeerId(peerId), result); return result; } @@ -521,12 +484,11 @@ private static Path findExistingPath(FileSystem rootFs, Path baseNamespaceDir, for (Path candidate : new Path[] { new Path(baseNamespaceDir, filePath), new Path(hFileArchiveDir, filePath) }) { if (rootFs.exists(candidate)) { + LOG.debug("Found bulk load file at: {}", candidate); return candidate; } } - - throw new FileNotFoundException("Bulk load file not found at either: " - + new Path(baseNamespaceDir, filePath) + " or " + new Path(hFileArchiveDir, filePath)); + return null; } private void shutdownFlushExecutor() { diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java index c765fd504742..f26b562e5fe1 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java @@ -39,6 +39,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.TimeZone; +import java.util.TreeMap; import java.util.TreeSet; import java.util.function.Predicate; import java.util.stream.Collectors; diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java index 50b47565d743..93cd4b6b7321 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.backup.util; +import static org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY; + import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; @@ -33,9 +35,11 @@ import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.NamespaceNotFoundException; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupInfo; import org.apache.hadoop.hbase.backup.BackupRestoreFactory; import org.apache.hadoop.hbase.backup.HBackupFileSystem; import org.apache.hadoop.hbase.backup.RestoreJob; +import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.Connection; @@ -158,11 +162,17 @@ void modifyTableSync(Connection conn, TableDescriptor desc) throws IOException { public void incrementalRestoreTable(Connection conn, Path tableBackupPath, Path[] logDirs, TableName[] tableNames, TableName[] newTableNames, String incrBackupId, boolean keepOriginalSplits) throws IOException { - try (Admin admin = conn.getAdmin()) { + try (Admin admin = conn.getAdmin(); BackupAdminImpl backupAdmin = new BackupAdminImpl(conn)) { if (tableNames.length != newTableNames.length) { throw new IOException("Number of source tables and target tables does not match!"); } - FileSystem fileSys = tableBackupPath.getFileSystem(this.conf); + Configuration conf = new Configuration(this.conf); + FileSystem fileSys = tableBackupPath.getFileSystem(conf); + + BackupInfo backupInfo = backupAdmin.getBackupInfo(incrBackupId); + if (backupInfo.isContinuousBackupEnabled()) { + conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); + } // for incremental backup image, expect the table already created either by user or previous // full backup. Here, check that all new tables exists diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/PITRTestUtil.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/PITRTestUtil.java index 24f5237866db..ae26cf960501 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/PITRTestUtil.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/PITRTestUtil.java @@ -28,17 +28,9 @@ import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.tool.BulkLoadHFiles; -import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.HFileTestUtil; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -112,32 +104,4 @@ public static int getRowCount(HBaseTestingUtil testUtil, TableName tableName) th return HBaseTestingUtil.countRows(table); } } - - public static void generateHFiles(Path outputDir, Configuration conf, String cfName) - throws IOException { - String hFileName = "MyHFile"; - int numRows = 1000; - - FileSystem fs = FileSystem.get(conf); - outputDir = outputDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); - - byte[] from = Bytes.toBytes(cfName + "begin"); - byte[] to = Bytes.toBytes(cfName + "end"); - - Path familyDir = new Path(outputDir, cfName); - HFileTestUtil.createHFile(conf, fs, new Path(familyDir, hFileName), Bytes.toBytes(cfName), - Bytes.toBytes("qualifier"), from, to, numRows); - } - - public static void bulkLoadHFiles(TableName tableName, Path inputDir, Connection conn, - Configuration conf) throws IOException { - conf.setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, true); - - try (Table table = conn.getTable(tableName)) { - BulkLoadHFiles loader = new BulkLoadHFilesTool(conf); - loader.bulkLoad(table.getName(), inputDir); - } finally { - conf.setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, false); - } - } } diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java index 16381325dbbb..35a489954793 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java @@ -195,6 +195,10 @@ public void execute() throws IOException { Map> newTableSetTimestampMap = backupManager.readLogTimestampMap(); + Long newStartCode = + BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap)); + backupManager.writeBackupStartCode(newStartCode); + handleBulkLoad(backupInfo.getTableNames(), new HashMap<>(), new HashMap<>()); failStageIf(Stage.stage_4); @@ -404,12 +408,13 @@ protected BackupRequest createBackupRequest(BackupType type, List tab return request; } - protected BackupRequest createBackupRequest(BackupType type, List tables, - String rootDir, boolean noChecksumVerify, boolean isContinuousBackupEnabled) { + protected BackupRequest createBackupRequest(BackupType type, List tables, String path, + boolean noChecksumVerify, boolean continuousBackupEnabled) { BackupRequest.Builder builder = new BackupRequest.Builder(); - return builder.withBackupType(type).withTableList(tables).withTargetRootDir(rootDir) - .withNoChecksumVerify(noChecksumVerify).withContinuousBackupEnabled(isContinuousBackupEnabled) - .build(); + BackupRequest request = builder.withBackupType(type).withTableList(tables) + .withTargetRootDir(path).withNoChecksumVerify(noChecksumVerify) + .withContinuousBackupEnabled(continuousBackupEnabled).build(); + return request; } protected String backupTables(BackupType type, List tables, String path) diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDelete.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDelete.java index 40b179833a34..daa6b378cf2a 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDelete.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDelete.java @@ -17,9 +17,9 @@ */ package org.apache.hadoop.hbase.backup; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.ByteArrayOutputStream; import java.io.PrintStream; @@ -32,8 +32,9 @@ import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.util.ToolRunner; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java index c6c6f5e9799e..f429ae4df68b 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java @@ -18,10 +18,9 @@ package org.apache.hadoop.hbase.backup; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; -import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER; +import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR; +import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; -import static org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.BULKLOAD_FILES_DIR; -import static org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.WALS_DIR; import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -29,25 +28,18 @@ import java.io.IOException; import java.text.SimpleDateFormat; -import java.util.ArrayList; import java.util.Date; -import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; -import org.apache.hadoop.hbase.backup.util.BackupFileSystemManager; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.util.ToolRunner; -import org.junit.After; -import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -63,55 +55,38 @@ public class TestBackupDeleteWithCleanup extends TestBackupBase { String backupWalDirName = "TestBackupDeleteWithCleanup"; - private FileSystem fs; - private Path backupWalDir; - private BackupSystemTable backupSystemTable; - - @Before - public void setUpTest() throws Exception { + @Test + public void testBackupDeleteWithCleanupLogic() throws Exception { Path root = TEST_UTIL.getDataTestDirOnTestFS(); - backupWalDir = new Path(root, backupWalDirName); + Path backupWalDir = new Path(root, backupWalDirName); conf1.set(CONF_CONTINUOUS_BACKUP_WAL_DIR, backupWalDir.toString()); - fs = FileSystem.get(conf1); + FileSystem fs = FileSystem.get(conf1); fs.mkdirs(backupWalDir); - backupSystemTable = new BackupSystemTable(TEST_UTIL.getConnection()); - } - - @After - public void tearDownTest() throws Exception { - if (backupSystemTable != null) { - backupSystemTable.close(); - } - if (fs != null && backupWalDir != null) { - fs.delete(backupWalDir, true); - } - - EnvironmentEdgeManager.reset(); - } - @Test - public void testBackupDeleteWithCleanupLogic() throws Exception { // Step 1: Setup Backup Folders long currentTime = EnvironmentEdgeManager.getDelegate().currentTime(); - setupBackupFolders(currentTime); + setupBackupFolders(fs, backupWalDir, currentTime); // Log the directory structure before cleanup logDirectoryStructure(fs, backupWalDir, "Directory structure BEFORE cleanup:"); // Step 2: Simulate Backup Creation + BackupSystemTable backupSystemTable = new BackupSystemTable(TEST_UTIL.getConnection()); backupSystemTable.addContinuousBackupTableSet(Set.of(table1), currentTime - (2 * ONE_DAY_IN_MILLISECONDS)); EnvironmentEdgeManager .injectEdge(() -> System.currentTimeMillis() - (2 * ONE_DAY_IN_MILLISECONDS)); - String backupId = fullTableBackup(Lists.newArrayList(table1)); assertTrue(checkSucceeded(backupId)); + String anotherBackupId = fullTableBackup(Lists.newArrayList(table1)); assertTrue(checkSucceeded(anotherBackupId)); // Step 3: Run Delete Command - deleteBackup(backupId); + int ret = + ToolRunner.run(conf1, new BackupDriver(), new String[] { "delete", "-l", backupId, "-fd" }); + assertEquals(0, ret); // Log the directory structure after cleanup logDirectoryStructure(fs, backupWalDir, "Directory structure AFTER cleanup:"); @@ -121,70 +96,6 @@ public void testBackupDeleteWithCleanupLogic() throws Exception { // Step 5: Verify System Table Update verifySystemTableUpdate(backupSystemTable, currentTime); - - // Cleanup - deleteBackup(anotherBackupId); - } - - @Test - public void testSingleBackupForceDelete() throws Exception { - // Step 1: Setup Backup Folders - long currentTime = EnvironmentEdgeManager.getDelegate().currentTime(); - setupBackupFolders(currentTime); - - // Log the directory structure before cleanup - logDirectoryStructure(fs, backupWalDir, "Directory structure BEFORE cleanup:"); - - // Step 2: Simulate Backup Creation - backupSystemTable.addContinuousBackupTableSet(Set.of(table1), - currentTime - (2 * ONE_DAY_IN_MILLISECONDS)); - - EnvironmentEdgeManager - .injectEdge(() -> System.currentTimeMillis() - (2 * ONE_DAY_IN_MILLISECONDS)); - - String backupId = fullTableBackupWithContinuous(Lists.newArrayList(table1)); - assertTrue(checkSucceeded(backupId)); - - assertTrue("Backup replication peer should be enabled after the backup", - continuousBackupReplicationPeerExistsAndEnabled()); - - // Step 3: Run Delete Command - deleteBackup(backupId); - - // Log the directory structure after cleanup - logDirectoryStructure(fs, backupWalDir, "Directory structure AFTER cleanup:"); - - // Step 4: Verify CONTINUOUS_BACKUP_REPLICATION_PEER is disabled - assertFalse("Backup replication peer should be disabled or removed", - continuousBackupReplicationPeerExistsAndEnabled()); - - // Step 5: Verify that system table is updated to remove all the tables - Set remainingTables = backupSystemTable.getContinuousBackupTableSet().keySet(); - assertTrue("System table should have no tables after all full backups are clear", - remainingTables.isEmpty()); - - // Step 6: Verify that the backup WAL directory is empty - assertTrue("WAL backup directory should be empty after force delete", - areWalAndBulkloadDirsEmpty(conf1, backupWalDir.toString())); - - // Step 7: Take new full backup with continuous backup enabled - String backupIdContinuous = fullTableBackupWithContinuous(Lists.newArrayList(table1)); - - // Step 8: Verify CONTINUOUS_BACKUP_REPLICATION_PEER is enabled again - assertTrue("Backup replication peer should be re-enabled after new backup", - continuousBackupReplicationPeerExistsAndEnabled()); - - // And system table has new entry - Set newTables = backupSystemTable.getContinuousBackupTableSet().keySet(); - assertTrue("System table should contain the table after new backup", - newTables.contains(table1)); - - // Cleanup - deleteBackup(backupIdContinuous); - } - - private void setupBackupFolders(long currentTime) throws IOException { - setupBackupFolders(fs, backupWalDir, currentTime); } public static void setupBackupFolders(FileSystem fs, Path backupWalDir, long currentTime) @@ -270,45 +181,4 @@ public static void listDirectory(FileSystem fs, Path dir, String indent) throws } } } - - private boolean continuousBackupReplicationPeerExistsAndEnabled() throws IOException { - return TEST_UTIL.getAdmin().listReplicationPeers().stream().anyMatch( - peer -> peer.getPeerId().equals(CONTINUOUS_BACKUP_REPLICATION_PEER) && peer.isEnabled()); - } - - private static boolean areWalAndBulkloadDirsEmpty(Configuration conf, String backupWalDir) - throws IOException { - BackupFileSystemManager manager = - new BackupFileSystemManager(CONTINUOUS_BACKUP_REPLICATION_PEER, conf, backupWalDir); - - FileSystem fs = manager.getBackupFs(); - Path walDir = manager.getWalsDir(); - Path bulkloadDir = manager.getBulkLoadFilesDir(); - - return isDirectoryEmpty(fs, walDir) && isDirectoryEmpty(fs, bulkloadDir); - } - - private static boolean isDirectoryEmpty(FileSystem fs, Path dirPath) throws IOException { - if (!fs.exists(dirPath)) { - // Directory doesn't exist — treat as empty - return true; - } - FileStatus[] entries = fs.listStatus(dirPath); - return entries == null || entries.length == 0; - } - - private static void deleteBackup(String backupId) throws Exception { - int ret = - ToolRunner.run(conf1, new BackupDriver(), new String[] { "delete", "-l", backupId, "-fd" }); - assertEquals(0, ret); - } - - private String fullTableBackupWithContinuous(List tables) throws IOException { - try (BackupAdmin admin = new BackupAdminImpl(TEST_UTIL.getConnection())) { - BackupRequest request = - createBackupRequest(BackupType.FULL, new ArrayList<>(tables), BACKUP_ROOT_DIR, false, true); - return admin.backupTables(request); - } - } - } diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithContinuousBackupAndPITR.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithContinuousBackupAndPITR.java index 928dff1801c2..248e8e7b757c 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithContinuousBackupAndPITR.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithContinuousBackupAndPITR.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.backup; -import static org.apache.hadoop.hbase.backup.BackupInfo.withState; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; import static org.junit.Assert.assertEquals; @@ -268,7 +267,7 @@ private String[] buildBackupDeleteArgs(String backupId, boolean isForceDelete) { } private BackupInfo getBackupInfoById(String backupId) throws IOException { - return backupSystemTable.getBackupHistory(withState(BackupInfo.BackupState.COMPLETE)).stream() + return backupSystemTable.getBackupInfos(BackupInfo.BackupState.COMPLETE).stream() .filter(b -> b.getBackupId().equals(backupId)).findFirst() .orElseThrow(() -> new IllegalStateException("Backup should exist: " + backupId)); } diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDescribe.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDescribe.java index c6453cca5cd3..d983f9441fd6 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDescribe.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDescribe.java @@ -21,15 +21,16 @@ import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_ENABLE_CONTINUOUS_BACKUP; import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_DEFAULT; import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; import java.io.ByteArrayOutputStream; import java.io.PrintStream; import java.util.List; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; import org.apache.hadoop.hbase.backup.impl.BackupCommands; @@ -128,7 +129,7 @@ public void testBackupDescribeCommandForContinuousBackup() throws Exception { String[] backupArgs = new String[] { "create", BackupType.FULL.name(), BACKUP_ROOT_DIR, "-t", table1.getNameAsString(), "-" + OPTION_ENABLE_CONTINUOUS_BACKUP }; int ret = ToolRunner.run(conf1, new BackupDriver(), backupArgs); - assertEquals(0, ret, "Backup should succeed"); + assertEquals("Backup should succeed", 0, ret); List backups = table.getBackupHistory(); String backupId = backups.get(0).getBackupId(); assertTrue(checkSucceeded(backupId)); @@ -164,7 +165,7 @@ public void testBackupDescribeCommandForContinuousBackup() throws Exception { backupArgs = new String[] { "create", BackupType.INCREMENTAL.name(), BACKUP_ROOT_DIR, "-t", table1.getNameAsString() }; ret = ToolRunner.run(conf1, new BackupDriver(), backupArgs); - assertEquals(0, ret, "Incremental Backup should succeed"); + assertEquals("Incremental Backup should succeed", 0, ret); backups = table.getBackupHistory(); String incrBackupId = backups.get(0).getBackupId(); assertTrue(checkSucceeded(incrBackupId)); diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupUtils.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupUtils.java index 86df4e316a47..02f092216dc3 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupUtils.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupUtils.java @@ -17,10 +17,9 @@ */ package org.apache.hadoop.hbase.backup; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.security.PrivilegedAction; @@ -42,9 +41,11 @@ import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.security.UserGroupInformation; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,7 +60,7 @@ public class TestBackupUtils { private static FileSystem dummyFs; private static Path backupRootDir; - @BeforeAll + @BeforeClass public static void setUp() throws IOException { dummyFs = TEST_UTIL.getTestFileSystem(); backupRootDir = TEST_UTIL.getDataTestDirOnTestFS("backupUT"); @@ -198,22 +199,21 @@ protected void testGetValidWalDirs(long startTime, long endTime, Path walDir, backupRootDir, startTime, endTime); // Verify the correct number of valid WAL dirs was found - assertEquals(numExpectedValidWalDirs, validWalDirs.size(), - "The number of valid WAL dirs should be " + numExpectedValidWalDirs + " for time zone " - + timeZone); + assertEquals("The number of valid WAL dirs should be " + numExpectedValidWalDirs + + " for time zone " + timeZone, numExpectedValidWalDirs, validWalDirs.size()); // Verify the list of valid WAL dirs is as expected for (String dirName : expectedValidWalDirs) { - assertTrue(validWalDirs.stream().anyMatch(path -> path.endsWith("/" + dirName)), - "Expected " + dirName + " to be a valid WAL dir"); + assertTrue("Expected " + dirName + " to be a valid WAL dir", + validWalDirs.stream().anyMatch(path -> path.endsWith("/" + dirName))); } // Verify the list of valid WAL dirs does not contain anything expected to be invalid List expectedInvalidWalDirs = new ArrayList<>(availableWalDateDirs); expectedInvalidWalDirs.removeAll(expectedValidWalDirs); for (String dirName : expectedInvalidWalDirs) { - assertFalse(validWalDirs.contains(dirName), - "Expected " + dirName + " to NOT be a valid WAL dir"); + assertFalse("Expected " + dirName + " to NOT be a valid WAL dir", + validWalDirs.contains(dirName)); } } } finally { diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java index c911f5dbce07..49b77fcdd909 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java @@ -22,11 +22,14 @@ import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -34,19 +37,26 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; import org.apache.hadoop.hbase.backup.impl.BackupManifest; import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; +import org.apache.hadoop.hbase.backup.impl.BulkLoad; import org.apache.hadoop.hbase.backup.util.BackupUtils; +import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.tool.BulkLoadHFiles; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.HFileTestUtil; +import org.apache.hadoop.util.ToolRunner; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; @@ -67,24 +77,25 @@ public class TestIncrementalBackupWithContinuous extends TestBackupBase { private static final Logger LOG = LoggerFactory.getLogger(TestIncrementalBackupWithContinuous.class); + private byte[] ROW = Bytes.toBytes("row1"); + private final byte[] FAMILY = Bytes.toBytes("family"); + private final byte[] COLUMN = Bytes.toBytes("col"); private static final int ROWS_IN_BULK_LOAD = 100; private static final String backupWalDirName = "TestContinuousBackupWalDir"; - private FileSystem fs; - @Before public void beforeTest() throws IOException { Path root = TEST_UTIL.getDataTestDirOnTestFS(); Path backupWalDir = new Path(root, backupWalDirName); conf1.set(CONF_CONTINUOUS_BACKUP_WAL_DIR, backupWalDir.toString()); conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true); - fs = FileSystem.get(conf1); } @After public void afterTest() throws IOException { Path root = TEST_UTIL.getDataTestDirOnTestFS(); Path backupWalDir = new Path(root, backupWalDirName); + FileSystem fs = FileSystem.get(conf1); if (fs.exists(backupWalDir)) { fs.delete(backupWalDir, true); } @@ -97,18 +108,20 @@ public void afterTest() throws IOException { public void testContinuousBackupWithIncrementalBackupSuccess() throws Exception { String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); TableName tableName = TableName.valueOf("table_" + methodName); - Table t1 = TEST_UTIL.createTable(tableName, famName); + Table t1 = TEST_UTIL.createTable(tableName, FAMILY); - try (BackupSystemTable table = new BackupSystemTable(TEST_UTIL.getConnection())) { - int before = table.getBackupHistory().size(); + try (BackupSystemTable backupSystemTable = new BackupSystemTable(TEST_UTIL.getConnection())) { + int before = backupSystemTable.getBackupHistory().size(); // Run continuous backup + LOG.info("Running full backup with continuous backup enabled on table: {}", tableName); String backup1 = backupTables(BackupType.FULL, List.of(tableName), BACKUP_ROOT_DIR, true); + LOG.info("Full backup complete with ID {} for table: {}", backup1, tableName); assertTrue(checkSucceeded(backup1)); // Verify backup history increased and all the backups are succeeded LOG.info("Verify backup history increased and all the backups are succeeded"); - List backups = table.getBackupHistory(); + List backups = backupSystemTable.getBackupHistory(); assertEquals("Backup history should increase", before + 1, backups.size()); // Verify backup manifest contains the correct tables @@ -117,36 +130,121 @@ public void testContinuousBackupWithIncrementalBackupSuccess() throws Exception assertEquals("Backup should contain the expected tables", Sets.newHashSet(tableName), new HashSet<>(manifest.getTableList())); - loadTable(t1); - Thread.sleep(10000); + Put p = new Put(ROW); + p.addColumn(FAMILY, COLUMN, COLUMN); + t1.put(p); + Thread.sleep(5000); // Run incremental backup - LOG.info("Run incremental backup now"); - before = table.getBackupHistory().size(); + LOG.info("Run incremental backup now on table: {}", tableName); + before = backupSystemTable.getBackupHistory().size(); String backup2 = backupTables(BackupType.INCREMENTAL, List.of(tableName), BACKUP_ROOT_DIR, true); assertTrue(checkSucceeded(backup2)); - LOG.info("Incremental backup completed"); - - // Verify the temporary backup directory was deleted - Path backupTmpDir = new Path(BACKUP_ROOT_DIR, ".tmp"); - Path bulkLoadOutputDir = new Path(backupTmpDir, backup2); - assertFalse("Bulk load output directory " + bulkLoadOutputDir + " should have been deleted", - fs.exists(bulkLoadOutputDir)); + LOG.info("Incremental backup completed for table: {}", tableName); // Verify backup history increased and all the backups are succeeded - backups = table.getBackupHistory(); + backups = backupSystemTable.getBackupHistory(); assertEquals("Backup history should increase", before + 1, backups.size()); + String originalTableChecksum = TEST_UTIL.checksumRows(t1); + + LOG.info("Truncating table: {}", tableName); TEST_UTIL.truncateTable(tableName); // Restore incremental backup TableName[] tables = new TableName[] { tableName }; BackupAdminImpl client = new BackupAdminImpl(TEST_UTIL.getConnection()); + LOG.info("Restoring table: {}", tableName); + // In the restore request, the original table is both the "from table" and the "to table". + // This means the table is being restored "into itself". It is not being restored into + // separate table. client.restore( BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backup2, false, tables, tables, true)); - assertEquals(NB_ROWS_IN_BATCH, TEST_UTIL.countRows(tableName)); + LOG.info("Verifying data integrity for restored table: {}", tableName); + verifyRestoredTableDataIntegrity(tables[0], originalTableChecksum, NB_ROWS_IN_BATCH); + } + } + + @Test + public void testMultiTableContinuousBackupWithIncrementalBackupSuccess() throws Exception { + String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); + List tables = new ArrayList<>(); + List tableNames = new ArrayList<>(); + tableNames.add(TableName.valueOf("table_" + methodName + "_0")); + tableNames.add(TableName.valueOf("table_" + methodName + "_1")); + tableNames.add(TableName.valueOf("ns1", "ns1_table_" + methodName + "_0")); + tableNames.add(TableName.valueOf("ns1", "ns1_table_" + methodName + "_1")); + tableNames.add(TableName.valueOf("sameTableNameDifferentNamespace")); + tableNames.add(TableName.valueOf("ns3", "sameTableNameDifferentNamespace")); + + for (TableName table : tableNames) { + LOG.info("Creating table: {}", table); + tables.add(TEST_UTIL.createTable(table, famName)); + } + + try (BackupSystemTable backupSystemTable = new BackupSystemTable(TEST_UTIL.getConnection())) { + int before = backupSystemTable.getBackupHistory().size(); + + // Run continuous backup on multiple tables + LOG.info("Running full backup with continuous backup enabled on tables: {}", tableNames); + String backup1 = backupTables(BackupType.FULL, tableNames, BACKUP_ROOT_DIR, true); + LOG.info("Full backup complete with ID {} for tables: {}", backup1, tableNames); + assertTrue(checkSucceeded(backup1)); + + // Verify backup history increased and all backups have succeeded + LOG.info("Verify backup history increased and all backups have succeeded"); + List backups = backupSystemTable.getBackupHistory(); + assertEquals("Backup history should increase", before + 1, backups.size()); + + // Verify backup manifest contains the correct tables + LOG.info("Verify backup manifest contains the correct tables"); + BackupManifest manifest = getLatestBackupManifest(backups); + assertEquals("Backup should contain the expected tables", Sets.newHashSet(tableNames), + new HashSet<>(manifest.getTableList())); + + loadTables(tables); + Thread.sleep(10000); + + // Run incremental backup + LOG.info("Running incremental backup on tables: {}", tableNames); + before = backupSystemTable.getBackupHistory().size(); + String backup2 = backupTables(BackupType.INCREMENTAL, tableNames, BACKUP_ROOT_DIR, true); + assertTrue(checkSucceeded(backup2)); + LOG.info("Incremental backup completed with ID {} for tables: {}", backup2, tableNames); + + // Verify backup history increased and all the backups are succeeded + backups = backupSystemTable.getBackupHistory(); + assertEquals("Backup history should increase", before + 1, backups.size()); + + // We need to get each table's original row checksum before truncating each table + LinkedHashMap originalTableChecksums = new LinkedHashMap<>(); + for (Table table : tables) { + LOG.info("Getting row checksum for table: {}", table); + originalTableChecksums.put(table.getName(), TEST_UTIL.checksumRows(table)); + } + + for (TableName tableName : tableNames) { + LOG.info("Truncating table: {}", tableName); + TEST_UTIL.truncateTable(tableName); + } + + // Restore incremental backup + TableName[] tableNamesArray = tableNames.toArray(new TableName[0]); + BackupAdminImpl client = new BackupAdminImpl(TEST_UTIL.getConnection()); + LOG.info("Restoring tables: {}", tableNames); + // In the restore request, the original tables are both the list of "from tables" and the + // list of "to tables". This means the tables are being restored "into themselves". They are + // not being restored into separate tables. + client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backup2, false, + tableNamesArray, tableNamesArray, true)); + + for (TableName tableName : originalTableChecksums.keySet()) { + LOG.info("Verifying data integrity for restored table: {}", tableName); + verifyRestoredTableDataIntegrity(tableName, originalTableChecksums.get(tableName), + NB_ROWS_IN_BATCH); + } } } @@ -162,13 +260,13 @@ public void testIncrementalBackupCopyingBulkloadTillIncrCommittedWalTs() throws assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1)); assertTrue(systemTable.readBulkloadRows(List.of(tableName1)).isEmpty()); - // Create continuous backup, bulk loads are now being tracked - String backup1 = backupTables(BackupType.FULL, List.of(tableName1), BACKUP_ROOT_DIR, true); + // Create a backup, bulk loads are now being tracked + String backup1 = backupTables(BackupType.FULL, List.of(table1), BACKUP_ROOT_DIR, true); assertTrue(checkSucceeded(backup1)); - loadTable(TEST_UTIL.getConnection().getTable(tableName1)); - expectedRowCount = expectedRowCount + NB_ROWS_IN_BATCH; - performBulkLoad("bulkPreIncr", methodName, tableName1); + loadTable(TEST_UTIL.getConnection().getTable(table1)); + assertEquals(expectedRowCount, TEST_UTIL.countRows(table1)); + performBulkLoad("bulk2", methodName); expectedRowCount += ROWS_IN_BULK_LOAD; assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1)); assertTrue(systemTable.readBulkloadRows(List.of(tableName1)).isEmpty()); @@ -194,8 +292,55 @@ public void testIncrementalBackupCopyingBulkloadTillIncrCommittedWalTs() throws } } - private void performBulkLoad(String keyPrefix, String testDir, TableName tableName) - throws IOException { + @Test + public void testPitrFailureDueToMissingBackupPostBulkload() throws Exception { + String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); + TableName tableName1 = TableName.valueOf("table_" + methodName); + TEST_UTIL.createTable(tableName1, famName); + try (BackupSystemTable systemTable = new BackupSystemTable(TEST_UTIL.getConnection())) { + + // The test starts with no data, and no bulk loaded rows. + int expectedRowCount = 0; + assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1)); + assertTrue(systemTable.readBulkloadRows(List.of(tableName1)).isEmpty()); + + // Create continuous backup, bulk loads are now being tracked + String backup1 = backupTables(BackupType.FULL, List.of(tableName1), BACKUP_ROOT_DIR, true); + assertTrue(checkSucceeded(backup1)); + + loadTable(TEST_UTIL.getConnection().getTable(tableName1)); + expectedRowCount = expectedRowCount + NB_ROWS_IN_BATCH; + performBulkLoad("bulkPreIncr", methodName, tableName1); + expectedRowCount += ROWS_IN_BULK_LOAD; + assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1)); + assertEquals(1, systemTable.readBulkloadRows(List.of(tableName1)).size()); + + loadTable(TEST_UTIL.getConnection().getTable(tableName1)); + Thread.sleep(5000); + + // Incremental backup + String backup2 = + backupTables(BackupType.INCREMENTAL, List.of(tableName1), BACKUP_ROOT_DIR, true); + assertTrue(checkSucceeded(backup2)); + assertEquals(0, systemTable.readBulkloadRows(List.of(tableName1)).size()); + + performBulkLoad("bulkPostIncr", methodName, tableName1); + assertEquals(1, systemTable.readBulkloadRows(List.of(tableName1)).size()); + + loadTable(TEST_UTIL.getConnection().getTable(tableName1)); + Thread.sleep(10000); + long restoreTs = BackupUtils.getReplicationCheckpoint(TEST_UTIL.getConnection()); + + // expect restore failure due to no backup post bulkPostIncr bulkload + TableName restoredTable = TableName.valueOf("restoredTable"); + String[] args = PITRTestUtil.buildPITRArgs(new TableName[] { tableName1 }, + new TableName[] { restoredTable }, restoreTs, null); + int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args); + assertNotEquals("Restore should fail since there is one bulkload without any backup", 0, ret); + } + } + + private void performBulkLoad(String keyPrefix, String testDir) throws IOException { FileSystem fs = TEST_UTIL.getTestFileSystem(); Path baseDirectory = TEST_UTIL.getDataTestDirOnTestFS(testDir); Path hfilePath = @@ -207,7 +352,7 @@ private void performBulkLoad(String keyPrefix, String testDir, TableName tableNa listFiles(fs, baseDirectory, baseDirectory); Map result = - BulkLoadHFiles.create(TEST_UTIL.getConfiguration()).bulkLoad(tableName, baseDirectory); + BulkLoadHFiles.create(TEST_UTIL.getConfiguration()).bulkLoad(table1, baseDirectory); assertFalse(result.isEmpty()); } @@ -233,9 +378,50 @@ private static Set listFiles(final FileSystem fs, final Path root, final protected static void loadTable(Table table) throws Exception { Put p; // 100 + 1 row to t1_syncup for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { - p = new Put(Bytes.toBytes("rowLoad" + i)); + p = new Put(Bytes.toBytes("row" + i)); p.addColumn(famName, qualName, Bytes.toBytes("val" + i)); table.put(p); } } + + protected static void loadTables(List
tables) throws Exception { + for (Table table : tables) { + LOG.info("Loading data into table: {}", table); + loadTable(table); + } + } + + private void verifyRestoredTableDataIntegrity(TableName restoredTableName, + String originalTableChecksum, int expectedRowCount) throws Exception { + try (Table restoredTable = TEST_UTIL.getConnection().getTable(restoredTableName); + ResultScanner scanner = restoredTable.getScanner(new Scan())) { + + // Verify the checksum for the original table (before it was truncated) matches the checksum + // of the restored table. + String restoredTableChecksum = TEST_UTIL.checksumRows(restoredTable); + assertEquals("The restored table's row checksum did not match the original table's checksum", + originalTableChecksum, restoredTableChecksum); + + // Verify the data in the restored table is the same as when it was originally loaded + // into the table. + int count = 0; + for (Result result : scanner) { + // The data has a numerical match between its row key and value (such as rowLoad1 and + // value1) + // We can use this to ensure a row key has the expected value. + String rowKey = Bytes.toString(result.getRow()); + int index = Integer.parseInt(rowKey.replace("rowLoad", "")); + + // Verify the Value + byte[] actualValue = result.getValue(famName, qualName); + assertNotNull("Value missing for row key: " + rowKey, actualValue); + String expectedValue = "val" + index; + assertEquals("Value mismatch for row key: " + rowKey, expectedValue, + Bytes.toString(actualValue)); + + count++; + } + assertEquals(expectedRowCount, count); + } + } } diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java index 54667752f01b..a1ce9c97a687 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java @@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.util.ToolRunner; import org.junit.AfterClass; @@ -37,21 +36,6 @@ import org.junit.Test; import org.junit.experimental.categories.Category; -/** - * Integration-style tests for Point-in-Time Restore (PITR). - *

- * These tests exercise the full backup / continuous backup / restore flow: - create backups at - * multiple historical points in time (via {@code BackupDriver}) - exercise WAL-based - * replication/continuous backup - validate Point-in-Time Restore behavior (successful restores, - * failure cases) - *

- *

- * NOTE: Some tests also create HFiles and perform HBase bulk-loads (HFile -> table) so the restore - * flow is validated when bulk-loaded storefiles are present in WALs. This ensures the - * BulkLoadCollector/BulkFilesCollector logic (discovering bulk-loaded store files referenced from - * WAL bulk-load descriptors) is exercised by the test suite. - *

- */ @Category(LargeTests.class) public class TestPointInTimeRestore extends TestBackupBase { @ClassRule @@ -83,8 +67,8 @@ private static void setUpBackups() throws Exception { // Simulate a backup taken 20 days ago EnvironmentEdgeManager .injectEdge(() -> System.currentTimeMillis() - 20 * ONE_DAY_IN_MILLISECONDS); - // Insert initial data into table1 - PITRTestUtil.loadRandomData(TEST_UTIL, table1, famName, 1000); + PITRTestUtil.loadRandomData(TEST_UTIL, table1, famName, 1000); // Insert initial data into + // table1 // Perform a full backup for table1 with continuous backup enabled String[] args = @@ -96,12 +80,6 @@ private static void setUpBackups() throws Exception { EnvironmentEdgeManager .injectEdge(() -> System.currentTimeMillis() - 15 * ONE_DAY_IN_MILLISECONDS); PITRTestUtil.loadRandomData(TEST_UTIL, table1, famName, 1000); // Add more data to table1 - - Path dir = TEST_UTIL.getDataTestDirOnTestFS("testBulkLoadByFamily"); - PITRTestUtil.generateHFiles(dir, TEST_UTIL.getConfiguration(), Bytes.toString(famName)); - PITRTestUtil.bulkLoadHFiles(table1, dir, TEST_UTIL.getConnection(), - TEST_UTIL.getConfiguration()); - PITRTestUtil.loadRandomData(TEST_UTIL, table2, famName, 500); // Insert data into table2 PITRTestUtil.waitForReplication(); // Ensure replication is complete diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupAdminImpl.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupAdminImpl.java index 3d0e3c008583..b78d44c144b5 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupAdminImpl.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupAdminImpl.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.backup.impl; -import static org.apache.hadoop.hbase.backup.BackupInfo.withRoot; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -267,8 +266,7 @@ public void testGetAffectedBackupSessions() throws IOException { BackupInfo b3 = createBackupInfo("backup_003", 3000L, BackupType.INCREMENTAL, table); BackupInfo b4 = createBackupInfo("backup_004", 4000L, BackupType.INCREMENTAL, table); - when(mockTable.getBackupHistory(withRoot("/backup/root"))) - .thenReturn(List.of(b4, b3, b2, b1, b0)); + when(mockTable.getBackupHistory("/backup/root")).thenReturn(List.of(b4, b3, b2, b1, b0)); List result = backupAdminImpl.getAffectedBackupSessions(current, table, mockTable); @@ -295,7 +293,7 @@ public void testGetAffectedBackupSessions_resetsOnFullBackup() throws IOExceptio BackupInfo b2 = createBackupInfo("backup_002", 2000L, BackupType.FULL, table); BackupInfo b3 = createBackupInfo("backup_003", 3000L, BackupType.INCREMENTAL, table); - when(mockTable.getBackupHistory(withRoot("/backup/root"))).thenReturn(List.of(b3, b2, b1, b0)); + when(mockTable.getBackupHistory("/backup/root")).thenReturn(List.of(b3, b2, b1, b0)); List result = backupAdminImpl.getAffectedBackupSessions(current, table, mockTable); @@ -322,8 +320,7 @@ public void testGetAffectedBackupSessions_skipsNonMatchingTable() throws IOExcep TableName.valueOf("other_table")); BackupInfo b4 = createBackupInfo("backup_004", 4000L, BackupType.INCREMENTAL, table); - when(mockTable.getBackupHistory(withRoot("/backup/root"))) - .thenReturn(List.of(b4, b3, b2, b1, b0)); + when(mockTable.getBackupHistory("/backup/root")).thenReturn(List.of(b4, b3, b2, b1, b0)); List result = backupAdminImpl.getAffectedBackupSessions(current, table, mockTable); @@ -352,8 +349,7 @@ public void testGetAffectedBackupSessions_ignoresFullBackupOfOtherTable() throws BackupInfo b3 = createBackupInfo("backup_003", 3000L, BackupType.INCREMENTAL, table); BackupInfo b4 = createBackupInfo("backup_004", 4000L, BackupType.INCREMENTAL, table); - when(mockTable.getBackupHistory(withRoot("/backup/root"))) - .thenReturn(List.of(b4, b3, b2, b1, b0)); + when(mockTable.getBackupHistory("/backup/root")).thenReturn(List.of(b4, b3, b2, b1, b0)); List result = backupAdminImpl.getAffectedBackupSessions(current, table, mockTable); @@ -637,7 +633,8 @@ public void testCheckIfValidForMerge_validCase() throws IOException { BackupSystemTable table = mock(BackupSystemTable.class); when(table.readBackupInfo("b1")).thenReturn(b1); when(table.readBackupInfo("b2")).thenReturn(b2); - when(table.getBackupHistory(any())).thenReturn(List.of(b1, b2)); + when(table.getBackupHistory(eq(-1), any(), any(), any(), any(), any())) + .thenReturn(List.of(b1, b2)); new BackupAdminImpl(mock(Connection.class)).checkIfValidForMerge(ids, table); } @@ -731,7 +728,8 @@ public void testCheckIfValidForMerge_failsWhenHoleInImages() throws IOException when(table.readBackupInfo("b2")).thenReturn(b2); when(table.readBackupInfo("b3")).thenReturn(b3); - when(table.getBackupHistory(any())).thenReturn(List.of(b1, b2, b3)); + when(table.getBackupHistory(eq(-1), any(), any(), any(), any(), any())) + .thenReturn(List.of(b1, b2, b3)); // Simulate a "hole" by omitting b2 from images String[] idsWithHole = { "b1", "b3" }; diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java index fdcb637dbfe2..e5da7754b00d 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java @@ -17,13 +17,12 @@ */ package org.apache.hadoop.hbase.backup.impl; -import static org.apache.hadoop.hbase.backup.BackupInfo.withState; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; import static org.apache.hadoop.hbase.backup.TestBackupDeleteWithCleanup.logDirectoryStructure; import static org.apache.hadoop.hbase.backup.TestBackupDeleteWithCleanup.setupBackupFolders; +import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR; +import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; -import static org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.BULKLOAD_FILES_DIR; -import static org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.WALS_DIR; import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -86,7 +85,7 @@ public void testDetermineWALCleanupCutoffTimeOfCleanupCommand() throws IOExcepti // Ordered as newest to oldest, will be reversed in the method List backupInfos = List.of(full2, inc, full1); - when(sysTable.getBackupHistory(withState(BackupInfo.BackupState.COMPLETE))) + when(sysTable.getBackupInfos(BackupInfo.BackupState.COMPLETE)) .thenReturn(new ArrayList<>(backupInfos)); // WHEN @@ -136,7 +135,7 @@ public void testDeleteOldWALFilesOfCleanupCommand() throws IOException { fs.mkdirs(backupWalDir); long currentTime = EnvironmentEdgeManager.getDelegate().currentTime(); - setupBackupFolders(fs, backupWalDir, currentTime); // Create 5 days of WAL/bulkload-files folder + setupBackupFolders(fs, backupWalDir, currentTime); // Create 5 days of WAL/bulk folders logDirectoryStructure(fs, backupWalDir, "Before cleanup:"); diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java index 78991a463da1..438543e4a805 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java @@ -19,6 +19,8 @@ import static org.apache.hadoop.hbase.HConstants.REPLICATION_BULKLOAD_ENABLE_KEY; import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID; +import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR; +import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_MAX_WAL_SIZE; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_ROOT_DIR; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_PEER_UUID; @@ -27,20 +29,12 @@ import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.WAL_FILE_PREFIX; import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.copyWithCleanup; -import static org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.BULKLOAD_FILES_DIR; -import static org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.WALS_DIR; import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.mockStatic; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import java.io.IOException; import java.text.SimpleDateFormat; @@ -55,17 +49,14 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.backup.util.BackupFileSystemManager; import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; @@ -89,7 +80,6 @@ import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.mockito.MockedStatic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -554,12 +544,6 @@ private void deleteTable(TableName tableName) throws IOException { private void addReplicationPeer(String peerId, Path backupRootDir, Map> tableMap) throws IOException { - addReplicationPeer(peerId, backupRootDir, tableMap, replicationEndpoint); - } - - private void addReplicationPeer(String peerId, Path backupRootDir, - Map> tableMap, String customReplicationEndpointImpl) - throws IOException { Map additionalArgs = new HashMap<>(); additionalArgs.put(CONF_PEER_UUID, UUID.randomUUID().toString()); additionalArgs.put(CONF_BACKUP_ROOT_DIR, backupRootDir.toString()); @@ -568,7 +552,7 @@ private void addReplicationPeer(String peerId, Path backupRootDir, additionalArgs.put(CONF_STAGED_WAL_FLUSH_INTERVAL, "10"); ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() - .setReplicationEndpointImpl(customReplicationEndpointImpl).setReplicateAllUserTables(false) + .setReplicationEndpointImpl(replicationEndpoint).setReplicateAllUserTables(false) .setTableCFsMap(tableMap).putAllConfiguration(additionalArgs).build(); admin.addReplicationPeer(peerId, peerConfig); diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index 0e81c95677c3..87241d437cd8 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -161,8 +161,9 @@ protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) "hbase.bulkload.locality.sensitive.enabled"; private static final boolean DEFAULT_LOCALITY_SENSITIVE = true; static final String OUTPUT_TABLE_NAME_CONF_KEY = "hbase.mapreduce.hfileoutputformat.table.name"; - static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY = + public static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY = "hbase.mapreduce.use.multi.table.hfileoutputformat"; + public static final boolean MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_DEFAULT = true; /** * ExtendedCell and ExtendedCellSerialization are InterfaceAudience.Private. We expose this config diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index 4c0b12ef7333..cf4397d1052f 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hbase.mapreduce; +import static org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_DEFAULT; +import static org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY; + import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; @@ -378,14 +381,35 @@ public Job createSubmittableJob(String[] args) throws IOException { Path outputDir = new Path(hfileOutPath); FileOutputFormat.setOutputPath(job, outputDir); job.setMapOutputValueClass(MapReduceExtendedCell.class); - try (Connection conn = ConnectionFactory.createConnection(conf);) { - List tableInfoList = new ArrayList(); - for (TableName tableName : tableNames) { + try (Connection conn = ConnectionFactory.createConnection(conf)) { + if ( + conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, + MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_DEFAULT) + ) { + // The HFiles will be output to something like this for each table: + // .../BULK_OUTPUT_CONF_KEY/namespace/table/columnFamily + List tableInfoList = new ArrayList(); + for (TableName tableName : tableNames) { + Table table = conn.getTable(tableName); + RegionLocator regionLocator = getRegionLocator(tableName, conf, conn); + tableInfoList.add(new TableInfo(table.getDescriptor(), regionLocator)); + } + MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfoList); + } else { + // The HFiles will be output to something like: .../BULK_OUTPUT_CONF_KEY/columnFamily + // This is useful for scenarios where we are running the WALPlayer consecutively on just + // one table at a time, and BULK_OUTPUT_CONF_KEY is already set to a "namespace/table" + // directory path for each table. + if (tableNames.size() != 1) { + throw new IOException("Expected table names list to have only one table since " + + MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY + " is set to false. Got the following " + + "list of tables instead: " + tableNames); + } + TableName tableName = tableNames.get(0); Table table = conn.getTable(tableName); RegionLocator regionLocator = getRegionLocator(tableName, conf, conn); - tableInfoList.add(new TableInfo(table.getDescriptor(), regionLocator)); + HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); } - MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfoList); } TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), org.apache.hbase.thirdparty.com.google.common.base.Preconditions.class); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java index bbadabab69bf..26a38f5b5367 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.mapreduce; +import static org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.nullValue; @@ -34,6 +35,7 @@ import java.io.File; import java.io.IOException; import java.io.PrintStream; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.concurrent.ThreadLocalRandom; import org.apache.hadoop.conf.Configuration; @@ -46,6 +48,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.SingleProcessHBaseCluster; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; @@ -90,13 +93,20 @@ public class TestWALPlayer { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestWALPlayer.class); + private static final byte[] FAMILY = Bytes.toBytes("family"); + private static final byte[] COLUMN1 = Bytes.toBytes("c1"); + private static final byte[] COLUMN2 = Bytes.toBytes("c2"); + private static final byte[] ROW = Bytes.toBytes("row"); + private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); private static SingleProcessHBaseCluster cluster; private static Path rootDir; private static Path walRootDir; - private static FileSystem fs; + private static FileSystem localFs; private static FileSystem logFs; private static Configuration conf; + private static FileSystem hdfs; + private static String bulkLoadOutputDir; @Rule public TestName name = new TestName(); @@ -106,15 +116,18 @@ public static void beforeClass() throws Exception { conf = TEST_UTIL.getConfiguration(); rootDir = TEST_UTIL.createRootDir(); walRootDir = TEST_UTIL.createWALRootDir(); - fs = CommonFSUtils.getRootDirFileSystem(conf); + localFs = CommonFSUtils.getRootDirFileSystem(conf); logFs = CommonFSUtils.getWALFileSystem(conf); cluster = TEST_UTIL.startMiniCluster(); + hdfs = TEST_UTIL.getTestFileSystem(); + bulkLoadOutputDir = new Path(new Path(TEST_UTIL.getConfiguration().get("fs.defaultFS")), + Path.SEPARATOR + "bulkLoadOutput").toString(); } @AfterClass public static void afterClass() throws Exception { TEST_UTIL.shutdownMiniCluster(); - fs.delete(rootDir, true); + localFs.delete(rootDir, true); logFs.delete(walRootDir, true); } @@ -235,18 +248,11 @@ public void testWALPlayerBulkLoadWithOverriddenTimestamps() throws Exception { public void testWALPlayer() throws Exception { final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2"); - final byte[] FAMILY = Bytes.toBytes("family"); - final byte[] COLUMN1 = Bytes.toBytes("c1"); - final byte[] COLUMN2 = Bytes.toBytes("c2"); - final byte[] ROW = Bytes.toBytes("row"); Table t1 = TEST_UTIL.createTable(tableName1, FAMILY); Table t2 = TEST_UTIL.createTable(tableName2, FAMILY); - // put a row into the first table - Put p = new Put(ROW); - p.addColumn(FAMILY, COLUMN1, COLUMN1); - p.addColumn(FAMILY, COLUMN2, COLUMN2); - t1.put(p); + putRowIntoTable(t1); + // delete one column Delete d = new Delete(ROW); d.addColumns(FAMILY, COLUMN1); @@ -411,6 +417,109 @@ public void testFailOnEmptyWALFilesWhenNotIgnored() throws Exception { assertNotEquals("WALPlayer should fail on empty files when not ignored", 0, exitCode); } + /** + * Verifies the HFile output format for WALPlayer has the following directory structure when + * hbase.mapreduce.use.multi.table.hfileoutputformat is set to true: + * .../BULK_OUTPUT_CONF_KEY/namespace/tableName/columnFamily + */ + @Test + public void testWALPlayerMultiTableHFileOutputFormat() throws Exception { + String namespace = "ns_" + name.getMethodName(); + TEST_UTIL.getAdmin().createNamespace(NamespaceDescriptor.create(namespace).build()); + final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); + final TableName tableName2 = TableName.valueOf(namespace, name.getMethodName() + "2"); + Table t1 = TEST_UTIL.createTable(tableName1, FAMILY); + Table t2 = TEST_UTIL.createTable(tableName2, FAMILY); + + putRowIntoTable(t1); + putRowIntoTable(t2); + + Configuration multiTableOutputConf = new Configuration(conf); + setConfSimilarToIncrementalBackupWALToHFilesMethod(multiTableOutputConf); + + // We are testing this config variable's effect on HFile output for the WALPlayer + multiTableOutputConf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true); + + WALPlayer player = new WALPlayer(multiTableOutputConf); + String walInputDir = new Path(cluster.getMaster().getMasterFileSystem().getWALRootDir(), + HConstants.HREGION_LOGDIR_NAME).toString(); + String tables = tableName1.getNameAsString() + "," + tableName2.getNameAsString(); + + ToolRunner.run(multiTableOutputConf, player, new String[] { walInputDir, tables }); + + assertMultiTableOutputFormatDirStructure(tableName1, "default"); + assertMultiTableOutputFormatDirStructure(tableName2, namespace); + + hdfs.delete(new Path(bulkLoadOutputDir), true); + } + + /** + * Verifies the HFile output format for WALPlayer has the following directory structure when + * hbase.mapreduce.use.multi.table.hfileoutputformat is set to false: + * .../BULK_OUTPUT_CONF_KEY/columnFamily. Also verifies an exception occurs when the WALPlayer is + * run on multiple tables at once while hbase.mapreduce.use.multi.table.hfileoutputformat is set + * to false. + */ + @Test + public void testWALPlayerSingleTableHFileOutputFormat() throws Exception { + String namespace = "ns_" + name.getMethodName(); + TEST_UTIL.getAdmin().createNamespace(NamespaceDescriptor.create(namespace).build()); + final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); + final TableName tableName2 = TableName.valueOf(namespace, name.getMethodName() + "2"); + Table t1 = TEST_UTIL.createTable(tableName1, FAMILY); + Table t2 = TEST_UTIL.createTable(tableName2, FAMILY); + + putRowIntoTable(t1); + putRowIntoTable(t2); + + String bulkLoadOutputDir = new Path(new Path(TEST_UTIL.getConfiguration().get("fs.defaultFS")), + Path.SEPARATOR + "bulkLoadOutput").toString(); + + Configuration singleTableOutputConf = new Configuration(conf); + setConfSimilarToIncrementalBackupWALToHFilesMethod(singleTableOutputConf); + + // We are testing this config variable's effect on HFile output for the WALPlayer + singleTableOutputConf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); + + WALPlayer player = new WALPlayer(singleTableOutputConf); + + String walInputDir = new Path(cluster.getMaster().getMasterFileSystem().getWALRootDir(), + HConstants.HREGION_LOGDIR_NAME).toString(); + String tables = tableName1.getNameAsString() + "," + tableName2.getNameAsString(); + + // Expecting a failure here since we are running WALPlayer on multiple tables even though the + // multi-table HFile output format is disabled + try { + ToolRunner.run(singleTableOutputConf, player, new String[] { walInputDir, tables }); + fail("Expected a failure to occur due to using WALPlayer with multiple tables while having " + + MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY + " set to false"); + } catch (IOException e) { + String expectedMsg = "Expected table names list to have only one table since " + + MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY + " is set to false. Got the following " + + "list of tables instead: [testWALPlayerSingleTableHFileOutputFormat1, " + namespace + + ":testWALPlayerSingleTableHFileOutputFormat2]"; + assertTrue(e.getMessage().contains(expectedMsg)); + } + + // Successfully run WALPlayer on just one table while having multi-table HFile output format + // disabled + ToolRunner.run(singleTableOutputConf, player, + new String[] { walInputDir, tableName1.getNameAsString() }); + + Path bulkLoadOutputDirForTable = new Path(bulkLoadOutputDir, "family"); + assertTrue("Expected path to exist: " + bulkLoadOutputDirForTable, + hdfs.exists(bulkLoadOutputDirForTable)); + + hdfs.delete(new Path(bulkLoadOutputDir), true); + } + + private void putRowIntoTable(Table table) throws IOException { + Put p = new Put(ROW); + p.addColumn(FAMILY, COLUMN1, COLUMN1); + p.addColumn(FAMILY, COLUMN2, COLUMN2); + table.put(p); + } + private Path createEmptyWALFile(String walDir) throws IOException { FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem(); Path inputDir = new Path("/" + walDir); @@ -422,4 +531,22 @@ private Path createEmptyWALFile(String walDir) throws IOException { return inputDir; } + + private void setConfSimilarToIncrementalBackupWALToHFilesMethod(Configuration conf) { + conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkLoadOutputDir); + conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";"); + conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true); + conf.set("mapreduce.job.name", name.getMethodName() + "-" + System.currentTimeMillis()); + conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, true); + } + + private void assertMultiTableOutputFormatDirStructure(TableName tableName, String namespace) + throws IOException { + Path qualifierAndFamilyDir = + new Path(tableName.getQualifierAsString(), new String(FAMILY, StandardCharsets.UTF_8)); + Path namespaceQualifierFamilyDir = new Path(namespace, qualifierAndFamilyDir); + Path bulkLoadOutputDirForTable = new Path(bulkLoadOutputDir, namespaceQualifierFamilyDir); + assertTrue("Expected path to exist: " + bulkLoadOutputDirForTable, + hdfs.exists(bulkLoadOutputDirForTable)); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 75314c48bd28..73421dced454 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -866,10 +866,6 @@ public long getTotalReplicatedEdits() { return totalReplicatedEdits.get(); } - long getSleepForRetries() { - return sleepForRetries; - } - @Override public void logPositionAndCleanOldLogs(WALEntryBatch entryBatch, ReplicationResult replicated) { String walName = entryBatch.getLastWalPath().getName(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 4c695634517b..a6ecc95d8aeb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -162,15 +162,7 @@ void shipEdits(WALEntryBatch entryBatch) { List entries = entryBatch.getWalEntries(); int sleepMultiplier = 0; if (entries.isEmpty()) { - /* - * Delegate to the endpoint to decide how to treat empty entry batches. In most replication - * flows, receiving an empty entry batch means that everything so far has been successfully - * replicated and committed — so it's safe to mark the WAL position as committed (COMMIT). - * However, some endpoints (e.g., asynchronous S3 backups) may buffer writes and delay actual - * persistence. In such cases, we must avoid committing the WAL position prematurely. - */ - final ReplicationResult result = getReplicationResult(); - updateLogPosition(entryBatch, result); + updateLogPosition(entryBatch, ReplicationResult.COMMITTED); return; } int currentSize = (int) entryBatch.getHeapSize(); @@ -277,11 +269,7 @@ private void cleanUpHFileRefs(WALEdit edit) throws IOException { } } - @RestrictedApi( - explanation = "Package-private for test visibility only. Do not use outside tests.", - link = "", - allowedOnPath = "(.*/src/test/.*|.*/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java)") - boolean updateLogPosition(WALEntryBatch batch, ReplicationResult replicated) { + private boolean updateLogPosition(WALEntryBatch batch, ReplicationResult replicated) { boolean updated = false; // if end of file is true, then the logPositionAndCleanOldLogs method will remove the file // record on zk, so let's call it. The last wal position maybe zero if end of file is true and