Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ partitionSpecLocation
;

partitionSpec
: PARTITION '(' partitionVal (',' partitionVal)* ')'
: PARTITION '(' (partitionVal | expression) (',' (partitionVal | expression))* ')'
;

partitionVal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,26 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
parts.toMap
}

/**
* Create a partition filter specification.
*/
def visitPartitionFilterSpec(ctx: PartitionSpecContext): Seq[Expression] = withOrigin(ctx) {
val parts = ctx.expression.asScala.map { pVal =>
expression(pVal) match {
case EqualNullSafe(_, _) =>
throw new ParseException("'<=>' operator is not allowed in partition specification.", ctx)
case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still the same question here. Constant has to be in the right side?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hive supports them only on the right side. So it makes sense to have the same here I think.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we support the right-side only, it seems be useful to print explicit error messages like left-side literal not supported ....?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cmp
case bc @ BinaryComparison(constant: Literal, _) =>
throw new ParseException(s"Literal $constant is supported only on the rigth-side.", ctx)
case _ =>
throw new ParseException(
s"Invalid partition filter specification (${pVal.getText}).", ctx)
}
}
parts
}

/**
* Create a partition specification map without optional values.
*/
Expand All @@ -293,6 +313,15 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
}
}

/**
* Create a partition specification map without optional values
* and a partition filter specification.
*/
protected def visitPartition(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we avoid this method? I find it quite confusing (I mean it is a bit weird to return a tuple with a Map and a Seq of different things....) We can add a new parameter to AlterTableDropPartitionCommand and use the other two method directly...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to add a new parameter to AlterTableDropPartitionCommand earlier, but it was kind hard.
thinking about a sql below:

DROP PARTITION(partitionVal1, expression1), PARTITION(partitionVal2, expression2)

all of the partitions need to be dropped are:
(partitionVal1 intersect expression1) union (partitionVal2 intersect expression2)

using one tuple is to telling us that the partitionVal1 and expression1 are from the same partitionSpec and we should use intersect.
Also, different tuples means (partitionVal1 intersect expression1) and (partitionVal2 intersect expression2) are from different partitionSpec and we should use union.

if we don't use tuple, it's would be difficult to tell the different occasions and it would be difficult to decide between intersect and union when partitionVal1 meet expression1/expression2

Any ideas to replace this tuple?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean now. Yes, I have no better idea indeed. Thanks.

ctx: PartitionSpecContext): (Map[String, String], Seq[Expression]) = {
(visitNonOptionalPartitionSpec(ctx), visitPartitionFilterSpec(ctx))
}

