Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -648,8 +648,13 @@ class Analyzer(
if catalog.isTemporaryTable(ident) =>
u // temporary views take precedence over catalog table names

case u @ UnresolvedRelation(CatalogObjectIdentifier(Some(catalogPlugin), ident)) =>
loadTable(catalogPlugin, ident).map(DataSourceV2Relation.create).getOrElse(u)
case u @ UnresolvedRelation(CatalogObjectIdentifier(maybeCatalog, ident)) =>
// First try loading the table with a loadable catalog, then fallback to the session
// catalog if that exists
maybeCatalog.flatMap(loadTable(_, ident))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about?

        maybeCatalog.orElse(sessionCatalog)
          .flatMap(loadTable(_, ident))

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the tricky thing. We need to see if we can load using the defined catalog first. If the catalog is defined, but it doesn't return a table, then we miss out on using the session catalog.

@jzhuge jzhuge Aug 2, 2019

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a user tries to use table cat1.ns1.ns2.tbl, where

  1. catalog cat1 exists, but without table ns1.ns2.tbl
  2. v2 session catalog is configured and it has a table named cat1.ns1.ns2.tbl

IMHO, Spark should throw no table found exception. This is consistent with the follow statement in #24768:

A session catalog should be used when the v1 catalog is responsible for tables with no catalog in the table identifier.

Maybe I missed some discussions about fallback?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the tricky thing. We need to see if we can load using the defined catalog first. If the catalog is defined, but it doesn't return a table, then we miss out on using the session catalog.

I disagree.

Determining which catalog is responsible for an identifier is independent of the catalog results. If maybeCatalog is None, then the session catalog is responsible for the identifier. Whether to use v1 or v2 after that point depends on the provider.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue Imagine the following use case. Let's say as the data science team, I have a database on the Hive MetaStore called prod. I have tables named like: prod.tbl1, prod.tbl2. Now, some other team (maybe data engineering team), creates a v2 catalog called prod and adds it to my environment.

Won't the prod catalog hijack all my requests, and start failing to resolve the tables I had declared before, which I was expecting the V2SessionCatalog to resolve? I, as an unaware user, suddenly I have all my jobs failing and chaos ensues.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. A conflict could break queries in both directions when there is a conflict between namespace prod and catalog prod:

  • If a namespace takes precedence and a new one is created, it will break queries that use the catalog
  • If a catalog takes precedence and a new one is created, it will break queries that use the namespace

When we wrote the SPIP, the choice was to make catalog take precedence because:

  1. Any user can create a namespace globally in the catalog, possibly breaking other users
  2. Users can't globally create catalogs -- that's done by administrators -- so the impact is limited to their own jobs where they can add Spark configs
  3. Global catalogs are created by administrators and this happens more rarely

In short, we expect fewer problems when catalogs take precedence.

.orElse(sessionCatalog.flatMap(loadTable(_, ident)))
.map(DataSourceV2Relation.create)
.getOrElse(u)
}
}

Expand Down
84 changes: 79 additions & 5 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ import java.util.{Locale, Properties, UUID}
import scala.collection.JavaConverters._

import org.apache.spark.annotation.Stable
import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier}
import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, TableCatalog}
import org.apache.spark.sql.catalog.v2.expressions._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation}
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, DataSourceUtils, LogicalRelation}
Expand All @@ -37,7 +38,7 @@ import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.TableCapability._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{IntegerType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
Expand Down Expand Up @@ -485,7 +486,80 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* @since 1.4.0
*/
def saveAsTable(tableName: String): Unit = {
saveAsTable(df.sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName))
import df.sparkSession.sessionState.analyzer.{AsTableIdentifier, CatalogObjectIdentifier}
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._

val session = df.sparkSession

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the local variable is used only once.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll use it in the follow up :)

val useV1Sources =
session.sessionState.conf.useV1SourceWriterList.toLowerCase(Locale.ROOT).split(",")
val cls = DataSource.lookupDataSource(source, session.sessionState.conf)
val shouldUseV1Source = cls.newInstance() match {
case d: DataSourceRegister if useV1Sources.contains(d.shortName()) => true
case _ => useV1Sources.contains(cls.getCanonicalName.toLowerCase(Locale.ROOT))
}

