Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.mapred.SparkHadoopMapRedUtil

Expand Down Expand Up @@ -118,7 +119,7 @@ class HadoopMapReduceCommitProtocol(

override def newTaskTempFile(
taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
val filename = getFilename(taskContext, ext)
val filename = getFilename(ext)

val stagingDir: Path = committer match {
// For FileOutputCommitter it has its own staging path called "work path".
Expand All @@ -141,7 +142,7 @@ class HadoopMapReduceCommitProtocol(

override def newTaskTempFileAbsPath(
taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = {
val filename = getFilename(taskContext, ext)
val filename = getFilename(ext)
val absOutputPath = new Path(absoluteDir, filename).toString

// Include a UUID here to prevent file collisions for one task writing to different dirs.
Expand All @@ -152,12 +153,21 @@ class HadoopMapReduceCommitProtocol(
tmpOutputPath
}

protected def getFilename(taskContext: TaskAttemptContext, ext: String): String = {
// The file name looks like part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003-c000.parquet
// Note that %05d does not truncate the split number, so if we have more than 100000 tasks,
protected def getFilename(ext: String): String = {
// Use the Spark task attempt ID which is unique within the write job, so that file writes never
// collide if the file name also includes job ID. The Hadoop task id is equivalent to Spark's
// partitionId, which is not unique within the write job, for cases like task retry or
// speculative tasks.
// NOTE: this is not necessary for certain Hadoop output committers, as they will create a
// unique staging directory for each task attempt, so we don't need to worry about file name
// collision between different task attempts, and using Hadoop task ID/Spark partition ID is
// also fine. For extra safety and consistency with the streaming side, we always use the
// Spark task attempt ID here.
val taskId = TaskContext.get.taskAttemptId()
// The file name looks like part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
// Note that %05d does not truncate the taskId, so if we have more than 100000 tasks,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taskId -> taskAttemptId?

// the file name is fine and won't overflow.
val split = taskContext.getTaskAttemptID.getTaskID.getId
f"part-$split%05d-$jobId$ext"
f"part-$taskId%05d-$jobId$ext"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A more aggressive way is to simply use a fresh UUID here, but I'm not sure if that's better. cc @zsxwing

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously it uses task id after part-, now this is taskAttemptId. Is it still the same format as before, e.g. part-xxxxx-?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the value will be very different. For one query, the partition id always starts with 0. But task attempt id is unique within a spark application and won't be reset for a new query.

If we do want to keep the part-00000 prefix for some reason, we can also apply the naming rule from streaming to batch. I don't know who will care about the final naming. The commit protocols I'm aware of only care about file listing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

part # is handy for a bit of blame assignment. FWIW the name "part" can be configured in "mapreduce.output.basename". Don't know if anyone does.

Now, the v2 committer whose lack of task commit idempotency is well known is only going be able to recover from a failure partway through task attempt commit if the second attempt creates files with the same name. This should not be a barrier to having better names as, well, it's still broken.

But task attempt id is unique within a spark application and won't be reset for a new query.

this true? I think really need to understand differences between spark job, task ID and attempt IDs and the YARN ones, which as we know, have had duplicate job IDs until SPARK-33402.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark has: job id -> stage id -> partition ID

job id is simply a UUID
stage id is an integer starting from 0, globally unique within the spark application
partition ID is an integer starting from 0, unique within a stage

Eash task does not only have partition ID, but also has attempt ID, which is an integer starting from 0, globally unique within the spark application. There is also an attempt number, which starts from 0 and increases by one for each attempt of this task.

}

override def setupJob(jobContext: JobContext): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class PathOutputCommitProtocol(
val parent = dir.map {
d => new Path(workDir, d)
}.getOrElse(workDir)
val file = new Path(parent, getFilename(taskContext, ext))
val file = new Path(parent, getFilename(ext))
Copy link
Contributor Author

@cloud-fan cloud-fan Jun 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this change affects this new committer, but I think it should be a positive change. The file name now use task attempt id instead of partition id, which is "more unique".

@steveloughran

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit protocols MUST NOT contain any assumptions about filenames. It would be silly.

Well, almost not. try creating a file with .pending or .pendingset in the magic committer and it'd be very confused. (Maybe we should change that to something really obscure...)

logTrace(s"Creating task file $file for dir $dir and ext $ext")
file.toString
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,13 @@ class FileStreamSink(
if (batchId <= fileLog.getLatestBatchId().getOrElse(-1L)) {
logInfo(s"Skipping already committed batch $batchId")
} else {
// To avoid file name collision, we should generate a new job ID for every write job, instead
// of using batchId, as we may use the same batchId to write files again, if the streaming job
// fails and we restore from the checkpoint.
val jobId = java.util.UUID.randomUUID().toString
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One change of SPARK-33402 was including some timestamp/version info. That's potentially quite handy later just to see when things were created/order

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the job id for spark file commit protocol. In Hadoop JobId, we do append the timestamp info: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L266

But that's a different story.

val committer = FileCommitProtocol.instantiate(
className = sparkSession.sessionState.conf.streamingFileCommitProtocolClass,
jobId = batchId.toString,
jobId = jobId,
outputPath = path)

committer match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
package org.apache.spark.sql.execution.streaming

import java.io.IOException
import java.util.UUID

import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
Expand Down Expand Up @@ -113,12 +113,15 @@ class ManifestFileCommitProtocol(jobId: String, path: String)

override def newTaskTempFile(
taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
// The file name looks like part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
// Note that %05d does not truncate the split number, so if we have more than 100000 tasks,
// Use the Spark task attempt ID which is unique within the write job, so that file writes never
// collide if the file name also includes job ID. The Hadoop task id is equivalent to Spark's
// partitionId, which is not unique within the write job, for cases like task retry or
// speculative tasks.
val taskId = TaskContext.get.taskAttemptId()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifically: Hadoop Task ID MUST be the same for all task attempts, so that committers can commit the output of more than one task attempt by renaming the the Task Attempt dir to output/_temporary/jobAttempt/taskID ; as only one task commit can do this (Assuming fs has atomic rename; google GCS doesn't), you get unique output. My WiP manifest committer creates a JSON manifest with task ID in the filename for the same reason: only one file can be committed by file rename (Atomic on GCS as well as azure).

Copy link
Contributor Author

@cloud-fan cloud-fan Jun 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hadoop Task ID MUST be the same for all task attempts

This doesn't change: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L268

This PR is only to unify the file name generated by the builtin Spark file commit protocol, and doesn't change anything in Hadoop Job/Task setting.

// The file name looks like part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
// Note that %05d does not truncate the taskId, so if we have more than 100000 tasks,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same, taskId -> taskAttemptId?

// the file name is fine and won't overflow.
val split = taskContext.getTaskAttemptID.getTaskID.getId
val uuid = UUID.randomUUID.toString
val filename = f"part-$split%05d-$uuid$ext"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2. The file output committe for streaming does not use staging directories. It writes files to the final path directly and uses a manifest file to track the committed files. Thus, partition ID is not sufficient to avoid file name collision. That's why we add a fresh UUID to the file name.

Could you explain this? Currently ManifestFileCommitProtocol should always pick up a new uuid for each file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently it does so to avoid file name collision, but I think it's overkill and we can use "task attempt id + job id" to avoid name collision as well, which is more consistent with the batch side.

It may also be useful to include the job id in the file name like the batch side does, so that people can see which files were written by the same job.

val filename = f"part-$taskId%05d-$jobId$ext"

val file = dir.map { d =>
new Path(new Path(path, d), filename).toString
Expand Down