Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ statement
partitionSpec+ #addTablePartition
| ALTER TABLE tableIdentifier
from=partitionSpec RENAME TO to=partitionSpec #renameTablePartition
| ALTER TABLE tableIdentifier
DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* PURGE? #dropTablePartitions
| ALTER TABLE tableIdentifier DROP (IF EXISTS)?
dropPartitionSpec (',' dropPartitionSpec)* PURGE? #dropTablePartitions
| ALTER VIEW tableIdentifier
DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* #dropTablePartitions
| ALTER TABLE tableIdentifier partitionSpec? SET locationSpec #setTableLocation
Expand Down Expand Up @@ -267,6 +267,22 @@ partitionVal
: identifier (EQ constant)?
;

dropPartitionSpec
: PARTITION '(' dropPartitionVal (',' dropPartitionVal)* ')'
;

dropPartitionVal
: left=identifier dropPartitionOperator right=constant #dropPartLExpr
| left=constant dropPartitionOperator right=identifier #dropPartRExpr
;

/**
* we should not support NSEQ <=>
*/
dropPartitionOperator
: EQ | NEQ | NEQJ | LT | LTE | GT | GTE
;

describeFuncName
: qualifiedName
| STRING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,36 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
}
}

/**
* Create a drop partition specification map.
*/
override def visitDropPartitionSpec(ctx: DropPartitionSpecContext): Seq[Expression] = {
ctx.dropPartitionVal().asScala.map {
case pLVal: DropPartLExprContext =>
val left = AttributeReference(pLVal.identifier().getText, StringType)()
val right = Literal(Option(pLVal.constant()).map(visitStringConstant).get)
val operator = pLVal.dropPartitionOperator().getChild(0).asInstanceOf[TerminalNode]
val expression = buildComparison(left, right, operator)
if (expression.isInstanceOf[EqualNullSafe]) {
throw new ParseException(
"'<=>' operator is not supported in ALTER TABLE ... DROP PARTITION.", ctx)
}
expression
case pRVal: DropPartRExprContext =>
val left = Literal(Option(pRVal.constant()).map(visitStringConstant).get)
val right = AttributeReference(pRVal.identifier().getText, StringType)()
val operator = pRVal.dropPartitionOperator().getChild(0).asInstanceOf[TerminalNode]
val expression = buildComparison(left, right, operator)
if (expression.isInstanceOf[EqualNullSafe]) {
throw new ParseException(
"'<=>' operator is not supported in ALTER TABLE ... DROP PARTITION.", ctx)
}
expression
case _ => throw new ParseException(
"Expression is not supported in ALTER TABLE ... DROP PARTITION.", ctx)
}
}