val canUseV2 = !shouldUseV1Source && classOf[TableProvider].isAssignableFrom(cls)
val sessionCatalogOpt = session.sessionState.analyzer.sessionCatalog

session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match {
case CatalogObjectIdentifier(Some(catalog), ident) =>
saveAsTable(catalog.asTableCatalog, ident, modeForDSV2)

case CatalogObjectIdentifier(None, ident) if canUseV2 && sessionCatalogOpt.isDefined =>
// We pass in the modeForDSV1, as using the V2 session catalog should maintain compatibility
// for now.
saveAsTable(sessionCatalogOpt.get.asTableCatalog, ident, modeForDSV1)

case AsTableIdentifier(tableIdentifier) =>
Comment thread
brkyvz marked this conversation as resolved.
saveAsTable(tableIdentifier)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about cases where AsTableIdentifier won't match? For example, saveAsTable("ns1.ns2.table"). For those cases, we need a case _ => that throws an exception because the table name is not supported by the session catalog.

}
}


private def saveAsTable(catalog: TableCatalog, ident: Identifier, mode: SaveMode): Unit = {
val partitioning = partitioningColumns.map { colNames =>
colNames.map(name => IdentityTransform(FieldReference(name)))
}.getOrElse(Seq.empty[Transform])
val bucketing = bucketColumnNames.map { cols =>
Seq(BucketTransform(LiteralValue(numBuckets.get, IntegerType), cols.map(FieldReference(_))))
}.getOrElse(Seq.empty[Transform])
val partitionTransforms = partitioning ++ bucketing

val tableOpt = try Option(catalog.loadTable(ident)) catch {
case _: NoSuchTableException => None
}

val command = (mode, tableOpt) match {
case (SaveMode.Append, Some(table)) =>
AppendData.byName(DataSourceV2Relation.create(table), df.logicalPlan)
Comment thread
brkyvz marked this conversation as resolved.

case (SaveMode.Overwrite, _) =>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the behavior here is to truncate and write, not replace.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we may want to change the table properties though, such as partitioning and schema, wouldn't we?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to match the behavior of v1 file sources. Do we know what that behavior is?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed in DSV2 sync, the old behavior was to drop the old table and create a new one, therefore the Replace behavior here works.

ReplaceTableAsSelect(
catalog,
ident,
partitionTransforms,
df.queryExecution.analyzed,
Map.empty, // properties can't be specified through this API
extraOptions.toMap,
orCreate = true) // Create the table if it doesn't exist

case (other, _) =>
// We have a potential race condition here in AppendMode, if the table suddenly gets
// created between our existence check and physical execution, but this can't be helped
// in any case.

@cloud-fan cloud-fan Aug 7, 2019

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we treat saveAsTable as one atomic operation, I think AppendMode means "create or append table". Since "create or append table" is not a standard SQL commannd, maybe it's fine to ignore this race condition.

CreateTableAsSelect(
catalog,
ident,
partitionTransforms,
df.queryExecution.analyzed,
Map.empty,
extraOptions.toMap,
ignoreIfExists = other == SaveMode.Ignore)
}

runCommand(df.sparkSession, "saveAsTable") {
command
}
}

private def saveAsTable(tableIdent: TableIdentifier): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,41 @@

package org.apache.spark.sql.sources.v2

import org.scalatest.BeforeAndAfter
import java.util
import java.util.concurrent.ConcurrentHashMap

import org.apache.spark.sql.QueryTest
import scala.collection.JavaConverters._
import scala.collection.mutable

import org.scalatest.{BeforeAndAfter, PrivateMethodTester}

import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SparkSession}
import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier}
import org.apache.spark.sql.catalog.v2.expressions.Transform
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog
import org.apache.spark.sql.execution.datasources.v2.parquet.{ParquetDataSourceV2, ParquetTable}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.{CaseInsensitiveStringMap, QueryExecutionListener}

