diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala b/core/src/main/scala/org/apache/spark/SparkException.scala index 4ad9a0cc4b10..6c2eb8defc58 100644 --- a/core/src/main/scala/org/apache/spark/SparkException.scala +++ b/core/src/main/scala/org/apache/spark/SparkException.scala @@ -43,3 +43,9 @@ private[spark] case class SparkUserAppException(exitCode: Int) */ private[spark] case class ExecutorDeadException(message: String) extends SparkException(message) + +/** + * Exception thrown when several InsertDataSource operations are conflicted. + */ +private[spark] case class InsertDataSourceConflictException(message: String) + extends SparkException(message) diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 11ce608f52ee..f945c16783df 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -19,6 +19,7 @@ package org.apache.spark.internal.io import java.io.IOException import java.util.{Date, UUID} +import java.util.concurrent.TimeUnit import scala.collection.mutable import scala.util.Try @@ -29,6 +30,7 @@ import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.spark.InsertDataSourceConflictException import org.apache.spark.internal.Logging import org.apache.spark.mapred.SparkHadoopMapRedUtil @@ -91,6 +93,31 @@ class HadoopMapReduceCommitProtocol( */ private def stagingDir = new Path(path, ".spark-staging-" + jobId) + // The job attempt path when dynamicPartitionOverwrite is false, + // please keep it consistent with `FileOutputCommitter`.`getJobAttemptPath`. + private def staticJobAttemptPath = new Path(path, "_temporary") + + private def checkStaticInsertConflict(jobContext: JobContext): Unit = { + val fs = new Path(path).getFileSystem(jobContext.getConfiguration) + if (fs.exists(staticJobAttemptPath)) { + val fileStatus = fs.getFileStatus(staticJobAttemptPath) + val accessTime = new Date(fileStatus.getAccessTime) + val modificationTime = new Date(fileStatus.getModificationTime) + val lastedTime = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - + fileStatus.getModificationTime) + throw InsertDataSourceConflictException( + s""" + | The staging dir for non DynamicPartitionOverwrite is existed, its create time is + | $accessTime, and its modified time is $modificationTime, already $lastedTime seconds + | from now. There may be two possibilities: + | 1. Another InsertDataSource operation is executing, you need wait for it to complete. + | 2. The staging dir is belong to a killed application and not be cleaned up gracefully, + | please refer to its modification time and it need be cleaned up manually. + | Please process this staging dir according to above information. + |""".stripMargin) + } + } + protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = { val format = context.getOutputFormatClass.getConstructor().newInstance() // If OutputFormat is Configurable, we should set conf to it. @@ -160,11 +187,18 @@ class HadoopMapReduceCommitProtocol( val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId) committer = setupCommitter(taskAttemptContext) - committer.setupJob(jobContext) + // For dynamic partition overwrite, it has specific job attempt path, so we don't need + // committer.setupJob here. Same with the commitJob and abortJob operations below. + if (!dynamicPartitionOverwrite) { + checkStaticInsertConflict(jobContext) + committer.setupJob(jobContext) + } } override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = { - committer.commitJob(jobContext) + if (!dynamicPartitionOverwrite) { + committer.commitJob(jobContext) + } if (hasValidPath) { val (allAbsPathFiles, allPartitionPaths) = @@ -215,7 +249,9 @@ class HadoopMapReduceCommitProtocol( */ override def abortJob(jobContext: JobContext): Unit = { try { - committer.abortJob(jobContext, JobStatus.State.FAILED) + if (!dynamicPartitionOverwrite) { + committer.abortJob(jobContext, JobStatus.State.FAILED) + } } catch { case e: IOException => logWarning(s"Exception while aborting ${jobContext.getJobID}", e) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 80c1e24bfa56..cf904f6b62ea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -22,6 +22,8 @@ import java.net.{MalformedURLException, URL} import java.sql.{Date, Timestamp} import java.util.concurrent.atomic.AtomicBoolean +import com.google.common.io.Files + import org.apache.spark.{AccumulatorSuite, SparkException} import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql.catalyst.util.StringUtils @@ -33,6 +35,7 @@ import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode} import org.apache.spark.sql.test.{SharedSparkSession, TestSQLContext} import org.apache.spark.sql.test.SQLTestData._ import org.apache.spark.sql.types._ @@ -3192,6 +3195,35 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession { checkAnswer(df3, Array(Row(new java.math.BigDecimal("0.100000000000000000000000100")))) } } + + test("SPARK-29037: Spark gives duplicate result when an application was killed") { + withSQLConf(PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString) { + withTable("test") { + sql("create table test(id int, p1 int, p2 int) using parquet partitioned by (p1, p2)") + sql("insert overwrite table test partition(p1=1,p2) select 1,3") + val df = sql("select id from test where p1=1 and p2=3") + checkAnswer(df, Array(Row(1))) + + val warehouse = SQLConf.get.warehousePath.split(":").last + val tblPath = Array(warehouse, "org.apache.spark.sql.SQLQuerySuite", "test") + .mkString(File.separator) + val taskAttemptPath = Array(tblPath, "_temporary", "0", "task_20190914232019_0000_m_000000", + "p1=1", "p2=3").mkString(File.separator) + new File(taskAttemptPath).mkdirs() + + val tblResult = new File(Array(tblPath, "p1=1", "p2=3").mkString(File.separator)) + val tFile = tblResult.list((_: File, name: String) => !name.startsWith(".")).apply(0) + val from = new File(tblResult.getAbsolutePath + File.separator + tFile) + val to = new File(taskAttemptPath + File.separator + tFile) + Files.copy(from, to) + + sql("insert overwrite table test partition(p1=1,p2) select 2, 3") + assert(tblResult.list((_: File, name: String) => !name.startsWith(".")).size === 1) + val df2 = sql("select id from test where p1=1 and p2=3") + checkAnswer(df2, Array(Row(2))) + } + } + } } case class Foo(bar: Option[String])