From dc5221213a001cd414ac123ffc001c71c40955fc Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Sun, 20 Oct 2019 07:53:39 -0700 Subject: [PATCH 1/3] initial checkin --- .../spark/sql/catalyst/parser/SqlBase.g4 | 4 +-- .../sql/catalyst/parser/AstBuilder.scala | 12 +++++++ .../catalyst/plans/logical/statements.scala | 5 +++ .../sql/catalyst/parser/DDLParserSuite.scala | 6 ++++ .../analysis/ResolveSessionCatalog.scala | 6 +++- .../spark/sql/execution/SparkSqlParser.scala | 7 ----- .../sql/connector/DataSourceV2SQLSuite.scala | 31 ++++++++++--------- 7 files changed, 47 insertions(+), 24 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 1839203e3b235..b2171893beba8 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -201,9 +201,9 @@ statement | (DESC | DESCRIBE) FUNCTION EXTENDED? describeFuncName #describeFunction | (DESC | DESCRIBE) database EXTENDED? db=errorCapturingIdentifier #describeDatabase | (DESC | DESCRIBE) TABLE? option=(EXTENDED | FORMATTED)? - multipartIdentifier partitionSpec? describeColName? #describeTable + multipartIdentifier partitionSpec? describeColName? #describeTable | (DESC | DESCRIBE) QUERY? query #describeQuery - | REFRESH TABLE tableIdentifier #refreshTable + | REFRESH TABLE multipartIdentifier #refreshTable | REFRESH (STRING | .*?) #refreshResource | CACHE LAZY? TABLE tableIdentifier (OPTIONS options=tablePropertyList)? (AS? query)? #cacheTable diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 8af7cf9ad8008..eaace2a48c3dd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2728,4 +2728,16 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging override def visitRepairTable(ctx: RepairTableContext): LogicalPlan = withOrigin(ctx) { RepairTableStatement(visitMultipartIdentifier(ctx.multipartIdentifier())) } + + /** + * Create a [[RefreshTableStatement]]. + * + * For example: + * {{{ + * REFRESH TABLE multi_part_name + * }}} + */ + override def visitRefreshTable(ctx: RefreshTableContext): LogicalPlan = withOrigin(ctx) { + RefreshTableStatement(visitMultipartIdentifier(ctx.multipartIdentifier())) + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index 72d5cbb7d9045..600401e29f229 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -316,3 +316,8 @@ case class AnalyzeColumnStatement( * A REPAIR TABLE statement, as parsed from SQL */ case class RepairTableStatement(tableName: Seq[String]) extends ParsedStatement + +/** + * A REFRESH TABLE statement, as parsed from SQL + */ +case class RefreshTableStatement(tableName: Seq[String]) extends ParsedStatement diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 0eaf74f655065..a43596f0baef6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -961,6 +961,12 @@ class DDLParserSuite extends AnalysisTest { RepairTableStatement(Seq("a", "b", "c"))) } + test("REFRESH TABLE table") { + comparePlans( + parsePlan("REFRESH TABLE a.b.c"), + RefreshTableStatement(Seq("a", "b", "c"))) + } + private case class TableSpec( name: Seq[String], schema: Option[StructType], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 72f539f72008d..f60841fdaa990 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, TableChange, V1Table} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableRecoverPartitionsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, AnalyzeColumnCommand, AnalyzePartitionCommand, AnalyzeTableCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowTablesCommand} -import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource} +import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, RefreshTable} import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{HIVE_TYPE_STRING, HiveStringType, MetadataBuilder, StructField, StructType} @@ -282,6 +282,10 @@ class ResolveSessionCatalog( AlterTableRecoverPartitionsCommand( v1TableName.asTableIdentifier, "MSCK REPAIR TABLE") + + case RefreshTableStatement(tableName) => + val v1TableName = parseV1Table(tableName, "REFRESH TABLE") + RefreshTable(v1TableName.asTableIdentifier) } private def parseV1Table(tableName: Seq[String], sql: String): Seq[String] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 3e7a54877cae8..a0866b3ae5381 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -160,13 +160,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { ShowCreateTableCommand(table) } - /** - * Create a [[RefreshTable]] logical plan. - */ - override def visitRefreshTable(ctx: RefreshTableContext): LogicalPlan = withOrigin(ctx) { - RefreshTable(visitTableIdentifier(ctx.tableIdentifier)) - } - /** * Create a [[RefreshResource]] logical plan. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index d253e6078ddc0..47dc0ad7d78cf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -1185,16 +1185,8 @@ class DataSourceV2SQLSuite val t = "testcat.ns1.ns2.tbl" withTable(t) { spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo") - - val e = intercept[AnalysisException] { - sql(s"ANALYZE TABLE $t COMPUTE STATISTICS") - } - assert(e.message.contains("ANALYZE TABLE is only supported with v1 tables")) - - val e2 = intercept[AnalysisException] { - sql(s"ANALYZE TABLE $t COMPUTE STATISTICS FOR ALL COLUMNS") - } - assert(e2.message.contains("ANALYZE TABLE is only supported with v1 tables")) + testV1Command("ANALYZE TABLE", s"$t COMPUTE STATISTICS") + testV1Command("ANALYZE TABLE", s"$t COMPUTE STATISTICS FOR ALL COLUMNS") } } @@ -1202,12 +1194,23 @@ class DataSourceV2SQLSuite val t = "testcat.ns1.ns2.tbl" withTable(t) { spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo") + testV1Command("MSCK REPAIR TABLE", t) + } + } - val e = intercept[AnalysisException] { - sql(s"MSCK REPAIR TABLE $t") - } - assert(e.message.contains("MSCK REPAIR TABLE is only supported with v1 tables")) + test("REFRESH TABLE") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id bigint, data string) USING foo") + testV1Command("REFRESH TABLE", t) + } + } + + private def testV1Command(sqlCommand: String, sqlParams: String): Unit = { + val e = intercept[AnalysisException] { + sql(s"$sqlCommand $sqlParams") } + assert(e.message.contains(s"$sqlCommand is only supported with v1 tables")) } private def assertAnalysisError(sqlStatement: String, expectedError: String): Unit = { From fcc31f2b16cee381c3e41a57f436f1c4de6e4634 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Mon, 21 Oct 2019 21:54:16 -0700 Subject: [PATCH 2/3] implement v2 command --- .../catalyst/analysis/ResolveCatalogs.scala | 3 ++ .../catalyst/plans/logical/v2Commands.scala | 7 ++++ .../analysis/ResolveSessionCatalog.scala | 7 ++-- .../datasources/v2/DataSourceV2Strategy.scala | 5 ++- .../datasources/v2/RefreshTableExec.scala | 32 +++++++++++++++++++ .../sql/connector/DataSourceV2SQLSuite.scala | 16 +++++----- 6 files changed, 57 insertions(+), 13 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RefreshTableExec.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index 13a79a82a3858..f494870e66d0d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -137,6 +137,9 @@ class ResolveCatalogs(val catalogManager: CatalogManager) writeOptions = c.options.filterKeys(_ != "path"), ignoreIfExists = c.ifNotExists) + case RefreshTableStatement(NonSessionCatalog(catalog, tableName)) => + RefreshTable(catalog.asTableCatalog, tableName.asIdentifier) + case c @ ReplaceTableStatement( NonSessionCatalog(catalog, tableName), _, _, _, _, _, _, _, _, _) => ReplaceTable( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index f89dfb1ec47d8..2209e95da01d2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -340,3 +340,10 @@ case class SetCatalogAndNamespace( catalogManager: CatalogManager, catalogName: Option[String], namespace: Option[Seq[String]]) extends Command + +/** + * The logical plan of the REFRESH TABLE command that works for v2 catalogs. + */ +case class RefreshTable( + catalog: TableCatalog, + ident: Identifier) extends Command diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index f60841fdaa990..bd83e583500c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -216,6 +216,9 @@ class ResolveSessionCatalog( ignoreIfExists = c.ifNotExists) } + case RefreshTableStatement(SessionCatalog(_, tableName)) => + RefreshTable(tableName.asTableIdentifier) + // For REPLACE TABLE [AS SELECT], we should fail if the catalog is resolved to the // session catalog and the table provider is not v2. case c @ ReplaceTableStatement( @@ -282,10 +285,6 @@ class ResolveSessionCatalog( AlterTableRecoverPartitionsCommand( v1TableName.asTableIdentifier, "MSCK REPAIR TABLE") - - case RefreshTableStatement(tableName) => - val v1TableName = parseV1Table(tableName, "REFRESH TABLE") - RefreshTable(v1TableName.asTableIdentifier) } private def parseV1Table(tableName: Seq[String], sql: String): Seq[String] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index c8d29520bcfce..2c10a67115167 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -23,7 +23,7 @@ import scala.collection.mutable import org.apache.spark.sql.{AnalysisException, Strategy} import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowNamespaces, ShowTables} +import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowNamespaces, ShowTables} import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, TableCapability} import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns} import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream} @@ -193,6 +193,9 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { catalog, ident, parts, query, planLater(query), props, writeOptions, ifNotExists) :: Nil } + case RefreshTable(catalog, ident) => + RefreshTableExec(catalog, ident) :: Nil + case ReplaceTable(catalog, ident, schema, parts, props, orCreate) => catalog match { case staging: StagingTableCatalog => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RefreshTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RefreshTableExec.scala new file mode 100644 index 0000000000000..45adf3a692986 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RefreshTableExec.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} +case class RefreshTableExec( + catalog: TableCatalog, + ident: Identifier) extends V2CommandExec { + override protected def run(): Seq[InternalRow] = { + catalog.invalidateTable(ident) + Seq.empty + } + + override def output: Seq[Attribute] = Seq.empty +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 47dc0ad7d78cf..ac7940b760dcd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -1086,6 +1086,14 @@ class DataSourceV2SQLSuite } } + test("REFRESH TABLE: v2 table") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id bigint, data string) USING foo") + sql(s"REFRESH TABLE $t") + } + } + test("REPLACE TABLE: v1 table") { val e = intercept[AnalysisException] { sql(s"CREATE OR REPLACE TABLE tbl (a int) USING ${classOf[SimpleScanSource].getName}") @@ -1198,14 +1206,6 @@ class DataSourceV2SQLSuite } } - test("REFRESH TABLE") { - val t = "testcat.ns1.ns2.tbl" - withTable(t) { - sql(s"CREATE TABLE $t (id bigint, data string) USING foo") - testV1Command("REFRESH TABLE", t) - } - } - private def testV1Command(sqlCommand: String, sqlParams: String): Unit = { val e = intercept[AnalysisException] { sql(s"$sqlCommand $sqlParams") From fe577f5aa2de25aa947018fa51a3a6b6c94614ee Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Tue, 22 Oct 2019 09:42:55 -0700 Subject: [PATCH 3/3] Address PR comments --- .../spark/sql/connector/InMemoryTableCatalog.scala | 10 ++++++++++ .../execution/datasources/v2/RefreshTableExec.scala | 1 + .../spark/sql/connector/DataSourceV2SQLSuite.scala | 6 ++++++ 3 files changed, 17 insertions(+) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTableCatalog.scala index 8724a38d08d1f..ece903a4c2838 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTableCatalog.scala @@ -34,6 +34,8 @@ class BasicInMemoryTableCatalog extends TableCatalog { protected val tables: util.Map[Identifier, InMemoryTable] = new ConcurrentHashMap[Identifier, InMemoryTable]() + private val invalidatedTables: util.Set[Identifier] = ConcurrentHashMap.newKeySet() + private var _name: Option[String] = None override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = { @@ -55,6 +57,10 @@ class BasicInMemoryTableCatalog extends TableCatalog { } } + override def invalidateTable(ident: Identifier): Unit = { + invalidatedTables.add(ident) + } + override def createTable( ident: Identifier, schema: StructType, @@ -104,6 +110,10 @@ class BasicInMemoryTableCatalog extends TableCatalog { } } + def isTableInvalidated(ident: Identifier): Boolean = { + invalidatedTables.contains(ident) + } + def clearTables(): Unit = { tables.clear() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RefreshTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RefreshTableExec.scala index 45adf3a692986..2a19ff304a9e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RefreshTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RefreshTableExec.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} + case class RefreshTableExec( catalog: TableCatalog, ident: Identifier) extends V2CommandExec { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 4a451b6df3c92..fd7f64ee072b8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -1090,7 +1090,13 @@ class DataSourceV2SQLSuite val t = "testcat.ns1.ns2.tbl" withTable(t) { sql(s"CREATE TABLE $t (id bigint, data string) USING foo") + + val testCatalog = catalog("testcat").asTableCatalog.asInstanceOf[InMemoryTableCatalog] + val identifier = Identifier.of(Array("ns1", "ns2"), "tbl") + + assert(!testCatalog.isTableInvalidated(identifier)) sql(s"REFRESH TABLE $t") + assert(testCatalog.isTableInvalidated(identifier)) } }