class DataSourceV2DataFrameSuite extends QueryTest with SharedSQLContext with BeforeAndAfter {
import testImplicits._

private val v2Format = classOf[InMemoryTableProvider].getName
private val dfData = Seq((1L, "a"), (2L, "b"), (3L, "c"))
private val catalogName = "testcat"

before {
spark.conf.set("spark.sql.catalog.testcat", classOf[TestInMemoryTableCatalog].getName)
spark.conf.set(s"spark.sql.catalog.$catalogName", classOf[TestInMemoryTableCatalog].getName)
spark.conf.set("spark.sql.catalog.testcat2", classOf[TestInMemoryTableCatalog].getName)

val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data")
df.createOrReplaceTempView("source")
spark.createDataFrame(dfData).toDF("id", "data").createOrReplaceTempView("source")
val df2 = spark.createDataFrame(Seq((4L, "d"), (5L, "e"), (6L, "f"))).toDF("id", "data")
df2.createOrReplaceTempView("source2")
}
Expand All @@ -42,6 +62,19 @@ class DataSourceV2DataFrameSuite extends QueryTest with SharedSQLContext with Be
spark.sql("DROP VIEW source2")
}

private def sessionCatalogTest(testName: String)(f: SparkSession => Unit): Unit = {
test("using session catalog: " + testName) {
val catalogConf = SQLConf.V2_SESSION_CATALOG
val newSession = spark.newSession()
newSession.createDataFrame(dfData).toDF("id", "data").createOrReplaceTempView("source")
newSession.sessionState.conf.setConf(catalogConf, classOf[TestV2SessionCatalog].getName)
try f(newSession) finally {
newSession.catalog("session").asInstanceOf[TestV2SessionCatalog].clearTables()
newSession.sql("DROP VIEW source")
}
}
}

test("insertInto: append") {
val t1 = "testcat.ns1.ns2.tbl"
withTable(t1) {
Expand Down Expand Up @@ -104,4 +137,169 @@ class DataSourceV2DataFrameSuite extends QueryTest with SharedSQLContext with Be
}
}
}

testQuietly("saveAsTable: with defined catalog and table doesn't exist") {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and many other tests don't need with defined catalog in test name any more since v2 session tests are split into a separate file.

val t1 = "testcat.ns1.ns2.tbl"
withTable(t1) {
spark.table("source").write.saveAsTable(t1)
checkAnswer(spark.table(t1), spark.table("source"))
}
}

testQuietly("saveAsTable: with defined catalog and table exists") {
val t1 = "testcat.ns1.ns2.tbl"
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING foo")
// Default saveMode is append, therefore this doesn't throw a table already exists eception
spark.table("source").write.saveAsTable(t1)
checkAnswer(spark.table(t1), spark.table("source"))
}
}

testQuietly("saveAsTable: with defined catalog + table overwrite and table doesn't exist") {
val t1 = "testcat.ns1.ns2.tbl"
withTable(t1) {
spark.table("source").write.mode("overwrite").saveAsTable(t1)
checkAnswer(spark.table(t1), spark.table("source"))
}
}

testQuietly("saveAsTable: with defined catalog + table overwrite and table exists") {
val t1 = "testcat.ns1.ns2.tbl"
withTable(t1) {
sql(s"CREATE TABLE $t1 USING foo AS SELECT 'c', 'd'")
spark.table("source").write.mode("overwrite").saveAsTable(t1)
checkAnswer(spark.table(t1), spark.table("source"))
}
}

testQuietly("saveAsTable: with defined catalog + ignore mode and table doesn't exist") {
val t1 = "testcat.ns1.ns2.tbl"
withTable(t1) {
spark.table("source").write.mode("ignore").saveAsTable(t1)
checkAnswer(spark.table(t1), spark.table("source"))
}
}

testQuietly("saveAsTable: with defined catalog + ignore mode and table exists") {
val t1 = "testcat.ns1.ns2.tbl"
withTable(t1) {
sql(s"CREATE TABLE $t1 USING foo AS SELECT 'c', 'd'")
spark.table("source").write.mode("ignore").saveAsTable(t1)
checkAnswer(spark.table(t1), Seq(Row("c", "d")))
}
}

sessionCatalogTest("saveAsTable and v2 table - table doesn't exist") { session =>

@jzhuge jzhuge Aug 1, 2019

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider moving all session catalog tests to a new suite file?
This way, the entire suite can have a Spark session with v2 session catalog defined.
We call also add new insertInto test cases for session catalog there.

val t1 = "tbl"
session.table("source").write.format(v2Format).saveAsTable(t1)
checkAnswer(session.table(t1), session.table("source"))
}

