diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala index de77b07fd83fe..7b1cf43b7396c 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala @@ -735,4 +735,52 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { } } } + + test("Test DATE to STRING conversions when vectorized reading is not enabled") { + withTempDir { tmp => + Seq("COPY_ON_WRITE", "MERGE_ON_READ").foreach { tableType => + val tableName = generateTableName + val tablePath = s"${new Path(tmp.getCanonicalPath, tableName).toUri.toString}" + if (HoodieSparkUtils.gteqSpark3_1) { + // adding a struct column to force reads to use non-vectorized readers + spark.sql( + s""" + | create table $tableName ( + | id int, + | name string, + | price double, + | struct_col struct, + | ts long + |) using hudi + | location '$tablePath' + | options ( + | type = '$tableType', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + | partitioned by (ts) + """.stripMargin) + spark.sql( + s""" + | insert into $tableName + | values (1, 'a1', 10, struct(1, 'f_1'), 1000) + """.stripMargin) + spark.sql(s"select * from $tableName") + + spark.sql("set hoodie.schema.on.read.enable=true") + spark.sql(s"alter table $tableName add columns(date_to_string_col date)") + spark.sql( + s""" + | insert into $tableName + | values (2, 'a2', 20, struct(2, 'f_2'), date '2023-03-22', 1001) + """.stripMargin) + spark.sql(s"alter table $tableName alter column date_to_string_col type string") + + // struct and string (converted from date) column must be read to ensure that non-vectorized reader is used + // not checking results as we just need to ensure that the table can be read without any errors thrown + spark.sql(s"select * from $tableName") + } + } + } + } } diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala index 1a8585b38aa90..c168911302eef 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala @@ -107,6 +107,7 @@ class Spark24HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold val isCaseSensitive = sqlConf.caseSensitiveAnalysis + val timeZoneId = Option(sqlConf.sessionLocalTimeZone) (file: PartitionedFile) => { assert(!shouldAppendPartitionValues || file.partitionValues.numFields == partitionSchema.size) @@ -238,7 +239,10 @@ class Spark24HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo }).toAttributes ++ partitionSchema.toAttributes val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) => if (implicitTypeChangeInfos.containsKey(i)) { - Cast(attr, implicitTypeChangeInfos.get(i).getLeft) + val srcType = implicitTypeChangeInfos.get(i).getRight + val dstType = implicitTypeChangeInfos.get(i).getLeft + val needTimeZone = Cast.needsTimeZone(srcType, dstType) + Cast(attr, dstType, if (needTimeZone) timeZoneId else None) } else attr } GenerateUnsafeProjection.generate(castSchema, newFullSchema) diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala index 9edd1321b1242..a90d36a02de77 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala @@ -130,6 +130,7 @@ class Spark31HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold val isCaseSensitive = sqlConf.caseSensitiveAnalysis + val timeZoneId = Option(sqlConf.sessionLocalTimeZone) (file: PartitionedFile) => { assert(!shouldAppendPartitionValues || file.partitionValues.numFields == partitionSchema.size) @@ -319,7 +320,10 @@ class Spark31HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo }).toAttributes ++ partitionSchema.toAttributes val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) => if (typeChangeInfos.containsKey(i)) { - Cast(attr, typeChangeInfos.get(i).getLeft) + val srcType = typeChangeInfos.get(i).getRight + val dstType = typeChangeInfos.get(i).getLeft + val needTimeZone = Cast.needsTimeZone(srcType, dstType) + Cast(attr, dstType, if (needTimeZone) timeZoneId else None) } else attr } GenerateUnsafeProjection.generate(castSchema, newFullSchema) diff --git a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieParquetFileFormat.scala index ae686d33a31b9..112f55cfc16a7 100644 --- a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieParquetFileFormat.scala @@ -132,6 +132,7 @@ class Spark32PlusHoodieParquetFileFormat(private val shouldAppendPartitionValues val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf) val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead + val timeZoneId = Option(sqlConf.sessionLocalTimeZone) (file: PartitionedFile) => { assert(!shouldAppendPartitionValues || file.partitionValues.numFields == partitionSchema.size) @@ -374,7 +375,10 @@ class Spark32PlusHoodieParquetFileFormat(private val shouldAppendPartitionValues }).toAttributes ++ partitionSchema.toAttributes val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) => if (typeChangeInfos.containsKey(i)) { - Cast(attr, typeChangeInfos.get(i).getLeft) + val srcType = typeChangeInfos.get(i).getRight + val dstType = typeChangeInfos.get(i).getLeft + val needTimeZone = Cast.needsTimeZone(srcType, dstType) + Cast(attr, dstType, if (needTimeZone) timeZoneId else None) } else attr } GenerateUnsafeProjection.generate(castSchema, newFullSchema)