diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala index 87f35410172d..4e0b54a5cde8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala @@ -153,6 +153,7 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T]) */ @throws(classOf[NoSuchTableException]) def append(): Unit = { + assertNoTempView("append") val append = loadTable(catalog, identifier) match { case Some(t) => AppendData.byName( @@ -177,6 +178,7 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T]) */ @throws(classOf[NoSuchTableException]) def overwrite(condition: Column): Unit = { + assertNoTempView("overwrite") val overwrite = loadTable(catalog, identifier) match { case Some(t) => OverwriteByExpression.byName( @@ -204,6 +206,7 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T]) */ @throws(classOf[NoSuchTableException]) def overwritePartitions(): Unit = { + assertNoTempView("overwritePartitions") val dynamicOverwrite = loadTable(catalog, identifier) match { case Some(t) => OverwritePartitionsDynamic.byName( @@ -216,6 +219,12 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T]) runCommand("overwritePartitions")(dynamicOverwrite) } + private def assertNoTempView(name: String): Unit = { + if (sparkSession.sessionState.catalog.isTempView(tableName)) { + throw new AnalysisException(s"Temporary view $table doesn't support $name") + } + } + /** * Wrap an action to track the QueryExecution and time cost, then report to the user-registered * callback functions. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala index 508eefafd075..9d7b109364a5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala @@ -144,6 +144,26 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo assert(exc.getMessage.contains("table_name")) } + test("Append: fail if it writes to the temp view") { + spark.sql("CREATE TABLE testcat.table_name (id bigint, data string) USING foo") + spark.table("testcat.table_name").createOrReplaceTempView("temp_view") + + val exc = intercept[AnalysisException] { + spark.table("source").writeTo("temp_view").append() + } + assert(exc.getMessage.contains("temp_view")) + } + + test("Append: fail if it writes to the view") { + spark.sql("CREATE TABLE testcat.table_name (id bigint, data string) USING foo") + spark.sql("CREATE OR REPLACE VIEW table_view AS SELECT id, data FROM testcat.table_name") + + val exc = intercept[AnalysisException] { + spark.table("source").writeTo("table_view").append() + } + assert(exc.getMessage.contains("table_view")) + } + test("Overwrite: overwrite by expression: true") { spark.sql( "CREATE TABLE testcat.table_name (id bigint, data string) USING foo PARTITIONED BY (id)") @@ -208,6 +228,26 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo assert(exc.getMessage.contains("table_name")) } + test("Overwrite: fail if it writes to the temp view") { + spark.sql("CREATE TABLE testcat.table_name (id bigint, data string) USING foo") + spark.table("testcat.table_name").createOrReplaceTempView("temp_view") + + val exc = intercept[AnalysisException] { + spark.table("source").writeTo("temp_view").overwrite(lit(true)) + } + assert(exc.getMessage.contains("temp_view")) + } + + test("Overwrite: fail if it writes to the view") { + spark.sql("CREATE TABLE testcat.table_name (id bigint, data string) USING foo") + spark.sql("CREATE OR REPLACE VIEW table_view AS SELECT id, data FROM testcat.table_name") + + val exc = intercept[AnalysisException] { + spark.table("source").writeTo("table_view").overwrite(lit(true)) + } + assert(exc.getMessage.contains("table_view")) + } + test("OverwritePartitions: overwrite conflicting partitions") { spark.sql( "CREATE TABLE testcat.table_name (id bigint, data string) USING foo PARTITIONED BY (id)") @@ -272,6 +312,26 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo assert(exc.getMessage.contains("table_name")) } + test("OverwritePartitions: fail if it writes to the temp view") { + spark.sql("CREATE TABLE testcat.table_name (id bigint, data string) USING foo") + spark.table("testcat.table_name").createOrReplaceTempView("temp_view") + + val exc = intercept[AnalysisException] { + spark.table("source").writeTo("temp_view").overwritePartitions() + } + assert(exc.getMessage.contains("temp_view")) + } + + test("OverwritePartitions: fail if it writes to the view") { + spark.sql("CREATE TABLE testcat.table_name (id bigint, data string) USING foo") + spark.sql("CREATE OR REPLACE VIEW table_view AS SELECT id, data FROM testcat.table_name") + + val exc = intercept[AnalysisException] { + spark.table("source").writeTo("table_view").overwritePartitions() + } + assert(exc.getMessage.contains("table_view")) + } + test("Create: basic behavior") { spark.table("source").writeTo("testcat.table_name").create()