-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-1526] Translate the api partitionBy to hoodie.datasource.write.partitionpath.field #2431
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b3484c2
a1ea754
518b5df
0c7e5b8
90389a0
83c7139
371028d
e0ac169
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,9 +23,11 @@ import org.apache.hudi.common.model.WriteOperationType | |
| import org.apache.hudi.config.HoodieWriteConfig | ||
| import org.apache.hudi.hive.HiveSyncTool | ||
| import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor | ||
| import org.apache.hudi.keygen.SimpleKeyGenerator | ||
| import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.Config | ||
| import org.apache.hudi.keygen.{CustomKeyGenerator, SimpleKeyGenerator} | ||
| import org.apache.hudi.keygen.constant.KeyGeneratorOptions | ||
| import org.apache.log4j.LogManager | ||
| import org.apache.spark.sql.execution.datasources.{DataSourceUtils => SparkDataSourceUtils} | ||
|
|
||
| /** | ||
| * List of options that can be passed to the Hoodie datasource, | ||
|
|
@@ -192,6 +194,42 @@ object DataSourceWriteOptions { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Translate spark parameters to hudi parameters | ||
| * | ||
| * @param optParams Parameters to be translated | ||
| * @return Parameters after translation | ||
| */ | ||
| def translateSqlOptions(optParams: Map[String, String]): Map[String, String] = { | ||
| var translatedOptParams = optParams | ||
| // translate the api partitionBy of spark DataFrameWriter to PARTITIONPATH_FIELD_OPT_KEY | ||
| if (optParams.contains(SparkDataSourceUtils.PARTITIONING_COLUMNS_KEY)) { | ||
| val partitionColumns = optParams.get(SparkDataSourceUtils.PARTITIONING_COLUMNS_KEY) | ||
| .map(SparkDataSourceUtils.decodePartitioningColumns) | ||
| .getOrElse(Nil) | ||
| val keyGeneratorClass = optParams.getOrElse(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, | ||
| DataSourceWriteOptions.DEFAULT_KEYGENERATOR_CLASS_OPT_VAL) | ||
|
|
||
| val partitionPathField = | ||
| keyGeneratorClass match { | ||
| // Only CustomKeyGenerator needs special treatment, because it needs to be specified in a way | ||
| // such as "field1:PartitionKeyType1,field2:PartitionKeyType2". | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice. |
||
| // partitionBy can specify the partition like this: partitionBy("p1", "p2:SIMPLE", "p3:TIMESTAMP") | ||
| case c if c == classOf[CustomKeyGenerator].getName => | ||
| partitionColumns.map(e => { | ||
| if (e.contains(":")) { | ||
| e | ||
| } else { | ||
| s"$e:SIMPLE" | ||
| } | ||
| }).mkString(",") | ||
| case _ => | ||
| partitionColumns.mkString(",") | ||
| } | ||
| translatedOptParams = optParams ++ Map(PARTITIONPATH_FIELD_OPT_KEY -> partitionPathField) | ||
| } | ||
| translatedOptParams | ||
| } | ||
|
|
||
| /** | ||
| * Hive table name, to register the table into. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,12 +25,16 @@ import org.apache.hudi.common.table.timeline.HoodieInstant | |
| import org.apache.hudi.common.testutils.HoodieTestDataGenerator | ||
| import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings | ||
| import org.apache.hudi.config.HoodieWriteConfig | ||
| import org.apache.hudi.keygen._ | ||
| import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.Config | ||
| import org.apache.hudi.testutils.HoodieClientTestBase | ||
| import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} | ||
| import org.apache.spark.sql._ | ||
| import org.apache.spark.sql.functions.{col, lit} | ||
| import org.apache.spark.sql.types.{DataTypes, DateType, IntegerType, StringType, StructField, StructType, TimestampType} | ||
| import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} | ||
| import org.apache.spark.sql.functions.{col, concat, lit, udf} | ||
| import org.apache.spark.sql.types._ | ||
| import org.joda.time.DateTime | ||
| import org.joda.time.format.DateTimeFormat | ||
| import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue, fail} | ||
| import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} | ||
| import org.junit.jupiter.params.ParameterizedTest | ||
| import org.junit.jupiter.params.provider.ValueSource | ||
|
|
@@ -348,4 +352,151 @@ class TestCOWDataSource extends HoodieClientTestBase { | |
|
|
||
| assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) | ||
| } | ||
|
|
||
| private def getDataFrameWriter(keyGenerator: String): DataFrameWriter[Row] = { | ||
| val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList | ||
| val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2)) | ||
|
|
||
| inputDF.write.format("hudi") | ||
| .options(commonOpts) | ||
| .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, keyGenerator) | ||
| .mode(SaveMode.Overwrite) | ||
| } | ||
|
|
||
| @Test def testSparkPartitonByWithCustomKeyGenerator(): Unit = { | ||
| // Without fieldType, the default is SIMPLE | ||
| var writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName) | ||
| writer.partitionBy("current_ts") | ||
| .save(basePath) | ||
|
|
||
| var recordsReadDF = spark.read.format("org.apache.hudi") | ||
| .load(basePath + "/*/*") | ||
|
|
||
| assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= col("current_ts").cast("string")).count() == 0) | ||
|
|
||
| // Specify fieldType as TIMESTAMP | ||
| writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName) | ||
| writer.partitionBy("current_ts:TIMESTAMP") | ||
| .option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS") | ||
| .option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd") | ||
| .save(basePath) | ||
|
|
||
| recordsReadDF = spark.read.format("org.apache.hudi") | ||
| .load(basePath + "/*/*") | ||
|
|
||
| val udf_date_format = udf((data: Long) => new DateTime(data).toString(DateTimeFormat.forPattern("yyyyMMdd"))) | ||
| assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= udf_date_format(col("current_ts"))).count() == 0) | ||
|
|
||
| // Mixed fieldType | ||
| writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName) | ||
| writer.partitionBy("driver", "rider:SIMPLE", "current_ts:TIMESTAMP") | ||
| .option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS") | ||
| .option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd") | ||
| .save(basePath) | ||
|
|
||
| recordsReadDF = spark.read.format("org.apache.hudi") | ||
| .load(basePath + "/*/*/*") | ||
| assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= | ||
| concat(col("driver"), lit("/"), col("rider"), lit("/"), udf_date_format(col("current_ts")))).count() == 0) | ||
|
|
||
| // Test invalid partitionKeyType | ||
| writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName) | ||
| writer = writer.partitionBy("current_ts:DUMMY") | ||
| .option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS") | ||
| .option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd") | ||
| try { | ||
| writer.save(basePath) | ||
| fail("should fail when invalid PartitionKeyType is provided!") | ||
| } catch { | ||
| case e: Exception => | ||
| assertTrue(e.getMessage.contains("No enum constant org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType.DUMMY")) | ||
| } | ||
| } | ||
|
|
||
| @Test def testSparkPartitonByWithSimpleKeyGenerator() { | ||
| // Use the `driver` field as the partition key | ||
| var writer = getDataFrameWriter(classOf[SimpleKeyGenerator].getName) | ||
| writer.partitionBy("driver") | ||
| .save(basePath) | ||
|
|
||
| var recordsReadDF = spark.read.format("org.apache.hudi") | ||
| .load(basePath + "/*/*") | ||
|
|
||
| assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= col("driver")).count() == 0) | ||
|
|
||
| // Use the `driver,rider` field as the partition key, If no such field exists, the default value `default` is used | ||
| writer = getDataFrameWriter(classOf[SimpleKeyGenerator].getName) | ||
| writer.partitionBy("driver", "rider") | ||
nsivabalan marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| .save(basePath) | ||
|
|
||
| recordsReadDF = spark.read.format("org.apache.hudi") | ||
| .load(basePath + "/*/*") | ||
|
|
||
| assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= lit("default")).count() == 0) | ||
| } | ||
|
|
||
| @Test def testSparkPartitonByWithComplexKeyGenerator() { | ||
| // Use the `driver` field as the partition key | ||
| var writer = getDataFrameWriter(classOf[ComplexKeyGenerator].getName) | ||
| writer.partitionBy("driver") | ||
| .save(basePath) | ||
|
|
||
| var recordsReadDF = spark.read.format("org.apache.hudi") | ||
| .load(basePath + "/*/*") | ||
|
|
||
| assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= col("driver")).count() == 0) | ||
|
|
||
| // Use the `driver`,`rider` field as the partition key | ||
| writer = getDataFrameWriter(classOf[ComplexKeyGenerator].getName) | ||
| writer.partitionBy("driver", "rider") | ||
| .save(basePath) | ||
|
|
||
| recordsReadDF = spark.read.format("org.apache.hudi") | ||
| .load(basePath + "/*/*") | ||
|
|
||
| assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= concat(col("driver"), lit("/"), col("rider"))).count() == 0) | ||
| } | ||
|
|
||
| @Test def testSparkPartitonByWithTimestampBasedKeyGenerator() { | ||
| val writer = getDataFrameWriter(classOf[TimestampBasedKeyGenerator].getName) | ||
|
||
| writer.partitionBy("current_ts") | ||
| .option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS") | ||
| .option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd") | ||
| .save(basePath) | ||
|
|
||
| val recordsReadDF = spark.read.format("org.apache.hudi") | ||
| .load(basePath + "/*/*") | ||
| val udf_date_format = udf((data: Long) => new DateTime(data).toString(DateTimeFormat.forPattern("yyyyMMdd"))) | ||
vinothchandar marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= udf_date_format(col("current_ts"))).count() == 0) | ||
| } | ||
|
|
||
| @Test def testSparkPartitonByWithGlobalDeleteKeyGenerator() { | ||
| val writer = getDataFrameWriter(classOf[GlobalDeleteKeyGenerator].getName) | ||
| writer.partitionBy("driver") | ||
| .save(basePath) | ||
|
|
||
| val recordsReadDF = spark.read.format("org.apache.hudi") | ||
| .load(basePath + "/*") | ||
| assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= lit("")).count() == 0) | ||
| } | ||
|
|
||
| @Test def testSparkPartitonByWithNonpartitionedKeyGenerator() { | ||
| // Empty string column | ||
| var writer = getDataFrameWriter(classOf[NonpartitionedKeyGenerator].getName) | ||
| writer.partitionBy("") | ||
| .save(basePath) | ||
|
|
||
| var recordsReadDF = spark.read.format("org.apache.hudi") | ||
| .load(basePath + "/*") | ||
| assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= lit("")).count() == 0) | ||
|
|
||
| // Non-existent column | ||
| writer = getDataFrameWriter(classOf[NonpartitionedKeyGenerator].getName) | ||
| writer.partitionBy("abc") | ||
| .save(basePath) | ||
|
|
||
| recordsReadDF = spark.read.format("org.apache.hudi") | ||
| .load(basePath + "/*") | ||
| assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= lit("")).count() == 0) | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.