diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
index a9aefa2ab4744..d106d8375e7a8 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
@@ -25,7 +25,6 @@
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
-import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
@@ -34,11 +33,15 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.HoodieBackedTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
+import org.apache.spark.api.java.JavaSparkContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
-import org.apache.spark.api.java.JavaSparkContext;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.shell.standard.ShellComponent;
@@ -57,9 +60,6 @@
import java.util.Map;
import java.util.Set;
-import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS;
-import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT;
-
/**
* CLI commands to operate on the Metadata Table.
*
@@ -115,13 +115,13 @@ public String set(@ShellOption(value = {"--metadataDir"},
@ShellMethod(key = "metadata create", value = "Create the Metadata Table if it does not exist")
public String create(
@ShellOption(value = "--sparkMaster", defaultValue = SparkUtil.DEFAULT_SPARK_MASTER, help = "Spark master") final String master
- ) throws IOException {
+ ) throws Exception {
HoodieCLI.getTableMetaClient();
Path metadataPath = new Path(getMetadataTableBasePath(HoodieCLI.basePath));
try {
FileStatus[] statuses = HoodieCLI.fs.listStatus(metadataPath);
if (statuses.length > 0) {
- throw new RuntimeException("Metadata directory (" + metadataPath.toString() + ") not empty.");
+ throw new RuntimeException("Metadata directory (" + metadataPath + ") not empty.");
}
} catch (FileNotFoundException e) {
// Metadata directory does not exist yet
@@ -131,28 +131,32 @@ public String create(
HoodieTimer timer = HoodieTimer.start();
HoodieWriteConfig writeConfig = getWriteConfig();
initJavaSparkContext(Option.of(master));
- SparkHoodieBackedTableMetadataWriter.create(HoodieCLI.conf, writeConfig, new HoodieSparkEngineContext(jsc));
- return String.format("Created Metadata Table in %s (duration=%.2f secs)", metadataPath, timer.endTimer() / 1000.0);
+ try (HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(HoodieCLI.conf, writeConfig, new HoodieSparkEngineContext(jsc))) {
+ return String.format("Created Metadata Table in %s (duration=%.2f secs)", metadataPath, timer.endTimer() / 1000.0);
+ }
}
@ShellMethod(key = "metadata delete", value = "Remove the Metadata Table")
- public String delete() throws Exception {
- HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
- Path metadataPath = new Path(getMetadataTableBasePath(HoodieCLI.basePath));
- try {
- FileStatus[] statuses = HoodieCLI.fs.listStatus(metadataPath);
- if (statuses.length > 0) {
- HoodieCLI.fs.delete(metadataPath, true);
- }
- } catch (FileNotFoundException e) {
- // Metadata directory does not exist
+ public String delete(@ShellOption(value = "--backup", help = "Backup the metadata table before delete", defaultValue = "true", arity = 1) final boolean backup) throws Exception {
+ HoodieTableMetaClient dataMetaClient = HoodieCLI.getTableMetaClient();
+ String backupPath = HoodieTableMetadataUtil.deleteMetadataTable(dataMetaClient, new HoodieSparkEngineContext(jsc), backup);
+ if (backup) {
+ return "Metadata Table has been deleted and backed up to " + backupPath;
+ } else {
+ return "Metadata Table has been deleted from " + getMetadataTableBasePath(HoodieCLI.basePath);
}
+ }
- LOG.info("Clear hoodie.table.metadata.partitions in hoodie.properties");
- HoodieTableConfig.delete(metaClient.getFs(), new Path(metaClient.getMetaPath()), new HashSet<>(Arrays
- .asList(TABLE_METADATA_PARTITIONS.key(), TABLE_METADATA_PARTITIONS_INFLIGHT.key())));
-
- return String.format("Removed Metadata Table from %s", metadataPath);
+ @ShellMethod(key = "metadata delete-record-index", value = "Delete the record index from Metadata Table")
+ public String deleteRecordIndex(@ShellOption(value = "--backup", help = "Backup the record index before delete", defaultValue = "true", arity = 1) final boolean backup) throws Exception {
+ HoodieTableMetaClient dataMetaClient = HoodieCLI.getTableMetaClient();
+ String backupPath = HoodieTableMetadataUtil.deleteMetadataTablePartition(dataMetaClient, new HoodieSparkEngineContext(jsc),
+ MetadataPartitionType.RECORD_INDEX, backup);
+ if (backup) {
+ return "Record Index has been deleted from the Metadata Table and backed up to " + backupPath;
+ } else {
+ return "Record Index has been deleted from the Metadata Table";
+ }
}
@ShellMethod(key = "metadata init", value = "Update the metadata table from commits since the creation")
@@ -165,14 +169,16 @@ public String init(@ShellOption(value = "--sparkMaster", defaultValue = SparkUti
HoodieCLI.fs.listStatus(metadataPath);
} catch (FileNotFoundException e) {
// Metadata directory does not exist
- throw new RuntimeException("Metadata directory (" + metadataPath.toString() + ") does not exist.");
+ throw new RuntimeException("Metadata directory (" + metadataPath + ") does not exist.");
}
HoodieTimer timer = HoodieTimer.start();
if (!readOnly) {
HoodieWriteConfig writeConfig = getWriteConfig();
initJavaSparkContext(Option.of(master));
- SparkHoodieBackedTableMetadataWriter.create(HoodieCLI.conf, writeConfig, new HoodieSparkEngineContext(jsc));
+ try (HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(HoodieCLI.conf, writeConfig, new HoodieSparkEngineContext(jsc))) {
+ // Empty
+ }
}
String action = readOnly ? "Opened" : "Initialized";
@@ -183,23 +189,23 @@ public String init(@ShellOption(value = "--sparkMaster", defaultValue = SparkUti
public String stats() throws IOException {
HoodieCLI.getTableMetaClient();
HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().enable(true).build();
- HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(new HoodieLocalEngineContext(HoodieCLI.conf),
- config, HoodieCLI.basePath, "/tmp");
- Map stats = metadata.stats();
+ try (HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(new HoodieLocalEngineContext(HoodieCLI.conf),
+ config, HoodieCLI.basePath)) {
+ Map stats = metadata.stats();
+
+ final List rows = new ArrayList<>();
+ for (Map.Entry entry : stats.entrySet()) {
+ Comparable[] row = new Comparable[2];
+ row[0] = entry.getKey();
+ row[1] = entry.getValue();
+ rows.add(row);
+ }
- final List rows = new ArrayList<>();
- for (Map.Entry entry : stats.entrySet()) {
- Comparable[] row = new Comparable[2];
- row[0] = entry.getKey();
- row[1] = entry.getValue();
- rows.add(row);
+ TableHeader header = new TableHeader()
+ .addTableHeaderField("stat key")
+ .addTableHeaderField("stat value");
+ return HoodiePrintHelper.print(header, new HashMap<>(), "", false, Integer.MAX_VALUE, false, rows);
}
-
- TableHeader header = new TableHeader()
- .addTableHeaderField("stat key")
- .addTableHeaderField("stat value");
- return HoodiePrintHelper.print(header, new HashMap<>(), "",
- false, Integer.MAX_VALUE, false, rows);
}
@ShellMethod(key = "metadata list-partitions", value = "List all partitions from metadata")
@@ -209,27 +215,27 @@ public String listPartitions(
HoodieCLI.getTableMetaClient();
initJavaSparkContext(Option.of(master));
HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().enable(true).build();
- HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(new HoodieSparkEngineContext(jsc), config,
- HoodieCLI.basePath, "/tmp");
+ try (HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(new HoodieSparkEngineContext(jsc), config,
+ HoodieCLI.basePath)) {
- if (!metadata.enabled()) {
- return "[ERROR] Metadata Table not enabled/initialized\n\n";
- }
+ if (!metadata.enabled()) {
+ return "[ERROR] Metadata Table not enabled/initialized\n\n";
+ }
- HoodieTimer timer = HoodieTimer.start();
- List partitions = metadata.getAllPartitionPaths();
- LOG.debug("Took " + timer.endTimer() + " ms");
+ HoodieTimer timer = HoodieTimer.start();
+ List partitions = metadata.getAllPartitionPaths();
+ LOG.debug("Took " + timer.endTimer() + " ms");
- final List rows = new ArrayList<>();
- partitions.stream().sorted(Comparator.reverseOrder()).forEach(p -> {
- Comparable[] row = new Comparable[1];
- row[0] = p;
- rows.add(row);
- });
-
- TableHeader header = new TableHeader().addTableHeaderField("partition");
- return HoodiePrintHelper.print(header, new HashMap<>(), "",
- false, Integer.MAX_VALUE, false, rows);
+ final List rows = new ArrayList<>();
+ partitions.stream().sorted(Comparator.reverseOrder()).forEach(p -> {
+ Comparable[] row = new Comparable[1];
+ row[0] = p;
+ rows.add(row);
+ });
+
+ TableHeader header = new TableHeader().addTableHeaderField("partition");
+ return HoodiePrintHelper.print(header, new HashMap<>(), "", false, Integer.MAX_VALUE, false, rows);
+ }
}
@ShellMethod(key = "metadata list-files", value = "Print a list of all files in a partition from the metadata")
@@ -237,32 +243,32 @@ public String listFiles(
@ShellOption(value = {"--partition"}, help = "Name of the partition to list files", defaultValue = "") final String partition) throws IOException {
HoodieCLI.getTableMetaClient();
HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().enable(true).build();
- HoodieBackedTableMetadata metaReader = new HoodieBackedTableMetadata(
- new HoodieLocalEngineContext(HoodieCLI.conf), config, HoodieCLI.basePath, "/tmp");
+ try (HoodieBackedTableMetadata metaReader = new HoodieBackedTableMetadata(
+ new HoodieLocalEngineContext(HoodieCLI.conf), config, HoodieCLI.basePath)) {
- if (!metaReader.enabled()) {
- return "[ERROR] Metadata Table not enabled/initialized\n\n";
- }
+ if (!metaReader.enabled()) {
+ return "[ERROR] Metadata Table not enabled/initialized\n\n";
+ }
- Path partitionPath = new Path(HoodieCLI.basePath);
- if (!StringUtils.isNullOrEmpty(partition)) {
- partitionPath = new Path(HoodieCLI.basePath, partition);
- }
+ Path partitionPath = new Path(HoodieCLI.basePath);
+ if (!StringUtils.isNullOrEmpty(partition)) {
+ partitionPath = new Path(HoodieCLI.basePath, partition);
+ }
- HoodieTimer timer = HoodieTimer.start();
- FileStatus[] statuses = metaReader.getAllFilesInPartition(partitionPath);
- LOG.debug("Took " + timer.endTimer() + " ms");
+ HoodieTimer timer = HoodieTimer.start();
+ FileStatus[] statuses = metaReader.getAllFilesInPartition(partitionPath);
+ LOG.debug("Took " + timer.endTimer() + " ms");
- final List rows = new ArrayList<>();
- Arrays.stream(statuses).sorted((p1, p2) -> p2.getPath().getName().compareTo(p1.getPath().getName())).forEach(f -> {
- Comparable[] row = new Comparable[1];
- row[0] = f;
- rows.add(row);
- });
-
- TableHeader header = new TableHeader().addTableHeaderField("file path");
- return HoodiePrintHelper.print(header, new HashMap<>(), "",
- false, Integer.MAX_VALUE, false, rows);
+ final List rows = new ArrayList<>();
+ Arrays.stream(statuses).sorted((p1, p2) -> p2.getPath().getName().compareTo(p1.getPath().getName())).forEach(f -> {
+ Comparable[] row = new Comparable[1];
+ row[0] = f;
+ rows.add(row);
+ });
+
+ TableHeader header = new TableHeader().addTableHeaderField("file path");
+ return HoodiePrintHelper.print(header, new HashMap<>(), "", false, Integer.MAX_VALUE, false, rows);
+ }
}
@ShellMethod(key = "metadata validate-files", value = "Validate all files in all partitions from the metadata")
@@ -271,7 +277,7 @@ public String validateFiles(
HoodieCLI.getTableMetaClient();
HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().enable(true).build();
HoodieBackedTableMetadata metadataReader = new HoodieBackedTableMetadata(
- new HoodieLocalEngineContext(HoodieCLI.conf), config, HoodieCLI.basePath, "/tmp");
+ new HoodieLocalEngineContext(HoodieCLI.conf), config, HoodieCLI.basePath);
if (!metadataReader.enabled()) {
return "[ERROR] Metadata Table not enabled/initialized\n\n";
@@ -279,7 +285,7 @@ public String validateFiles(
HoodieMetadataConfig fsConfig = HoodieMetadataConfig.newBuilder().enable(false).build();
HoodieBackedTableMetadata fsMetaReader = new HoodieBackedTableMetadata(
- new HoodieLocalEngineContext(HoodieCLI.conf), fsConfig, HoodieCLI.basePath, "/tmp");
+ new HoodieLocalEngineContext(HoodieCLI.conf), fsConfig, HoodieCLI.basePath);
HoodieTimer timer = HoodieTimer.start();
List metadataPartitions = metadataReader.getAllPartitionPaths();
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRestoresCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRestoresCommand.java
index aa75ff29b8b27..d45c63eddbb66 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRestoresCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRestoresCommand.java
@@ -104,7 +104,7 @@ public void init() throws Exception {
metaClient.getHadoopConf(), config, context))
.withPartitionMetaFiles(DEFAULT_PARTITION_PATHS)
.addCommit("100")
- .withBaseFilesInPartitions(partitionAndFileId)
+ .withBaseFilesInPartitions(partitionAndFileId).getLeft()
.addCommit("101");
hoodieTestTable.addCommit("102").withBaseFilesInPartitions(partitionAndFileId);
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java
index 98e258d0392d9..a1942a62312b4 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java
@@ -104,9 +104,9 @@ public void init() throws Exception {
metaClient.getHadoopConf(), config, context))
.withPartitionMetaFiles(DEFAULT_PARTITION_PATHS)
.addCommit("100")
- .withBaseFilesInPartitions(partitionAndFileId)
+ .withBaseFilesInPartitions(partitionAndFileId).getLeft()
.addCommit("101")
- .withBaseFilesInPartitions(partitionAndFileId)
+ .withBaseFilesInPartitions(partitionAndFileId).getLeft()
.addInflightCommit("102")
.withBaseFilesInPartitions(partitionAndFileId);
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java
index ff983d44ae780..4d1a0ec3fb748 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java
@@ -77,13 +77,13 @@ public void init() throws Exception {
HoodieTestTable.of(metaClient)
.withPartitionMetaFiles(DEFAULT_PARTITION_PATHS)
.addCommit("100")
- .withBaseFilesInPartition(DEFAULT_FIRST_PARTITION_PATH, "file-1")
- .withBaseFilesInPartition(DEFAULT_SECOND_PARTITION_PATH, "file-2")
- .withBaseFilesInPartition(DEFAULT_THIRD_PARTITION_PATH, "file-3")
+ .withBaseFilesInPartition(DEFAULT_FIRST_PARTITION_PATH, "file-1").getLeft()
+ .withBaseFilesInPartition(DEFAULT_SECOND_PARTITION_PATH, "file-2").getLeft()
+ .withBaseFilesInPartition(DEFAULT_THIRD_PARTITION_PATH, "file-3").getLeft()
.addInflightCommit("101")
- .withBaseFilesInPartition(DEFAULT_FIRST_PARTITION_PATH, "file-1")
- .withBaseFilesInPartition(DEFAULT_SECOND_PARTITION_PATH, "file-2")
- .withBaseFilesInPartition(DEFAULT_THIRD_PARTITION_PATH, "file-3")
+ .withBaseFilesInPartition(DEFAULT_FIRST_PARTITION_PATH, "file-1").getLeft()
+ .withBaseFilesInPartition(DEFAULT_SECOND_PARTITION_PATH, "file-2").getLeft()
+ .withBaseFilesInPartition(DEFAULT_THIRD_PARTITION_PATH, "file-3").getLeft()
.withMarkerFile(DEFAULT_FIRST_PARTITION_PATH, "file-1", IOType.MERGE)
.withMarkerFile(DEFAULT_SECOND_PARTITION_PATH, "file-2", IOType.MERGE)
.withMarkerFile(DEFAULT_THIRD_PARTITION_PATH, "file-3", IOType.MERGE);
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCommitsCommand.java
index e1fb98fb0a786..d158b096c38c6 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCommitsCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCommitsCommand.java
@@ -95,9 +95,9 @@ public void testRollbackCommit() throws Exception {
HoodieTestTable.of(metaClient)
.withPartitionMetaFiles(DEFAULT_PARTITION_PATHS)
.addCommit("100")
- .withBaseFilesInPartitions(partitionAndFileId)
+ .withBaseFilesInPartitions(partitionAndFileId).getLeft()
.addCommit("101")
- .withBaseFilesInPartitions(partitionAndFileId)
+ .withBaseFilesInPartitions(partitionAndFileId).getLeft()
.addCommit("102")
.withBaseFilesInPartitions(partitionAndFileId);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index 95d868a43e094..897dcae291514 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -29,6 +29,7 @@
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.heartbeat.HeartbeatUtils;
import org.apache.hudi.common.HoodiePendingRollbackInfo;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ActionType;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -494,15 +495,16 @@ protected void runAnyPendingClustering(HoodieTable table) {
/**
* Write the HoodieCommitMetadata to metadata table if available.
*
- * @param table {@link HoodieTable} of interest.
- * @param instantTime instant time of the commit.
- * @param actionType action type of the commit.
- * @param metadata instance of {@link HoodieCommitMetadata}.
+ * @param table {@link HoodieTable} of interest.
+ * @param instantTime instant time of the commit.
+ * @param actionType action type of the commit.
+ * @param metadata instance of {@link HoodieCommitMetadata}.
+ * @param writeStatuses Write statuses of the commit
*/
- protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
+ protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata, HoodieData writeStatuses) {
checkArgument(table.isTableServiceAction(actionType, instantTime), String.format("Unsupported action: %s.%s is not table service.", actionType, instantTime));
context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table: " + config.getTableName());
- table.getMetadataWriter(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter) w).update(metadata, instantTime));
+ table.getMetadataWriter(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter) w).update(metadata, writeStatuses, instantTime));
}
/**
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 644c87be4352b..e7f69b6e43958 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -33,6 +33,7 @@
import org.apache.hudi.client.utils.TransactionUtils;
import org.apache.hudi.common.HoodiePendingRollbackInfo;
import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ActionType;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -206,12 +207,13 @@ public abstract boolean commit(String instantTime, O writeStatuses, Option