diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index a47136ea3673..e17f08a8b236 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2779,9 +2779,6 @@ object SparkContext extends Logging { deployMode: String): (SchedulerBackend, TaskScheduler) = { import SparkMasterRegex._ - // When running locally, don't try to re-execute tasks on failure. - val MAX_LOCAL_TASK_FAILURES = 1 - // Ensure that default executor's resources satisfies one or more tasks requirement. // This function is for cluster managers that don't set the executor cores config, for // others its checked in ResourceProfile. @@ -2810,7 +2807,8 @@ object SparkContext extends Logging { master match { case "local" => checkResourcesPerTask(1) - val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) + val scheduler = + new TaskSchedulerImpl(sc, sc.conf.get(MAX_LOCAL_TASK_FAILURES), isLocal = true) val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1) scheduler.initialize(backend) (backend, scheduler) @@ -2823,7 +2821,8 @@ object SparkContext extends Logging { throw new SparkException(s"Asked to run locally with $threadCount threads") } checkResourcesPerTask(threadCount) - val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) + val scheduler = + new TaskSchedulerImpl(sc, sc.conf.get(MAX_LOCAL_TASK_FAILURES), isLocal = true) val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount) scheduler.initialize(backend) (backend, scheduler) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 3f36e61fe112..0827781344ed 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1542,4 +1542,10 @@ package object config { .bytesConf(ByteUnit.BYTE) .createOptional + private[spark] val MAX_LOCAL_TASK_FAILURES = ConfigBuilder("spark.task.local.maxFailures") + .doc("The max failure times for a task while SparkContext running in Local mode, " + + "SomeTimes we need to ignore some nonfatal task failure or test some cases " + + "when speculation or re-executing on") + .intConf + .createWithDefault(1) } diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 11ce608f52ee..65041b13ae4b 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -21,6 +21,7 @@ import java.io.IOException import java.util.{Date, UUID} import scala.collection.mutable +import scala.language.implicitConversions import scala.util.Try import org.apache.hadoop.conf.Configurable @@ -110,7 +111,9 @@ class HadoopMapReduceCommitProtocol( assert(dir.isDefined, "The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.") partitionPaths += dir.get - this.stagingDir + // For speculative or re-executing task, we need to assign different + // dir for each task attempt + new Path(this.stagingDir, taskContext.getTaskAttemptID.toString) // For FileOutputCommitter it has its own staging path called "work path". case f: FileOutputCommitter => new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path)) @@ -165,10 +168,11 @@ class HadoopMapReduceCommitProtocol( override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = { committer.commitJob(jobContext) - if (hasValidPath) { - val (allAbsPathFiles, allPartitionPaths) = - taskCommits.map(_.obj.asInstanceOf[(Map[String, String], Set[String])]).unzip + implicit def asPair(x: (Map[String, String], Set[String], String)) + : (Map[String, String], (Set[String], String)) = (x._1, (x._2, x._3)) + val (allAbsPathFiles, partitionPathsAttemptIDPair) = + taskCommits.map(_.obj.asInstanceOf[(Map[String, String], Set[String], String)]).unzip val fs = stagingDir.getFileSystem(jobContext.getConfiguration) val filesToMove = allAbsPathFiles.foldLeft(Map[String, String]())(_ ++ _) @@ -183,22 +187,30 @@ class HadoopMapReduceCommitProtocol( } if (dynamicPartitionOverwrite) { - val partitionPaths = allPartitionPaths.foldLeft(Set[String]())(_ ++ _) - logDebug(s"Clean up default partition directories for overwriting: $partitionPaths") - for (part <- partitionPaths) { - val finalPartPath = new Path(path, part) - if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) { - // According to the official hadoop FileSystem API spec, delete op should assume - // the destination is no longer present regardless of return value, thus we do not - // need to double check if finalPartPath exists before rename. - // Also in our case, based on the spec, delete returns false only when finalPartPath - // does not exist. When this happens, we need to take action if parent of finalPartPath - // also does not exist(e.g. the scenario described on SPARK-23815), because - // FileSystem API spec on rename op says the rename dest(finalPartPath) must have - // a parent that exists, otherwise we may get unexpected result on the rename. - fs.mkdirs(finalPartPath.getParent) - } - fs.rename(new Path(stagingDir, part), finalPartPath) + val allPartitionPaths = partitionPathsAttemptIDPair.map { + case (allPartitionPath, successAttemptID) => + allPartitionPath.foreach(part => { + val finalPartPath = new Path(path, part) + if (!fs.delete(finalPartPath, true) && !fs.exists(finalPartPath.getParent)) { + // According to the official hadoop FileSystem API spec, delete op should assume + // the destination is no longer present regardless of return value, thus we do not + // need to double check if finalPartPath exists before rename. + // Also in our case, based on the spec, delete returns false only when finalPartPath + // does not exist. When this happens, we need to take action if parent of + // finalPartPath also does not exist(e.g. the scenario described on SPARK-23815), + // because FileSystem API spec on rename op says the rename dest(finalPartPath) + // must have a parent that exists, otherwise we may get unexpected result + // on the rename. + fs.mkdirs(finalPartPath.getParent) + } + fs.rename(new Path(s"$stagingDir/$successAttemptID", part), finalPartPath) + }) + allPartitionPath + case _ => Set.empty + } + if (log.isDebugEnabled) { + val partitionPaths = allPartitionPaths.foldLeft(Set[String]())(_ ++ _) + logDebug(s"Clean up default partition directories for overwriting: $partitionPaths") } } @@ -243,7 +255,7 @@ class HadoopMapReduceCommitProtocol( logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) - new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet) + new TaskCommitMessage((addedAbsPathFiles.toMap, partitionPaths.toSet, attemptId.toString)) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertWithMultipleTaskAttemptSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertWithMultipleTaskAttemptSuite.scala new file mode 100644 index 000000000000..8734636c061a --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertWithMultipleTaskAttemptSuite.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.hadoop.mapreduce.TaskAttemptContext + +import org.apache.spark.SparkConf +import org.apache.spark.internal.config +import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode +import org.apache.spark.sql.test.SharedSparkSession + +class InsertWithMultipleTaskAttemptSuite extends QueryTest with SharedSparkSession { + override def sparkConf: SparkConf = { + val conf = super.sparkConf + conf.set(config.MAX_LOCAL_TASK_FAILURES, 3) + } + + test("it is allowed to insert into a table for dynamic partition overwrite " + + "while speculation on") { + withSQLConf( + SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString, + SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> classOf[InsertExceptionCommitProtocol].getName) { + withTable("insertTable") { + sql( + """ + |CREATE TABLE insertTable(i int, part1 int, part2 int) USING PARQUET + |PARTITIONED BY (part1, part2) + """.stripMargin) + + sql( + """ + |INSERT OVERWRITE TABLE insertTable Partition(part1=1, part2) + |SELECT 1,2 + """.stripMargin) + checkAnswer(spark.table("insertTable"), Row(1, 1, 2)) + } + } + } +} + +class InsertExceptionCommitProtocol( + jobId: String, + path: String, + dynamicPartitionOverwrite: Boolean = false) + extends SQLHadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite) { + override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = + if (InsertExceptionCommitProtocol.inited) { + throw new Exception("test") + } else { + super.commitTask(taskContext) + } +} + +object InsertExceptionCommitProtocol { + private val initedFlag = new AtomicBoolean(false) + def inited: Boolean = initedFlag.compareAndSet(false, true) +}