Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._

import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, NamespaceChange, TableChange}
import org.apache.spark.sql.catalog.v2.TableChange.{AddColumn, DeleteColumn, RemoveProperty, RenameColumn, SetProperty, UpdateColumnComment, UpdateColumnType}
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException}
import org.apache.spark.sql.sources.v2.Table
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}

Expand Down Expand Up @@ -219,5 +219,7 @@ object CatalogV2Util {
Option(catalog.asTableCatalog.loadTable(ident))
} catch {
case _: NoSuchTableException => None
case _: NoSuchDatabaseException => None
case _: NoSuchNamespaceException => None
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.internal

import java.util

import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.sql.catalog.v2.expressions.{LogicalExpressions, Transform}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.sources.v2.{Table, TableCapability}
import org.apache.spark.sql.types.StructType

/**
* An implementation of catalog v2 `Table` to expose v1 table metadata.
*/
case class UnresolvedTable(v1Table: CatalogTable) extends Table {
implicit class IdentifierHelper(identifier: TableIdentifier) {
def quoted: String = {
identifier.database match {
case Some(db) =>
Seq(db, identifier.table).map(quote).mkString(".")
case _ =>
quote(identifier.table)

}
}

private def quote(part: String): String = {
if (part.contains(".") || part.contains("`")) {
s"`${part.replace("`", "``")}`"
} else {
part
}
}
}

def catalogTable: CatalogTable = v1Table

lazy val options: Map[String, String] = {
v1Table.storage.locationUri match {
case Some(uri) =>
v1Table.storage.properties + ("path" -> uri.toString)
case _ =>
v1Table.storage.properties
}
}

override lazy val properties: util.Map[String, String] = v1Table.properties.asJava

override lazy val schema: StructType = v1Table.schema

override lazy val partitioning: Array[Transform] = {
val partitions = new mutable.ArrayBuffer[Transform]()

v1Table.partitionColumnNames.foreach { col =>
partitions += LogicalExpressions.identity(col)
}

v1Table.bucketSpec.foreach { spec =>
partitions += LogicalExpressions.bucket(spec.numBuckets, spec.bucketColumnNames: _*)
}

partitions.toArray
}

override def name: String = v1Table.identifier.quoted

override def capabilities: util.Set[TableCapability] = new util.HashSet[TableCapability]()

override def toString: String = s"UnresolvedTable($name)"
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
import org.apache.spark.sql.sources.v2.Table
import org.apache.spark.sql.sources.v2.internal.UnresolvedTable
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -650,8 +651,14 @@ class Analyzer(
if catalog.isTemporaryTable(ident) =>
u // temporary views take precedence over catalog table names

case u @ UnresolvedRelation(CatalogObjectIdentifier(Some(catalogPlugin), ident)) =>
loadTable(catalogPlugin, ident).map(DataSourceV2Relation.create).getOrElse(u)
case u @ UnresolvedRelation(CatalogObjectIdentifier(maybeCatalog, ident)) =>
maybeCatalog.orElse(sessionCatalog)
.flatMap(loadTable(_, ident))
.map {
case unresolved: UnresolvedTable => u
case resolved => DataSourceV2Relation.create(resolved)
}
.getOrElse(u)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A +1 on this.

}
}

Expand Down
35 changes: 25 additions & 10 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister}
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.TableCapability._
import org.apache.spark.sql.sources.v2.internal.UnresolvedTable
import org.apache.spark.sql.types.{IntegerType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap

Expand Down Expand Up @@ -251,19 +252,13 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
assertNotBucketed("save")

val session = df.sparkSession
val useV1Sources =
session.sessionState.conf.useV1SourceWriterList.toLowerCase(Locale.ROOT).split(",")
val cls = DataSource.lookupDataSource(source, session.sessionState.conf)
val shouldUseV1Source = cls.newInstance() match {
case d: DataSourceRegister if useV1Sources.contains(d.shortName()) => true
case _ => useV1Sources.contains(cls.getCanonicalName.toLowerCase(Locale.ROOT))
}
val canUseV2 = canUseV2Source(session, cls) && partitioningColumns.isEmpty

// In Data Source V2 project, partitioning is still under development.
// Here we fallback to V1 if partitioning columns are specified.
// TODO(SPARK-26778): use V2 implementations when partitioning feature is supported.
if (!shouldUseV1Source && classOf[TableProvider].isAssignableFrom(cls) &&
partitioningColumns.isEmpty) {
if (canUseV2) {
val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider]
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
provider, session.sessionState.conf)
Expand Down Expand Up @@ -493,13 +488,20 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
import df.sparkSession.sessionState.analyzer.{AsTableIdentifier, CatalogObjectIdentifier}
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._

import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
val session = df.sparkSession
val provider = DataSource.lookupDataSource(source, session.sessionState.conf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the provider here may not be the actual table provider, as saveAsTable can write to an existing table. Maybe we should always use v2 session catalog?

Copy link
Contributor Author

@brkyvz brkyvz Aug 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That works for me. Since the V2 code path will fallback to the V1 code path if it sees an UnresolvedTable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm. actually that causes issues if the table doesn't exist. Maybe we should use the statements instead of the logical plans?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on using statements.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's do it in a followup.

val canUseV2 = canUseV2Source(session, provider)
val sessionCatalogOpt = session.sessionState.analyzer.sessionCatalog

session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match {
case CatalogObjectIdentifier(Some(catalog), ident) =>
saveAsTable(catalog.asTableCatalog, ident, modeForDSV2)
// TODO(SPARK-28666): This should go through V2SessionCatalog

case CatalogObjectIdentifier(None, ident)
if canUseV2 && sessionCatalogOpt.isDefined && ident.namespace().length <= 1 =>
// We pass in the modeForDSV1, as using the V2 session catalog should maintain compatibility
// for now.
saveAsTable(sessionCatalogOpt.get.asTableCatalog, ident, modeForDSV1)

case AsTableIdentifier(tableIdentifier) =>
saveAsTable(tableIdentifier)
Expand All @@ -525,6 +527,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
}

val command = (mode, tableOpt) match {
case (_, Some(table: UnresolvedTable)) =>
return saveAsTable(TableIdentifier(ident.name(), ident.namespace().headOption))

case (SaveMode.Append, Some(table)) =>
AppendData.byName(DataSourceV2Relation.create(table), df.logicalPlan)

Expand Down Expand Up @@ -830,6 +835,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {

private def modeForDSV2 = mode.getOrElse(SaveMode.Append)

private def canUseV2Source(session: SparkSession, providerClass: Class[_]): Boolean = {
val useV1Sources =
session.sessionState.conf.useV1SourceWriterList.toLowerCase(Locale.ROOT).split(",")
val shouldUseV1Source = providerClass.newInstance() match {
case d: DataSourceRegister if useV1Sources.contains(d.shortName()) => true
case _ => useV1Sources.contains(providerClass.getCanonicalName.toLowerCase(Locale.ROOT))
}
!shouldUseV1Source && classOf[TableProvider].isAssignableFrom(providerClass)
}

///////////////////////////////////////////////////////////////////////////////////////
// Builder pattern config options
///////////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand}
import org.apache.spark.sql.execution.datasources.v2.{CatalogTableAsV2, DataSourceV2Relation}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2.TableProvider
import org.apache.spark.sql.types.{HIVE_TYPE_STRING, HiveStringType, MetadataBuilder, StructField, StructType}
import org.apache.spark.sql.util.SchemaUtils

case class DataSourceResolution(
conf: SQLConf,
Expand Down Expand Up @@ -183,8 +181,6 @@ case class DataSourceResolution(
val aliased = delete.tableAlias.map(SubqueryAlias(_, relation)).getOrElse(relation)
DeleteFromTable(aliased, delete.condition)

case DataSourceV2Relation(CatalogTableAsV2(catalogTable), _, _) =>
UnresolvedCatalogRelation(catalogTable)
}

object V1WriteProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchT
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils, SessionCatalog}
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.internal.SessionState
import org.apache.spark.sql.sources.v2.{Table, TableCapability}
import org.apache.spark.sql.sources.v2.Table
import org.apache.spark.sql.sources.v2.internal.UnresolvedTable
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

Expand Down Expand Up @@ -70,7 +71,7 @@ class V2SessionCatalog(sessionState: SessionState) extends TableCatalog {
throw new NoSuchTableException(ident)
}

CatalogTableAsV2(catalogTable)
UnresolvedTable(catalogTable)
}

override def invalidateTable(ident: Identifier): Unit = {
Expand Down Expand Up @@ -179,66 +180,6 @@ class V2SessionCatalog(sessionState: SessionState) extends TableCatalog {
override def toString: String = s"V2SessionCatalog($name)"
}

/**
* An implementation of catalog v2 [[Table]] to expose v1 table metadata.
*/
case class CatalogTableAsV2(v1Table: CatalogTable) extends Table {
implicit class IdentifierHelper(identifier: TableIdentifier) {
def quoted: String = {
identifier.database match {
case Some(db) =>
Seq(db, identifier.table).map(quote).mkString(".")
case _ =>
quote(identifier.table)

}
}

private def quote(part: String): String = {
if (part.contains(".") || part.contains("`")) {
s"`${part.replace("`", "``")}`"
} else {
part
}
}
}

def catalogTable: CatalogTable = v1Table

lazy val options: Map[String, String] = {
v1Table.storage.locationUri match {
case Some(uri) =>
v1Table.storage.properties + ("path" -> uri.toString)
case _ =>
v1Table.storage.properties
}
}

override lazy val properties: util.Map[String, String] = v1Table.properties.asJava

override lazy val schema: StructType = v1Table.schema

override lazy val partitioning: Array[Transform] = {
val partitions = new mutable.ArrayBuffer[Transform]()

v1Table.partitionColumnNames.foreach { col =>
partitions += LogicalExpressions.identity(col)
}

v1Table.bucketSpec.foreach { spec =>
partitions += LogicalExpressions.bucket(spec.numBuckets, spec.bucketColumnNames: _*)
}

partitions.toArray
}

override def name: String = v1Table.identifier.quoted

override def capabilities: util.Set[TableCapability] = new util.HashSet[TableCapability]()

override def toString: String = s"CatalogTableAsV2($name)"
}

private[sql] object V2SessionCatalog {
/**
* Convert v2 Transforms to v1 partition columns and an optional bucket spec.
Expand Down
Loading