/**
* Convert a constant of any type into a string. This is typically used in DDL commands, and its
* main purpose is to prevent slight differences due to back to back conversions i.e.:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
}
AlterTableDropPartitionCommand(
visitTableIdentifier(ctx.tableIdentifier),
ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec),
ctx.partitionSpec.asScala.map(visitPartition),
ifExists = ctx.EXISTS != null,
purge = ctx.PURGE != null,
retainData = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf}

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver}
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, BinaryComparison, Cast, Expression, Literal, PredicateHelper}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningUtils}
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
Expand Down Expand Up @@ -500,7 +500,8 @@ case class AlterTableRenamePartitionCommand(
}

/**
* Drop Partition in ALTER TABLE: to drop a particular partition for a table.
* Drop Partition in ALTER TABLE: to drop a particular partition
* or a set of partitions according to given expressions for a table.
*
* This removes the data and metadata for this partition.
* The data is actually moved to the .Trash/Current directory if Trash is configured,
Expand All @@ -510,40 +511,86 @@ case class AlterTableRenamePartitionCommand(
*
* The syntax of this command is:
* {{{
* ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE];
* ALTER TABLE table DROP [IF EXISTS] PARTITION (spec1, expr1)
* [, PARTITION (spec2, expr2), ...] [PURGE];
* }}}
*/
case class AlterTableDropPartitionCommand(
tableName: TableIdentifier,
specs: Seq[TablePartitionSpec],
partitions: Seq[(TablePartitionSpec, Seq[Expression])],
ifExists: Boolean,
purge: Boolean,
retainData: Boolean)
extends RunnableCommand {
extends RunnableCommand with PredicateHelper {

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
val resolver = sparkSession.sessionState.conf.resolver
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION")

val normalizedSpecs = specs.map { spec =>
PartitioningUtils.normalizePartitionSpec(
spec,
table.partitionColumnNames,
table.identifier.quotedString,
sparkSession.sessionState.conf.resolver)
val toDrop = partitions.flatMap { partition =>
if (partition._1.isEmpty && !partition._2.isEmpty) {
// There are only expressions in this drop condition.
extractFromPartitionFilter(partition._2, catalog, table, resolver)
} else if (!partition._1.isEmpty && partition._2.isEmpty) {
// There are only partitionSpecs in this drop condition.
extractFromPartitionSpec(partition._1, table, resolver)
} else if (!partition._1.isEmpty && !partition._2.isEmpty) {
// This drop condition has both partitionSpecs and expressions.
extractFromPartitionFilter(partition._2, catalog, table, resolver).intersect(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this may be quite inefficient if we have a lot if partitions. What about converting the partitionSpec is EqualsTo expressions and add them as conditions? It would be great IMO if we can achieve this by enforcing in the syntax that we have either all partitionSpecs or all expressions. So if we have all partition = value, we have a partitionSpec, while if at least one is a comparison different from =, we have all expressions (including the =s). What do you think?

Copy link
Author

@DazhuangSu DazhuangSu Jun 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree. And the hard part may be how to convert a partitionSpec to an EqualsTo.
I think it's better to let the AstBuilder to handle this. If so, we may have to have two AlterTableDropPartitionCommand instances in ddl.scala, one for all partitionSpec and one for all expression.
But it maybe a bit weird.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why? Isn't it enough something like:

((partitionVal (',' partitionVal)*) | (expression (',' expression)*))

?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean how to define AlterTableDropPartitionCommand better in ddl.scala. need to handle both
AlterTableDropPartitionCommand( tableName: TableIdentifier, partitions: Seq[Seq[Expression]], ifExists: Boolean, purge: Boolean, retainData: Boolean)
and
AlterTableDropPartitionCommand( tableName: TableIdentifier, partitions: Seq[TablePartitionSpec], ifExists: Boolean, purge: Boolean, retainData: Boolean)
Maybe telling the different cases inside the method?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can (must) just have a single: AlterTableDropPartitionCommand( tableName: TableIdentifier, partitionSpecs: Seq[TablePartitionSpec], partitionExprs: Seq[Seq[Expression]], ifExists: Boolean, purge: Boolean, retainData: Boolean). Indeed, we might have something like:

alter table foo drop partition (year=2017, month=12), partition(year=2018, month < 3);

where we have both a partition spec and an expression specification.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi, @mgaido91 there is one problem after I changed the syntax,
when i run sql DROP PARTITION (p >=2) it throws
org.apache.spark.sql.AnalysisException: cannot resolve 'p' given input columns: []
I'm trying to find a way to figure it out.

By the way, is a syntax like ((partitionVal (',' partitionVal)*) | (expression (',' expression)*)) legal? Because I wrote a antlr4 syntax test, but it didn't work as I supposed.

Besides, I was wrong that day. I think the if conditions won't be inefficient if there is a lot of partitions. it maybe inefficient if there are a lot of dropPartitionSpec which I don't think can happen easily.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DazhuangSu sorry I missed your last comment somehow.

Why do you say it would not be inefficient if you have a lot of partitions?I think it would be! Imagine that you partition per year and day. And you want to get the first 6 months of this year. The spec would be something like (year = 2018, day < 2018-07-01). Imagine we have a 10 years history. With the current implementation, we would get back basically all the the partitions from the filter, ie. roughly 3.650 and then it will intersect those. Anyway, my understanding is that such a case would not even work properly, as it would try drop the intersect of:

Seq(Seq("year"-> "2018", "day" -> "2018-01-01", ...)).intersect(Seq(Map("year"->"2018")))

which would result in an empty Seq, so we would drop nothing. Moreover, I saw no test for this case in the tests. Can we add tests for this use case and can we add support for it if my understanding that it is not working is right? Thanks

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgaido91 I understand your point, yes it would be inefficient. I will work on this soon

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you @DazhuangSu

extractFromPartitionSpec(partition._1, table, resolver))
} else {
Seq.empty[TablePartitionSpec]
}
}

catalog.dropPartitions(
table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge,
table.identifier, toDrop, ignoreIfNotExists = ifExists, purge = purge,
retainData = retainData)

CommandUtils.updateTableStats(sparkSession, table)

Seq.empty[Row]
}

private def extractFromPartitionSpec(
specs: TablePartitionSpec,
table: CatalogTable,
resolver: Resolver): Seq[Map[String, String]] = {
Seq(PartitioningUtils.normalizePartitionSpec(
specs,
table.partitionColumnNames,
table.identifier.quotedString,
resolver))
}

private def extractFromPartitionFilter(
filters: Seq[Expression],
catalog: SessionCatalog,
table: CatalogTable,
resolver: Resolver): Seq[TablePartitionSpec] = {
val expressions = filters.map { expr =>
val (attrName, constant) = expr match {
case BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) =>
(name, constant)
}
if (!table.partitionColumnNames.exists(resolver(_, attrName))) {
throw new AnalysisException(s"${attrName} is not a valid partition column " +
s"in table ${table.identifier.quotedString}.")
}
val dataType = table.partitionSchema.apply(attrName).dataType
expr.withNewChildren(Seq(AttributeReference(attrName, dataType)(),
Cast(constant, dataType)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we add the cast only when needed, ie. dataType != constant.dataType?

}.reduce(And)
val parts = catalog.listPartitionsByFilter(
table.identifier, Seq(expressions)).map(_.spec)
if (parts.isEmpty && !ifExists) {
throw new AnalysisException(s"There is no partition for ${expressions.sql}")
}
parts
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTablePartition}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.util.SchemaUtils
Expand Down Expand Up @@ -128,7 +128,7 @@ case class InsertIntoHadoopFsRelationCommand(
val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions
if (deletedPartitions.nonEmpty) {
AlterTableDropPartitionCommand(
catalogTable.get.identifier, deletedPartitions.toSeq,
catalogTable.get.identifier, deletedPartitions.map(x => (x, Seq.empty)).toSeq,
ifExists = true, purge = false,
retainData = true /* already deleted */).run(sparkSession)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -826,8 +826,8 @@ class DDLParserSuite extends PlanTest with SharedSQLContext {
val expected1_table = AlterTableDropPartitionCommand(
tableIdent,
Seq(
Map("dt" -> "2008-08-08", "country" -> "us"),
Map("dt" -> "2009-09-09", "country" -> "uk")),
(Map("dt" -> "2008-08-08", "country" -> "us"), Seq.empty),
(Map("dt" -> "2009-09-09", "country" -> "uk"), Seq.empty)),
ifExists = true,
purge = false,
retainData = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METASTORE_PARQUET}
Expand Down Expand Up @@ -495,6 +496,164 @@ class HiveDDLSuite
}
}

def testDropPartition(dataType: DataType, value1: Any, value2: Any): Unit = {
withTable("tbl_x") {
sql(s"CREATE TABLE tbl_x (a INT) PARTITIONED BY (p ${dataType.sql})")
sql(s"ALTER TABLE tbl_x ADD PARTITION (p = $value1)")
sql(s"ALTER TABLE tbl_x ADD PARTITION (p = $value2)")
sql(s"ALTER TABLE tbl_x DROP PARTITION (p >= $value2)")
checkAnswer(sql("SHOW PARTITIONS tbl_x"),
Row(s"p=$value1") :: Nil)
sql(s"ALTER TABLE tbl_x DROP PARTITION (p = $value1)")
checkAnswer(sql("SHOW PARTITIONS tbl_x"), Nil)
}
}

test("SPARK-17732: Drop partitions by filter") {
withTable("sales") {
sql("CREATE TABLE sales (id INT) PARTITIONED BY (country STRING, quarter STRING)")

for (country <- Seq("AU", "US", "CA", "KR")) {
for (quarter <- 1 to 5) {
sql(s"ALTER TABLE sales ADD PARTITION (country = '$country', quarter = '$quarter')")
}
}

sql("ALTER TABLE sales DROP PARTITION (country < 'KR', quarter > '2')")
checkAnswer(sql("SHOW PARTITIONS sales"),
Row("country=AU/quarter=1") ::
Row("country=AU/quarter=2") ::
Row("country=CA/quarter=1") ::
Row("country=CA/quarter=2") ::
Row("country=KR/quarter=1") ::
Row("country=KR/quarter=2") ::
Row("country=KR/quarter=3") ::
Row("country=KR/quarter=4") ::
Row("country=KR/quarter=5") ::
Row("country=US/quarter=1") ::
Row("country=US/quarter=2") ::
Row("country=US/quarter=3") ::
Row("country=US/quarter=4") ::
Row("country=US/quarter=5") :: Nil)

sql("ALTER TABLE sales DROP PARTITION (country < 'CA'), PARTITION (quarter = '5')")
checkAnswer(sql("SHOW PARTITIONS sales"),
Row("country=CA/quarter=1") ::
Row("country=CA/quarter=2") ::
Row("country=KR/quarter=1") ::
Row("country=KR/quarter=2") ::
Row("country=KR/quarter=3") ::
Row("country=KR/quarter=4") ::
Row("country=US/quarter=1") ::
Row("country=US/quarter=2") ::
Row("country=US/quarter=3") ::
Row("country=US/quarter=4") :: Nil)

sql("ALTER TABLE sales DROP PARTITION (country < 'KR'), PARTITION (quarter <= '1')")
checkAnswer(sql("SHOW PARTITIONS sales"),
Row("country=KR/quarter=2") ::
Row("country=KR/quarter=3") ::
Row("country=KR/quarter=4") ::
Row("country=US/quarter=2") ::
Row("country=US/quarter=3") ::
Row("country=US/quarter=4") :: Nil)

sql("ALTER TABLE sales DROP PARTITION (country = 'KR', quarter = '4')")
sql("ALTER TABLE sales DROP PARTITION (country = 'US', quarter = '3')")
checkAnswer(sql("SHOW PARTITIONS sales"),
Row("country=KR/quarter=2") ::
Row("country=KR/quarter=3") ::
Row("country=US/quarter=2") ::
Row("country=US/quarter=4") :: Nil)

sql("ALTER TABLE sales DROP PARTITION (quarter <= '2'), PARTITION (quarter >= '4')")
checkAnswer(sql("SHOW PARTITIONS sales"),
Row("country=KR/quarter=3") :: Nil)

// According to the declarative partition spec definitions, this drops the union of target
// partitions without exceptions. Hive raises exceptions because it handles them sequentially.
sql("ALTER TABLE sales DROP PARTITION (quarter <= '4'), PARTITION (quarter <= '3')")
checkAnswer(sql("SHOW PARTITIONS sales"), Nil)
}

withTable("tbl_x") {
sql(s"CREATE TABLE tbl_x (a INT) PARTITIONED BY (p STRING)")
sql(s"ALTER TABLE tbl_x ADD PARTITION (p = 'false')")
sql(s"ALTER TABLE tbl_x ADD PARTITION (p = 'true')")
sql(s"ALTER TABLE tbl_x DROP PARTITION (p >= 'true')")
checkAnswer(sql("SHOW PARTITIONS tbl_x"),
Row(s"p=false") :: Nil)
sql(s"ALTER TABLE tbl_x DROP PARTITION (p = 'false')")
checkAnswer(sql("SHOW PARTITIONS tbl_x"), Nil)
}
testDropPartition(IntegerType, 1, 2)
testDropPartition(BooleanType, false, true)
testDropPartition(LongType, 1L, 2L)
testDropPartition(ShortType, 1.toShort, 2.toShort)
testDropPartition(ByteType, 1.toByte, 2.toByte)
testDropPartition(FloatType, 1.0F, 2.0F)
testDropPartition(DoubleType, 1.0, 2.0)
testDropPartition(DecimalType(2, 1), Decimal(1.5), Decimal(2.5))
}

test("SPARK-14922, SPARK-17732: Error handling for drop partitions by filter") {
withTable("sales") {
sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)")

val m = intercept[AnalysisException] {
sql("ALTER TABLE sales DROP PARTITION (unknown = 'KR')")
}.getMessage
assert(m.contains("unknown is not a valid partition column in table"))

val m2 = intercept[AnalysisException] {
sql("ALTER TABLE sales DROP PARTITION (unknown < 'KR')")
}.getMessage
assert(m2.contains("unknown is not a valid partition column in table"))

val m3 = intercept[AnalysisException] {
sql("ALTER TABLE sales DROP PARTITION (unknown <=> 'KR')")
}.getMessage
assert(m3.contains("'<=>' operator is not allowed in partition specification"))

val m4 = intercept[ParseException] {
sql("ALTER TABLE sales DROP PARTITION (unknown <=> upper('KR'))")
}.getMessage
assert(m4.contains("'<=>' operator is not allowed in partition specification"))

val m5 = intercept[ParseException] {
sql("ALTER TABLE sales DROP PARTITION (country < 'KR', quarter)")
}.getMessage
assert(m5.contains("Found an empty partition key"))

sql(s"ALTER TABLE sales ADD PARTITION (country = 'KR', quarter = '3')")
val m6 = intercept[AnalysisException] {
sql("ALTER TABLE sales DROP PARTITION (quarter <= '4'), PARTITION (quarter <= '2')")
}.getMessage
// The query is not executed because `PARTITION (quarter <= '2')` is invalid.
checkAnswer(sql("SHOW PARTITIONS sales"),
Row("country=KR/quarter=3") :: Nil)
assert(m6.contains("There is no partition for (`quarter` <= CAST('2' AS STRING))"))

val m7 = intercept[ParseException] {
sql("ALTER TABLE sales DROP PARTITION ( '4' > quarter)")
}.getMessage
checkAnswer(sql("SHOW PARTITIONS sales"),
Row("country=KR/quarter=3") :: Nil)
assert(m7.contains("Literal 4 is supported only on the rigth-side"))
}
}

test("SPARK-17732: Partition filter is not allowed in ADD PARTITION") {
withTable("sales") {
sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)")

val m = intercept[AnalysisException] {
sql("ALTER TABLE sales ADD PARTITION (country = 'US', quarter < '1')")
}.getMessage
assert(m.contains("Partition spec is invalid"))
}
}

test("drop views") {
withTable("tab1") {
val tabName = "tab1"
Expand Down