Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/sql-ref-ansi-compliance.md
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ Below is a list of all the keywords in Spark SQL.
|STRUCT|non-reserved|non-reserved|non-reserved|
|SUBSTR|non-reserved|non-reserved|non-reserved|
|SUBSTRING|non-reserved|non-reserved|non-reserved|
|SYNC|non-reserved|non-reserved|non-reserved|
|TABLE|reserved|non-reserved|reserved|
|TABLES|non-reserved|non-reserved|non-reserved|
|TABLESAMPLE|non-reserved|non-reserved|reserved|
Expand Down
9 changes: 8 additions & 1 deletion docs/sql-ref-syntax-ddl-repair-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ If the table is cached, the command clears cached data of the table and all its
### Syntax

```sql
MSCK REPAIR TABLE table_identifier
MSCK REPAIR TABLE table_identifier [{ADD|DROP|SYNC} PARTITIONS]
```

### Parameters
Expand All @@ -39,6 +39,13 @@ MSCK REPAIR TABLE table_identifier

**Syntax:** `[ database_name. ] table_name`

* **`{ADD|DROP|SYNC} PARTITIONS`**

* If specified, `MSCK REPAIR TABLE` only adds partitions to the session catalog.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: should be If not specified.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to put it in the end, and say If not specified, ADD is the default.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the PR #31633

* **ADD**, the command adds new partitions to the session catalog for all sub-folder in the base table folder that don't belong to any table partitions.
* **DROP**, the command drops all partitions from the session catalog that have non-existing locations in the file system.
* **SYNC** is the combination of **DROP** and **ADD**.

### Examples

