diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala index 24c6e21df93a..8a9486ea45dd 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hudi import org.apache.hudi.DataSourceWriteOptions +import org.apache.hudi.avro.HoodieAvroUtils.getRootLevelFieldName import org.apache.hudi.common.model.DefaultHoodieRecordPayload import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.common.util.ValidationUtils @@ -198,14 +199,14 @@ object HoodieOptionConfig { .map(_.split(",").filter(_.length > 0)) ValidationUtils.checkArgument(primaryKeys.nonEmpty, "No `primaryKey` is specified.") primaryKeys.get.foreach { primaryKey => - ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, primaryKey)), + ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, getRootLevelFieldName(primaryKey))), s"Can't find primaryKey `$primaryKey` in ${schema.treeString}.") } // validate preCombine key val preCombineKey = sqlOptions.get(SQL_KEY_PRECOMBINE_FIELD.sqlKeyName) if (preCombineKey.isDefined && preCombineKey.get.nonEmpty) { - ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, preCombineKey.get)), + ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, getRootLevelFieldName(preCombineKey.get))), s"Can't find preCombineKey `${preCombineKey.get}` in ${schema.treeString}.") } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index ab75ef563f22..760d1269c2e4 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -663,4 +663,37 @@ class TestInsertTable extends HoodieSparkSqlTestBase { } } } + + test("Test nested field as primaryKey and preCombineField") { + withTempDir { tmp => + Seq("cow", "mor").foreach { tableType => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | name string, + | price double, + | ts long, + | nestedcol struct>> + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | options ( + | type = '$tableType', + | primaryKey = 'nestedcol.a1', + | preCombineField = 'nestedcol.a2.b2.c2' + | ) + """.stripMargin) + // insert data to table + spark.sql( + s"""insert into $tableName values + |('name_1', 10, 1000, struct('a', struct('b', struct('c', 999)))), + |('name_2', 20, 2000, struct('a', struct('b', struct('c', 333)))) + |""".stripMargin) + checkAnswer(s"select name, price, ts, nestedcol.a1, nestedcol.a2.b2.c2 from $tableName")( + Seq("name_1", 10.0, 1000, "a", 999) + ) + } + } + } }