/**
* Convert a constant of any type into a string. This is typically used in DDL commands, and its
* main purpose is to prevent slight differences due to back to back conversions i.e.:
Expand Down Expand Up @@ -1068,6 +1098,23 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
val left = expression(ctx.left)
val right = expression(ctx.right)
val operator = ctx.comparisonOperator().getChild(0).asInstanceOf[TerminalNode]
buildComparison(left, right, operator)
}

/**
* Creates a comparison expression. The following comparison operators are supported:
* - Equal: '=' or '=='
* - Null-safe Equal: '<=>'
* - Not Equal: '<>' or '!='
* - Less than: '<'
* - Less then or Equal: '<='
* - Greater than: '>'
* - Greater then or Equal: '>='
*/
private def buildComparison(
left: Expression,
right: Expression,
operator: TerminalNode): Expression = {
operator.getSymbol.getType match {
case SqlBaseParser.EQ =>
EqualTo(left, right)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1591,6 +1591,21 @@ object SQLConf {
.doc("When true, use legacy MySqlServer SMALLINT and REAL type mapping.")
.booleanConf
.createWithDefault(false)

val BATCH_DROP_PARTITIONS_ENABLE =
buildConf("spark.sql.batch.drop.partitions.enable")
.internal()
.doc("Whether support partition filter in ALTER TABLE DROP PARTITION.")
.booleanConf
.createWithDefault(false)

val BATCH_DROP_PARTITIONS_LIMIT =
buildConf("spark.sql.batch.drop.partitions.limit")
.internal()
.doc("The max partition's number which is allowed to be deleted.")
.intConf
.checkValue(bit => bit >= -1 && bit <= 1000, "The bit value must be in [-1, 1000].")
.createWithDefault(1000)
}

/**
Expand Down Expand Up @@ -2004,6 +2019,10 @@ class SQLConf extends Serializable with Logging {
def legacyMsSqlServerNumericMappingEnabled: Boolean =
getConf(LEGACY_MSSQLSERVER_NUMERIC_MAPPING_ENABLED)

def batchDropPartitionsEnable: Boolean = getConf(SQLConf.BATCH_DROP_PARTITIONS_ENABLE)

def batchDropPartitionsLimit: Int = getConf(SQLConf.BATCH_DROP_PARTITIONS_LIMIT)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -927,7 +927,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
}
AlterTableDropPartitionCommand(
visitTableIdentifier(ctx.tableIdentifier),
ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec),
ctx.dropPartitionSpec().asScala.map(visitDropPartitionSpec),
Seq(),
ifExists = ctx.EXISTS != null,
purge = ctx.PURGE != null,
retainData = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,15 @@ import java.util.Locale
import scala.collection.{GenMap, GenSeq}
import scala.collection.parallel.ForkJoinTaskSupport
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, EqualTo, Expression, Literal}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningUtils}
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
Expand Down Expand Up @@ -523,6 +521,7 @@ case class AlterTableRenamePartitionCommand(
*/
case class AlterTableDropPartitionCommand(
tableName: TableIdentifier,
partExpr: Seq[Seq[Expression]],
specs: Seq[TablePartitionSpec],
ifExists: Boolean,
purge: Boolean,
Expand All @@ -535,7 +534,12 @@ case class AlterTableDropPartitionCommand(
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION")

val normalizedSpecs = specs.map { spec =>
var tableSpecs = specs
if (partExpr.nonEmpty) {
tableSpecs = parsePartExpressionToTablePartitionSpec(partExpr, sparkSession, catalog, table)
}

val normalizedSpecs = tableSpecs.map { spec =>
PartitioningUtils.normalizePartitionSpec(
spec,
table.partitionColumnNames,
Expand All @@ -552,6 +556,56 @@ case class AlterTableDropPartitionCommand(
Seq.empty[Row]
}

def parsePartExpressionToTablePartitionSpec(partExpr: Seq[Seq[Expression]],
sparkSession: SparkSession,
catalog: SessionCatalog,
table: CatalogTable): Seq[TablePartitionSpec] = {
if (!hasComplexFilters(partExpr)) {
return partExpr.map(f => {
var expr: TablePartitionSpec = Map()
f.foreach {
case EqualTo(left: AttributeReference, right: Literal)
=> expr += (left.name -> right.value.toString)
case EqualTo(left: Literal, right: AttributeReference)
=> expr += (right.name -> left.value.toString)
}
expr
}
)
}

if (!sparkSession.sessionState.conf.batchDropPartitionsEnable) {
throw new Exception("Don't support partition filters in ALTER TABLE DROP PARTITION, " +
"you can \'set spark.sql.batch.drop.partitions.enable = true\'")
}

var parts: Seq[CatalogTablePartition] = Seq()
partExpr.foreach { s =>
val partitions = catalog.listPartitionsByFilter(tableName, s)
parts = parts.++:(partitions)
if (checkDropPartitionsSize(parts, sparkSession)) {
throw new Exception("Just support drop partition's number less than " +
s"${sparkSession.sessionState.conf.batchDropPartitionsLimit}...")
}
partitions
}
parts.map(f => f.spec)
}

def hasComplexFilters(specs: Seq[Seq[Expression]]): Boolean = {
specs.exists(f => f.exists(!_.isInstanceOf[EqualTo]))
}

/**
* We only support the number of partitions deleted in batches not greater than 1000(default).
*
* @param parts
* @return
*/
def checkDropPartitionsSize(parts: Seq[CatalogTablePartition],
sparkSession: SparkSession): Boolean = {
parts.size > sparkSession.sessionState.conf.batchDropPartitionsLimit
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ case class InsertIntoHadoopFsRelationCommand(
val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions
if (deletedPartitions.nonEmpty) {
AlterTableDropPartitionCommand(
catalogTable.get.identifier, deletedPartitions.toSeq,
catalogTable.get.identifier, Seq(), deletedPartitions.toSeq,
ifExists = true, purge = false,
retainData = true /* already deleted */).run(sparkSession)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,7 @@ class DDLParserSuite extends PlanTest with SharedSQLContext {
val tableIdent = TableIdentifier("table_name", None)
val expected1_table = AlterTableDropPartitionCommand(
tableIdent,
Seq(),
Seq(
Map("dt" -> "2008-08-08", "country" -> "us"),
Map("dt" -> "2009-09-09", "country" -> "uk")),
Expand Down
Loading