diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 7e5e16b8e32b..970d244071e0 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -201,9 +201,9 @@ statement | (DESC | DESCRIBE) FUNCTION EXTENDED? describeFuncName #describeFunction | (DESC | DESCRIBE) database EXTENDED? db=errorCapturingIdentifier #describeDatabase | (DESC | DESCRIBE) TABLE? option=(EXTENDED | FORMATTED)? - multipartIdentifier partitionSpec? describeColName? #describeTable + multipartIdentifier partitionSpec? describeColName? #describeTable | (DESC | DESCRIBE) QUERY? query #describeQuery - | REFRESH TABLE tableIdentifier #refreshTable + | REFRESH TABLE multipartIdentifier #refreshTable | REFRESH (STRING | .*?) #refreshResource | CACHE LAZY? TABLE tableIdentifier (OPTIONS options=tablePropertyList)? (AS? query)? #cacheTable diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index 6553b3d57d7f..9803fda0678f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -137,6 +137,9 @@ class ResolveCatalogs(val catalogManager: CatalogManager) writeOptions = c.options.filterKeys(_ != "path"), ignoreIfExists = c.ifNotExists) + case RefreshTableStatement(NonSessionCatalog(catalog, tableName)) => + RefreshTable(catalog.asTableCatalog, tableName.asIdentifier) + case c @ ReplaceTableStatement( NonSessionCatalog(catalog, tableName), _, _, _, _, _, _, _, _, _) => ReplaceTable( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 7c67952aba40..940dfd0fc333 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2799,4 +2799,16 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging val partitionKeys = Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec) ShowPartitionsStatement(table, partitionKeys) } + + /** + * Create a [[RefreshTableStatement]]. + * + * For example: + * {{{ + * REFRESH TABLE multi_part_name + * }}} + */ + override def visitRefreshTable(ctx: RefreshTableContext): LogicalPlan = withOrigin(ctx) { + RefreshTableStatement(visitMultipartIdentifier(ctx.multipartIdentifier())) + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index 3bd16187320f..127d9026f802 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -340,5 +340,11 @@ case class TruncateTableStatement( /** * A SHOW PARTITIONS statement, as parsed from SQL */ -case class ShowPartitionsStatement(tableName: Seq[String], +case class ShowPartitionsStatement( + tableName: Seq[String], partitionSpec: Option[TablePartitionSpec]) extends ParsedStatement + +/** + * A REFRESH TABLE statement, as parsed from SQL + */ +case class RefreshTableStatement(tableName: Seq[String]) extends ParsedStatement diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 8f5731a4a7a7..d80c1c034a86 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -348,3 +348,10 @@ case class SetCatalogAndNamespace( catalogManager: CatalogManager, catalogName: Option[String], namespace: Option[Seq[String]]) extends Command + +/** + * The logical plan of the REFRESH TABLE command that works for v2 catalogs. + */ +case class RefreshTable( + catalog: TableCatalog, + ident: Identifier) extends Command diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 38ef357036a0..8e605bd15f69 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -1081,6 +1081,12 @@ class DDLParserSuite extends AnalysisTest { comparePlans(parsed5, expected5) } + test("REFRESH TABLE table") { + comparePlans( + parsePlan("REFRESH TABLE a.b.c"), + RefreshTableStatement(Seq("a", "b", "c"))) + } + private case class TableSpec( name: Seq[String], schema: Option[StructType], diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTableCatalog.scala index 8724a38d08d1..ece903a4c283 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTableCatalog.scala @@ -34,6 +34,8 @@ class BasicInMemoryTableCatalog extends TableCatalog { protected val tables: util.Map[Identifier, InMemoryTable] = new ConcurrentHashMap[Identifier, InMemoryTable]() + private val invalidatedTables: util.Set[Identifier] = ConcurrentHashMap.newKeySet() + private var _name: Option[String] = None override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = { @@ -55,6 +57,10 @@ class BasicInMemoryTableCatalog extends TableCatalog { } } + override def invalidateTable(ident: Identifier): Unit = { + invalidatedTables.add(ident) + } + override def createTable( ident: Identifier, schema: StructType, @@ -104,6 +110,10 @@ class BasicInMemoryTableCatalog extends TableCatalog { } } + def isTableInvalidated(ident: Identifier): Boolean = { + invalidatedTables.contains(ident) + } + def clearTables(): Unit = { tables.clear() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 4cca9846e996..230b8f3906bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, TableChange, V1Table} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableRecoverPartitionsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, AnalyzeColumnCommand, AnalyzePartitionCommand, AnalyzeTableCommand, CreateDatabaseCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowPartitionsCommand, ShowTablesCommand, TruncateTableCommand} -import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource} +import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, RefreshTable} import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{HIVE_TYPE_STRING, HiveStringType, MetadataBuilder, StructField, StructType} @@ -216,6 +216,9 @@ class ResolveSessionCatalog( ignoreIfExists = c.ifNotExists) } + case RefreshTableStatement(SessionCatalog(_, tableName)) => + RefreshTable(tableName.asTableIdentifier) + // For REPLACE TABLE [AS SELECT], we should fail if the catalog is resolved to the // session catalog and the table provider is not v2. case c @ ReplaceTableStatement( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 38f3c6e1b750..2439621f7725 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -143,13 +143,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { ShowCreateTableCommand(table) } - /** - * Create a [[RefreshTable]] logical plan. - */ - override def visitRefreshTable(ctx: RefreshTableContext): LogicalPlan = withOrigin(ctx) { - RefreshTable(visitTableIdentifier(ctx.tableIdentifier)) - } - /** * Create a [[RefreshResource]] logical plan. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 49035c3cc3da..4a7cb7db45de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -23,7 +23,7 @@ import scala.collection.mutable import org.apache.spark.sql.{AnalysisException, Strategy} import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowNamespaces, ShowTables} +import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowNamespaces, ShowTables} import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, TableCapability} import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns} import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream} @@ -193,6 +193,9 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { catalog, ident, parts, query, planLater(query), props, writeOptions, ifNotExists) :: Nil } + case RefreshTable(catalog, ident) => + RefreshTableExec(catalog, ident) :: Nil + case ReplaceTable(catalog, ident, schema, parts, props, orCreate) => catalog match { case staging: StagingTableCatalog => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RefreshTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RefreshTableExec.scala new file mode 100644 index 000000000000..2a19ff304a9e --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RefreshTableExec.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} + +case class RefreshTableExec( + catalog: TableCatalog, + ident: Identifier) extends V2CommandExec { + override protected def run(): Seq[InternalRow] = { + catalog.invalidateTable(ident) + Seq.empty + } + + override def output: Seq[Attribute] = Seq.empty +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 2ea26787dbb1..463147903c92 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -1112,6 +1112,20 @@ class DataSourceV2SQLSuite } } + test("REFRESH TABLE: v2 table") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id bigint, data string) USING foo") + + val testCatalog = catalog("testcat").asTableCatalog.asInstanceOf[InMemoryTableCatalog] + val identifier = Identifier.of(Array("ns1", "ns2"), "tbl") + + assert(!testCatalog.isTableInvalidated(identifier)) + sql(s"REFRESH TABLE $t") + assert(testCatalog.isTableInvalidated(identifier)) + } + } + test("REPLACE TABLE: v1 table") { val e = intercept[AnalysisException] { sql(s"CREATE OR REPLACE TABLE tbl (a int) USING ${classOf[SimpleScanSource].getName}") @@ -1211,16 +1225,8 @@ class DataSourceV2SQLSuite val t = "testcat.ns1.ns2.tbl" withTable(t) { spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo") - - val e = intercept[AnalysisException] { - sql(s"ANALYZE TABLE $t COMPUTE STATISTICS") - } - assert(e.message.contains("ANALYZE TABLE is only supported with v1 tables")) - - val e2 = intercept[AnalysisException] { - sql(s"ANALYZE TABLE $t COMPUTE STATISTICS FOR ALL COLUMNS") - } - assert(e2.message.contains("ANALYZE TABLE is only supported with v1 tables")) + testV1Command("ANALYZE TABLE", s"$t COMPUTE STATISTICS") + testV1Command("ANALYZE TABLE", s"$t COMPUTE STATISTICS FOR ALL COLUMNS") } } @@ -1228,11 +1234,7 @@ class DataSourceV2SQLSuite val t = "testcat.ns1.ns2.tbl" withTable(t) { spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo") - - val e = intercept[AnalysisException] { - sql(s"MSCK REPAIR TABLE $t") - } - assert(e.message.contains("MSCK REPAIR TABLE is only supported with v1 tables")) + testV1Command("MSCK REPAIR TABLE", t) } } @@ -1246,15 +1248,8 @@ class DataSourceV2SQLSuite |PARTITIONED BY (id) """.stripMargin) - val e1 = intercept[AnalysisException] { - sql(s"TRUNCATE TABLE $t") - } - assert(e1.message.contains("TRUNCATE TABLE is only supported with v1 tables")) - - val e2 = intercept[AnalysisException] { - sql(s"TRUNCATE TABLE $t PARTITION(id='1')") - } - assert(e2.message.contains("TRUNCATE TABLE is only supported with v1 tables")) + testV1Command("TRUNCATE TABLE", t) + testV1Command("TRUNCATE TABLE", s"$t PARTITION(id='1')") } } @@ -1268,16 +1263,16 @@ class DataSourceV2SQLSuite |PARTITIONED BY (id) """.stripMargin) - val e1 = intercept[AnalysisException] { - val partition = sql(s"SHOW PARTITIONS $t") - } - assert(e1.message.contains("SHOW PARTITIONS is only supported with v1 tables")) + testV1Command("SHOW PARTITIONS", t) + testV1Command("SHOW PARTITIONS", s"$t PARTITION(id='1')") + } + } - val e2 = intercept[AnalysisException] { - val partition2 = sql(s"SHOW PARTITIONS $t PARTITION(id='1')") - } - assert(e2.message.contains("SHOW PARTITIONS is only supported with v1 tables")) + private def testV1Command(sqlCommand: String, sqlParams: String): Unit = { + val e = intercept[AnalysisException] { + sql(s"$sqlCommand $sqlParams") } + assert(e.message.contains(s"$sqlCommand is only supported with v1 tables")) } private def assertAnalysisError(sqlStatement: String, expectedError: String): Unit = {