sessionCatalogTest("saveAsTable: v2 table - table exists") { session =>
val t1 = "tbl"
session.sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
intercept[TableAlreadyExistsException] {
session.table("source").select("id", "data").write.format(v2Format).saveAsTable(t1)
}
session.table("source").write.format(v2Format).mode("append").saveAsTable(t1)
checkAnswer(session.table(t1), session.table("source"))
}

sessionCatalogTest("saveAsTable: v2 table - table overwrite and table doesn't exist") { session =>
val t1 = "tbl"
session.table("source").write.format(v2Format).mode("overwrite").saveAsTable(t1)
checkAnswer(session.table(t1), session.table("source"))
}

sessionCatalogTest("saveAsTable: v2 table - table overwrite and table exists") { session =>
val t1 = "tbl"
session.sql(s"CREATE TABLE $t1 USING $v2Format AS SELECT 'c', 'd'")
session.table("source").write.format(v2Format).mode("overwrite").saveAsTable(t1)
checkAnswer(session.table(t1), session.table("source"))
}

sessionCatalogTest("saveAsTable: v2 table - ignore mode and table doesn't exist") { session =>
val t1 = "tbl"
session.table("source").write.format(v2Format).mode("ignore").saveAsTable(t1)
checkAnswer(session.table(t1), session.table("source"))
}

sessionCatalogTest("saveAsTable: v2 table - ignore mode and table exists") { session =>
val t1 = "tbl"
session.sql(s"CREATE TABLE $t1 USING $v2Format AS SELECT 'c', 'd'")
session.table("source").write.format(v2Format).mode("ignore").saveAsTable(t1)
checkAnswer(session.table(t1), Seq(Row("c", "d")))
}

sessionCatalogTest("saveAsTable: old table defined in a database colliding " +
"with a catalog name") { session =>
// Make sure the database name conflicts with a catalog name
val dbPath = session.sessionState.catalog.getDefaultDBPath(catalogName)
session.sessionState.catalog.createDatabase(
CatalogDatabase(catalogName, "", dbPath, Map.empty), ignoreIfExists = false)
val t1 = "tbl"
withTable(t1) {
// Create the table in the built in catalog, in the given database
session.sessionState.catalog.createTable(
CatalogTable(
identifier = TableIdentifier(t1, Some(catalogName)),
tableType = CatalogTableType.MANAGED,
provider = Some(v2Format),
storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
schema = session.table("source").schema
),
ignoreIfExists = false
)
val tableName = s"$catalogName.$t1"
checkAnswer(session.table(tableName), Nil)
intercept[TableAlreadyExistsException] {
// Make sure default save mode is same as before
session.table("source").write.format(v2Format).saveAsTable(tableName)
}
session.table("source").write.format(v2Format).mode("append").saveAsTable(tableName)
checkAnswer(session.table(tableName), session.table("source"))
}
}
}

class InMemoryTableProvider extends TableProvider {
override def getTable(options: CaseInsensitiveStringMap): Table = {
throw new UnsupportedOperationException("D'oh!")
}
}

/** A SessionCatalog that always loads an in memory Table, so we can test write code paths. */
class TestV2SessionCatalog extends V2SessionCatalog {

protected val tables: util.Map[Identifier, InMemoryTable] =
new ConcurrentHashMap[Identifier, InMemoryTable]()

override def loadTable(ident: Identifier): Table = {
if (tables.containsKey(ident)) {
tables.get(ident)
} else {
// Table was created through the built-in catalog
val t = super.loadTable(ident)
val table = new InMemoryTable(t.name(), t.schema(), t.partitioning(), t.properties())
tables.put(ident, table)
table
}
}

override def createTable(
ident: Identifier,
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String]): Table = {
val t = new InMemoryTable(ident.name(), schema, partitions, properties)
tables.put(ident, t)
t
}

def clearTables(): Unit = {
assert(!tables.isEmpty, "Tables were empty, maybe didn't use the session catalog code path?")
tables.keySet().asScala.foreach(super.dropTable)
tables.clear()
}
}