Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command
import java.util.Locale

import scala.collection.{GenMap, GenSeq}
import scala.concurrent.ExecutionContext
import scala.collection.parallel.ForkJoinTaskSupport
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
Expand All @@ -29,7 +29,7 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf}

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
Expand All @@ -40,7 +40,6 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter
import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.types._
import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}
import org.apache.spark.util.ThreadUtils.parmap

// Note: The definition of these commands are based on the ones described in
// https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
Expand Down Expand Up @@ -622,9 +621,8 @@ case class AlterTableRecoverPartitionsCommand(
val evalPool = ThreadUtils.newForkJoinPool("AlterTableRecoverPartitionsCommand", 8)
val partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)] =
try {
implicit val ec = ExecutionContext.fromExecutor(evalPool)
scanPartitions(spark, fs, pathFilter, root, Map(), table.partitionColumnNames, threshold,
spark.sessionState.conf.resolver)
spark.sessionState.conf.resolver, new ForkJoinTaskSupport(evalPool)).seq
} finally {
evalPool.shutdown()
}
Expand Down Expand Up @@ -656,13 +654,23 @@ case class AlterTableRecoverPartitionsCommand(
spec: TablePartitionSpec,
partitionNames: Seq[String],
threshold: Int,
resolver: Resolver)(implicit ec: ExecutionContext): Seq[(TablePartitionSpec, Path)] = {
resolver: Resolver,
evalTaskSupport: ForkJoinTaskSupport): GenSeq[(TablePartitionSpec, Path)] = {
if (partitionNames.isEmpty) {
return Seq(spec -> path)
}

val statuses = fs.listStatus(path, filter).toSeq
def handleStatus(st: FileStatus): Seq[(TablePartitionSpec, Path)] = {
val statuses = fs.listStatus(path, filter)
val statusPar: GenSeq[FileStatus] =
if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) {
// parallelize the list of partitions here, then we can have better parallelism later.
val parArray = statuses.par
parArray.tasksupport = evalTaskSupport
parArray
} else {
statuses
}
statusPar.flatMap { st =>
val name = st.getPath.getName
if (st.isDirectory && name.contains("=")) {
val ps = name.split("=", 2)
Expand All @@ -671,7 +679,7 @@ case class AlterTableRecoverPartitionsCommand(
val value = ExternalCatalogUtils.unescapePathName(ps(1))
if (resolver(columnName, partitionNames.head)) {
scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(partitionNames.head -> value),
partitionNames.drop(1), threshold, resolver)
partitionNames.drop(1), threshold, resolver, evalTaskSupport)
} else {
logWarning(
s"expected partition column ${partitionNames.head}, but got ${ps(0)}, ignoring it")
Expand All @@ -682,14 +690,6 @@ case class AlterTableRecoverPartitionsCommand(
Seq.empty
}
}
val result = if (partitionNames.length > 1 &&
statuses.length > threshold || partitionNames.length > 2) {
parmap(statuses)(handleStatus _)
} else {
statuses.map(handleStatus)
}

result.flatten
}

private def gatherPartitionStats(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,23 +52,24 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSQLContext with Befo
protected override def generateTable(
catalog: SessionCatalog,
name: TableIdentifier,
isDataSource: Boolean = true): CatalogTable = {
isDataSource: Boolean = true,
partitionCols: Seq[String] = Seq("a", "b")): CatalogTable = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A question about the changes in this file. Are they related to the work of this PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile Yes, the changes are related to an existing test which was modified to reproduce the issue. In particular, this line is related to support of any number of partition columns.

val storage =
CatalogStorageFormat.empty.copy(locationUri = Some(catalog.defaultTablePath(name)))
val metadata = new MetadataBuilder()
.putString("key", "value")
.build()
val schema = new StructType()
.add("col1", "int", nullable = true, metadata = metadata)
.add("col2", "string")
CatalogTable(
identifier = name,
tableType = CatalogTableType.EXTERNAL,
storage = storage,
schema = new StructType()
.add("col1", "int", nullable = true, metadata = metadata)
.add("col2", "string")
.add("a", "int")
.add("b", "int"),
schema = schema.copy(
fields = schema.fields ++ partitionCols.map(StructField(_, IntegerType))),
provider = Some("parquet"),
partitionColumnNames = Seq("a", "b"),
partitionColumnNames = partitionCols,
createTime = 0L,
createVersion = org.apache.spark.SPARK_VERSION,
tracksPartitionsInCatalog = true)
Expand Down Expand Up @@ -176,7 +177,8 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
protected def generateTable(
catalog: SessionCatalog,
name: TableIdentifier,
isDataSource: Boolean = true): CatalogTable
isDataSource: Boolean = true,
partitionCols: Seq[String] = Seq("a", "b")): CatalogTable

private val escapedIdentifier = "`(.+)`".r

Expand Down Expand Up @@ -228,8 +230,10 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
private def createTable(
catalog: SessionCatalog,
name: TableIdentifier,
isDataSource: Boolean = true): Unit = {
catalog.createTable(generateTable(catalog, name, isDataSource), ignoreIfExists = false)
isDataSource: Boolean = true,
partitionCols: Seq[String] = Seq("a", "b")): Unit = {
catalog.createTable(
generateTable(catalog, name, isDataSource, partitionCols), ignoreIfExists = false)
}

private def createTablePartition(
Expand Down Expand Up @@ -1131,7 +1135,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}

test("alter table: recover partition (parallel)") {
withSQLConf("spark.rdd.parallelListingThreshold" -> "1") {
withSQLConf("spark.rdd.parallelListingThreshold" -> "0") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MaxGekk, out of curiosity, why does this have to be 0?

Copy link
Member Author

@MaxGekk MaxGekk Aug 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the recursive calls this condition

val result = if (partitionNames.length > 1 &&
statuses.length > threshold || partitionNames.length > 2) {
is false because statuses.length is 1 and threshold is 1. So, it leads to sequential listening of files. I just enforce parallel scanning even for 1 file/folder.

testRecoverPartitions()
}
}
Expand All @@ -1144,23 +1148,32 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}

val tableIdent = TableIdentifier("tab1")
createTable(catalog, tableIdent)
val part1 = Map("a" -> "1", "b" -> "5")
createTable(catalog, tableIdent, partitionCols = Seq("a", "b", "c"))
val part1 = Map("a" -> "1", "b" -> "5", "c" -> "19")
createTablePartition(catalog, part1, tableIdent)
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))

