Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.hudi.utilities.deltastreamer.BootstrapExecutor;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -67,6 +68,7 @@
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.StructType;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -456,8 +458,15 @@ public static int renamePartition(JavaSparkContext jsc, String basePath, String
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build();
Map<String, String> propsMap = getPropsForRewrite(metaClient);
rewriteRecordsToNewPartition(basePath, newPartition, recordsToRewrite, metaClient, propsMap);
// after re-writing, we can safely delete older data.
// after re-writing, we can safely delete older partition.
deleteOlderPartition(basePath, oldPartition, recordsToRewrite, propsMap);
// also, we can physically delete the old partition.
FileSystem fs = FSUtils.getFs(new Path(basePath), metaClient.getHadoopConf());
try {
fs.delete(new Path(basePath, oldPartition), true);
} catch (IOException e) {
LOG.warn("Failed to delete older partition " + basePath);
Comment on lines +465 to +468
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so if this failed we left some dir in the table? better to return 1 and make the operation fail so user would notice it. the warn log can easily go unnoticed. /nit the msg should contain basePath + oldPartition

}
}
return 0;
}
Expand All @@ -473,21 +482,23 @@ private static void deleteOlderPartition(String basePath, String oldPartition, D
}

private static void rewriteRecordsToNewPartition(String basePath, String newPartition, Dataset<Row> recordsToRewrite, HoodieTableMetaClient metaClient, Map<String, String> propsMap) {
recordsToRewrite.withColumn(metaClient.getTableConfig().getPartitionFieldProp(), functions.lit(newPartition))
String partitionFieldProp = metaClient.getTableConfig().getPartitionFieldProp();
StructType structType = recordsToRewrite.schema();
int partitionIndex = structType.fieldIndex(partitionFieldProp);

recordsToRewrite.withColumn(metaClient.getTableConfig().getPartitionFieldProp(), functions.lit(null).cast(structType.apply(partitionIndex).dataType()))
.write()
.options(propsMap)
.option("hoodie.datasource.write.operation", "insert")
.option("hoodie.datasource.write.operation", WriteOperationType.BULK_INSERT.value())
.format("hudi")
.mode("Append")
.save(basePath);
}

private static Dataset<Row> getRecordsToRewrite(String basePath, String oldPartition, SQLContext sqlContext) {
return sqlContext.read()
.option("hoodie.datasource.read.extract.partition.values.from.path", "false")
.format("hudi")
.load(basePath)
.filter(HoodieRecord.PARTITION_PATH_METADATA_FIELD + " == '" + oldPartition + "'")
.load(basePath + "/" + oldPartition)
.drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD)
.drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)
Expand Down