Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.util.Try
import org.apache.hadoop.conf.Configurable
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, FileOutputFormat}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -91,7 +91,31 @@ class HadoopMapReduceCommitProtocol(
*/
private def stagingDir = new Path(path, ".spark-staging-" + jobId)

/**
* Get the desired output path for the job. The output will be [[path]] when
Copy link
Contributor

@cloud-fan cloud-fan Sep 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The output will be [[path]] what does path mean here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the path is defined in the class parameter, and the comment for that is:

 * @param jobId the job's or stage's id
 * @param path the job's output path, or null if committer acts as a noop
 * @param dynamicPartitionOverwrite If true, Spark will overwrite partition directories at runtime
 *                                  dynamically, i.e., we first write files under a staging
 *                                  directory with partition path, e.g.
 *                                  /path/to/staging/a=1/b=1/xxx.parquet. When committing the job,
 *                                  we first clean up the corresponding partition directories at
 *                                  destination path, e.g. /path/to/destination/a=1/b=1, and move
 *                                  files from staging directory to the corresponding partition
 *                                  directories under destination path.

* dynamicPartitionOverwrite is disabled, otherwise, it will be [[stagingDir]]. We choose
* [[stagingDir]] over [[path]] to avoid potential collision of concurrent write jobs as the same
* output will be specified when writing to the same table dynamically.
*
* @return Path the desired output path.
*/
protected def getOutputPath(context: TaskAttemptContext): Path = {
if (dynamicPartitionOverwrite) {
val conf = context.getConfiguration
val outputPath = stagingDir.getFileSystem(conf).makeQualified(stagingDir)
outputPath
} else {
new Path(path)
}
}

protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
// set output path to stagingDir to avoid potential collision of multiple concurrent write tasks
Copy link
Contributor

@cloud-fan cloud-fan Sep 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when dynamicPartitionOverwrite=true, we already write files to the staging dir, see newTaskTempFile.

In fact, I don't see how the committer is related to the staging dir. If you look at commitTask and commitJob, we kind of manually commit the files in the staging dir, by moving it to the table dir.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, I don't see how the committer is related to the staging dir. If you look at commitTask and commitJob, we kind of manually commit the files in the staging dir, by moving it to the table dir.

Yes, we manually commit files in the staging dir. The problem is in the HadoopMapReduceCommitProtocol's commitJob calls, it first calls committer.commitJob(jobContext), which relates to the output path passes to the JobContext.

override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = {
committer.commitJob(jobContext)
if (hasValidPath) {
val (allAbsPathFiles, allPartitionPaths) =
taskCommits.map(_.obj.asInstanceOf[(Map[String, String], Set[String])]).unzip
val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
val filesToMove = allAbsPathFiles.foldLeft(Map[String, String]())(_ ++ _)

The OutputCommitter cannot work correctly if multiple OutputCommitter working on the same output path( concurrent writes to different partition to the same table, as the output would be the same: the table output location). After changing the output path to the staging dir, concurrent jobs can have different output dirs.

if (dynamicPartitionOverwrite) {
val newOutputPath = getOutputPath(context)
context.getConfiguration.set(FileOutputFormat.OUTDIR, newOutputPath.toString)
}

val format = context.getOutputFormatClass.getConstructor().newInstance()
// If OutputFormat is Configurable, we should set conf to it.
format match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ class SQLHadoopMapReduceCommitProtocol(
// The specified output committer is a FileOutputCommitter.
// So, we will use the FileOutputCommitter-specified constructor.
val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
committer = ctor.newInstance(new Path(path), context)
val outputPath = getOutputPath(context)
committer = ctor.newInstance(outputPath, context)
} else {
// The specified output committer is just an OutputCommitter.
// So, we will use the no-argument constructor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,22 @@ package org.apache.spark.sql.sources

import java.io.File
import java.sql.Timestamp
import java.util.concurrent.Semaphore

import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{OutputCommitter, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat

import org.apache.spark.TestUtils
import org.apache.spark.{SparkContext, TestUtils}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
import org.apache.spark.sql.test.{SharedSparkSession, TestSparkSession}
import org.apache.spark.util.Utils

private class OnlyDetectCustomPathFileCommitProtocol(jobId: String, path: String)
Expand All @@ -43,9 +47,34 @@ private class OnlyDetectCustomPathFileCommitProtocol(jobId: String, path: String
}
}

private class DetectCorrectOutputPathFileCommitProtocol(
jobId: String, path: String, dynamicPartitionOverwrite: Boolean)
extends SQLHadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite)
with Serializable with Logging {

override def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
val committer = super.setupCommitter(context)

val newOutputPath = context.getConfiguration.get(FileOutputFormat.OUTDIR, "")
if (dynamicPartitionOverwrite) {
assert(new Path(newOutputPath).getName.startsWith(".spark-staging"))
} else {
assert(newOutputPath == path)
}
committer
}
}

class PartitionedWriteSuite extends QueryTest with SharedSparkSession {
import testImplicits._

// create sparkSession with 4 cores to support concurrent write.
override protected def createSparkSession = new TestSparkSession(
new SparkContext(
"local[4]",
"test-partitioned-write-context",
sparkConf.set("spark.sql.testkey", "true")))

test("write many partitions") {
val path = Utils.createTempDir()
path.delete()
Expand Down Expand Up @@ -156,4 +185,65 @@ class PartitionedWriteSuite extends QueryTest with SharedSparkSession {
}
}
}

test("Output path should be staging dir when dynamicPartitionOverwrite is enabled") {
withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString) {
withTable("t") {
withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
classOf[DetectCorrectOutputPathFileCommitProtocol].getName) {
Seq((1, 2)).toDF("a", "b")
.write
.partitionBy("b")
.mode("overwrite")
.saveAsTable("t")
}
}
}
}

test("Concurrent write to the same table with different partitions should be possible") {
withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString) {
withTable("t") {
val sem = new Semaphore(0)
Seq((1, 2)).toDF("a", "b")
.write
.partitionBy("b")
.mode("overwrite")
.saveAsTable("t")

val df1 = spark.range(0, 10).map(x => (x, 1)).toDF("a", "b")
val df2 = spark.range(0, 10).map(x => (x, 2)).toDF("a", "b")
val dfs = Seq(df1, df2)

var throwable: Option[Throwable] = None
for (i <- 0 until 2) {
new Thread {
override def run(): Unit = {
try {
dfs(i)
.write
.mode("overwrite")
.insertInto("t")
} catch {
case t: Throwable =>
throwable = Some(t)
} finally {
sem.release()
}
}
}.start()
}
// make sure writing table in two threads are executed.
sem.acquire(2)
throwable.foreach { t => throw improveStackTrace(t) }
checkAnswer(spark.sql("select a, b from t where b = 1"), df1)
checkAnswer(spark.sql("select a, b from t where b = 2"), df2)
}
}
}

private def improveStackTrace(t: Throwable): Throwable = {
t.setStackTrace(t.getStackTrace ++ Thread.currentThread.getStackTrace)
t
}
}