From 940282b922ec18a99f0c84a61e863856fb9a8438 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Tue, 22 Oct 2019 17:30:42 -0700 Subject: [PATCH 1/6] [SPARK-29563] CREATE TABLE LIKE should look up catalog/table like v2 commands --- .../spark/sql/catalyst/parser/SqlBase.g4 | 4 +- .../sql/catalyst/parser/AstBuilder.scala | 16 +++++ .../catalyst/plans/logical/statements.scala | 8 +++ .../sql/catalyst/parser/DDLParserSuite.scala | 18 ++++++ .../analysis/ResolveSessionCatalog.scala | 5 ++ .../spark/sql/execution/SparkSqlParser.scala | 16 ----- .../sql/connector/DataSourceV2SQLSuite.scala | 21 +++++++ .../execution/command/DDLParserSuite.scala | 63 +++++++------------ 8 files changed, 91 insertions(+), 60 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 113f41183e9d..73b69bb59a85 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -118,8 +118,8 @@ statement locationSpec | (TBLPROPERTIES tableProps=tablePropertyList))* (AS? query)? #createHiveTable - | CREATE TABLE (IF NOT EXISTS)? target=tableIdentifier - LIKE source=tableIdentifier locationSpec? #createTableLike + | CREATE TABLE (IF NOT EXISTS)? target=multipartIdentifier + LIKE source=multipartIdentifier locationSpec? #createTableLike | replaceTableHeader ('(' colTypeList ')')? tableProvider ((OPTIONS options=tablePropertyList) | (PARTITIONED BY partitioning=transformList) | diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index fca8f50bd174..0b6862672357 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3180,4 +3180,20 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging originalText = source(ctx.query), query = plan(ctx.query)) } + + /** + * Create a [[CreateTableLikeStatement]] logical plan. + * + * For example: + * {{{ + * CREATE TABLE [IF NOT EXISTS] multi_part_table_name + * LIKE existing_multi_part_table_name [locationSpec] + * }}} + */ + override def visitCreateTableLike(ctx: CreateTableLikeContext): LogicalPlan = withOrigin(ctx) { + val targetTable = visitMultipartIdentifier(ctx.target) + val sourceTable = visitMultipartIdentifier(ctx.source) + val location = Option(ctx.locationSpec).map(visitLocationSpec) + CreateTableLikeStatement(targetTable, sourceTable, location, ctx.EXISTS != null) + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index 6707a80213cd..25f06be0d2eb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -86,6 +86,14 @@ case class CreateTableAsSelectStatement( override def children: Seq[LogicalPlan] = Seq(asSelect) } +/** + * A CREATE TABLE LIKE statement, as parsed from SQL + */ +case class CreateTableLikeStatement( + targetTable: Seq[String], + sourceTable: Seq[String], + location: Option[String], + ifNotExists: Boolean) extends ParsedStatement /** * A REPLACE TABLE command, as parsed from SQL. * diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 11fc53092889..8faaf3b8c549 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -377,6 +377,24 @@ class DDLParserSuite extends AnalysisTest { } } + test("create table like") { + comparePlans( + parsePlan("CREATE TABLE a.b.c LIKE d.e.f"), + CreateTableLikeStatement(Seq("a", "b", "c"), Seq("d", "e", "f"), None, false)) + + comparePlans( + parsePlan("CREATE TABLE IF NOT EXISTS a.b.c LIKE d.e.f"), + CreateTableLikeStatement(Seq("a", "b", "c"), Seq("d", "e", "f"), None, true)) + + comparePlans( + parsePlan("CREATE TABLE a.b.c LIKE d.e.f LOCATION '/tmp'"), + CreateTableLikeStatement(Seq("a", "b", "c"), Seq("d", "e", "f"), Some("/tmp"), false)) + + comparePlans( + parsePlan("CREATE TABLE IF NOT EXISTS a.b.c LIKE d.e.f LOCATION '/tmp'"), + CreateTableLikeStatement(Seq("a", "b", "c"), Seq("d", "e", "f"), Some("/tmp"), true)) + } + test("drop table") { parseCompare("DROP TABLE testcat.ns1.ns2.tbl", DropTableStatement(Seq("testcat", "ns1", "ns2", "tbl"), ifExists = false, purge = false)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index bcf067ba0b97..241860aba440 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -240,6 +240,11 @@ class ResolveSessionCatalog( ignoreIfExists = c.ifNotExists) } + case CreateTableLikeStatement(targetTable, sourceTable, location, ifNotExists) => + val v1targetTable = parseV1Table(targetTable, "CREATE TABLE LIKE").asTableIdentifier + val v1sourceTable = parseV1Table(sourceTable, "CREATE TABLE LIKE").asTableIdentifier + CreateTableLikeCommand(v1targetTable, v1sourceTable, location, ifNotExists) + case RefreshTableStatement(SessionCatalog(_, tableName)) => RefreshTable(tableName.asTableIdentifier) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index e58cf2372c7f..473e382e3ed8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -626,22 +626,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { } } - /** - * Create a [[CreateTableLikeCommand]] command. - * - * For example: - * {{{ - * CREATE TABLE [IF NOT EXISTS] [db_name.]table_name - * LIKE [other_db_name.]existing_table_name [locationSpec] - * }}} - */ - override def visitCreateTableLike(ctx: CreateTableLikeContext): LogicalPlan = withOrigin(ctx) { - val targetTable = visitTableIdentifier(ctx.target) - val sourceTable = visitTableIdentifier(ctx.source) - val location = Option(ctx.locationSpec).map(visitLocationSpec) - CreateTableLikeCommand(targetTable, sourceTable, location, ctx.EXISTS != null) - } - /** * Create a [[CatalogStorageFormat]] for creating tables. * diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index ce41847f5b3a..8cbcc4aed2cf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -1562,6 +1562,27 @@ class DataSourceV2SQLSuite assert(e.message.contains("ALTER VIEW QUERY is only supported with v1 tables")) } + test("CREATE TABLE LIKE") { + val targetTable = "testcat.ns1.ns2.tbl1" + val sourceTable = "testcat.ns1.ns2.tbl2" + + val e1 = intercept[AnalysisException] { + sql(s"CREATE TABLE $targetTable LIKE $sourceTable") + } + assert(e1.message.contains("CREATE TABLE LIKE is only supported with v1 tables")) + + val e2 = intercept[AnalysisException] { + sql(s"CREATE TABLE IF NOT EXISTS $targetTable LIKE $sourceTable") + } + assert(e2.message.contains("CREATE TABLE LIKE is only supported with v1 tables")) + + val e3 = intercept[AnalysisException] { + sql(s"CREATE TABLE IF NOT EXISTS $targetTable LIKE " + + s"$sourceTable LOCATION 'AnyLocation'") + } + assert(e3.message.contains("CREATE TABLE LIKE is only supported with v1 tables")) + } + private def testV1Command(sqlCommand: String, sqlParams: String): Unit = { val e = intercept[AnalysisException] { sql(s"$sqlCommand $sqlParams") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index 940df21131e0..00ba40fee0c4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -1105,49 +1105,28 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession { intercept(sql2, "Found duplicate clauses: TBLPROPERTIES") } - test("create table like") { - val v1 = "CREATE TABLE table1 LIKE table2" - val (target, source, location, exists) = parser.parsePlan(v1).collect { - case CreateTableLikeCommand(t, s, l, allowExisting) => (t, s, l, allowExisting) + test("load data") { + val v1 = "LOAD DATA INPATH 'path' INTO TABLE table1" + val (table, path, isLocal, isOverwrite, partition) = parser.parsePlan(v1).collect { + case LoadDataCommand(t, path, l, o, partition) => (t, path, l, o, partition) }.head - assert(exists == false) - assert(target.database.isEmpty) - assert(target.table == "table1") - assert(source.database.isEmpty) - assert(source.table == "table2") - assert(location.isEmpty) - - val v2 = "CREATE TABLE IF NOT EXISTS table1 LIKE table2" - val (target2, source2, location2, exists2) = parser.parsePlan(v2).collect { - case CreateTableLikeCommand(t, s, l, allowExisting) => (t, s, l, allowExisting) - }.head - assert(exists2) - assert(target2.database.isEmpty) - assert(target2.table == "table1") - assert(source2.database.isEmpty) - assert(source2.table == "table2") - assert(location2.isEmpty) - - val v3 = "CREATE TABLE table1 LIKE table2 LOCATION '/spark/warehouse'" - val (target3, source3, location3, exists3) = parser.parsePlan(v3).collect { - case CreateTableLikeCommand(t, s, l, allowExisting) => (t, s, l, allowExisting) - }.head - assert(!exists3) - assert(target3.database.isEmpty) - assert(target3.table == "table1") - assert(source3.database.isEmpty) - assert(source3.table == "table2") - assert(location3 == Some("/spark/warehouse")) - - val v4 = "CREATE TABLE IF NOT EXISTS table1 LIKE table2 LOCATION '/spark/warehouse'" - val (target4, source4, location4, exists4) = parser.parsePlan(v4).collect { - case CreateTableLikeCommand(t, s, l, allowExisting) => (t, s, l, allowExisting) + assert(table.database.isEmpty) + assert(table.table == "table1") + assert(path == "path") + assert(!isLocal) + assert(!isOverwrite) + assert(partition.isEmpty) + + val v2 = "LOAD DATA LOCAL INPATH 'path' OVERWRITE INTO TABLE table1 PARTITION(c='1', d='2')" + val (table2, path2, isLocal2, isOverwrite2, partition2) = parser.parsePlan(v2).collect { + case LoadDataCommand(t, path, l, o, partition) => (t, path, l, o, partition) }.head - assert(exists4) - assert(target4.database.isEmpty) - assert(target4.table == "table1") - assert(source4.database.isEmpty) - assert(source4.table == "table2") - assert(location4 == Some("/spark/warehouse")) + assert(table2.database.isEmpty) + assert(table2.table == "table1") + assert(path2 == "path") + assert(isLocal2) + assert(isOverwrite2) + assert(partition2.nonEmpty) + assert(partition2.get.apply("c") == "1" && partition2.get.apply("d") == "2") } } From 13ee7833f23c85a5cfb1626c9373fc4c27300fbf Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Mon, 4 Nov 2019 07:56:46 -0800 Subject: [PATCH 2/6] Add V2 implementation --- .../catalyst/analysis/ResolveCatalogs.scala | 12 ++++ .../catalyst/plans/logical/v2Commands.scala | 10 ++++ .../datasources/v2/CreateTableLikeExec.scala | 57 +++++++++++++++++++ .../datasources/v2/DataSourceV2Strategy.scala | 5 +- .../sql/connector/DataSourceV2SQLSuite.scala | 48 +++++++++++----- 5 files changed, 118 insertions(+), 14 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index bca07262a6f7..83cb1d37b9a9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -141,6 +141,18 @@ class ResolveCatalogs(val catalogManager: CatalogManager) writeOptions = c.options.filterKeys(_ != "path"), ignoreIfExists = c.ifNotExists) + case CreateTableLikeStatement( + NonSessionCatalog(tCatalog, t), NonSessionCatalog(sCatalog, s), loc, ifNotExists ) => + if (loc.nonEmpty) { + throw new AnalysisException("Location clause not supported for" + + " CREATE TABLE LIKE statement when tables are of V2 type.") + } + CreateTableLike(tCatalog.asTableCatalog, + t.asIdentifier, + sCatalog.asTableCatalog, + s.asIdentifier, + ifNotExists) + case RefreshTableStatement(NonSessionCatalog(catalog, tableName)) => RefreshTable(catalog.asTableCatalog, tableName.asIdentifier) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index f2e7a0699fd9..c0e53391564a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -187,6 +187,16 @@ case class CreateTableAsSelect( } } +/** + * Create a new table with the same table definition of the source table. + */ +case class CreateTableLike( + targetCatalog: TableCatalog, + targetTableName: Identifier, + sourceCatalog: TableCatalog, + sourceTableName: Identifier, + ifNotExists: Boolean) extends Command + /** * Replace a table with a v2 catalog. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala new file mode 100644 index 000000000000..250e42db28c5 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog, TableChange} + +/** + * Physical plan node for altering a table. + */ +case class CreateTableLikeExec( + targetCatalog: TableCatalog, + targetTable: Identifier, + sourceCatalog: TableCatalog, + sourceTable: Identifier, + ifNotExists: Boolean) extends V2CommandExec { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + override def output: Seq[Attribute] = Seq.empty + + override protected def run(): Seq[InternalRow] = { + val sourceTab = sourceCatalog.loadTable(sourceTable) + if (!targetCatalog.tableExists(targetTable)) { + try { + targetCatalog.createTable(targetTable, + sourceTab.schema, + sourceTab.partitioning, + sourceTab.properties()) + } catch { + case _: TableAlreadyExistsException if ifNotExists => + logWarning(s"Table ${targetTable.quoted} was created concurrently. Ignoring.") + } + } else if (!ifNotExists) { + throw new TableAlreadyExistsException(targetTable) + } + + Seq.empty + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 0a7785b0e088..747d13bb1143 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -22,7 +22,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.{AnalysisException, Strategy} import org.apache.spark.sql.catalyst.expressions.{And, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowCurrentNamespace, ShowNamespaces, ShowTables} +import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateTableLike, CreateV2Table, DeleteFromTable, DescribeTable, DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowCurrentNamespace, ShowNamespaces, ShowTables} import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, TableCapability} import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream} import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} @@ -96,6 +96,9 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { catalog, ident, parts, query, planLater(query), props, writeOptions, ifNotExists) :: Nil } + case CreateTableLike(tCatalog, tTab, sCatalog, sTab, ifNotExists) => + CreateTableLikeExec(tCatalog, tTab, sCatalog, sTab, ifNotExists) :: Nil + case RefreshTable(catalog, ident) => RefreshTableExec(catalog, ident) :: Nil diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 8cbcc4aed2cf..2f6f3f083c30 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableExceptio import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog +import org.apache.spark.sql.connector.expressions.{Expression, LogicalExpressions, NamedReference, Transform} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION import org.apache.spark.sql.sources.SimpleScanSource @@ -1563,24 +1564,45 @@ class DataSourceV2SQLSuite } test("CREATE TABLE LIKE") { - val targetTable = "testcat.ns1.ns2.tbl1" - val sourceTable = "testcat.ns1.ns2.tbl2" + val targetTable = "testcat.target_tab" + val sourceTable = "testcat.source_tab" - val e1 = intercept[AnalysisException] { + withTable(targetTable, sourceTable) { + val e1 = intercept[AnalysisException] { + sql(s"CREATE TABLE $targetTable LIKE $sourceTable") + } + assert(e1.message.contains("Table source_tab not found")) + + val e2 = intercept[AnalysisException] { + sql(s"CREATE TABLE $targetTable LIKE $sourceTable LOCATION '/tmp'") + } + assert(e2.message.contains("Location clause not supported for CREATE TABLE LIKE" + + " statement when tables are of V2 type")) + + sql( + s""" + |CREATE TABLE $sourceTable + |(id bigint, data string, p int) USING foo PARTITIONED BY (id, p) + |TBLPROPERTIES ('prop'='propvalue') + |""".stripMargin) sql(s"CREATE TABLE $targetTable LIKE $sourceTable") - } - assert(e1.message.contains("CREATE TABLE LIKE is only supported with v1 tables")) + val testCatalog = catalog("testcat").asTableCatalog + val table = testCatalog.loadTable(Identifier.of(Array(), "target_tab")) + assert(table.name == targetTable) + assert(table.partitioning().size == 2) + assert(table.partitioning()(0) == LogicalExpressions.identity("id")) + assert(table.partitioning()(1) == LogicalExpressions.identity("p")) + assert(table.properties.asScala == Map("prop" -> "propvalue", "provider" -> "foo")) + + // 2nd invocation should result in error. + val e3 = intercept[AnalysisException] { + sql(s"CREATE TABLE $targetTable LIKE $sourceTable") + } + assert(e3.message.contains("Table target_tab already exists")) - val e2 = intercept[AnalysisException] { + // No error when IF NOT EXISTS is specified. sql(s"CREATE TABLE IF NOT EXISTS $targetTable LIKE $sourceTable") } - assert(e2.message.contains("CREATE TABLE LIKE is only supported with v1 tables")) - - val e3 = intercept[AnalysisException] { - sql(s"CREATE TABLE IF NOT EXISTS $targetTable LIKE " + - s"$sourceTable LOCATION 'AnyLocation'") - } - assert(e3.message.contains("CREATE TABLE LIKE is only supported with v1 tables")) } private def testV1Command(sqlCommand: String, sqlParams: String): Unit = { From 5e99f3761a0e7627a57c5fa3864d251f84f92be1 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Mon, 4 Nov 2019 23:57:01 -0800 Subject: [PATCH 3/6] fix1 --- .../execution/command/DDLParserSuite.scala | 25 ------------------- 1 file changed, 25 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index 00ba40fee0c4..8fb5b729e898 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -1104,29 +1104,4 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession { intercept(sql1, "Found duplicate clauses: COMMENT") intercept(sql2, "Found duplicate clauses: TBLPROPERTIES") } - - test("load data") { - val v1 = "LOAD DATA INPATH 'path' INTO TABLE table1" - val (table, path, isLocal, isOverwrite, partition) = parser.parsePlan(v1).collect { - case LoadDataCommand(t, path, l, o, partition) => (t, path, l, o, partition) - }.head - assert(table.database.isEmpty) - assert(table.table == "table1") - assert(path == "path") - assert(!isLocal) - assert(!isOverwrite) - assert(partition.isEmpty) - - val v2 = "LOAD DATA LOCAL INPATH 'path' OVERWRITE INTO TABLE table1 PARTITION(c='1', d='2')" - val (table2, path2, isLocal2, isOverwrite2, partition2) = parser.parsePlan(v2).collect { - case LoadDataCommand(t, path, l, o, partition) => (t, path, l, o, partition) - }.head - assert(table2.database.isEmpty) - assert(table2.table == "table1") - assert(path2 == "path") - assert(isLocal2) - assert(isOverwrite2) - assert(partition2.nonEmpty) - assert(partition2.get.apply("c") == "1" && partition2.get.apply("d") == "2") - } } From 4a1fece89e7013ccae7e69efe4ead8809d454e41 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Fri, 8 Nov 2019 14:46:12 -0800 Subject: [PATCH 4/6] remove me --- .../catalyst/analysis/ResolveCatalogs.scala | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index 83cb1d37b9a9..be686cfa2983 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, TableChange} +import org.apache.spark.sql.execution.command.CreateTableLikeCommand /** * Resolves catalogs from the multi-part identifiers in SQL statements, and convert the statements @@ -141,6 +142,24 @@ class ResolveCatalogs(val catalogManager: CatalogManager) writeOptions = c.options.filterKeys(_ != "path"), ignoreIfExists = c.ifNotExists) + case CreateTableLikeStatement(target, source, loc, ifNotExists) => + (source, target) match { + case (SessionCatalog(tCatalog, t), SessionCatalog(sCatalog, s)) => + val CatalogAndIdentifierParts(targetCatalog, targetParts) = t + val CatalogAndIdentifierParts(sourceCatalog, sourceParts) = s + CreateTableLikeCommand(v1targetTable, v1sourceTable, location, ifNotExists) + + case (NonSessionCatalog(tCatalog, t), NonSessionCatalog(sCatalog, s)) => + CreateTableLike(tCatalog.asTableCatalog, + t.asIdentifier, + sCatalog.asTableCatalog, + s.asIdentifier, + ifNotExists) + case (SessionCatalog(tCatalog, t), NonSessionCatalog(sCatalog, s)) => + case (NonSessionCatalog(tCatalog, t), SessionCatalog(sCatalog, s)) => + } + + case CreateTableLikeStatement( NonSessionCatalog(tCatalog, t), NonSessionCatalog(sCatalog, s), loc, ifNotExists ) => if (loc.nonEmpty) { @@ -228,4 +247,12 @@ class ResolveCatalogs(val catalogManager: CatalogManager) case _ => None } } + + object SessionCatalog { + def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Seq[String])] = nameParts match { + case CatalogAndIdentifierParts(catalog, parts) if isSessionCatalog(catalog) => + Some(catalog -> parts) + case _ => None + } + } } From 4c772e53129bb14ef87b9268a0dd4de9f7783ec8 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Sun, 10 Nov 2019 01:06:30 -0800 Subject: [PATCH 5/6] improve --- .../catalyst/analysis/ResolveCatalogs.scala | 48 ++++++++--------- .../catalyst/plans/logical/v2Commands.scala | 6 +-- .../datasources/v2/CreateTableLikeExec.scala | 25 +++++---- .../sql/connector/DataSourceV2SQLSuite.scala | 51 ++++++++++++++++++- 4 files changed, 93 insertions(+), 37 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index be686cfa2983..70a3d6a4dcd3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -21,7 +21,6 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, TableChange} -import org.apache.spark.sql.execution.command.CreateTableLikeCommand /** * Resolves catalogs from the multi-part identifiers in SQL statements, and convert the statements @@ -142,35 +141,36 @@ class ResolveCatalogs(val catalogManager: CatalogManager) writeOptions = c.options.filterKeys(_ != "path"), ignoreIfExists = c.ifNotExists) - case CreateTableLikeStatement(target, source, loc, ifNotExists) => - (source, target) match { - case (SessionCatalog(tCatalog, t), SessionCatalog(sCatalog, s)) => - val CatalogAndIdentifierParts(targetCatalog, targetParts) = t - val CatalogAndIdentifierParts(sourceCatalog, sourceParts) = s - CreateTableLikeCommand(v1targetTable, v1sourceTable, location, ifNotExists) - + case c @ CreateTableLikeStatement(target, source, loc, ifNotExists) => + def validateLocation(loc: Option[String]) = { + if (loc.isDefined) { + throw new AnalysisException("Location clause not supported for " + + "CREATE TABLE LIKE statement when tables are of V2 type") + } + } + (target, source) match { case (NonSessionCatalog(tCatalog, t), NonSessionCatalog(sCatalog, s)) => + validateLocation(loc) CreateTableLike(tCatalog.asTableCatalog, - t.asIdentifier, - sCatalog.asTableCatalog, - s.asIdentifier, + t, + Some(sCatalog.asTableCatalog), + s, ifNotExists) - case (SessionCatalog(tCatalog, t), NonSessionCatalog(sCatalog, s)) => case (NonSessionCatalog(tCatalog, t), SessionCatalog(sCatalog, s)) => - } - + validateLocation(loc) + CreateTableLike(tCatalog.asTableCatalog, + t, + None, + source, + ifNotExists) + case (SessionCatalog(tCatalog, t), NonSessionCatalog(sCatalog, s)) => + throw new AnalysisException("CREATE TABLE LIKE is not allowed when " + + "source table is V2 type and target table is V1 type") - case CreateTableLikeStatement( - NonSessionCatalog(tCatalog, t), NonSessionCatalog(sCatalog, s), loc, ifNotExists ) => - if (loc.nonEmpty) { - throw new AnalysisException("Location clause not supported for" + - " CREATE TABLE LIKE statement when tables are of V2 type.") + // When target and source are V1 type, its handled in v1 CreateTableLikeCommand. We + // return from here without any transformation and its handled in ResolveSessionCatalog. + case _ => c } - CreateTableLike(tCatalog.asTableCatalog, - t.asIdentifier, - sCatalog.asTableCatalog, - s.asIdentifier, - ifNotExists) case RefreshTableStatement(NonSessionCatalog(catalog, tableName)) => RefreshTable(catalog.asTableCatalog, tableName.asIdentifier) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index c0e53391564a..fcd4225a6401 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -192,9 +192,9 @@ case class CreateTableAsSelect( */ case class CreateTableLike( targetCatalog: TableCatalog, - targetTableName: Identifier, - sourceCatalog: TableCatalog, - sourceTableName: Identifier, + targetTableName: Seq[String], + sourceCatalog: Option[TableCatalog], + sourceTableName: Seq[String], ifNotExists: Boolean) extends Command /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala index 250e42db28c5..b212745d6a3e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala @@ -21,26 +21,33 @@ import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog, TableChange} +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog, TableChange, V1Table} /** - * Physical plan node for altering a table. + * Physical plan node for CREATE TABLE LIKE statement. */ case class CreateTableLikeExec( targetCatalog: TableCatalog, - targetTable: Identifier, - sourceCatalog: TableCatalog, - sourceTable: Identifier, + targetTable: Seq[String], + sourceCatalog: Option[TableCatalog], + sourceTable: Seq[String], ifNotExists: Boolean) extends V2CommandExec { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ override def output: Seq[Attribute] = Seq.empty override protected def run(): Seq[InternalRow] = { - val sourceTab = sourceCatalog.loadTable(sourceTable) - if (!targetCatalog.tableExists(targetTable)) { + val sessionCatalog = sqlContext.sparkSession.sessionState.catalog + // If source catalog is not specified then its resolved from session catalog. + val sourceTab = sourceCatalog.map { catalog => + catalog.loadTable(sourceTable.asIdentifier) + }.getOrElse( + V1Table(sessionCatalog.getTempViewOrPermanentTableMetadata(sourceTable.asTableIdentifier)) + ) + + if (!targetCatalog.tableExists(targetTable.asIdentifier)) { try { - targetCatalog.createTable(targetTable, + targetCatalog.createTable(targetTable.asIdentifier, sourceTab.schema, sourceTab.partitioning, sourceTab.properties()) @@ -49,7 +56,7 @@ case class CreateTableLikeExec( logWarning(s"Table ${targetTable.quoted} was created concurrently. Ignoring.") } } else if (!ifNotExists) { - throw new TableAlreadyExistsException(targetTable) + throw new TableAlreadyExistsException(targetTable.asIdentifier) } Seq.empty diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 2f6f3f083c30..da1914019bb6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -1563,7 +1563,7 @@ class DataSourceV2SQLSuite assert(e.message.contains("ALTER VIEW QUERY is only supported with v1 tables")) } - test("CREATE TABLE LIKE") { + test("CREATE TABLE LIKE with target v2 and source v2") { val targetTable = "testcat.target_tab" val sourceTable = "testcat.source_tab" @@ -1605,6 +1605,55 @@ class DataSourceV2SQLSuite } } + test("CREATE TABLE LIKE with target v2 and source v1") { + val targetTable = "testcat.target_tab" + val sourceTable = "default.source_tab" + + withTable(targetTable, sourceTable) { + val e1 = intercept[AnalysisException] { + sql(s"CREATE TABLE $targetTable LIKE $sourceTable") + } + assert(e1.message.contains("Table or view 'source_tab' not found in database 'default'")) + + val e2 = intercept[AnalysisException] { + sql(s"CREATE TABLE $targetTable LIKE $sourceTable LOCATION '/tmp'") + } + assert(e2.message.contains("Location clause not supported for CREATE TABLE LIKE" + + " statement when tables are of V2 type")) + + sql( + s""" + |CREATE TABLE $sourceTable + |(id bigint, data string, p int) USING parquet PARTITIONED BY (id, p) + |TBLPROPERTIES ('prop'='propvalue') + |""".stripMargin) + sql(s"CREATE TABLE $targetTable LIKE $sourceTable") + val testCatalog = catalog("testcat").asTableCatalog + val table = testCatalog.loadTable(Identifier.of(Array(), "target_tab")) + assert(table.name == targetTable) + assert(table.partitioning().size == 2) + assert(table.partitioning()(0) == LogicalExpressions.identity("id")) + assert(table.partitioning()(1) == LogicalExpressions.identity("p")) + assert(table.properties.asScala == Map("prop" -> "propvalue")) + + // 2nd invocation should result in error. + val e3 = intercept[AnalysisException] { + sql(s"CREATE TABLE $targetTable LIKE $sourceTable") + } + assert(e3.message.contains("Table target_tab already exists")) + + // No error when IF NOT EXISTS is specified. + sql(s"CREATE TABLE IF NOT EXISTS $targetTable LIKE $sourceTable") + + // if target is V1 and source if V2 then its not allowed. + val e4 = intercept[AnalysisException] { + sql(s"CREATE TABLE $sourceTable LIKE $targetTable") + } + assert(e4.message.contains("CREATE TABLE LIKE is not allowed when source table" + + " is V2 type and target table is V1 type")) + } + } + private def testV1Command(sqlCommand: String, sqlParams: String): Unit = { val e = intercept[AnalysisException] { sql(s"$sqlCommand $sqlParams") From 11ea11b80898f006ffe314f626b046a524a15ff6 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Sun, 10 Nov 2019 19:21:08 -0800 Subject: [PATCH 6/6] style --- .../org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index da1914019bb6..5ff2d30a6955 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -24,8 +24,8 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME +import org.apache.spark.sql.connector.expressions.LogicalExpressions import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog -import org.apache.spark.sql.connector.expressions.{Expression, LogicalExpressions, NamedReference, Transform} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION import org.apache.spark.sql.sources.SimpleScanSource