Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ statement
locationSpec |
(TBLPROPERTIES tableProps=tablePropertyList))*
(AS? query)? #createHiveTable
| CREATE TABLE (IF NOT EXISTS)? target=tableIdentifier
LIKE source=tableIdentifier locationSpec? #createTableLike
| CREATE TABLE (IF NOT EXISTS)? target=multipartIdentifier
LIKE source=multipartIdentifier locationSpec? #createTableLike
| replaceTableHeader ('(' colTypeList ')')? tableProvider
((OPTIONS options=tablePropertyList) |
(PARTITIONED BY partitioning=transformList) |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,37 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
writeOptions = c.options.filterKeys(_ != "path"),
ignoreIfExists = c.ifNotExists)

case c @ CreateTableLikeStatement(target, source, loc, ifNotExists) =>
def validateLocation(loc: Option[String]) = {
if (loc.isDefined) {
throw new AnalysisException("Location clause not supported for " +
"CREATE TABLE LIKE statement when tables are of V2 type")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can support the LOCATION clause. See CatalogV2Utils.convertTableProperties, we can store the location in a special table property location.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Thanks a lot. I was not aware of this. I will check.

}
}
(target, source) match {
case (NonSessionCatalog(tCatalog, t), NonSessionCatalog(sCatalog, s)) =>
validateLocation(loc)
CreateTableLike(tCatalog.asTableCatalog,
t,
Some(sCatalog.asTableCatalog),
s,
ifNotExists)
case (NonSessionCatalog(tCatalog, t), SessionCatalog(sCatalog, s)) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we need to catch session catalog, we should move the case to ResolveSessionCatalog. It's OK to create v2 command in ResolveSessionCatalog

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan NonSessionCatalog is not available in ResolveSessionCatalog, right ? this case catches both NonSessionCatalog and SessionCatalog ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok let's leave it.

validateLocation(loc)
CreateTableLike(tCatalog.asTableCatalog,
t,
None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we pass sCatalog in ?

source,
ifNotExists)
case (SessionCatalog(tCatalog, t), NonSessionCatalog(sCatalog, s)) =>
throw new AnalysisException("CREATE TABLE LIKE is not allowed when " +
"source table is V2 type and target table is V1 type")

// When target and source are V1 type, its handled in v1 CreateTableLikeCommand. We
// return from here without any transformation and its handled in ResolveSessionCatalog.
case _ => c
}

case RefreshTableStatement(NonSessionCatalog(catalog, tableName)) =>
RefreshTable(catalog.asTableCatalog, tableName.asIdentifier)

Expand Down Expand Up @@ -216,4 +247,12 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
case _ => None
}
}

object SessionCatalog {
def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Seq[String])] = nameParts match {
case CatalogAndIdentifierParts(catalog, parts) if isSessionCatalog(catalog) =>
Some(catalog -> parts)
case _ => None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3180,4 +3180,20 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
originalText = source(ctx.query),
query = plan(ctx.query))
}

