From 740454335997aeb1e628104b56e0c5865bd310fc Mon Sep 17 00:00:00 2001 From: xiepengjie Date: Fri, 11 Dec 2020 17:05:21 +0800 Subject: [PATCH] SPARK-33691: Support partition filters in ALTER TABLE DROP PARTITION --- .../spark/sql/catalyst/parser/SqlBase.g4 | 20 +- .../sql/catalyst/parser/AstBuilder.scala | 47 +++++ .../apache/spark/sql/internal/SQLConf.scala | 19 ++ .../spark/sql/execution/SparkSqlParser.scala | 3 +- .../spark/sql/execution/command/ddl.scala | 62 +++++- .../InsertIntoHadoopFsRelationCommand.scala | 2 +- .../execution/command/DDLParserSuite.scala | 1 + .../HiveDropPartsWithFilterSuite.scala | 196 ++++++++++++++++++ 8 files changed, 342 insertions(+), 8 deletions(-) create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDropPartsWithFilterSuite.scala diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 0792a7b7eff5..c3968d258605 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -120,8 +120,8 @@ statement partitionSpec+ #addTablePartition | ALTER TABLE tableIdentifier from=partitionSpec RENAME TO to=partitionSpec #renameTablePartition - | ALTER TABLE tableIdentifier - DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* PURGE? #dropTablePartitions + | ALTER TABLE tableIdentifier DROP (IF EXISTS)? + dropPartitionSpec (',' dropPartitionSpec)* PURGE? #dropTablePartitions | ALTER VIEW tableIdentifier DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* #dropTablePartitions | ALTER TABLE tableIdentifier partitionSpec? SET locationSpec #setTableLocation @@ -267,6 +267,22 @@ partitionVal : identifier (EQ constant)? ; +dropPartitionSpec + : PARTITION '(' dropPartitionVal (',' dropPartitionVal)* ')' + ; + +dropPartitionVal + : left=identifier dropPartitionOperator right=constant #dropPartLExpr + | left=constant dropPartitionOperator right=identifier #dropPartRExpr + ; + +/** + * we should not support NSEQ <=> + */ +dropPartitionOperator + : EQ | NEQ | NEQJ | LT | LTE | GT | GTE + ; + describeFuncName : qualifiedName | STRING diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index e2e8a45976d7..8aace0ed5137 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -293,6 +293,36 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } + /** + * Create a drop partition specification map. + */ + override def visitDropPartitionSpec(ctx: DropPartitionSpecContext): Seq[Expression] = { + ctx.dropPartitionVal().asScala.map { + case pLVal: DropPartLExprContext => + val left = AttributeReference(pLVal.identifier().getText, StringType)() + val right = Literal(Option(pLVal.constant()).map(visitStringConstant).get) + val operator = pLVal.dropPartitionOperator().getChild(0).asInstanceOf[TerminalNode] + val expression = buildComparison(left, right, operator) + if (expression.isInstanceOf[EqualNullSafe]) { + throw new ParseException( + "'<=>' operator is not supported in ALTER TABLE ... DROP PARTITION.", ctx) + } + expression + case pRVal: DropPartRExprContext => + val left = Literal(Option(pRVal.constant()).map(visitStringConstant).get) + val right = AttributeReference(pRVal.identifier().getText, StringType)() + val operator = pRVal.dropPartitionOperator().getChild(0).asInstanceOf[TerminalNode] + val expression = buildComparison(left, right, operator) + if (expression.isInstanceOf[EqualNullSafe]) { + throw new ParseException( + "'<=>' operator is not supported in ALTER TABLE ... DROP PARTITION.", ctx) + } + expression + case _ => throw new ParseException( + "Expression is not supported in ALTER TABLE ... DROP PARTITION.", ctx) + } + } + /** * Convert a constant of any type into a string. This is typically used in DDL commands, and its * main purpose is to prevent slight differences due to back to back conversions i.e.: @@ -1068,6 +1098,23 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging val left = expression(ctx.left) val right = expression(ctx.right) val operator = ctx.comparisonOperator().getChild(0).asInstanceOf[TerminalNode] + buildComparison(left, right, operator) + } + + /** + * Creates a comparison expression. The following comparison operators are supported: + * - Equal: '=' or '==' + * - Null-safe Equal: '<=>' + * - Not Equal: '<>' or '!=' + * - Less than: '<' + * - Less then or Equal: '<=' + * - Greater than: '>' + * - Greater then or Equal: '>=' + */ + private def buildComparison( + left: Expression, + right: Expression, + operator: TerminalNode): Expression = { operator.getSymbol.getType match { case SqlBaseParser.EQ => EqualTo(left, right) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 760a9db8bead..827e2e6083a5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1591,6 +1591,21 @@ object SQLConf { .doc("When true, use legacy MySqlServer SMALLINT and REAL type mapping.") .booleanConf .createWithDefault(false) + + val BATCH_DROP_PARTITIONS_ENABLE = + buildConf("spark.sql.batch.drop.partitions.enable") + .internal() + .doc("Whether support partition filter in ALTER TABLE DROP PARTITION.") + .booleanConf + .createWithDefault(false) + + val BATCH_DROP_PARTITIONS_LIMIT = + buildConf("spark.sql.batch.drop.partitions.limit") + .internal() + .doc("The max partition's number which is allowed to be deleted.") + .intConf + .checkValue(bit => bit >= -1 && bit <= 1000, "The bit value must be in [-1, 1000].") + .createWithDefault(1000) } /** @@ -2004,6 +2019,10 @@ class SQLConf extends Serializable with Logging { def legacyMsSqlServerNumericMappingEnabled: Boolean = getConf(LEGACY_MSSQLSERVER_NUMERIC_MAPPING_ENABLED) + def batchDropPartitionsEnable: Boolean = getConf(SQLConf.BATCH_DROP_PARTITIONS_ENABLE) + + def batchDropPartitionsLimit: Int = getConf(SQLConf.BATCH_DROP_PARTITIONS_LIMIT) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 95206e8ef182..61331bb234ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -927,7 +927,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { } AlterTableDropPartitionCommand( visitTableIdentifier(ctx.tableIdentifier), - ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec), + ctx.dropPartitionSpec().asScala.map(visitDropPartitionSpec), + Seq(), ifExists = ctx.EXISTS != null, purge = ctx.PURGE != null, retainData = false) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index e1faecedd20e..f8ddf8598aff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -22,17 +22,15 @@ import java.util.Locale import scala.collection.{GenMap, GenSeq} import scala.collection.parallel.ForkJoinTaskSupport import scala.util.control.NonFatal - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.hadoop.mapred.{FileInputFormat, JobConf} - import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, EqualTo, Expression, Literal} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningUtils} import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat @@ -523,6 +521,7 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, + partExpr: Seq[Seq[Expression]], specs: Seq[TablePartitionSpec], ifExists: Boolean, purge: Boolean, @@ -535,7 +534,12 @@ case class AlterTableDropPartitionCommand( DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") - val normalizedSpecs = specs.map { spec => + var tableSpecs = specs + if (partExpr.nonEmpty) { + tableSpecs = parsePartExpressionToTablePartitionSpec(partExpr, sparkSession, catalog, table) + } + + val normalizedSpecs = tableSpecs.map { spec => PartitioningUtils.normalizePartitionSpec( spec, table.partitionColumnNames, @@ -552,6 +556,56 @@ case class AlterTableDropPartitionCommand( Seq.empty[Row] } + def parsePartExpressionToTablePartitionSpec(partExpr: Seq[Seq[Expression]], + sparkSession: SparkSession, + catalog: SessionCatalog, + table: CatalogTable): Seq[TablePartitionSpec] = { + if (!hasComplexFilters(partExpr)) { + return partExpr.map(f => { + var expr: TablePartitionSpec = Map() + f.foreach { + case EqualTo(left: AttributeReference, right: Literal) + => expr += (left.name -> right.value.toString) + case EqualTo(left: Literal, right: AttributeReference) + => expr += (right.name -> left.value.toString) + } + expr + } + ) + } + + if (!sparkSession.sessionState.conf.batchDropPartitionsEnable) { + throw new Exception("Don't support partition filters in ALTER TABLE DROP PARTITION, " + + "you can \'set spark.sql.batch.drop.partitions.enable = true\'") + } + + var parts: Seq[CatalogTablePartition] = Seq() + partExpr.foreach { s => + val partitions = catalog.listPartitionsByFilter(tableName, s) + parts = parts.++:(partitions) + if (checkDropPartitionsSize(parts, sparkSession)) { + throw new Exception("Just support drop partition's number less than " + + s"${sparkSession.sessionState.conf.batchDropPartitionsLimit}...") + } + partitions + } + parts.map(f => f.spec) + } + + def hasComplexFilters(specs: Seq[Seq[Expression]]): Boolean = { + specs.exists(f => f.exists(!_.isInstanceOf[EqualTo])) + } + + /** + * We only support the number of partitions deleted in batches not greater than 1000(default). + * + * @param parts + * @return + */ + def checkDropPartitionsSize(parts: Seq[CatalogTablePartition], + sparkSession: SparkSession): Boolean = { + parts.size > sparkSession.sessionState.conf.batchDropPartitionsLimit + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 484942d35c85..77826c7e86df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -147,7 +147,7 @@ case class InsertIntoHadoopFsRelationCommand( val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions if (deletedPartitions.nonEmpty) { AlterTableDropPartitionCommand( - catalogTable.get.identifier, deletedPartitions.toSeq, + catalogTable.get.identifier, Seq(), deletedPartitions.toSeq, ifExists = true, purge = false, retainData = true /* already deleted */).run(sparkSession) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index e0ccae15f1d0..08131553d239 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -863,6 +863,7 @@ class DDLParserSuite extends PlanTest with SharedSQLContext { val tableIdent = TableIdentifier("table_name", None) val expected1_table = AlterTableDropPartitionCommand( tableIdent, + Seq(), Seq( Map("dt" -> "2008-08-08", "country" -> "us"), Map("dt" -> "2009-09-09", "country" -> "uk")), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDropPartsWithFilterSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDropPartsWithFilterSuite.scala new file mode 100644 index 000000000000..c8fe84597219 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDropPartsWithFilterSuite.scala @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils + +class HiveDropPartsWithFilterSuite + extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach { + protected override def beforeAll(): Unit = { + super.beforeAll() + sql("create table test_drop_part(c1 int) partitioned by(year string, month string, day string)") + } + + override protected def afterAll(): Unit = { + try { + sql("DROP TABLE IF EXISTS test_drop_part") + } finally { + super.afterAll() + } + } + + protected def addParts(): Unit = { + sql("alter table test_drop_part add if not exists partition(year='2020', month='08', day='01')") + sql("alter table test_drop_part add if not exists partition(year='2020', month='08', day='02')") + sql("alter table test_drop_part add if not exists partition(year='2020', month='08', day='03')") + sql("alter table test_drop_part add if not exists partition(year='2020', month='08', day='04')") + sql("alter table test_drop_part add if not exists partition(year='2020', month='08', day='05')") + sql("alter table test_drop_part add if not exists partition(year='2020', month='08', day='06')") + sql("alter table test_drop_part add if not exists partition(year='2020', month='08', day='07')") + sql("alter table test_drop_part add if not exists partition(year='2020', month='08', day='08')") + sql("alter table test_drop_part add if not exists partition(year='2020', month='08', day='09')") + sql("alter table test_drop_part add if not exists partition(year='2020', month='08', day='10')") + + } + + test("Case 1: The part column on the left and the value on the right like year > 2020") { + addParts() + sql("set spark.sql.batch.drop.partitions.enable = true;") + + // delete year=2020/month=08/day=10 + sql("alter table test_drop_part drop if exists partition (year='2020', month='08', day=='10')") + checkAnswer( + sql("SHOW PARTITIONS test_drop_part"), + Row("year=2020/month=08/day=01") :: + Row("year=2020/month=08/day=02") :: + Row("year=2020/month=08/day=03") :: + Row("year=2020/month=08/day=04") :: + Row("year=2020/month=08/day=05") :: + Row("year=2020/month=08/day=06") :: + Row("year=2020/month=08/day=07") :: + Row("year=2020/month=08/day=08") :: + Row("year=2020/month=08/day=09") :: Nil) + + // delete year=2020/month=08/day=01 + sql("alter table test_drop_part drop if exists partition(year='2020', month='08', day < '02')") + checkAnswer( + sql("SHOW PARTITIONS test_drop_part"), + Row("year=2020/month=08/day=02") :: + Row("year=2020/month=08/day=03") :: + Row("year=2020/month=08/day=04") :: + Row("year=2020/month=08/day=05") :: + Row("year=2020/month=08/day=06") :: + Row("year=2020/month=08/day=07") :: + Row("year=2020/month=08/day=08") :: + Row("year=2020/month=08/day=09") :: Nil) + + // delete year=2020/month=08/day=02 year=2020/month=08/day=03 + sql("alter table test_drop_part drop if exists partition(year='2020', month='08', day<='03')") + checkAnswer( + sql("SHOW PARTITIONS test_drop_part"), + Row("year=2020/month=08/day=04") :: + Row("year=2020/month=08/day=05") :: + Row("year=2020/month=08/day=06") :: + Row("year=2020/month=08/day=07") :: + Row("year=2020/month=08/day=08") :: + Row("year=2020/month=08/day=09") :: Nil) + + // delete year=2020/month=08/day=04 year=2020/month=08/day=05 + sql("alter table test_drop_part drop if exists partition(year='2020', month='08', day !>'05')") + checkAnswer( + sql("SHOW PARTITIONS test_drop_part"), + Row("year=2020/month=08/day=06") :: + Row("year=2020/month=08/day=07") :: + Row("year=2020/month=08/day=08") :: + Row("year=2020/month=08/day=09") :: Nil) + + // delete year=2020/month=08/day=09 + sql("alter table test_drop_part drop if exists partition(year='2020', month='08', day > '08')") + checkAnswer( + sql("SHOW PARTITIONS test_drop_part"), + Row("year=2020/month=08/day=06") :: + Row("year=2020/month=08/day=07") :: + Row("year=2020/month=08/day=08") :: Nil) + + // delete year=2020/month=08/day=07 year=2020/month=08/day=08 + sql("alter table test_drop_part drop if exists partition(year='2020', month='08', day >= '07')") + checkAnswer( + sql("SHOW PARTITIONS test_drop_part"), + Row("year=2020/month=08/day=06") :: Nil) + + // delete year=2020/month=08/day=06 + sql("alter table test_drop_part drop if exists partition(year='2020', month='08', day !< '06')") + checkAnswer( + sql("SHOW PARTITIONS test_drop_part"), Nil) + } + + test("Case 2: The part column on the right and the value on the left like 2020 > year") { + addParts() + sql("set spark.sql.batch.drop.partitions.enable = true;") + + // delete year=2020/month=08/day=10 + sql("alter table test_drop_part drop if exists partition (year='2020', month='08', '10'==day)") + checkAnswer( + sql("SHOW PARTITIONS test_drop_part"), + Row("year=2020/month=08/day=01") :: + Row("year=2020/month=08/day=02") :: + Row("year=2020/month=08/day=03") :: + Row("year=2020/month=08/day=04") :: + Row("year=2020/month=08/day=05") :: + Row("year=2020/month=08/day=06") :: + Row("year=2020/month=08/day=07") :: + Row("year=2020/month=08/day=08") :: + Row("year=2020/month=08/day=09") :: Nil) + + // delete year=2020/month=08/day=01 + sql("alter table test_drop_part drop if exists partition(year='2020', month='08', '02' > day)") + checkAnswer( + sql("SHOW PARTITIONS test_drop_part"), + Row("year=2020/month=08/day=02") :: + Row("year=2020/month=08/day=03") :: + Row("year=2020/month=08/day=04") :: + Row("year=2020/month=08/day=05") :: + Row("year=2020/month=08/day=06") :: + Row("year=2020/month=08/day=07") :: + Row("year=2020/month=08/day=08") :: + Row("year=2020/month=08/day=09") :: Nil) + + // delete year=2020/month=08/day=02 year=2020/month=08/day=03 + sql("alter table test_drop_part drop if exists partition(year='2020', month='08', '03' >= day)") + checkAnswer( + sql("SHOW PARTITIONS test_drop_part"), + Row("year=2020/month=08/day=04") :: + Row("year=2020/month=08/day=05") :: + Row("year=2020/month=08/day=06") :: + Row("year=2020/month=08/day=07") :: + Row("year=2020/month=08/day=08") :: + Row("year=2020/month=08/day=09") :: Nil) + + // delete year=2020/month=08/day=04 year=2020/month=08/day=05 + sql("alter table test_drop_part drop if exists partition(year='2020', month='08', '05' !< day)") + checkAnswer( + sql("SHOW PARTITIONS test_drop_part"), + Row("year=2020/month=08/day=06") :: + Row("year=2020/month=08/day=07") :: + Row("year=2020/month=08/day=08") :: + Row("year=2020/month=08/day=09") :: Nil) + + // delete year=2020/month=08/day=09 + sql("alter table test_drop_part drop if exists partition(year='2020', month='08', '08' < day)") + checkAnswer( + sql("SHOW PARTITIONS test_drop_part"), + Row("year=2020/month=08/day=06") :: + Row("year=2020/month=08/day=07") :: + Row("year=2020/month=08/day=08") :: Nil) + + // delete year=2020/month=08/day=07 year=2020/month=08/day=08 + sql("alter table test_drop_part drop if exists partition(year='2020', month='08', '07' <= day)") + checkAnswer( + sql("SHOW PARTITIONS test_drop_part"), + Row("year=2020/month=08/day=06") :: Nil) + + // delete year=2020/month=08/day=06 + sql("alter table test_drop_part drop if exists partition(year='2020', month='08', '06' !> day)") + checkAnswer( + sql("SHOW PARTITIONS test_drop_part"), Nil) + } +}