Skip to content

Commit b7da6cb

Browse files
[HUDI-2307] When using delete_partition with ds should not rely on the primary key (apache#3469)
- Co-authored-by: Sivabalan Narayanan <[email protected]>
1 parent 8eed440 commit b7da6cb

File tree

4 files changed

+75
-58
lines changed

4 files changed

+75
-58
lines changed

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala

+5
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,11 @@ object DataSourceWriteOptions {
325325
@Deprecated
326326
val INSERT_DROP_DUPS_OPT_KEY = INSERT_DROP_DUPS.key()
327327

328+
val PARTITIONS_TO_DELETE: ConfigProperty[String] = ConfigProperty
329+
.key("hoodie.datasource.write.partitions.to.delete")
330+
.noDefaultValue()
331+
.withDocumentation("Comma separated list of partitions to delete")
332+
328333
val STREAMING_RETRY_CNT: ConfigProperty[String] = ConfigProperty
329334
.key("hoodie.datasource.write.streaming.retry.count")
330335
.defaultValue("3")

hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala

+8-3
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
2929
import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig, TypedProperties}
3030
import org.apache.hudi.common.fs.FSUtils
3131
import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, WriteOperationType}
32-
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
33-
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
32+
import org.apache.hudi.common.table.TableSchemaResolver
33+
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
3434
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
3535
import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils}
3636
import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH, BOOTSTRAP_INDEX_CLASS}
@@ -192,7 +192,12 @@ object HoodieSparkSqlWriter {
192192
}
193193

194194
// Get list of partitions to delete
195-
val partitionsToDelete = genericRecords.map(gr => keyGenerator.getKey(gr).getPartitionPath).toJavaRDD().distinct().collect()
195+
val partitionsToDelete = if (parameters.containsKey(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key())) {
196+
val partitionColsToDelete = parameters.get(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key()).get.split(",")
197+
java.util.Arrays.asList(partitionColsToDelete:_*)
198+
} else {
199+
genericRecords.map(gr => keyGenerator.getKey(gr).getPartitionPath).toJavaRDD().distinct().collect()
200+
}
196201
// Create a HoodieWriteClient & issue the delete.
197202
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
198203
null, path.get, tblName,

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala

+61-54
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.apache.hadoop.fs.Path
2222
import org.apache.hudi.DataSourceWriteOptions._
2323
import org.apache.hudi.client.SparkRDDWriteClient
2424
import org.apache.hudi.common.config.HoodieConfig
25-
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
25+
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
2626
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord, HoodieRecordPayload, HoodieTableType, WriteOperationType}
2727
import org.apache.hudi.common.table.HoodieTableConfig
2828
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
@@ -679,61 +679,68 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
679679
}
680680
}
681681

