diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index fc9de60c67374..3f9066d0847f5 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hudi import org.apache.hudi.DataSourceWriteOptions.{KEYGENERATOR_CLASS_NAME, MOR_TABLE_TYPE_OPT_VAL, PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD, TABLE_TYPE} +import org.apache.hudi.HoodieSparkUtils import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieDuplicateKeyException @@ -696,4 +697,47 @@ class TestInsertTable extends HoodieSparkSqlTestBase { } } } + + test("Test Insert Into With Catalog Identifier for spark >= 3.2.0") { + Seq("hudi", "parquet").foreach { format => + withTempDir { tmp => + val tableName = s"spark_catalog.default.$generateTableName" + // Create a partitioned table + if (HoodieSparkUtils.gteqSpark3_2) { + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long, + | dt string + |) using $format + | tblproperties (primaryKey = 'id') + | partitioned by (dt) + | location '${tmp.getCanonicalPath}' + """.stripMargin) + // Insert into dynamic partition + spark.sql( + s""" + | insert into $tableName + | select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-01-05' as dt + """.stripMargin) + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(1, "a1", 10.0, 1000, "2021-01-05") + ) + // Insert into static partition + spark.sql( + s""" + | insert into $tableName partition(dt = '2021-01-05') + | select 2 as id, 'a2' as name, 10 as price, 1000 as ts + """.stripMargin) + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(1, "a1", 10.0, 1000, "2021-01-05"), + Seq(2, "a2", 10.0, 1000, "2021-01-05") + ) + } + } + } + } } diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala index 48d323c8f286c..115913c230c62 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala @@ -19,8 +19,9 @@ package org.apache.spark.sql.adapter import org.apache.hudi.Spark3RowSerDe import org.apache.hudi.client.utils.SparkRowSerDe -import org.apache.spark.SPARK_VERSION import org.apache.hudi.spark3.internal.ReflectUtil +import org.apache.spark.SPARK_VERSION +import org.apache.spark.internal.Logging import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters, HoodieSparkAvroSchemaConverters} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder @@ -28,21 +29,21 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredica import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, JoinHint, LogicalPlan} -import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import org.apache.spark.sql.connector.catalog.Table import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation -import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.hudi.SparkAdapter import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.{Row, SparkSession} +import scala.util.control.NonFatal + /** * Base implementation of [[SparkAdapter]] for Spark 3.x branch */ -abstract class BaseSpark3Adapter extends SparkAdapter { +abstract class BaseSpark3Adapter extends SparkAdapter with Logging { override def createSparkRowSerDe(encoder: ExpressionEncoder[Row]): SparkRowSerDe = { new Spark3RowSerDe(encoder) @@ -115,7 +116,13 @@ abstract class BaseSpark3Adapter extends SparkAdapter { unfoldSubqueryAliases(table) match { case LogicalRelation(_, _, Some(table), _) => isHoodieTable(table) case relation: UnresolvedRelation => - isHoodieTable(toTableIdentifier(relation), spark) + try { + isHoodieTable(toTableIdentifier(relation), spark) + } catch { + case NonFatal(e) => + logWarning("Failed to determine whether the table is a hoodie table", e) + false + } case DataSourceV2Relation(table: Table, _, _, _, _) => isHoodieTable(table.properties()) case _=> false }