Skip to content
81 changes: 75 additions & 6 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,22 @@ import java.util.{Locale, Properties, UUID}
import scala.collection.JavaConverters._

import org.apache.spark.annotation.Stable
import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier}
import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, TableCatalog}
import org.apache.spark.sql.catalog.v2.expressions._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation}
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic}
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, InsertIntoTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, DataSourceUtils, LogicalRelation}
import org.apache.spark.sql.execution.datasources.v2._
import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.TableCapability._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{IntegerType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
Expand Down Expand Up @@ -360,6 +360,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
*/
def insertInto(tableName: String): Unit = {
import df.sparkSession.sessionState.analyzer.{AsTableIdentifier, CatalogObjectIdentifier}
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._

assertNotBucketed("insertInto")

Expand All @@ -376,6 +377,10 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
insertInto(catalog, ident)
case AsTableIdentifier(tableIdentifier) =>
insertInto(tableIdentifier)
case other =>
// TODO(SPARK-28667): This should go through V2SessionCatalog
throw new UnsupportedOperationException(
s"Couldn't find a catalog to handle the identifier ${other.quoted}.")
}
}

Expand Down Expand Up @@ -485,7 +490,71 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* @since 1.4.0
*/
def saveAsTable(tableName: String): Unit = {
saveAsTable(df.sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName))
import df.sparkSession.sessionState.analyzer.{AsTableIdentifier, CatalogObjectIdentifier}
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._

import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
val session = df.sparkSession

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the local variable is used only once.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll use it in the follow up :)


session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match {
case CatalogObjectIdentifier(Some(catalog), ident) =>
saveAsTable(catalog.asTableCatalog, ident, modeForDSV2)

case AsTableIdentifier(tableIdentifier) =>
Comment thread
brkyvz marked this conversation as resolved.
saveAsTable(tableIdentifier)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about cases where AsTableIdentifier won't match? For example, saveAsTable("ns1.ns2.table"). For those cases, we need a case _ => that throws an exception because the table name is not supported by the session catalog.


case other =>
// TODO(SPARK-28666): This should go through V2SessionCatalog
throw new UnsupportedOperationException(
s"Couldn't find a catalog to handle the identifier ${other.quoted}.")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is an analysis error. The catalog was None, so it belongs to the session catalog. But the session catalog doesn't support namespaces with more than one part, so it is an invalid identifier for that catalog.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah gotcha, that's the 4th case... I'd like to make that change along with the V2SessionCatalog support. Right now we just couldn't find a catalog for it, so... (Maybe they forgot to specify a default catalog).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. saveAsTable("ns1.ns2.table") means we want to save the table to the builtin catalog, but ns1.ns2 is not a valid namespace to the builtin catalog. (or we can report namespace not found?)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true, but that could also mean that they forgot to configure their default catalog too, right?
I can throw a better error when the SessionCatalog code path is implemented, since we need that before we can even say that ns1.ns2.table cannot be handled by the session catalog.

}
}


private def saveAsTable(catalog: TableCatalog, ident: Identifier, mode: SaveMode): Unit = {
val partitioning = partitioningColumns.map { colNames =>
colNames.map(name => IdentityTransform(FieldReference(name)))
}.getOrElse(Seq.empty[Transform])
val bucketing = bucketColumnNames.map { cols =>
Seq(BucketTransform(LiteralValue(numBuckets.get, IntegerType), cols.map(FieldReference(_))))
}.getOrElse(Seq.empty[Transform])
val partitionTransforms = partitioning ++ bucketing

val tableOpt = try Option(catalog.loadTable(ident)) catch {
case _: NoSuchTableException => None
}

val command = (mode, tableOpt) match {
case (SaveMode.Append, Some(table)) =>
AppendData.byName(DataSourceV2Relation.create(table), df.logicalPlan)
Comment thread
brkyvz marked this conversation as resolved.

case (SaveMode.Overwrite, _) =>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the behavior here is to truncate and write, not replace.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we may want to change the table properties though, such as partitioning and schema, wouldn't we?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to match the behavior of v1 file sources. Do we know what that behavior is?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed in DSV2 sync, the old behavior was to drop the old table and create a new one, therefore the Replace behavior here works.

ReplaceTableAsSelect(
catalog,
ident,
partitionTransforms,
df.queryExecution.analyzed,
Map.empty, // properties can't be specified through this API
extraOptions.toMap,
orCreate = true) // Create the table if it doesn't exist

case (other, _) =>
// We have a potential race condition here in AppendMode, if the table suddenly gets
// created between our existence check and physical execution, but this can't be helped
// in any case.

@cloud-fan cloud-fan Aug 7, 2019

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we treat saveAsTable as one atomic operation, I think AppendMode means "create or append table". Since "create or append table" is not a standard SQL commannd, maybe it's fine to ignore this race condition.

CreateTableAsSelect(
catalog,
ident,
partitionTransforms,
df.queryExecution.analyzed,
Map.empty,
extraOptions.toMap,
ignoreIfExists = other == SaveMode.Ignore)
}

runCommand(df.sparkSession, "saveAsTable") {
command
}
}

private def saveAsTable(tableIdent: TableIdentifier): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.sources.v2

import org.scalatest.BeforeAndAfter

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode}
import org.apache.spark.sql.test.SharedSQLContext

Expand Down Expand Up @@ -141,4 +141,66 @@ class DataSourceV2DataFrameSuite extends QueryTest with SharedSQLContext with Be
}
}
}

testQuietly("saveAsTable: table doesn't exist => create table") {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for curiosity, why do we need testQuietly here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was before merging Gengliang's "Catalogs.load should work for inbuilt catalogs" PR. It may not be needed now, but I don't think it's a big deal

val t1 = "testcat.ns1.ns2.tbl"
withTable(t1) {
val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
df.write.saveAsTable(t1)
checkAnswer(spark.table(t1), df)
}
}

testQuietly("saveAsTable: table exists => append by name") {
val t1 = "testcat.ns1.ns2.tbl"
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING foo")
val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
// Default saveMode is append, therefore this doesn't throw a table already exists exception
df.write.saveAsTable(t1)
checkAnswer(spark.table(t1), df)

// also appends are by name not by position
df.select('data, 'id).write.saveAsTable(t1)
checkAnswer(spark.table(t1), df.union(df))
}
}

testQuietly("saveAsTable: table overwrite and table doesn't exist => create table") {
val t1 = "testcat.ns1.ns2.tbl"
withTable(t1) {
val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
df.write.mode("overwrite").saveAsTable(t1)
checkAnswer(spark.table(t1), df)
}
}

testQuietly("saveAsTable: table overwrite and table exists => replace table") {
val t1 = "testcat.ns1.ns2.tbl"
withTable(t1) {
sql(s"CREATE TABLE $t1 USING foo AS SELECT 'c', 'd'")
val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
df.write.mode("overwrite").saveAsTable(t1)
checkAnswer(spark.table(t1), df)
}
}

testQuietly("saveAsTable: ignore mode and table doesn't exist => create table") {
val t1 = "testcat.ns1.ns2.tbl"
withTable(t1) {
val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
df.write.mode("ignore").saveAsTable(t1)
checkAnswer(spark.table(t1), df)
}
}

testQuietly("saveAsTable: ignore mode and table exists => do nothing") {
val t1 = "testcat.ns1.ns2.tbl"
withTable(t1) {
val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
sql(s"CREATE TABLE $t1 USING foo AS SELECT 'c', 'd'")
df.write.mode("ignore").saveAsTable(t1)
checkAnswer(spark.table(t1), Seq(Row("c", "d")))
}
}
}