682-
test("test delete partitions") {
683-
initSparkContext("test_delete_partitions")
684-
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path_delete_partitions")
685-
try {
686-
val hoodieFooTableName = "hoodie_foo_tbl_delete_partitions"
687-
val fooTableModifier = getCommonParams(path, hoodieFooTableName, HoodieTableType.COPY_ON_WRITE.name())
688-
val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
689-
val schema = DataSourceTestUtils.getStructTypeExampleSchema
690-
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
691-
val records = DataSourceTestUtils.generateRandomRows(10)
692-
val recordsSeq = convertRowListToSeq(records)
693-
val df1 = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
694-
// write to Hudi
695-
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, fooTableParams, df1)
682+
List(true, false)
683+
.foreach(usePartitionsToDeleteConfig => {
684+
test("test delete partitions for " + usePartitionsToDeleteConfig) {
685+
initSparkContext("test_delete_partitions_" + usePartitionsToDeleteConfig)
686+
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path_delete_partitions")
687+
try {
688+
val hoodieFooTableName = "hoodie_foo_tbl_delete_partitions"
689+
val fooTableModifier = getCommonParams(path, hoodieFooTableName, HoodieTableType.COPY_ON_WRITE.name())
690+
var fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
691+
val schema = DataSourceTestUtils.getStructTypeExampleSchema
692+
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
693+
val records = DataSourceTestUtils.generateRandomRows(10)
694+
val recordsSeq = convertRowListToSeq(records)
695+
val df1 = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
696+
// write to Hudi
697+
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, fooTableParams, df1)
696698

697-
val snapshotDF1 = spark.read.format("org.apache.hudi")
698-
.load(path.toAbsolutePath.toString + "/*/*/*/*")
699-
assertEquals(10, snapshotDF1.count())
700-
// remove metadata columns so that expected and actual DFs can be compared as is
701-
val trimmedDf1 = dropMetaFields(snapshotDF1)
702-
assert(df1.except(trimmedDf1).count() == 0)
703-
704-
// issue updates so that log files are created for MOR table
705-
var updatesSeq = convertRowListToSeq(DataSourceTestUtils.generateUpdates(records, 5))
706-
var updatesDf = spark.createDataFrame(sc.parallelize(updatesSeq), structType)
707-
// write updates to Hudi
708-
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, updatesDf)
709-
val snapshotDF2 = spark.read.format("org.apache.hudi")
710-
.load(path.toAbsolutePath.toString + "/*/*/*/*")
711-
assertEquals(10, snapshotDF2.count())
699+
val snapshotDF1 = spark.read.format("org.apache.hudi")
700+
.load(path.toAbsolutePath.toString + "/*/*/*/*")
701+
assertEquals(10, snapshotDF1.count())
702+
// remove metadata columns so that expected and actual DFs can be compared as is
703+
val trimmedDf1 = dropMetaFields(snapshotDF1)
704+
assert(df1.except(trimmedDf1).count() == 0)
712705

713-
// remove metadata columns so that expected and actual DFs can be compared as is
714-
val trimmedDf2 = dropMetaFields(snapshotDF2)
715-
// ensure 2nd batch of updates matches.
716-
assert(updatesDf.intersect(trimmedDf2).except(updatesDf).count() == 0)
717-
718-
// delete partitions
719-
val recordsToDelete = df1.filter(entry => {
720-
val partitionPath : String = entry.getString(1)
721-
partitionPath.equals(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) || partitionPath.equals(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)
722-
})
723-
val updatedParams = fooTableParams.updated(DataSourceWriteOptions.OPERATION.key(), WriteOperationType.DELETE_PARTITION.name())
724-
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, updatedParams, recordsToDelete)
725-
726-
val snapshotDF3 = spark.read.format("org.apache.hudi")
727-
.load(path.toAbsolutePath.toString + "/*/*/*/*")
728-
assertEquals(0, snapshotDF3.filter(entry => {
729-
val partitionPath = entry.getString(3)
730-
!partitionPath.equals(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH)
731-
}).count())
732-
} finally {
733-
spark.stop()
734-
FileUtils.deleteDirectory(path.toFile)
735-
}
736-
}
706+
// issue updates so that log files are created for MOR table
707+
var updatesSeq = convertRowListToSeq(DataSourceTestUtils.generateUpdates(records, 5))
708+
var updatesDf = spark.createDataFrame(sc.parallelize(updatesSeq), structType)
709+
// write updates to Hudi
710+
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, updatesDf)
711+
val snapshotDF2 = spark.read.format("org.apache.hudi")
712+
.load(path.toAbsolutePath.toString + "/*/*/*/*")
713+
assertEquals(10, snapshotDF2.count())
714+
715+
// remove metadata columns so that expected and actual DFs can be compared as is
716+
val trimmedDf2 = dropMetaFields(snapshotDF2)
717+
// ensure 2nd batch of updates matches.
718+
assert(updatesDf.intersect(trimmedDf2).except(updatesDf).count() == 0)
719+
720+
if ( usePartitionsToDeleteConfig) {
721+
fooTableParams.updated(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key(), HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
722+
}
723+
// delete partitions contains the primary key
724+
val recordsToDelete = df1.filter(entry => {
725+
val partitionPath : String = entry.getString(1)
726+
partitionPath.equals(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) ||
727+
partitionPath.equals(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)
728+
})
729+
val updatedParams = fooTableParams.updated(DataSourceWriteOptions.OPERATION.key(), WriteOperationType.DELETE_PARTITION.name())
730+
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, updatedParams, recordsToDelete)
731+
732+
val snapshotDF3 = spark.read.format("org.apache.hudi")
733+
.load(path.toAbsolutePath.toString + "/*/*/*/*")
734+
assertEquals(0, snapshotDF3.filter(entry => {
735+
val partitionPath = entry.getString(3)
736+
!partitionPath.equals(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH)
737+
}).count())
738+
} finally {
739+
spark.stop()
740+
FileUtils.deleteDirectory(path.toFile)
741+
}
742+
}
743+
})
737744

738745
def dropMetaFields(df: Dataset[Row]) : Dataset[Row] = {
739746
df.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))

style/scalastyle.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
<check level="error" class="org.scalastyle.file.FileTabChecker" enabled="true"/>
2828
<check level="error" class="org.scalastyle.file.FileLengthChecker" enabled="true">
2929
<parameters>
30-
<parameter name="maxFileLength"><![CDATA[800]]></parameter>
30+
<parameter name="maxFileLength"><![CDATA[900]]></parameter>
3131
</parameters>
3232
</check>
3333
<check level="error" class="org.scalastyle.scalariform.SpacesAfterPlusChecker" enabled="true"/>

0 commit comments

Comments
 (0)