val part2 = Map("a" -> "2", "b" -> "6")
val part2 = Map("a" -> "2", "b" -> "6", "c" -> "31")
val root = new Path(catalog.getTableMetadata(tableIdent).location)
val fs = root.getFileSystem(spark.sessionState.newHadoopConf())
// valid
fs.mkdirs(new Path(new Path(root, "a=1"), "b=5"))
fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "a.csv")) // file
fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "_SUCCESS")) // file
fs.mkdirs(new Path(new Path(root, "A=2"), "B=6"))
fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "b.csv")) // file
fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "c.csv")) // file
fs.createNewFile(new Path(new Path(root, "A=2/B=6"), ".hiddenFile")) // file
fs.mkdirs(new Path(new Path(root, "A=2/B=6"), "_temporary"))
fs.mkdirs(new Path(new Path(new Path(root, "a=1"), "b=5"), "c=19"))
fs.createNewFile(new Path(new Path(root, "a=1/b=5/c=19"), "a.csv")) // file
fs.createNewFile(new Path(new Path(root, "a=1/b=5/c=19"), "_SUCCESS")) // file

fs.mkdirs(new Path(new Path(new Path(root, "A=2"), "B=6"), "C=31"))
fs.createNewFile(new Path(new Path(root, "A=2/B=6/C=31"), "b.csv")) // file
fs.createNewFile(new Path(new Path(root, "A=2/B=6/C=31"), "c.csv")) // file
fs.createNewFile(new Path(new Path(root, "A=2/B=6/C=31"), ".hiddenFile")) // file
fs.mkdirs(new Path(new Path(root, "A=2/B=6/C=31"), "_temporary"))

val parts = (10 to 100).map { a =>
val part = Map("a" -> a.toString, "b" -> "5", "c" -> "42")
fs.mkdirs(new Path(new Path(new Path(root, s"a=$a"), "b=5"), "c=42"))
fs.createNewFile(new Path(new Path(root, s"a=$a/b=5/c=42"), "a.csv")) // file
createTablePartition(catalog, part, tableIdent)
part
}

// invalid
fs.mkdirs(new Path(new Path(root, "a"), "b")) // bad name
Expand All @@ -1174,7 +1187,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
try {
sql("ALTER TABLE tab1 RECOVER PARTITIONS")
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
Set(part1, part2))
Set(part1, part2) ++ parts)
if (!isUsingHiveMetastore) {
assert(catalog.getPartition(tableIdent, part1).parameters("numFiles") == "1")
assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") == "2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA
protected override def generateTable(
catalog: SessionCatalog,
name: TableIdentifier,
isDataSource: Boolean): CatalogTable = {
isDataSource: Boolean,
partitionCols: Seq[String] = Seq("a", "b")): CatalogTable = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The interface of this function looks strange. The original one is also hacky. We should refine them later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, please don't overwrite a method with a default parameter. It's very easy to use different default values then the value to pick up will depend on the type you are using...

val storage =
if (isDataSource) {
val serde = HiveSerDe.sourceToSerDe("parquet")
Expand All @@ -84,17 +85,17 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA
val metadata = new MetadataBuilder()
.putString("key", "value")
.build()
val schema = new StructType()
.add("col1", "int", nullable = true, metadata = metadata)
.add("col2", "string")
CatalogTable(
identifier = name,
tableType = CatalogTableType.EXTERNAL,
storage = storage,
schema = new StructType()
.add("col1", "int", nullable = true, metadata = metadata)
.add("col2", "string")
.add("a", "int")
.add("b", "int"),
schema = schema.copy(
fields = schema.fields ++ partitionCols.map(StructField(_, IntegerType))),
provider = if (isDataSource) Some("parquet") else Some("hive"),
partitionColumnNames = Seq("a", "b"),
partitionColumnNames = partitionCols,
createTime = 0L,
createVersion = org.apache.spark.SPARK_VERSION,
tracksPartitionsInCatalog = true)
Expand Down