diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index 2fc7fb017ad5e..be1ad8e9b8a5d 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -154,12 +154,15 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig { schemaWithoutMetaFields: Seq[StructField], conf: SQLConf): Seq[Alias] = { queryOutputWithoutMetaFields.zip(schemaWithoutMetaFields).map { case (dataAttr, dataField) => - val targetFieldOption = if (dataAttr.name.startsWith("col")) None else - schemaWithoutMetaFields.find(_.name.equals(dataAttr.name)) - val targetField = if (targetFieldOption.isDefined) targetFieldOption.get else dataField - val castAttr = castIfNeeded(dataAttr.withNullability(targetField.nullable), - targetField.dataType, conf) - Alias(castAttr, targetField.name)() + val targetAttrOption = if (dataAttr.name.startsWith("col")) { + None + } else { + queryOutputWithoutMetaFields.find(_.name.equals(dataField.name)) + } + val targetAttr = targetAttrOption.getOrElse(dataAttr) + val castAttr = castIfNeeded(targetAttr.withNullability(dataField.nullable), + dataField.dataType, conf) + Alias(castAttr, dataField.name)() } } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index 9aa3c509c3dab..8d21fe32eadba 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -93,6 +93,14 @@ class TestInsertTable extends HoodieSparkSqlTestBase { | insert into $tableName partition(dt = '2021-01-06') | select 20 as price, 2000 as ts, 2 as id, 'a2' as name """.stripMargin) + // should not mess with the original order after write the out-of-order data. + val metaClient = HoodieTableMetaClient.builder() + .setBasePath(tmp.getCanonicalPath) + .setConf(spark.sessionState.newHadoopConf()) + .build() + val schema = HoodieSqlCommonUtils.getTableSqlSchema(metaClient).get + assert(schema.getFieldIndex("id").contains(0)) + assert(schema.getFieldIndex("price").contains(2)) // Note: Do not write the field alias, the partition field must be placed last. spark.sql( @@ -133,6 +141,14 @@ class TestInsertTable extends HoodieSparkSqlTestBase { | insert into $tableName partition(dt) | select 1 as id, '2021-01-05' as dt, 'a1' as name, 10 as price, 1000 as ts """.stripMargin) + // should not mess with the original order after write the out-of-order data. + val metaClient = HoodieTableMetaClient.builder() + .setBasePath(tmp.getCanonicalPath) + .setConf(spark.sessionState.newHadoopConf()) + .build() + val schema = HoodieSqlCommonUtils.getTableSqlSchema(metaClient).get + assert(schema.getFieldIndex("id").contains(0)) + assert(schema.getFieldIndex("price").contains(2)) spark.sql( s"""