Skip to content

Commit 071de47

Browse files
committed
Revert "Porting ALTER TABLE on parmap"
This reverts commit 6a5f2ae.
1 parent 80f9e7d commit 071de47

File tree

1 file changed

+17
-17
lines changed
  • sql/core/src/main/scala/org/apache/spark/sql/execution/command

1 file changed

+17
-17
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command
2020
import java.util.Locale
2121

2222
import scala.collection.{GenMap, GenSeq}
23-
import scala.concurrent.ExecutionContext
23+
import scala.collection.parallel.ForkJoinTaskSupport
2424
import scala.util.control.NonFatal
2525

2626
import org.apache.hadoop.conf.Configuration
@@ -29,7 +29,7 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
2929

3030
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
3131
import org.apache.spark.sql.catalyst.TableIdentifier
32-
import org.apache.spark.sql.catalyst.analysis.Resolver
32+
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver}
3333
import org.apache.spark.sql.catalyst.catalog._
3434
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
3535
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
@@ -40,7 +40,6 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter
4040
import org.apache.spark.sql.internal.HiveSerDe
4141
import org.apache.spark.sql.types._
4242
import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}
43-
import org.apache.spark.util.ThreadUtils.parmap
4443

4544
// Note: The definition of these commands are based on the ones described in
4645
// https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
@@ -622,9 +621,8 @@ case class AlterTableRecoverPartitionsCommand(
622621
val evalPool = ThreadUtils.newForkJoinPool("AlterTableRecoverPartitionsCommand", 8)
623622
val partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)] =
624623
try {
625-
implicit val ec = ExecutionContext.fromExecutor(evalPool)
626624
scanPartitions(spark, fs, pathFilter, root, Map(), table.partitionColumnNames, threshold,
627-
spark.sessionState.conf.resolver)
625+
spark.sessionState.conf.resolver, new ForkJoinTaskSupport(evalPool)).seq
628626
} finally {
629627
evalPool.shutdown()
630628
}
@@ -656,13 +654,23 @@ case class AlterTableRecoverPartitionsCommand(
656654
spec: TablePartitionSpec,
657655
partitionNames: Seq[String],
658656
threshold: Int,
659-
resolver: Resolver)(implicit ec: ExecutionContext): Seq[(TablePartitionSpec, Path)] = {
657+
resolver: Resolver,
658+
evalTaskSupport: ForkJoinTaskSupport): GenSeq[(TablePartitionSpec, Path)] = {
660659
if (partitionNames.isEmpty) {
661660
return Seq(spec -> path)
662661
}
663662

664-
val statuses = fs.listStatus(path, filter).toSeq
665-
def handleStatus(st: FileStatus): Seq[(TablePartitionSpec, Path)] = {
663+
val statuses = fs.listStatus(path, filter)
664+
val statusPar: GenSeq[FileStatus] =
665+
if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) {
666+
// parallelize the list of partitions here, then we can have better parallelism later.
667+
val parArray = statuses.par
668+
parArray.tasksupport = evalTaskSupport
669+
parArray
670+
} else {
671+
statuses
672+
}
673+
statusPar.flatMap { st =>
666674
val name = st.getPath.getName
667675
if (st.isDirectory && name.contains("=")) {
668676
val ps = name.split("=", 2)
@@ -671,7 +679,7 @@ case class AlterTableRecoverPartitionsCommand(
671679
val value = ExternalCatalogUtils.unescapePathName(ps(1))
672680
if (resolver(columnName, partitionNames.head)) {
673681
scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(partitionNames.head -> value),
674-
partitionNames.drop(1), threshold, resolver)
682+
partitionNames.drop(1), threshold, resolver, evalTaskSupport)
675683
} else {
676684
logWarning(
677685
s"expected partition column ${partitionNames.head}, but got ${ps(0)}, ignoring it")
@@ -682,14 +690,6 @@ case class AlterTableRecoverPartitionsCommand(
682690
Seq.empty
683691
}
684692
}
685-
val result = if (partitionNames.length > 1 &&
686-
statuses.length > threshold || partitionNames.length > 2) {
687-
parmap(statuses)(handleStatus _)
688-
} else {
689-
statuses.map(handleStatus)
690-
}
691-
692-
result.flatten
693693
}
694694

695695
private def gatherPartitionStats(

0 commit comments

Comments
 (0)