/**
* Create a [[CreateTableLikeStatement]] logical plan.
*
* For example:
* {{{
* CREATE TABLE [IF NOT EXISTS] multi_part_table_name
* LIKE existing_multi_part_table_name [locationSpec]
* }}}
*/
override def visitCreateTableLike(ctx: CreateTableLikeContext): LogicalPlan = withOrigin(ctx) {
val targetTable = visitMultipartIdentifier(ctx.target)
val sourceTable = visitMultipartIdentifier(ctx.source)
val location = Option(ctx.locationSpec).map(visitLocationSpec)
CreateTableLikeStatement(targetTable, sourceTable, location, ctx.EXISTS != null)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ case class CreateTableAsSelectStatement(
override def children: Seq[LogicalPlan] = Seq(asSelect)
}

/**
* A CREATE TABLE LIKE statement, as parsed from SQL
*/
case class CreateTableLikeStatement(
targetTable: Seq[String],
sourceTable: Seq[String],
location: Option[String],
ifNotExists: Boolean) extends ParsedStatement
/**
* A REPLACE TABLE command, as parsed from SQL.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,16 @@ case class CreateTableAsSelect(
}
}

/**
* Create a new table with the same table definition of the source table.
*/
case class CreateTableLike(
targetCatalog: TableCatalog,
targetTableName: Seq[String],
sourceCatalog: Option[TableCatalog],
sourceTableName: Seq[String],
Copy link
Contributor

@cloud-fan cloud-fan Nov 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the source table, what we really care is the table itself, not which catalog it comes from. I think it's better to define the plan as

case class CreateTableLike(
    targetCatalog: TableCatalog,
    targetTableName: Seq[String],
    sourceTable: NamedRelation,
    location: Option[String],
    provider: Option[String],
    ifNotExists: Boolean)

In the planner, we match CreateTableLike(..., r: DataSourceV2Relation, ..), and create the physical plan with source table r.table

Copy link
Contributor Author

@dilipbiswal dilipbiswal Nov 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I am a bit confused. The source can be both V1 or V2, right ? So how can we expect a DataSourceV2Relation as source ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the v1 table has a v2 adapter called V1Table. In ResolveCatalogs, we can lookup table from session catalog, create a DataSourceV2Relation, and pass it into CreateTableLike.

ifNotExists: Boolean) extends Command

/**
* Replace a table with a v2 catalog.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,24 @@ class DDLParserSuite extends AnalysisTest {
}
}

test("create table like") {
comparePlans(
parsePlan("CREATE TABLE a.b.c LIKE d.e.f"),
CreateTableLikeStatement(Seq("a", "b", "c"), Seq("d", "e", "f"), None, false))

comparePlans(
parsePlan("CREATE TABLE IF NOT EXISTS a.b.c LIKE d.e.f"),
CreateTableLikeStatement(Seq("a", "b", "c"), Seq("d", "e", "f"), None, true))

comparePlans(
parsePlan("CREATE TABLE a.b.c LIKE d.e.f LOCATION '/tmp'"),
CreateTableLikeStatement(Seq("a", "b", "c"), Seq("d", "e", "f"), Some("/tmp"), false))

comparePlans(
parsePlan("CREATE TABLE IF NOT EXISTS a.b.c LIKE d.e.f LOCATION '/tmp'"),
CreateTableLikeStatement(Seq("a", "b", "c"), Seq("d", "e", "f"), Some("/tmp"), true))
}

test("drop table") {
parseCompare("DROP TABLE testcat.ns1.ns2.tbl",
DropTableStatement(Seq("testcat", "ns1", "ns2", "tbl"), ifExists = false, purge = false))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,11 @@ class ResolveSessionCatalog(
ignoreIfExists = c.ifNotExists)
}

case CreateTableLikeStatement(targetTable, sourceTable, location, ifNotExists) =>
val v1targetTable = parseV1Table(targetTable, "CREATE TABLE LIKE").asTableIdentifier
val v1sourceTable = parseV1Table(sourceTable, "CREATE TABLE LIKE").asTableIdentifier
CreateTableLikeCommand(v1targetTable, v1sourceTable, location, ifNotExists)

case RefreshTableStatement(SessionCatalog(_, tableName)) =>
RefreshTable(tableName.asTableIdentifier)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,22 +626,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
}
}

/**
* Create a [[CreateTableLikeCommand]] command.
*
* For example:
* {{{
* CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
* LIKE [other_db_name.]existing_table_name [locationSpec]
* }}}
*/
override def visitCreateTableLike(ctx: CreateTableLikeContext): LogicalPlan = withOrigin(ctx) {
val targetTable = visitTableIdentifier(ctx.target)
val sourceTable = visitTableIdentifier(ctx.source)
val location = Option(ctx.locationSpec).map(visitLocationSpec)
CreateTableLikeCommand(targetTable, sourceTable, location, ctx.EXISTS != null)
}

/**
* Create a [[CatalogStorageFormat]] for creating tables.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog, TableChange, V1Table}

/**
* Physical plan node for CREATE TABLE LIKE statement.
*/
case class CreateTableLikeExec(
targetCatalog: TableCatalog,
targetTable: Seq[String],
sourceCatalog: Option[TableCatalog],
sourceTable: Seq[String],
ifNotExists: Boolean) extends V2CommandExec {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

override def output: Seq[Attribute] = Seq.empty

override protected def run(): Seq[InternalRow] = {
val sessionCatalog = sqlContext.sparkSession.sessionState.catalog
// If source catalog is not specified then its resolved from session catalog.
val sourceTab = sourceCatalog.map { catalog =>
catalog.loadTable(sourceTable.asIdentifier)
}.getOrElse(
V1Table(sessionCatalog.getTempViewOrPermanentTableMetadata(sourceTable.asTableIdentifier))
)

if (!targetCatalog.tableExists(targetTable.asIdentifier)) {
try {
targetCatalog.createTable(targetTable.asIdentifier,
sourceTab.schema,
sourceTab.partitioning,
sourceTab.properties())
} catch {
case _: TableAlreadyExistsException if ifNotExists =>
logWarning(s"Table ${targetTable.quoted} was created concurrently. Ignoring.")
}
} else if (!ifNotExists) {
throw new TableAlreadyExistsException(targetTable.asIdentifier)
}

Seq.empty
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.{AnalysisException, Strategy}
import org.apache.spark.sql.catalyst.expressions.{And, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowCurrentNamespace, ShowNamespaces, ShowTables}
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateTableLike, CreateV2Table, DeleteFromTable, DescribeTable, DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowCurrentNamespace, ShowNamespaces, ShowTables}
import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, TableCapability}
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
Expand Down Expand Up @@ -96,6 +96,9 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
catalog, ident, parts, query, planLater(query), props, writeOptions, ifNotExists) :: Nil
}

