From 503f96d8568fadb5bac15cfeb7653f7c2f1db466 Mon Sep 17 00:00:00 2001 From: voonhous Date: Fri, 24 Mar 2023 14:29:46 +0800 Subject: [PATCH 1/6] [HUDI-5977] Fix Date to String column schema evolution causing table to become unreadable when non-vectorized readers are used --- .../apache/spark/sql/hudi/TestSpark3DDL.scala | 38 +++++++++++++++++++ .../Spark24HoodieParquetFileFormat.scala | 6 ++- .../Spark31HoodieParquetFileFormat.scala | 6 ++- .../Spark32PlusHoodieParquetFileFormat.scala | 6 ++- 4 files changed, 53 insertions(+), 3 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala index de77b07fd83fe..866dee1688a52 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala @@ -734,5 +734,43 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { } } } + + test("Test DATE to STRING conversions when vectorized reading is not enabled") { + val tableName = generateTableName + // adding a struct column to force reads to use non-vectorized readers + spark.sql( + s""" + | create table $tableName ( + | id int, + | name string, + | price double, + | struct_col struct, + | ts long + |) using hudi + | partitioned by (ts) + |tblproperties ( + | primaryKey = 'id' + ) + """.stripMargin) + spark.sql( + s""" + | insert into $tableName + | values (1, 'a1', 10, struct(1, 'f_1'), 1000) + """.stripMargin) + spark.sql(s"select * from $tableName") + + spark.sql("set hoodie.schema.on.read.enable=true") + spark.sql(s"alter table $tableName add column (`date_to_string_col` date)") + spark.sql( + s""" + | insert into $tableName + | values (2, 'a2', 20, struct(2, 'f_2'), date '2023-03-22', 1001) + """.stripMargin) + spark.sql(s"alter table $tableName alter column `date_to_string_col` type string") + + // struct and string (converted from date) column must be read to ensure that non-vectorized reader is used + // not checking results as we just need to ensure that the table can be read without any errors thrown + spark.sql(s"select * from $tableName") + } } } diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala index 1a8585b38aa90..3aec4ef111b1f 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala @@ -107,6 +107,7 @@ class Spark24HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold val isCaseSensitive = sqlConf.caseSensitiveAnalysis + val timeZoneId = Option(sqlConf.sessionLocalTimeZone) (file: PartitionedFile) => { assert(!shouldAppendPartitionValues || file.partitionValues.numFields == partitionSchema.size) @@ -238,7 +239,10 @@ class Spark24HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo }).toAttributes ++ partitionSchema.toAttributes val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) => if (implicitTypeChangeInfos.containsKey(i)) { - Cast(attr, implicitTypeChangeInfos.get(i).getLeft) + val srcType = typeChangeInfos.get(i).getRight + val dstType = typeChangeInfos.get(i).getLeft + val needTimeZone = Cast.needsTimeZone(srcType, dstType) + Cast(attr, dstType, if (needTimeZone) timeZoneId else None) } else attr } GenerateUnsafeProjection.generate(castSchema, newFullSchema) diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala index 9edd1321b1242..a90d36a02de77 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala @@ -130,6 +130,7 @@ class Spark31HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold val isCaseSensitive = sqlConf.caseSensitiveAnalysis + val timeZoneId = Option(sqlConf.sessionLocalTimeZone) (file: PartitionedFile) => { assert(!shouldAppendPartitionValues || file.partitionValues.numFields == partitionSchema.size) @@ -319,7 +320,10 @@ class Spark31HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo }).toAttributes ++ partitionSchema.toAttributes val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) => if (typeChangeInfos.containsKey(i)) { - Cast(attr, typeChangeInfos.get(i).getLeft) + val srcType = typeChangeInfos.get(i).getRight + val dstType = typeChangeInfos.get(i).getLeft + val needTimeZone = Cast.needsTimeZone(srcType, dstType) + Cast(attr, dstType, if (needTimeZone) timeZoneId else None) } else attr } GenerateUnsafeProjection.generate(castSchema, newFullSchema) diff --git a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieParquetFileFormat.scala index ae686d33a31b9..112f55cfc16a7 100644 --- a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieParquetFileFormat.scala @@ -132,6 +132,7 @@ class Spark32PlusHoodieParquetFileFormat(private val shouldAppendPartitionValues val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf) val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead + val timeZoneId = Option(sqlConf.sessionLocalTimeZone) (file: PartitionedFile) => { assert(!shouldAppendPartitionValues || file.partitionValues.numFields == partitionSchema.size) @@ -374,7 +375,10 @@ class Spark32PlusHoodieParquetFileFormat(private val shouldAppendPartitionValues }).toAttributes ++ partitionSchema.toAttributes val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) => if (typeChangeInfos.containsKey(i)) { - Cast(attr, typeChangeInfos.get(i).getLeft) + val srcType = typeChangeInfos.get(i).getRight + val dstType = typeChangeInfos.get(i).getLeft + val needTimeZone = Cast.needsTimeZone(srcType, dstType) + Cast(attr, dstType, if (needTimeZone) timeZoneId else None) } else attr } GenerateUnsafeProjection.generate(castSchema, newFullSchema) From 54fd6e37af699de1add3d92b57b6a3437623feb3 Mon Sep 17 00:00:00 2001 From: voonhous Date: Fri, 24 Mar 2023 17:11:43 +0800 Subject: [PATCH 2/6] Fix tests --- .../datasources/parquet/Spark24HoodieParquetFileFormat.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala index 3aec4ef111b1f..c168911302eef 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24HoodieParquetFileFormat.scala @@ -239,8 +239,8 @@ class Spark24HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo }).toAttributes ++ partitionSchema.toAttributes val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) => if (implicitTypeChangeInfos.containsKey(i)) { - val srcType = typeChangeInfos.get(i).getRight - val dstType = typeChangeInfos.get(i).getLeft + val srcType = implicitTypeChangeInfos.get(i).getRight + val dstType = implicitTypeChangeInfos.get(i).getLeft val needTimeZone = Cast.needsTimeZone(srcType, dstType) Cast(attr, dstType, if (needTimeZone) timeZoneId else None) } else attr From 023778950aac15d0a5bcd57f5da2a5d7ffa2971f Mon Sep 17 00:00:00 2001 From: voonhous Date: Fri, 24 Mar 2023 20:17:13 +0800 Subject: [PATCH 3/6] Fix tests 2 --- .../apache/spark/sql/hudi/TestSpark3DDL.scala | 74 +++++++++---------- 1 file changed, 37 insertions(+), 37 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala index 866dee1688a52..e8f5e415cafb1 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala @@ -734,43 +734,43 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { } } } + } - test("Test DATE to STRING conversions when vectorized reading is not enabled") { - val tableName = generateTableName - // adding a struct column to force reads to use non-vectorized readers - spark.sql( - s""" - | create table $tableName ( - | id int, - | name string, - | price double, - | struct_col struct, - | ts long - |) using hudi - | partitioned by (ts) - |tblproperties ( - | primaryKey = 'id' - ) - """.stripMargin) - spark.sql( - s""" - | insert into $tableName - | values (1, 'a1', 10, struct(1, 'f_1'), 1000) - """.stripMargin) - spark.sql(s"select * from $tableName") - - spark.sql("set hoodie.schema.on.read.enable=true") - spark.sql(s"alter table $tableName add column (`date_to_string_col` date)") - spark.sql( - s""" - | insert into $tableName - | values (2, 'a2', 20, struct(2, 'f_2'), date '2023-03-22', 1001) - """.stripMargin) - spark.sql(s"alter table $tableName alter column `date_to_string_col` type string") - - // struct and string (converted from date) column must be read to ensure that non-vectorized reader is used - // not checking results as we just need to ensure that the table can be read without any errors thrown - spark.sql(s"select * from $tableName") - } + test("Test DATE to STRING conversions when vectorized reading is not enabled") { + val tableName = generateTableName + // adding a struct column to force reads to use non-vectorized readers + spark.sql( + s""" + | create table $tableName ( + | id int, + | name string, + | price double, + | struct_col struct, + | ts long + |) using hudi + | partitioned by (ts) + |tblproperties ( + | primaryKey = 'id' + ) + """.stripMargin) + spark.sql( + s""" + | insert into $tableName + | values (1, 'a1', 10, struct(1, 'f_1'), 1000) + """.stripMargin) + spark.sql(s"select * from $tableName") + + spark.sql("set hoodie.schema.on.read.enable=true") + spark.sql(s"alter table $tableName add column (`date_to_string_col` date)") + spark.sql( + s""" + | insert into $tableName + | values (2, 'a2', 20, struct(2, 'f_2'), date '2023-03-22', 1001) + """.stripMargin) + spark.sql(s"alter table $tableName alter column `date_to_string_col` type string") + + // struct and string (converted from date) column must be read to ensure that non-vectorized reader is used + // not checking results as we just need to ensure that the table can be read without any errors thrown + spark.sql(s"select * from $tableName") } } From 2b273f906891d2e4e9fea23c148eb524ae1c667e Mon Sep 17 00:00:00 2001 From: voonhous Date: Sat, 25 Mar 2023 00:58:21 +0800 Subject: [PATCH 4/6] Fix tests 3 --- .../test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala index e8f5e415cafb1..1964f7f42673e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala @@ -761,7 +761,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { spark.sql(s"select * from $tableName") spark.sql("set hoodie.schema.on.read.enable=true") - spark.sql(s"alter table $tableName add column (`date_to_string_col` date)") + spark.sql(s"alter table $tableName add columns(`date_to_string_col` date)") spark.sql( s""" | insert into $tableName From 7dfa9a09b36fe2f9af728365843cd89877994cb9 Mon Sep 17 00:00:00 2001 From: voonhous Date: Sat, 25 Mar 2023 09:52:42 +0800 Subject: [PATCH 5/6] Fix tests 4 --- .../apache/spark/sql/hudi/TestSpark3DDL.scala | 76 ++++++++++--------- 1 file changed, 42 insertions(+), 34 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala index 1964f7f42673e..96d6a839450a9 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala @@ -737,40 +737,48 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { } test("Test DATE to STRING conversions when vectorized reading is not enabled") { - val tableName = generateTableName - // adding a struct column to force reads to use non-vectorized readers - spark.sql( - s""" - | create table $tableName ( - | id int, - | name string, - | price double, - | struct_col struct, - | ts long - |) using hudi - | partitioned by (ts) - |tblproperties ( - | primaryKey = 'id' - ) - """.stripMargin) - spark.sql( - s""" - | insert into $tableName - | values (1, 'a1', 10, struct(1, 'f_1'), 1000) - """.stripMargin) - spark.sql(s"select * from $tableName") + withTempDir { tmp => + Seq("COPY_ON_WRITE", "MERGE_ON_READ").foreach { tableType => + val tableName = generateTableName + val tablePath = s"${new Path(tmp.getCanonicalPath, tableName).toUri.toString}" + // adding a struct column to force reads to use non-vectorized readers + spark.sql( + s""" + | create table $tableName ( + | id int, + | name string, + | price double, + | struct_col struct, + | ts long + |) using hudi + | location '$tablePath' + | options ( + | type = '$tableType', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + | partitioned by (ts) + """.stripMargin) + spark.sql( + s""" + | insert into $tableName + | values (1, 'a1', 10, struct(1, 'f_1'), 1000) + """.stripMargin) + spark.sql(s"select * from $tableName") - spark.sql("set hoodie.schema.on.read.enable=true") - spark.sql(s"alter table $tableName add columns(`date_to_string_col` date)") - spark.sql( - s""" - | insert into $tableName - | values (2, 'a2', 20, struct(2, 'f_2'), date '2023-03-22', 1001) - """.stripMargin) - spark.sql(s"alter table $tableName alter column `date_to_string_col` type string") - - // struct and string (converted from date) column must be read to ensure that non-vectorized reader is used - // not checking results as we just need to ensure that the table can be read without any errors thrown - spark.sql(s"select * from $tableName") + spark.sql("set hoodie.schema.on.read.enable=true") + spark.sql(s"alter table $tableName add columns(date_to_string_col date)") + spark.sql( + s""" + | insert into $tableName + | values (2, 'a2', 20, struct(2, 'f_2'), date '2023-03-22', 1001) + """.stripMargin) + spark.sql(s"alter table $tableName alter column date_to_string_col type string") + + // struct and string (converted from date) column must be read to ensure that non-vectorized reader is used + // not checking results as we just need to ensure that the table can be read without any errors thrown + spark.sql(s"select * from $tableName") + } + } } } From 90db3447a020728c0fc12b3714cd018482b89d1d Mon Sep 17 00:00:00 2001 From: voonhous Date: Sat, 25 Mar 2023 10:32:36 +0800 Subject: [PATCH 6/6] Fix tests 5 --- .../apache/spark/sql/hudi/TestSpark3DDL.scala | 74 ++++++++++--------- 1 file changed, 38 insertions(+), 36 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala index 96d6a839450a9..7b1cf43b7396c 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala @@ -741,43 +741,45 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { Seq("COPY_ON_WRITE", "MERGE_ON_READ").foreach { tableType => val tableName = generateTableName val tablePath = s"${new Path(tmp.getCanonicalPath, tableName).toUri.toString}" - // adding a struct column to force reads to use non-vectorized readers - spark.sql( - s""" - | create table $tableName ( - | id int, - | name string, - | price double, - | struct_col struct, - | ts long - |) using hudi - | location '$tablePath' - | options ( - | type = '$tableType', - | primaryKey = 'id', - | preCombineField = 'ts' - | ) - | partitioned by (ts) - """.stripMargin) - spark.sql( - s""" - | insert into $tableName - | values (1, 'a1', 10, struct(1, 'f_1'), 1000) - """.stripMargin) - spark.sql(s"select * from $tableName") + if (HoodieSparkUtils.gteqSpark3_1) { + // adding a struct column to force reads to use non-vectorized readers + spark.sql( + s""" + | create table $tableName ( + | id int, + | name string, + | price double, + | struct_col struct, + | ts long + |) using hudi + | location '$tablePath' + | options ( + | type = '$tableType', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + | partitioned by (ts) + """.stripMargin) + spark.sql( + s""" + | insert into $tableName + | values (1, 'a1', 10, struct(1, 'f_1'), 1000) + """.stripMargin) + spark.sql(s"select * from $tableName") - spark.sql("set hoodie.schema.on.read.enable=true") - spark.sql(s"alter table $tableName add columns(date_to_string_col date)") - spark.sql( - s""" - | insert into $tableName - | values (2, 'a2', 20, struct(2, 'f_2'), date '2023-03-22', 1001) - """.stripMargin) - spark.sql(s"alter table $tableName alter column date_to_string_col type string") - - // struct and string (converted from date) column must be read to ensure that non-vectorized reader is used - // not checking results as we just need to ensure that the table can be read without any errors thrown - spark.sql(s"select * from $tableName") + spark.sql("set hoodie.schema.on.read.enable=true") + spark.sql(s"alter table $tableName add columns(date_to_string_col date)") + spark.sql( + s""" + | insert into $tableName + | values (2, 'a2', 20, struct(2, 'f_2'), date '2023-03-22', 1001) + """.stripMargin) + spark.sql(s"alter table $tableName alter column date_to_string_col type string") + + // struct and string (converted from date) column must be read to ensure that non-vectorized reader is used + // not checking results as we just need to ensure that the table can be read without any errors thrown + spark.sql(s"select * from $tableName") + } } } }