```sql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ statement
| LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE
multipartIdentifier partitionSpec? #loadData
| TRUNCATE TABLE multipartIdentifier partitionSpec? #truncateTable
| MSCK REPAIR TABLE multipartIdentifier #repairTable
| MSCK REPAIR TABLE multipartIdentifier
(option=(ADD|DROP|SYNC) PARTITIONS)? #repairTable
| op=(ADD | LIST) identifier (STRING | .*?) #manageResource
| SET ROLE .*? #failNativeCommand
| SET TIME ZONE interval #setTimeZone
Expand Down Expand Up @@ -1173,6 +1174,7 @@ ansiNonReserved
| STRUCT
| SUBSTR
| SUBSTRING
| SYNC
| TABLES
| TABLESAMPLE
| TBLPROPERTIES
Expand Down Expand Up @@ -1429,6 +1431,7 @@ nonReserved
| STRUCT
| SUBSTR
| SUBSTRING
| SYNC
| TABLE
| TABLES
| TABLESAMPLE
Expand Down Expand Up @@ -1687,6 +1690,7 @@ STRATIFY: 'STRATIFY';
STRUCT: 'STRUCT';
SUBSTR: 'SUBSTR';
SUBSTRING: 'SUBSTRING';
SYNC: 'SYNC';
TABLE: 'TABLE';
TABLES: 'TABLES';
TABLESAMPLE: 'TABLESAMPLE';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3659,11 +3659,24 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
*
* For example:
* {{{
* MSCK REPAIR TABLE multi_part_name
* MSCK REPAIR TABLE multi_part_name [{ADD|DROP|SYNC} PARTITIONS]
* }}}
*/
override def visitRepairTable(ctx: RepairTableContext): LogicalPlan = withOrigin(ctx) {
RepairTable(createUnresolvedTable(ctx.multipartIdentifier, "MSCK REPAIR TABLE"))
val (enableAddPartitions, enableDropPartitions, option) =
if (ctx.SYNC() != null) {
(true, true, " ... SYNC PARTITIONS")
} else if (ctx.DROP() != null) {
(false, true, " ... DROP PARTITIONS")
} else if (ctx.ADD() != null) {
(true, false, " ... ADD PARTITIONS")
} else {
(true, false, "")
}
RepairTable(
createUnresolvedTable(ctx.multipartIdentifier, s"MSCK REPAIR TABLE$option"),
enableAddPartitions,
enableDropPartitions)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,10 @@ case class DropView(
/**
* The logical plan of the MSCK REPAIR TABLE command.
*/
case class RepairTable(child: LogicalPlan) extends Command {
case class RepairTable(
child: LogicalPlan,
enableAddPartitions: Boolean,
enableDropPartitions: Boolean) extends Command {
override def children: Seq[LogicalPlan] = child :: Nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1913,12 +1913,6 @@ class DDLParserSuite extends AnalysisTest {
"missing 'COLUMNS' at '<EOF>'")
}

test("MSCK REPAIR TABLE") {
Copy link
Member

@dongjoon-hyun dongjoon-hyun Feb 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to remove this instead of revising?

comparePlans(
parsePlan("MSCK REPAIR TABLE a.b.c"),
RepairTable(UnresolvedTable(Seq("a", "b", "c"), "MSCK REPAIR TABLE", None)))
}

test("LOAD DATA INTO table") {
comparePlans(
parsePlan("LOAD DATA INPATH 'filepath' INTO TABLE a.b.c"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,12 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case AnalyzeColumn(ResolvedV1TableOrViewIdentifier(ident), columnNames, allColumns) =>
AnalyzeColumnCommand(ident.asTableIdentifier, columnNames, allColumns)

case RepairTable(ResolvedV1TableIdentifier(ident)) =>
AlterTableRecoverPartitionsCommand(ident.asTableIdentifier, "MSCK REPAIR TABLE")
case RepairTable(ResolvedV1TableIdentifier(ident), addPartitions, dropPartitions) =>
AlterTableRecoverPartitionsCommand(
ident.asTableIdentifier,
addPartitions,
dropPartitions,
"MSCK REPAIR TABLE")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a question: can we propagate the original commandName instead of having "MSCK REPAIR TABLE"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, we lost the info in UnresolvedTable -> ResolvedTable but we can reconstruct the command name from the flags addPartitions and dropPartitions. Though not the original one because MSCK REPAIR TABLE table has addPartitions = true, dropPartitions = false can be re-constructed as MSCK REPAIR TABLE table ADD PARTITIONS. Are you ok with that?


case LoadData(ResolvedV1TableIdentifier(ident), path, isLocal, isOverwrite, partition) =>
LoadDataCommand(
Expand Down Expand Up @@ -420,6 +424,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case AlterTableRecoverPartitions(ResolvedV1TableIdentifier(ident)) =>
AlterTableRecoverPartitionsCommand(
ident.asTableIdentifier,
enableAddPartitions = true,
enableDropPartitions = false,
"ALTER TABLE RECOVER PARTITIONS")

case AlterTableAddPartition(ResolvedV1TableIdentifier(ident), partSpecsAndLocs, ifNotExists) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,10 @@ case class CreateDataSourceTableAsSelectCommand(
case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
sparkSession.sqlContext.conf.manageFilesourcePartitions =>
// Need to recover partitions into the metastore so our saved data is visible.
sessionState.executePlan(AlterTableRecoverPartitionsCommand(table.identifier)).toRdd
sessionState.executePlan(AlterTableRecoverPartitionsCommand(
table.identifier,
enableAddPartitions = true,
enableDropPartitions = false)).toRdd
case _ =>
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,11 +597,13 @@ case class PartitionStatistics(numFiles: Int, totalSize: Long)
* The syntax of this command is:
* {{{
* ALTER TABLE table RECOVER PARTITIONS;
* MSCK REPAIR TABLE table;
* MSCK REPAIR TABLE table [{ADD|DROP|SYNC} PARTITIONS];
* }}}
*/
case class AlterTableRecoverPartitionsCommand(
tableName: TableIdentifier,
enableAddPartitions: Boolean,
enableDropPartitions: Boolean,
cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand {

// These are list of statistics that can be collected quickly without requiring a scan of the data
Expand Down Expand Up @@ -645,34 +647,40 @@ case class AlterTableRecoverPartitionsCommand(
val hadoopConf = spark.sessionState.newHadoopConf()
val fs = root.getFileSystem(hadoopConf)

val threshold = spark.sparkContext.conf.get(RDD_PARALLEL_LISTING_THRESHOLD)
val pathFilter = getPathFilter(hadoopConf)
val droppedAmount = if (enableDropPartitions) {
dropPartitions(catalog, fs)
} else 0
val addedAmount = if (enableAddPartitions) {
val threshold = spark.sparkContext.conf.get(RDD_PARALLEL_LISTING_THRESHOLD)
val pathFilter = getPathFilter(hadoopConf)

val evalPool = ThreadUtils.newForkJoinPool("AlterTableRecoverPartitionsCommand", 8)
val partitionSpecsAndLocs: GenSeq[(TablePartitionSpec, Path)] =
try {
scanPartitions(spark, fs, pathFilter, root, Map(), table.partitionColumnNames, threshold,
spark.sessionState.conf.resolver, new ForkJoinTaskSupport(evalPool)).seq
} finally {
evalPool.shutdown()
}
val total = partitionSpecsAndLocs.length
logInfo(s"Found $total partitions in $root")

val evalPool = ThreadUtils.newForkJoinPool("AlterTableRecoverPartitionsCommand", 8)
val partitionSpecsAndLocs: GenSeq[(TablePartitionSpec, Path)] =
try {
scanPartitions(spark, fs, pathFilter, root, Map(), table.partitionColumnNames, threshold,
spark.sessionState.conf.resolver, new ForkJoinTaskSupport(evalPool)).seq
} finally {
evalPool.shutdown()
val partitionStats = if (spark.sqlContext.conf.gatherFastStats) {
gatherPartitionStats(spark, partitionSpecsAndLocs, fs, pathFilter, threshold)
} else {
GenMap.empty[String, PartitionStatistics]
}
val total = partitionSpecsAndLocs.length
logInfo(s"Found $total partitions in $root")

val partitionStats = if (spark.sqlContext.conf.gatherFastStats) {
gatherPartitionStats(spark, partitionSpecsAndLocs, fs, pathFilter, threshold)
} else {
GenMap.empty[String, PartitionStatistics]
}
logInfo(s"Finished to gather the fast stats for all $total partitions.")
logInfo(s"Finished to gather the fast stats for all $total partitions.")

addPartitions(spark, table, partitionSpecsAndLocs, partitionStats)
addPartitions(spark, table, partitionSpecsAndLocs, partitionStats)
total
} else 0
// Updates the table to indicate that its partition metadata is stored in the Hive metastore.
// This is always the case for Hive format tables, but is not true for Datasource tables created
// before Spark 2.1 unless they are converted via `msck repair table`.
spark.sessionState.catalog.alterTable(table.copy(tracksPartitionsInCatalog = true))
spark.catalog.refreshTable(tableIdentWithDB)
logInfo(s"Recovered all partitions ($total).")
logInfo(s"Recovered all partitions: added ($addedAmount), dropped ($droppedAmount).")
Seq.empty[Row]
}

Expand Down Expand Up @@ -791,8 +799,28 @@ case class AlterTableRecoverPartitionsCommand(
logDebug(s"Recovered ${parts.length} partitions ($done/$total so far)")
}
}
}

// Drops the partitions that do not exist in the file system
private def dropPartitions(catalog: SessionCatalog, fs: FileSystem): Int = {
val dropPartSpecs = ThreadUtils.parmap(
catalog.listPartitions(tableName),
"AlterTableRecoverPartitionsCommand: non-existing partitions",
maxThreads = 8) { partition =>
partition.storage.locationUri.flatMap { uri =>
if (fs.exists(new Path(uri))) None else Some(partition.spec)
}
}.flatten
catalog.dropPartitions(
tableName,
dropPartSpecs,
ignoreIfNotExists = true,
purge = false,
// Since we have already checked that partition directories do not exist, we can avoid
// additional calls to the file system at the catalog side by setting this flag.
retainData = true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment about the reason why we use retainData=true? I guess the reason is that fs.exists(..) is already false and we don't want addition file system calls. Did I understand correctly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... we don't want addition file system calls. Did I understand correctly?

Yep, if we set retainData to true, the deleteData flag will false at https://github.com/apache/hive/blob/release-3.1.3-rc0/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java#L4360-L4378 . So, Hive MeteStore will not try to delete the partition folders.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same for the In-Memory catalog:

if (existingParts.contains(p) && shouldRemovePartitionLocation) {
val partitionPath = new Path(existingParts(p).location)
try {
val fs = partitionPath.getFileSystem(hadoopConfig)
fs.delete(partitionPath, true)
} catch {
case e: IOException =>
throw QueryExecutionErrors.unableToDeletePartitionPathError(partitionPath, e)
}
}

dropPartSpecs.length
}
}

/**
* A command that sets the location of a table or a partition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
table,
pattern.map(_.asInstanceOf[ResolvedPartitionSpec])) :: Nil

case RepairTable(_: ResolvedTable) =>
case RepairTable(_: ResolvedTable, _, _) =>
throw new AnalysisException("MSCK REPAIR TABLE is not supported for v2 tables.")

case r: CacheTable =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.spark.sql.execution.command

import java.io.File

import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.scalactic.source.Position
import org.scalatest.Tag
Expand Down Expand Up @@ -144,4 +147,26 @@ trait DDLCommandTestUtils extends SQLTestUtils {
val fs = root.getFileSystem(spark.sessionState.newHadoopConf())
f(fs, root)
}

def getPartitionLocation(tableName: String, part: String): String = {
val idents = tableName.split('.')
val table = idents.last
val catalogAndNs = idents.init
val in = if (catalogAndNs.isEmpty) "" else s"IN ${catalogAndNs.mkString(".")}"
val information = sql(s"SHOW TABLE EXTENDED $in LIKE '$table' PARTITION ($part)")
.select("information")
.first().getString(0)
information
.split("\\r?\\n")
.filter(_.startsWith("Location:"))
.head
.replace("Location: file:", "")
}

def copyPartition(tableName: String, from: String, to: String): String = {
val part0Loc = getPartitionLocation(tableName, from)
val part1Loc = part0Loc.replace(from, to)
FileUtils.copyDirectory(new File(part0Loc), new File(part1Loc))
part1Loc
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.command

import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedTable}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan
import org.apache.spark.sql.catalyst.plans.logical.RepairTable

class MsckRepairTableParserSuite extends AnalysisTest {
test("repair a table") {
comparePlans(
parsePlan("MSCK REPAIR TABLE a.b.c"),
RepairTable(
UnresolvedTable(Seq("a", "b", "c"), "MSCK REPAIR TABLE", None),
enableAddPartitions = true,
enableDropPartitions = false))
}

test("add partitions") {
comparePlans(
parsePlan("msck repair table ns.tbl add partitions"),
RepairTable(
UnresolvedTable(
Seq("ns", "tbl"),
"MSCK REPAIR TABLE ... ADD PARTITIONS",
None),
enableAddPartitions = true,
enableDropPartitions = false))
}

test("drop partitions") {
comparePlans(
parsePlan("MSCK repair table TBL Drop Partitions"),
RepairTable(
UnresolvedTable(
Seq("TBL"),
"MSCK REPAIR TABLE ... DROP PARTITIONS",
None),
enableAddPartitions = false,
enableDropPartitions = true))
}

test("sync partitions") {
comparePlans(
parsePlan("MSCK REPAIR TABLE spark_catalog.ns.tbl SYNC PARTITIONS"),
RepairTable(
UnresolvedTable(
Seq("spark_catalog", "ns", "tbl"),
"MSCK REPAIR TABLE ... SYNC PARTITIONS",
None),
enableAddPartitions = true,
enableDropPartitions = true))
}
}
Loading