case CreateTableLike(tCatalog, tTab, sCatalog, sTab, ifNotExists) =>
CreateTableLikeExec(tCatalog, tTab, sCatalog, sTab, ifNotExists) :: Nil

case RefreshTable(catalog, ident) =>
RefreshTableExec(catalog, ident) :: Nil

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.connector.expressions.LogicalExpressions
import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION
Expand Down Expand Up @@ -1562,6 +1563,97 @@ class DataSourceV2SQLSuite
assert(e.message.contains("ALTER VIEW QUERY is only supported with v1 tables"))
}

test("CREATE TABLE LIKE with target v2 and source v2") {
val targetTable = "testcat.target_tab"
val sourceTable = "testcat.source_tab"

withTable(targetTable, sourceTable) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is withTable needed since tables are not created?

val e1 = intercept[AnalysisException] {
sql(s"CREATE TABLE $targetTable LIKE $sourceTable")
}
assert(e1.message.contains("Table source_tab not found"))

val e2 = intercept[AnalysisException] {
sql(s"CREATE TABLE $targetTable LIKE $sourceTable LOCATION '/tmp'")
}
assert(e2.message.contains("Location clause not supported for CREATE TABLE LIKE" +
" statement when tables are of V2 type"))

sql(
s"""
|CREATE TABLE $sourceTable
|(id bigint, data string, p int) USING foo PARTITIONED BY (id, p)
|TBLPROPERTIES ('prop'='propvalue')
|""".stripMargin)
sql(s"CREATE TABLE $targetTable LIKE $sourceTable")
val testCatalog = catalog("testcat").asTableCatalog
val table = testCatalog.loadTable(Identifier.of(Array(), "target_tab"))
assert(table.name == targetTable)
assert(table.partitioning().size == 2)
assert(table.partitioning()(0) == LogicalExpressions.identity("id"))
assert(table.partitioning()(1) == LogicalExpressions.identity("p"))
assert(table.properties.asScala == Map("prop" -> "propvalue", "provider" -> "foo"))

// 2nd invocation should result in error.
val e3 = intercept[AnalysisException] {
sql(s"CREATE TABLE $targetTable LIKE $sourceTable")
}
assert(e3.message.contains("Table target_tab already exists"))

// No error when IF NOT EXISTS is specified.
sql(s"CREATE TABLE IF NOT EXISTS $targetTable LIKE $sourceTable")
}
}

test("CREATE TABLE LIKE with target v2 and source v1") {
val targetTable = "testcat.target_tab"
val sourceTable = "default.source_tab"

withTable(targetTable, sourceTable) {
val e1 = intercept[AnalysisException] {
sql(s"CREATE TABLE $targetTable LIKE $sourceTable")
}
assert(e1.message.contains("Table or view 'source_tab' not found in database 'default'"))

val e2 = intercept[AnalysisException] {
sql(s"CREATE TABLE $targetTable LIKE $sourceTable LOCATION '/tmp'")
}
assert(e2.message.contains("Location clause not supported for CREATE TABLE LIKE" +
" statement when tables are of V2 type"))

sql(
s"""
|CREATE TABLE $sourceTable
|(id bigint, data string, p int) USING parquet PARTITIONED BY (id, p)
|TBLPROPERTIES ('prop'='propvalue')
|""".stripMargin)
sql(s"CREATE TABLE $targetTable LIKE $sourceTable")
val testCatalog = catalog("testcat").asTableCatalog
val table = testCatalog.loadTable(Identifier.of(Array(), "target_tab"))
assert(table.name == targetTable)
assert(table.partitioning().size == 2)
assert(table.partitioning()(0) == LogicalExpressions.identity("id"))
assert(table.partitioning()(1) == LogicalExpressions.identity("p"))
assert(table.properties.asScala == Map("prop" -> "propvalue"))

// 2nd invocation should result in error.
val e3 = intercept[AnalysisException] {
sql(s"CREATE TABLE $targetTable LIKE $sourceTable")
}
assert(e3.message.contains("Table target_tab already exists"))

// No error when IF NOT EXISTS is specified.
sql(s"CREATE TABLE IF NOT EXISTS $targetTable LIKE $sourceTable")

// if target is V1 and source if V2 then its not allowed.
val e4 = intercept[AnalysisException] {
sql(s"CREATE TABLE $sourceTable LIKE $targetTable")
}
assert(e4.message.contains("CREATE TABLE LIKE is not allowed when source table" +
" is V2 type and target table is V1 type"))
}
}

private def testV1Command(sqlCommand: String, sqlParams: String): Unit = {
val e = intercept[AnalysisException] {
sql(s"$sqlCommand $sqlParams")
Expand Down
Loading