Skip to content
6 changes: 6 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkException.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,9 @@ private[spark] case class SparkUserAppException(exitCode: Int)
*/
private[spark] case class ExecutorDeadException(message: String)
extends SparkException(message)

/**
* Exception thrown when several InsertDataSource operations are conflicted.
*/
private[spark] case class InsertDataSourceConflictException(message: String)
extends SparkException(message)
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.internal.io

import java.io.IOException
import java.util.{Date, UUID}
import java.util.concurrent.TimeUnit

import scala.collection.mutable
import scala.util.Try
Expand All @@ -29,6 +30,7 @@ import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

import org.apache.spark.InsertDataSourceConflictException
import org.apache.spark.internal.Logging
import org.apache.spark.mapred.SparkHadoopMapRedUtil

Expand Down Expand Up @@ -91,6 +93,31 @@ class HadoopMapReduceCommitProtocol(
*/
private def stagingDir = new Path(path, ".spark-staging-" + jobId)

// The job attempt path when dynamicPartitionOverwrite is false,
// please keep it consistent with `FileOutputCommitter`.`getJobAttemptPath`.
private def staticJobAttemptPath = new Path(path, "_temporary")

private def checkStaticInsertConflict(jobContext: JobContext): Unit = {
val fs = new Path(path).getFileSystem(jobContext.getConfiguration)
if (fs.exists(staticJobAttemptPath)) {
val fileStatus = fs.getFileStatus(staticJobAttemptPath)
val accessTime = new Date(fileStatus.getAccessTime)
val modificationTime = new Date(fileStatus.getModificationTime)
val lastedTime = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() -
fileStatus.getModificationTime)
throw InsertDataSourceConflictException(
s"""
| The staging dir for non DynamicPartitionOverwrite is existed, its create time is
| $accessTime, and its modified time is $modificationTime, already $lastedTime seconds
| from now. There may be two possibilities:
| 1. Another InsertDataSource operation is executing, you need wait for it to complete.
| 2. The staging dir is belong to a killed application and not be cleaned up gracefully,
| please refer to its modification time and it need be cleaned up manually.
| Please process this staging dir according to above information.
|""".stripMargin)
}
}

protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
val format = context.getOutputFormatClass.getConstructor().newInstance()
// If OutputFormat is Configurable, we should set conf to it.
Expand Down Expand Up @@ -160,11 +187,18 @@ class HadoopMapReduceCommitProtocol(

val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
committer = setupCommitter(taskAttemptContext)
committer.setupJob(jobContext)
// For dynamic partition overwrite, it has specific job attempt path, so we don't need
// committer.setupJob here. Same with the commitJob and abortJob operations below.
if (!dynamicPartitionOverwrite) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to add comments to explain it. It looks to me that the hadoop output committer doesn't support concurrent writing to the same directory by design, so there is nothing we can do at Spark side.

The fix here is to avoid using the hadoop output committer when dynamicPartitionOverwrite=true. I'm fine with this fix.

BTW, when writing partitioned table with dynamicPartitionOverwrite=false, can we support it as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also cc @advancedxy

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and for non-partitioned table, can we clean up the staging dir when the job is killed?

Copy link
Member Author

@turboFei turboFei Sep 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@advancedxy has discussed with me offline about writing partitioned table with dynamicPartitionOverwrite=false .
He proposed a suggestion that, we can add JobAttemptPath(_temporary/0) existence check when dynamicPartitionOverwrite=false.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a non-partitioned table, dynamicPartitionOverwrite is false, and the staging dir is under JobAttemptPath(_temporary/0), I think the staging dir will be cleaned up by FileOutputCommitter.abortJob().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the staging dir will be cleaned up by FileOutputCommitter.abortJob().

Why it can't be cleaned when dynamicPartitionOverwrite=true?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a job is killed, its staging dir can be cleaned up by abortJob method.
But when an application is killed, its job's staging dir would not be cleaned up gracefully.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the staging dir will be cleaned up by FileOutputCommitter.abortJob().

Why it can't be cleaned when dynamicPartitionOverwrite=true?

For the case in PR description, It is happened when appA(static partition overwrite) is killed and its staging dir is not cleaned up gracefully, then appB commits parts result of appA.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, so we can't rely on the job cleanup. And ideally we should use different staging dir for each job.

That said, seems we can't fix the problem for non-partitioned table if we continue to use the hadoop output committer.

Copy link
Member Author

@turboFei turboFei Sep 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, a solution proposed by advancedxy is adding job attempt path existence check for non-partitioned table and static partition overwrite operation.

And the implementation of InsertIntoHiveTable uses different staging dir for each job.

checkStaticInsertConflict(jobContext)
committer.setupJob(jobContext)
}
}

override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = {
committer.commitJob(jobContext)
if (!dynamicPartitionOverwrite) {
committer.commitJob(jobContext)
}

if (hasValidPath) {
val (allAbsPathFiles, allPartitionPaths) =
Expand Down Expand Up @@ -215,7 +249,9 @@ class HadoopMapReduceCommitProtocol(
*/
override def abortJob(jobContext: JobContext): Unit = {
try {
committer.abortJob(jobContext, JobStatus.State.FAILED)
if (!dynamicPartitionOverwrite) {
committer.abortJob(jobContext, JobStatus.State.FAILED)
}
} catch {
case e: IOException =>
logWarning(s"Exception while aborting ${jobContext.getJobID}", e)
Expand Down
32 changes: 32 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import java.net.{MalformedURLException, URL}
import java.sql.{Date, Timestamp}
import java.util.concurrent.atomic.AtomicBoolean

import com.google.common.io.Files

import org.apache.spark.{AccumulatorSuite, SparkException}
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.catalyst.util.StringUtils
Expand All @@ -33,6 +35,7 @@ import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode}
import org.apache.spark.sql.test.{SharedSparkSession, TestSQLContext}
import org.apache.spark.sql.test.SQLTestData._
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -3192,6 +3195,35 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession {
checkAnswer(df3, Array(Row(new java.math.BigDecimal("0.100000000000000000000000100"))))
}
}

test("SPARK-29037: Spark gives duplicate result when an application was killed") {
withSQLConf(PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString) {
withTable("test") {
sql("create table test(id int, p1 int, p2 int) using parquet partitioned by (p1, p2)")
sql("insert overwrite table test partition(p1=1,p2) select 1,3")
val df = sql("select id from test where p1=1 and p2=3")
checkAnswer(df, Array(Row(1)))

val warehouse = SQLConf.get.warehousePath.split(":").last
val tblPath = Array(warehouse, "org.apache.spark.sql.SQLQuerySuite", "test")
.mkString(File.separator)
val taskAttemptPath = Array(tblPath, "_temporary", "0", "task_20190914232019_0000_m_000000",
"p1=1", "p2=3").mkString(File.separator)
new File(taskAttemptPath).mkdirs()

val tblResult = new File(Array(tblPath, "p1=1", "p2=3").mkString(File.separator))
val tFile = tblResult.list((_: File, name: String) => !name.startsWith(".")).apply(0)
val from = new File(tblResult.getAbsolutePath + File.separator + tFile)
val to = new File(taskAttemptPath + File.separator + tFile)
Files.copy(from, to)

sql("insert overwrite table test partition(p1=1,p2) select 2, 3")
assert(tblResult.list((_: File, name: String) => !name.startsWith(".")).size === 1)
val df2 = sql("select id from test where p1=1 and p2=3")
checkAnswer(df2, Array(Row(2)))
}
}
}
}

case class Foo(bar: Option[String])