diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala index 177de90f335dc..cb5e0cb136084 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala @@ -21,7 +21,6 @@ package org.apache.hudi import java.nio.ByteBuffer import java.sql.{Date, Timestamp} import java.util - import org.apache.avro.Conversions.DecimalConversion import org.apache.avro.LogicalTypes.{TimestampMicros, TimestampMillis} import org.apache.avro.Schema.Type._ @@ -29,11 +28,12 @@ import org.apache.avro.generic.GenericData.{Fixed, Record} import org.apache.avro.generic.{GenericData, GenericFixed, GenericRecord} import org.apache.avro.{LogicalTypes, Schema} import org.apache.spark.sql.Row -import org.apache.spark.sql.avro.{IncompatibleSchemaException, SchemaConverters} +import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ import org.apache.hudi.AvroConversionUtils._ +import org.apache.hudi.exception.HoodieInCompatibleSchemaException import scala.collection.JavaConverters._ @@ -131,7 +131,7 @@ object AvroConversionHelper { case null => new Timestamp(item.asInstanceOf[Long]) case other => - throw new IncompatibleSchemaException( + throw new HoodieInCompatibleSchemaException( s"Cannot convert Avro logical type $other to Catalyst Timestamp type.") } } @@ -149,7 +149,7 @@ object AvroConversionHelper { converters(i) = converter avroFieldIndexes(i) = avroField.pos() } else if (!sqlField.nullable) { - throw new IncompatibleSchemaException( + throw new HoodieInCompatibleSchemaException( s"Cannot find non-nullable field ${sqlField.name} at path ${path.mkString(".")} " + "in Avro schema\n" + s"Source Avro schema: $sourceAvroSchema.\n" + @@ -254,7 +254,7 @@ object AvroConversionHelper { converted(i) = fieldConverters(i)(item) new GenericRow(converted) } - case _ => throw new IncompatibleSchemaException( + case _ => throw new HoodieInCompatibleSchemaException( s"Cannot convert Avro schema to catalyst type because schema at path " + s"${path.mkString(".")} is not compatible " + s"(avroType = $other, sqlType = $sqlType). \n" + @@ -263,7 +263,7 @@ object AvroConversionHelper { } } case (left, right) => - throw new IncompatibleSchemaException( + throw new HoodieInCompatibleSchemaException( s"Cannot convert Avro schema to catalyst type because schema at path " + s"${path.mkString(".")} is not compatible (avroType = $left, sqlType = $right). \n" + s"Source Avro schema: $sourceAvroSchema.\n" + diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala index 2c59495b0d0d2..a93331ee20d48 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala @@ -87,4 +87,9 @@ trait SparkAdapter extends Serializable { * Create Like expression. */ def createLike(left: Expression, right: Expression): Expression + + /** + * ParserInterface#parseMultipartIdentifier is supported since spark3, for spark2 this should not be called. + */ + def parseMultipartIdentifier(parser: ParserInterface, sqlText: String): Seq[String] } diff --git a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieInCompatibleSchemaException.java b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieInCompatibleSchemaException.java new file mode 100644 index 0000000000000..f462f36d7fb0a --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieInCompatibleSchemaException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.exception; + +/** + * Exception for in compatible schema. + */ +public class HoodieInCompatibleSchemaException extends Exception { + + public HoodieInCompatibleSchemaException(String msg, Throwable e) { + super(msg, e); + } + + public HoodieInCompatibleSchemaException(String msg) { + super(msg); + } +} diff --git a/hudi-spark-datasource/hudi-spark/pom.xml b/hudi-spark-datasource/hudi-spark/pom.xml index c4fe1d109f502..60d43927c1a49 100644 --- a/hudi-spark-datasource/hudi-spark/pom.xml +++ b/hudi-spark-datasource/hudi-spark/pom.xml @@ -490,6 +490,12 @@ mockito-junit-jupiter test + + org.slf4j + slf4j-api + ${slf4j.version} + test + org.junit.platform @@ -502,6 +508,11 @@ junit-platform-suite-api test - + + org.slf4j + slf4j-log4j12 + ${slf4j.version} + test + diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index 398377dc64479..44d39facdea67 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -28,7 +28,7 @@ import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.config.HoodieRealtimeConfig import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS import org.apache.spark.rdd.RDD -import org.apache.spark.sql.avro.{AvroDeserializer, AvroSerializer} +import org.apache.spark.sql.avro.{HoodieAvroSerializer, HooodieAvroDeserializer} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeProjection} import org.apache.spark.sql.execution.datasources.PartitionedFile @@ -119,7 +119,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, tableState.requiredStructSchema .map(f => tableAvroSchema.getField(f.name).pos()).toList private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema) - private val deserializer = new AvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema) + private val deserializer = HooodieAvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema) private val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema) private val logRecords = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config).getRecords private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala @@ -135,7 +135,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, } else { val requiredAvroRecord = AvroConversionUtils .buildAvroRecordBySchema(curAvroRecord.get(), requiredAvroSchema, requiredFieldPosition, recordBuilder) - recordToLoad = unsafeProjection(deserializer.deserialize(requiredAvroRecord).asInstanceOf[InternalRow]) + recordToLoad = unsafeProjection(deserializer.deserializeData(requiredAvroRecord).asInstanceOf[InternalRow]) true } } else { @@ -158,7 +158,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, tableState.requiredStructSchema .map(f => tableAvroSchema.getField(f.name).pos()).toList private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema) - private val deserializer = new AvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema) + private val deserializer = HooodieAvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema) private val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema) private val logRecords = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config).getRecords private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala @@ -180,7 +180,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, } else { val requiredAvroRecord = AvroConversionUtils .buildAvroRecordBySchema(curAvroRecord.get(), requiredAvroSchema, requiredFieldPosition, recordBuilder) - recordToLoad = unsafeProjection(deserializer.deserialize(requiredAvroRecord).asInstanceOf[InternalRow]) + recordToLoad = unsafeProjection(deserializer.deserializeData(requiredAvroRecord).asInstanceOf[InternalRow]) true } } else { @@ -203,8 +203,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, private val requiredFieldPosition = tableState.requiredStructSchema .map(f => tableAvroSchema.getField(f.name).pos()).toList - private val serializer = new AvroSerializer(tableState.tableStructSchema, tableAvroSchema, false) - private val requiredDeserializer = new AvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema) + private val serializer = HoodieAvroSerializer(tableState.tableStructSchema, tableAvroSchema, false) + private val requiredDeserializer = HooodieAvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema) private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema) private val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema) private val logRecords = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config).getRecords @@ -236,7 +236,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, recordBuilder ) recordToLoad = unsafeProjection(requiredDeserializer - .deserialize(requiredAvroRecord).asInstanceOf[InternalRow]) + .deserializeData(requiredAvroRecord).asInstanceOf[InternalRow]) true } } else { @@ -264,7 +264,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, recordBuilder ) recordToLoad = unsafeProjection(requiredDeserializer - .deserialize(requiredAvroRecord).asInstanceOf[InternalRow]) + .deserializeData(requiredAvroRecord).asInstanceOf[InternalRow]) true } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializer.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializer.scala new file mode 100644 index 0000000000000..b464c2dc5d611 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializer.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.avro + +import org.apache.avro.Schema +import org.apache.spark.sql.types.DataType + +/** + * As AvroSerializer cannot be access out of the spark.sql.avro package since spark 3.1, we define + * this class to be accessed by other class. + */ +case class HoodieAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean) + extends AvroSerializer(rootCatalystType, rootAvroType, nullable) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/avro/HooodieAvroDeserializer.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/avro/HooodieAvroDeserializer.scala new file mode 100644 index 0000000000000..ba911a7b3075c --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/avro/HooodieAvroDeserializer.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.avro + +import org.apache.avro.Schema +import org.apache.spark.sql.types.DataType + +/** + * This is to be compatible with the type returned by Spark 3.1 + * and other spark versions for AvroDeserializer + */ +case class HooodieAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) + extends AvroDeserializer(rootAvroType, rootCatalystType) { + + def deserializeData(data: Any): Any = { + super.deserialize(data) match { + case Some(r) => r // spark 3.1 return type is Option, we fetch the data. + case o => o // for other spark version, return the data directly. + } + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala index ea55127d4416a..b025cf3efa443 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala @@ -31,7 +31,7 @@ import org.apache.hudi.common.util.{ValidationUtils, Option => HOption} import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.io.HoodieWriteHandle import org.apache.hudi.sql.IExpressionEvaluator -import org.apache.spark.sql.avro.{AvroSerializer, SchemaConverters} +import org.apache.spark.sql.avro.{AvroSerializer, HoodieAvroSerializer, SchemaConverters} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.hudi.SerDeUtils import org.apache.spark.sql.hudi.command.payload.ExpressionPayload.getEvaluator @@ -310,7 +310,7 @@ object ExpressionPayload { val conditionEvaluator = ExpressionCodeGen.doCodeGen(Seq(condition), conditionSerializer) val assignSqlType = SchemaConverters.toSqlType(writeSchema).dataType.asInstanceOf[StructType] - val assignSerializer = new AvroSerializer(assignSqlType, writeSchema, false) + val assignSerializer = new HoodieAvroSerializer(assignSqlType, writeSchema, false) val assignmentEvaluator = ExpressionCodeGen.doCodeGen(assignments, assignSerializer) conditionEvaluator -> assignmentEvaluator } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/SqlTypedRecord.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/SqlTypedRecord.scala index 2a12e9227a15a..3fb48f430221a 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/SqlTypedRecord.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/SqlTypedRecord.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi.command.payload import org.apache.avro.generic.IndexedRecord import org.apache.avro.Schema -import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters} +import org.apache.spark.sql.avro.{HooodieAvroDeserializer, SchemaConverters} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types._ @@ -29,8 +29,8 @@ import org.apache.spark.sql.types._ class SqlTypedRecord(val record: IndexedRecord) extends IndexedRecord { private lazy val sqlType = SchemaConverters.toSqlType(getSchema).dataType.asInstanceOf[StructType] - private lazy val avroDeserializer = new AvroDeserializer(record.getSchema, sqlType) - private lazy val sqlRow = avroDeserializer.deserialize(record).asInstanceOf[InternalRow] + private lazy val avroDeserializer = HooodieAvroDeserializer(record.getSchema, sqlType) + private lazy val sqlRow = avroDeserializer.deserializeData(record).asInstanceOf[InternalRow] override def put(i: Int, v: Any): Unit = { record.put(i, v) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieCommonSqlParser.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieCommonSqlParser.scala index 4f410c7d67fb8..f830c515be782 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieCommonSqlParser.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieCommonSqlParser.scala @@ -62,7 +62,7 @@ class HoodieCommonSqlParser(session: SparkSession, delegate: ParserInterface) } def parseMultipartIdentifier(sqlText: String): Seq[String] = { - throw new UnsupportedOperationException(s"Unsupported parseMultipartIdentifier method") + sparkAdapter.parseMultipartIdentifier(delegate, sqlText) } protected def parse[T](command: String)(toResult: HoodieSqlCommonParser => T): T = { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala index 3d35c03e48dbc..8fc6e7f13a8f6 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala @@ -137,7 +137,8 @@ class TestDataSourceForBootstrap { verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = false, isHiveStylePartitioned = false) } - @Test def testMetadataBootstrapCOWHiveStylePartitioned(): Unit = { + @Test + def testMetadataBootstrapCOWHiveStylePartitioned(): Unit = { val timestamp = Instant.now.toEpochMilli val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala index e35b9b703f4e1..8850bb43dec49 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala @@ -18,13 +18,15 @@ package org.apache.spark.sql.hudi import java.io.File - import org.apache.log4j.Level +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.util.Utils import org.scalactic.source import org.scalatest.{BeforeAndAfterAll, FunSuite, Tag} +import java.util.TimeZone + class TestHoodieSqlBase extends FunSuite with BeforeAndAfterAll { org.apache.log4j.Logger.getRootLogger.setLevel(Level.WARN) @@ -34,6 +36,7 @@ class TestHoodieSqlBase extends FunSuite with BeforeAndAfterAll { dir } + TimeZone.setDefault(DateTimeUtils.getTimeZone("CTT")) protected lazy val spark: SparkSession = SparkSession.builder() .master("local[1]") .appName("hoodie sql test") @@ -43,6 +46,7 @@ class TestHoodieSqlBase extends FunSuite with BeforeAndAfterAll { .config("hoodie.upsert.shuffle.parallelism", "4") .config("hoodie.delete.shuffle.parallelism", "4") .config("spark.sql.warehouse.dir", sparkWareHouse.getCanonicalPath) + .config("spark.sql.session.timeZone", "CTT") .getOrCreate() private var tableId = 0 @@ -92,6 +96,19 @@ class TestHoodieSqlBase extends FunSuite with BeforeAndAfterAll { assertResult(true)(hasException) } + protected def checkExceptionContain(sql: String)(errorMsg: String): Unit = { + var hasException = false + try { + spark.sql(sql) + } catch { + case e: Throwable => + assertResult(true)(e.getMessage.contains(errorMsg)) + hasException = true + } + assertResult(true)(hasException) + } + + protected def removeQuotes(value: Any): Any = { value match { case s: String => s.stripPrefix("'").stripSuffix("'") diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala index 92a2c63ee617f..c93c97a1d10aa 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala @@ -353,19 +353,7 @@ class TestMergeIntoTable2 extends TestHoodieSqlBase { |""".stripMargin if (HoodieSqlUtils.isSpark3) { - checkException(mergeSql)( - "\nColumns aliases are not allowed in MERGE.(line 5, pos 5)\n\n" + - "== SQL ==\n\r\n" + - s" merge into $tableName\r\n" + - " using (\r\n" + - " select 1, 'a1', 10, 1000, '1'\r\n" + - " ) s0(id,name,price,ts,flag)\r\n" + - "-----^^^\n" + - s" on s0.id = $tableName.id\r\n" + - " when matched and flag = '1' then update set\r\n" + - " id = s0.id, name = s0.name, price = s0.price, ts = s0.ts\r\n" + - " when not matched and flag = '1' then insert *\r\n" - ) + checkExceptionContain(mergeSql)("Columns aliases are not allowed in MERGE") } else { spark.sql(mergeSql) checkAnswer(s"select id, name, price, ts from $tableName")( diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala index 0dbb07466d4af..357954ebb1d57 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala @@ -98,7 +98,7 @@ class TestPartialUpdateForMergeInto extends TestHoodieSqlBase { | preCombineField = '_ts' |)""".stripMargin) - checkException( + checkExceptionContain( s""" |merge into $tableName t0 |using ( select 1 as id, 'a1' as name, 12 as price) s0 @@ -106,7 +106,7 @@ class TestPartialUpdateForMergeInto extends TestHoodieSqlBase { |when matched then update set price = s0.price """.stripMargin)( "Missing specify value for the preCombineField: _ts in merge-into update action. " + - "You should add '... update set _ts = xx....' to the when-matched clause.;") + "You should add '... update set _ts = xx....' to the when-matched clause.") val tableName2 = generateTableName spark.sql( @@ -123,7 +123,7 @@ class TestPartialUpdateForMergeInto extends TestHoodieSqlBase { | preCombineField = '_ts' |)""".stripMargin) - checkException( + checkExceptionContain( s""" |merge into $tableName2 t0 |using ( select 1 as id, 'a1' as name, 12 as price, 1000 as ts) s0 @@ -132,6 +132,6 @@ class TestPartialUpdateForMergeInto extends TestHoodieSqlBase { """.stripMargin)( "Missing specify the value for target field: 'id' in merge into update action for MOR table. " + "Currently we cannot support partial update for MOR, please complete all the target fields " + - "just like '...update set id = s0.id, name = s0.name ....';") + "just like '...update set id = s0.id, name = s0.name ....'") } } diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala index 9a3e8e3024311..d47e7fbb497b0 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala @@ -82,4 +82,8 @@ class Spark2Adapter extends SparkAdapter { override def createLike(left: Expression, right: Expression): Expression = { Like(left, right) } + + override def parseMultipartIdentifier(parser: ParserInterface, sqlText: String): Seq[String] = { + throw new IllegalStateException(s"Should not call ParserInterface#parseMultipartIdentifier for spark2") + } } diff --git a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/ReflectUtil.java b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/ReflectUtil.java new file mode 100644 index 0000000000000..c7a70438fc3cd --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/ReflectUtil.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.spark3.internal; + +import org.apache.spark.sql.catalyst.plans.logical.InsertIntoStatement; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import scala.Option; +import scala.collection.Seq; +import scala.collection.immutable.Map; + +import java.lang.reflect.Constructor; + +public class ReflectUtil { + + public static InsertIntoStatement createInsertInto(boolean isSpark30, LogicalPlan table, Map> partition, Seq userSpecifiedCols, + LogicalPlan query, boolean overwrite, boolean ifPartitionNotExists) { + try { + if (isSpark30) { + Constructor constructor = InsertIntoStatement.class.getConstructor( + LogicalPlan.class, Map.class, LogicalPlan.class, boolean.class, boolean.class); + return constructor.newInstance(table, partition, query, overwrite, ifPartitionNotExists); + } else { + Constructor constructor = InsertIntoStatement.class.getConstructor( + LogicalPlan.class, Map.class, Seq.class, LogicalPlan.class, boolean.class, boolean.class); + return constructor.newInstance(table, partition, userSpecifiedCols, query, overwrite, ifPartitionNotExists); + } + } catch (Exception e) { + throw new RuntimeException("Error in create InsertIntoStatement", e); + } + } +} diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala index 4c9a06b3cf209..87d80d0b42bf0 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala @@ -19,10 +19,13 @@ package org.apache.spark.sql.adapter import org.apache.hudi.Spark3RowSerDe import org.apache.hudi.client.utils.SparkRowSerDe +import org.apache.hudi.spark3.internal.ReflectUtil +import org.apache.spark.SPARK_VERSION import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.{Expression, Like} +import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, JoinHint, LogicalPlan} import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} @@ -67,15 +70,16 @@ class Spark3Adapter extends SparkAdapter { override def getInsertIntoChildren(plan: LogicalPlan): Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] = { plan match { - case InsertIntoStatement(table, partitionSpec, query, overwrite, ifPartitionNotExists) => - Some((table, partitionSpec, query, overwrite, ifPartitionNotExists)) - case _=> None + case insert: InsertIntoStatement => + Some((insert.table, insert.partitionSpec, insert.query, insert.overwrite, insert.ifPartitionNotExists)) + case _ => + None } } override def createInsertInto(table: LogicalPlan, partition: Map[String, Option[String]], query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean): LogicalPlan = { - InsertIntoStatement(table, partition, query, overwrite, ifPartitionNotExists) + ReflectUtil.createInsertInto(SPARK_VERSION.startsWith("3.0"), table, partition, Seq.empty[String], query, overwrite, ifPartitionNotExists) } override def createSparkParsePartitionUtil(conf: SQLConf): SparkParsePartitionUtil = { @@ -85,4 +89,8 @@ class Spark3Adapter extends SparkAdapter { override def createLike(left: Expression, right: Expression): Expression = { new Like(left, right) } + + override def parseMultipartIdentifier(parser: ParserInterface, sqlText: String): Seq[String] = { + parser.parseMultipartIdentifier(sqlText) + } } diff --git a/hudi-sync/hudi-hive-sync/pom.xml b/hudi-sync/hudi-hive-sync/pom.xml index 8f4fedaed5e0f..9b990b074cd50 100644 --- a/hudi-sync/hudi-hive-sync/pom.xml +++ b/hudi-sync/hudi-hive-sync/pom.xml @@ -151,6 +151,12 @@ test + + org.apache.spark + spark-core_${scala.binary.version} + test + + org.eclipse.jetty.aggregate diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestParquet2SparkSchemaUtils.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestParquet2SparkSchemaUtils.java index e4debe181c70e..3ca31b04395a1 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestParquet2SparkSchemaUtils.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestParquet2SparkSchemaUtils.java @@ -36,7 +36,19 @@ public class TestParquet2SparkSchemaUtils { private final SparkToParquetSchemaConverter spark2ParquetConverter = new SparkToParquetSchemaConverter(new SQLConf()); - private final SparkSqlParser parser = new SparkSqlParser(new SQLConf()); + private final SparkSqlParser parser = createSqlParser(); + + private static SparkSqlParser createSqlParser() { + try { + return SparkSqlParser.class.getDeclaredConstructor(SQLConf.class).newInstance(new SQLConf()); + } catch (Exception ne) { + try { // For spark 3.1, there is no constructor with SQLConf, use the default constructor + return SparkSqlParser.class.getDeclaredConstructor().newInstance(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } @Test public void testConvertPrimitiveType() { diff --git a/pom.xml b/pom.xml index 65e391ab1a1b5..376cfb8457b8b 100644 --- a/pom.xml +++ b/pom.xml @@ -97,7 +97,7 @@ 1.7.0-M1 3.3.3 1.2.17 - 1.7.15 + 1.7.30 2.9.9 2.7.3 org.apache.hive @@ -112,7 +112,7 @@ ${spark2bundle.version} 1.13.1 2.4.4 - 3.0.0 + 3.1.0 3 hudi-spark2 @@ -1471,6 +1471,7 @@ ${scala12.version} 2.12 hudi-spark3 + 3.1.0 2.4.1 ${fasterxml.spark3.version} ${fasterxml.spark3.version} @@ -1486,6 +1487,16 @@ + + isSpark3.0 + + + 3.0.0 + ${spark3.version} + 3.0.1 + + + skipShadeSources