diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala index 177de90f335dc..11cc4959442c8 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala @@ -28,12 +28,15 @@ import org.apache.avro.Schema.Type._ import org.apache.avro.generic.GenericData.{Fixed, Record} import org.apache.avro.generic.{GenericData, GenericFixed, GenericRecord} import org.apache.avro.{LogicalTypes, Schema} + import org.apache.spark.sql.Row -import org.apache.spark.sql.avro.{IncompatibleSchemaException, SchemaConverters} +import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ + import org.apache.hudi.AvroConversionUtils._ +import org.apache.hudi.exception.HoodieIncompatibleSchemaException import scala.collection.JavaConverters._ @@ -131,7 +134,7 @@ object AvroConversionHelper { case null => new Timestamp(item.asInstanceOf[Long]) case other => - throw new IncompatibleSchemaException( + throw new HoodieIncompatibleSchemaException( s"Cannot convert Avro logical type $other to Catalyst Timestamp type.") } } @@ -149,7 +152,7 @@ object AvroConversionHelper { converters(i) = converter avroFieldIndexes(i) = avroField.pos() } else if (!sqlField.nullable) { - throw new IncompatibleSchemaException( + throw new HoodieIncompatibleSchemaException( s"Cannot find non-nullable field ${sqlField.name} at path ${path.mkString(".")} " + "in Avro schema\n" + s"Source Avro schema: $sourceAvroSchema.\n" + @@ -254,7 +257,7 @@ object AvroConversionHelper { converted(i) = fieldConverters(i)(item) new GenericRow(converted) } - case _ => throw new IncompatibleSchemaException( + case _ => throw new HoodieIncompatibleSchemaException( s"Cannot convert Avro schema to catalyst type because schema at path " + s"${path.mkString(".")} is not compatible " + s"(avroType = $other, sqlType = $sqlType). \n" + @@ -263,7 +266,7 @@ object AvroConversionHelper { } } case (left, right) => - throw new IncompatibleSchemaException( + throw new HoodieIncompatibleSchemaException( s"Cannot convert Avro schema to catalyst type because schema at path " + s"${path.mkString(".")} is not compatible (avroType = $left, sqlType = $right). \n" + s"Source Avro schema: $sourceAvroSchema.\n" + diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala index 2c59495b0d0d2..a93331ee20d48 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala @@ -87,4 +87,9 @@ trait SparkAdapter extends Serializable { * Create Like expression. */ def createLike(left: Expression, right: Expression): Expression + + /** + * ParserInterface#parseMultipartIdentifier is supported since spark3, for spark2 this should not be called. + */ + def parseMultipartIdentifier(parser: ParserInterface, sqlText: String): Seq[String] } diff --git a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieIncompatibleSchemaException.java b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieIncompatibleSchemaException.java new file mode 100644 index 0000000000000..579ae21d3ed99 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieIncompatibleSchemaException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.exception; + +/** + * Exception for incompatible schema. + */ +public class HoodieIncompatibleSchemaException extends Exception { + + public HoodieIncompatibleSchemaException(String msg, Throwable e) { + super(msg, e); + } + + public HoodieIncompatibleSchemaException(String msg) { + super(msg); + } +} diff --git a/hudi-spark-datasource/hudi-spark/pom.xml b/hudi-spark-datasource/hudi-spark/pom.xml index e7f5211567145..11ebeca39e963 100644 --- a/hudi-spark-datasource/hudi-spark/pom.xml +++ b/hudi-spark-datasource/hudi-spark/pom.xml @@ -503,6 +503,13 @@ test + + org.slf4j + slf4j-api + ${slf4j.version} + test + + org.apache.hadoop hadoop-hdfs @@ -524,6 +531,5 @@ - diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index 398377dc64479..44d39facdea67 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -28,7 +28,7 @@ import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.config.HoodieRealtimeConfig import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS import org.apache.spark.rdd.RDD -import org.apache.spark.sql.avro.{AvroDeserializer, AvroSerializer} +import org.apache.spark.sql.avro.{HoodieAvroSerializer, HooodieAvroDeserializer} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeProjection} import org.apache.spark.sql.execution.datasources.PartitionedFile @@ -119,7 +119,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, tableState.requiredStructSchema .map(f => tableAvroSchema.getField(f.name).pos()).toList private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema) - private val deserializer = new AvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema) + private val deserializer = HooodieAvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema) private val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema) private val logRecords = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config).getRecords private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala @@ -135,7 +135,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, } else { val requiredAvroRecord = AvroConversionUtils .buildAvroRecordBySchema(curAvroRecord.get(), requiredAvroSchema, requiredFieldPosition, recordBuilder) - recordToLoad = unsafeProjection(deserializer.deserialize(requiredAvroRecord).asInstanceOf[InternalRow]) + recordToLoad = unsafeProjection(deserializer.deserializeData(requiredAvroRecord).asInstanceOf[InternalRow]) true } } else { @@ -158,7 +158,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, tableState.requiredStructSchema .map(f => tableAvroSchema.getField(f.name).pos()).toList private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema) - private val deserializer = new AvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema) + private val deserializer = HooodieAvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema) private val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema) private val logRecords = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config).getRecords private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala @@ -180,7 +180,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, } else { val requiredAvroRecord = AvroConversionUtils .buildAvroRecordBySchema(curAvroRecord.get(), requiredAvroSchema, requiredFieldPosition, recordBuilder) - recordToLoad = unsafeProjection(deserializer.deserialize(requiredAvroRecord).asInstanceOf[InternalRow]) + recordToLoad = unsafeProjection(deserializer.deserializeData(requiredAvroRecord).asInstanceOf[InternalRow]) true } } else { @@ -203,8 +203,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, private val requiredFieldPosition = tableState.requiredStructSchema .map(f => tableAvroSchema.getField(f.name).pos()).toList - private val serializer = new AvroSerializer(tableState.tableStructSchema, tableAvroSchema, false) - private val requiredDeserializer = new AvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema) + private val serializer = HoodieAvroSerializer(tableState.tableStructSchema, tableAvroSchema, false) + private val requiredDeserializer = HooodieAvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema) private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema) private val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema) private val logRecords = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config).getRecords @@ -236,7 +236,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, recordBuilder ) recordToLoad = unsafeProjection(requiredDeserializer - .deserialize(requiredAvroRecord).asInstanceOf[InternalRow]) + .deserializeData(requiredAvroRecord).asInstanceOf[InternalRow]) true } } else { @@ -264,7 +264,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, recordBuilder ) recordToLoad = unsafeProjection(requiredDeserializer - .deserialize(requiredAvroRecord).asInstanceOf[InternalRow]) + .deserializeData(requiredAvroRecord).asInstanceOf[InternalRow]) true } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializer.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializer.scala new file mode 100644 index 0000000000000..b464c2dc5d611 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializer.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.avro + +import org.apache.avro.Schema +import org.apache.spark.sql.types.DataType + +/** + * As AvroSerializer cannot be access out of the spark.sql.avro package since spark 3.1, we define + * this class to be accessed by other class. + */ +case class HoodieAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean) + extends AvroSerializer(rootCatalystType, rootAvroType, nullable) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/avro/HooodieAvroDeserializer.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/avro/HooodieAvroDeserializer.scala new file mode 100644 index 0000000000000..ba911a7b3075c --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/avro/HooodieAvroDeserializer.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.avro + +import org.apache.avro.Schema +import org.apache.spark.sql.types.DataType + +/** + * This is to be compatible with the type returned by Spark 3.1 + * and other spark versions for AvroDeserializer + */ +case class HooodieAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) + extends AvroDeserializer(rootAvroType, rootCatalystType) { + + def deserializeData(data: Any): Any = { + super.deserialize(data) match { + case Some(r) => r // spark 3.1 return type is Option, we fetch the data. + case o => o // for other spark version, return the data directly. + } + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala index ea55127d4416a..b025cf3efa443 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala @@ -31,7 +31,7 @@ import org.apache.hudi.common.util.{ValidationUtils, Option => HOption} import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.io.HoodieWriteHandle import org.apache.hudi.sql.IExpressionEvaluator -import org.apache.spark.sql.avro.{AvroSerializer, SchemaConverters} +import org.apache.spark.sql.avro.{AvroSerializer, HoodieAvroSerializer, SchemaConverters} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.hudi.SerDeUtils import org.apache.spark.sql.hudi.command.payload.ExpressionPayload.getEvaluator @@ -310,7 +310,7 @@ object ExpressionPayload { val conditionEvaluator = ExpressionCodeGen.doCodeGen(Seq(condition), conditionSerializer) val assignSqlType = SchemaConverters.toSqlType(writeSchema).dataType.asInstanceOf[StructType] - val assignSerializer = new AvroSerializer(assignSqlType, writeSchema, false) + val assignSerializer = new HoodieAvroSerializer(assignSqlType, writeSchema, false) val assignmentEvaluator = ExpressionCodeGen.doCodeGen(assignments, assignSerializer) conditionEvaluator -> assignmentEvaluator } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/SqlTypedRecord.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/SqlTypedRecord.scala index 2a12e9227a15a..3fb48f430221a 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/SqlTypedRecord.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/SqlTypedRecord.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi.command.payload import org.apache.avro.generic.IndexedRecord import org.apache.avro.Schema -import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters} +import org.apache.spark.sql.avro.{HooodieAvroDeserializer, SchemaConverters} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types._ @@ -29,8 +29,8 @@ import org.apache.spark.sql.types._ class SqlTypedRecord(val record: IndexedRecord) extends IndexedRecord { private lazy val sqlType = SchemaConverters.toSqlType(getSchema).dataType.asInstanceOf[StructType] - private lazy val avroDeserializer = new AvroDeserializer(record.getSchema, sqlType) - private lazy val sqlRow = avroDeserializer.deserialize(record).asInstanceOf[InternalRow] + private lazy val avroDeserializer = HooodieAvroDeserializer(record.getSchema, sqlType) + private lazy val sqlRow = avroDeserializer.deserializeData(record).asInstanceOf[InternalRow] override def put(i: Int, v: Any): Unit = { record.put(i, v) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieCommonSqlParser.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieCommonSqlParser.scala index 4f410c7d67fb8..f830c515be782 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieCommonSqlParser.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieCommonSqlParser.scala @@ -62,7 +62,7 @@ class HoodieCommonSqlParser(session: SparkSession, delegate: ParserInterface) } def parseMultipartIdentifier(sqlText: String): Seq[String] = { - throw new UnsupportedOperationException(s"Unsupported parseMultipartIdentifier method") + sparkAdapter.parseMultipartIdentifier(delegate, sqlText) } protected def parse[T](command: String)(toResult: HoodieSqlCommonParser => T): T = { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala index 3d35c03e48dbc..8fc6e7f13a8f6 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala @@ -137,7 +137,8 @@ class TestDataSourceForBootstrap { verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = false, isHiveStylePartitioned = false) } - @Test def testMetadataBootstrapCOWHiveStylePartitioned(): Unit = { + @Test + def testMetadataBootstrapCOWHiveStylePartitioned(): Unit = { val timestamp = Instant.now.toEpochMilli val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala index 1315857ae5a37..b2ada77c21941 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala @@ -46,8 +46,8 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { // insert data spark.sql(s"""insert into $tableName values (1, "z3", "v1", "2021-10-01"), (2, "l4", "v1", "2021-10-02")""") - checkException(s"alter table $tableName drop partition (dt='2021-10-01')")( - s"dt is not a valid partition column in table `default`.`${tableName}`.;") + checkExceptionContain(s"alter table $tableName drop partition (dt='2021-10-01')")( + s"dt is not a valid partition column in table `default`.`$tableName`.") } Seq(false, true).foreach { urlencode => @@ -115,12 +115,8 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { spark.sql(s"""insert into $tableName values (1, "z3", "v1", "2021-10-01"), (2, "l4", "v1", "2021-10-02")""") // specify duplicate partition columns - try { - spark.sql(s"alter table $tableName drop partition (dt='2021-10-01', dt='2021-10-02')") - } catch { - case NonFatal(e) => - assert(e.getMessage.contains("Found duplicate keys 'dt'")) - } + checkExceptionContain(s"alter table $tableName drop partition (dt='2021-10-01', dt='2021-10-02')")( + "Found duplicate keys 'dt'") // drop 2021-10-01 partition spark.sql(s"alter table $tableName drop partition (dt='2021-10-01')") @@ -164,8 +160,8 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { |""".stripMargin) // not specified all partition column - checkException(s"alter table $tableName drop partition (year='2021', month='10')")( - "All partition columns need to be specified for Hoodie's dropping partition;" + checkExceptionContain(s"alter table $tableName drop partition (year='2021', month='10')")( + "All partition columns need to be specified for Hoodie's dropping partition" ) // drop 2021-10-01 partition spark.sql(s"alter table $tableName drop partition (year='2021', month='10', day='01')") diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala index 1f9b1ea597bd5..5413bf4044892 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala @@ -18,13 +18,15 @@ package org.apache.spark.sql.hudi import java.io.File - import org.apache.log4j.Level +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.util.Utils import org.scalactic.source import org.scalatest.{BeforeAndAfterAll, FunSuite, Tag} +import java.util.TimeZone + class TestHoodieSqlBase extends FunSuite with BeforeAndAfterAll { org.apache.log4j.Logger.getRootLogger.setLevel(Level.WARN) @@ -34,6 +36,7 @@ class TestHoodieSqlBase extends FunSuite with BeforeAndAfterAll { dir } + TimeZone.setDefault(DateTimeUtils.getTimeZone("CTT")) protected lazy val spark: SparkSession = SparkSession.builder() .master("local[1]") .appName("hoodie sql test") @@ -43,6 +46,7 @@ class TestHoodieSqlBase extends FunSuite with BeforeAndAfterAll { .config("hoodie.upsert.shuffle.parallelism", "4") .config("hoodie.delete.shuffle.parallelism", "4") .config("spark.sql.warehouse.dir", sparkWareHouse.getCanonicalPath) + .config("spark.sql.session.timeZone", "CTT") .getOrCreate() private var tableId = 0 @@ -92,6 +96,19 @@ class TestHoodieSqlBase extends FunSuite with BeforeAndAfterAll { assertResult(true)(hasException) } + protected def checkExceptionContain(sql: String)(errorMsg: String): Unit = { + var hasException = false + try { + spark.sql(sql) + } catch { + case e: Throwable => + assertResult(true)(e.getMessage.contains(errorMsg)) + hasException = true + } + assertResult(true)(hasException) + } + + protected def removeQuotes(value: Any): Any = { value match { case s: String => s.stripPrefix("'").stripSuffix("'") diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala index 30a2448f0a5e4..153eacfe1a46e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala @@ -353,19 +353,7 @@ class TestMergeIntoTable2 extends TestHoodieSqlBase { |""".stripMargin if (HoodieSqlUtils.isSpark3) { - checkException(mergeSql)( - "\nColumns aliases are not allowed in MERGE.(line 5, pos 5)\n\n" + - "== SQL ==\n\r\n" + - s" merge into $tableName\r\n" + - " using (\r\n" + - " select 1, 'a1', 10, 1000, '1'\r\n" + - " ) s0(id,name,price,ts,flag)\r\n" + - "-----^^^\n" + - s" on s0.id = $tableName.id\r\n" + - " when matched and flag = '1' then update set\r\n" + - " id = s0.id, name = s0.name, price = s0.price, ts = s0.ts\r\n" + - " when not matched and flag = '1' then insert *\r\n" - ) + checkExceptionContain(mergeSql)("Columns aliases are not allowed in MERGE") } else { spark.sql(mergeSql) checkAnswer(s"select id, name, price, ts from $tableName")( diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala index 0dbb07466d4af..357954ebb1d57 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala @@ -98,7 +98,7 @@ class TestPartialUpdateForMergeInto extends TestHoodieSqlBase { | preCombineField = '_ts' |)""".stripMargin) - checkException( + checkExceptionContain( s""" |merge into $tableName t0 |using ( select 1 as id, 'a1' as name, 12 as price) s0 @@ -106,7 +106,7 @@ class TestPartialUpdateForMergeInto extends TestHoodieSqlBase { |when matched then update set price = s0.price """.stripMargin)( "Missing specify value for the preCombineField: _ts in merge-into update action. " + - "You should add '... update set _ts = xx....' to the when-matched clause.;") + "You should add '... update set _ts = xx....' to the when-matched clause.") val tableName2 = generateTableName spark.sql( @@ -123,7 +123,7 @@ class TestPartialUpdateForMergeInto extends TestHoodieSqlBase { | preCombineField = '_ts' |)""".stripMargin) - checkException( + checkExceptionContain( s""" |merge into $tableName2 t0 |using ( select 1 as id, 'a1' as name, 12 as price, 1000 as ts) s0 @@ -132,6 +132,6 @@ class TestPartialUpdateForMergeInto extends TestHoodieSqlBase { """.stripMargin)( "Missing specify the value for target field: 'id' in merge into update action for MOR table. " + "Currently we cannot support partial update for MOR, please complete all the target fields " + - "just like '...update set id = s0.id, name = s0.name ....';") + "just like '...update set id = s0.id, name = s0.name ....'") } } diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala index 9a3e8e3024311..d47e7fbb497b0 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala @@ -82,4 +82,8 @@ class Spark2Adapter extends SparkAdapter { override def createLike(left: Expression, right: Expression): Expression = { Like(left, right) } + + override def parseMultipartIdentifier(parser: ParserInterface, sqlText: String): Seq[String] = { + throw new IllegalStateException(s"Should not call ParserInterface#parseMultipartIdentifier for spark2") + } } diff --git a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/ReflectUtil.java b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/ReflectUtil.java new file mode 100644 index 0000000000000..c7a70438fc3cd --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/ReflectUtil.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.spark3.internal; + +import org.apache.spark.sql.catalyst.plans.logical.InsertIntoStatement; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import scala.Option; +import scala.collection.Seq; +import scala.collection.immutable.Map; + +import java.lang.reflect.Constructor; + +public class ReflectUtil { + + public static InsertIntoStatement createInsertInto(boolean isSpark30, LogicalPlan table, Map> partition, Seq userSpecifiedCols, + LogicalPlan query, boolean overwrite, boolean ifPartitionNotExists) { + try { + if (isSpark30) { + Constructor constructor = InsertIntoStatement.class.getConstructor( + LogicalPlan.class, Map.class, LogicalPlan.class, boolean.class, boolean.class); + return constructor.newInstance(table, partition, query, overwrite, ifPartitionNotExists); + } else { + Constructor constructor = InsertIntoStatement.class.getConstructor( + LogicalPlan.class, Map.class, Seq.class, LogicalPlan.class, boolean.class, boolean.class); + return constructor.newInstance(table, partition, userSpecifiedCols, query, overwrite, ifPartitionNotExists); + } + } catch (Exception e) { + throw new RuntimeException("Error in create InsertIntoStatement", e); + } + } +} diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala index 4c9a06b3cf209..87d80d0b42bf0 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala @@ -19,10 +19,13 @@ package org.apache.spark.sql.adapter import org.apache.hudi.Spark3RowSerDe import org.apache.hudi.client.utils.SparkRowSerDe +import org.apache.hudi.spark3.internal.ReflectUtil +import org.apache.spark.SPARK_VERSION import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.{Expression, Like} +import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, JoinHint, LogicalPlan} import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} @@ -67,15 +70,16 @@ class Spark3Adapter extends SparkAdapter { override def getInsertIntoChildren(plan: LogicalPlan): Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] = { plan match { - case InsertIntoStatement(table, partitionSpec, query, overwrite, ifPartitionNotExists) => - Some((table, partitionSpec, query, overwrite, ifPartitionNotExists)) - case _=> None + case insert: InsertIntoStatement => + Some((insert.table, insert.partitionSpec, insert.query, insert.overwrite, insert.ifPartitionNotExists)) + case _ => + None } } override def createInsertInto(table: LogicalPlan, partition: Map[String, Option[String]], query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean): LogicalPlan = { - InsertIntoStatement(table, partition, query, overwrite, ifPartitionNotExists) + ReflectUtil.createInsertInto(SPARK_VERSION.startsWith("3.0"), table, partition, Seq.empty[String], query, overwrite, ifPartitionNotExists) } override def createSparkParsePartitionUtil(conf: SQLConf): SparkParsePartitionUtil = { @@ -85,4 +89,8 @@ class Spark3Adapter extends SparkAdapter { override def createLike(left: Expression, right: Expression): Expression = { new Like(left, right) } + + override def parseMultipartIdentifier(parser: ParserInterface, sqlText: String): Seq[String] = { + parser.parseMultipartIdentifier(sqlText) + } } diff --git a/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestReflectUtil.java b/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestReflectUtil.java new file mode 100644 index 0000000000000..284b2aaf1f81d --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestReflectUtil.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.spark3.internal; + +import org.apache.hudi.testutils.HoodieClientTestBase; + +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation; +import org.apache.spark.sql.catalyst.plans.logical.InsertIntoStatement; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +/** + * Unit tests {@link ReflectUtil}. + */ +public class TestReflectUtil extends HoodieClientTestBase { + + @Test + public void testDataSourceWriterExtraCommitMetadata() throws Exception { + SparkSession spark = sqlContext.sparkSession(); + + String insertIntoSql = "insert into test_reflect_util values (1, 'z3', 1, '2021')"; + InsertIntoStatement statement = (InsertIntoStatement) spark.sessionState().sqlParser().parsePlan(insertIntoSql); + + InsertIntoStatement newStatment = ReflectUtil.createInsertInto( + spark.version().startsWith("3.0"), + statement.table(), + statement.partitionSpec(), + scala.collection.immutable.List.empty(), + statement.query(), + statement.overwrite(), + statement.ifPartitionNotExists()); + + Assertions.assertTrue( + ((UnresolvedRelation)newStatment.table()).multipartIdentifier().contains("test_reflect_util")); + + if (!spark.version().startsWith("3.0")) { + Assertions.assertTrue(newStatment.userSpecifiedCols().isEmpty()); + } + } +} diff --git a/hudi-sync/hudi-hive-sync/pom.xml b/hudi-sync/hudi-hive-sync/pom.xml index a094b5d130482..23ba1f96971ea 100644 --- a/hudi-sync/hudi-hive-sync/pom.xml +++ b/hudi-sync/hudi-hive-sync/pom.xml @@ -153,6 +153,12 @@ test + + org.apache.spark + spark-core_${scala.binary.version} + test + + org.eclipse.jetty.aggregate diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestParquet2SparkSchemaUtils.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestParquet2SparkSchemaUtils.java index e4debe181c70e..3ca31b04395a1 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestParquet2SparkSchemaUtils.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestParquet2SparkSchemaUtils.java @@ -36,7 +36,19 @@ public class TestParquet2SparkSchemaUtils { private final SparkToParquetSchemaConverter spark2ParquetConverter = new SparkToParquetSchemaConverter(new SQLConf()); - private final SparkSqlParser parser = new SparkSqlParser(new SQLConf()); + private final SparkSqlParser parser = createSqlParser(); + + private static SparkSqlParser createSqlParser() { + try { + return SparkSqlParser.class.getDeclaredConstructor(SQLConf.class).newInstance(new SQLConf()); + } catch (Exception ne) { + try { // For spark 3.1, there is no constructor with SQLConf, use the default constructor + return SparkSqlParser.class.getDeclaredConstructor().newInstance(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } @Test public void testConvertPrimitiveType() { diff --git a/pom.xml b/pom.xml index e403973b417c2..f3a9097921c2a 100644 --- a/pom.xml +++ b/pom.xml @@ -99,7 +99,7 @@ 1.7.0-M1 3.3.3 1.2.17 - 1.7.15 + 1.7.30 2.9.9 2.7.3 org.apache.hive @@ -114,7 +114,7 @@ ${spark2bundle.version} 1.13.1 2.4.4 - 3.0.0 + 3.1.2 3 hudi-spark2 @@ -1476,6 +1476,7 @@ ${scala12.version} 2.12 hudi-spark3 + 3.1.0 2.4.1 ${fasterxml.spark3.version} ${fasterxml.spark3.version} @@ -1491,6 +1492,16 @@ + + spark3.0.x + + + 3.0.0 + ${spark3.version} + 3.0.1 + + + skipShadeSources