Skip to content

Conversation

@cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

Currently, the batch and streaming file writers generate the file name a bit differently:

  1. The batch file writer, generates the file name with partition ID and job id (which is a unique UUID)
  2. The streaming file writer, generates the file name with partition ID and a fresh UUID

The reason for it is:

  1. The file output committer for batch creates a unique staging directory for each task attempt, so partition ID is good enough to avoid file name collision, as concurrent task attempts (task retry, speculative tasks) are well handled by the staging directory.
  2. The file output committe for streaming does not use staging directories. It writes files to the final path directly and uses a manifest file to track the committed files. Thus, partition ID is not sufficient to avoid file name collision. That's why we add a fresh UUID to the file name.

This PR proposes to unify the file name, by putting task attempt ID (which must be unique within the job) and the job ID in the file name.

Why are the changes needed?

Remove confusion when people try to understand how Spark generates file names. We can further refactor the code later to move the file name generation outside of the output committer.

Does this PR introduce any user-facing change?

No. The read side doesn't care about file name at all, and only care about how to list files. No backward compatibility issues.

How was this patch tested?

existing tests

@cloud-fan
Copy link
Contributor Author

cc @viirya @squito @HeartSaVioR

d => new Path(workDir, d)
}.getOrElse(workDir)
val file = new Path(parent, getFilename(taskContext, ext))
val file = new Path(parent, getFilename(ext))
Copy link
Contributor Author

@cloud-fan cloud-fan Jun 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this change affects this new committer, but I think it should be a positive change. The file name now use task attempt id instead of partition id, which is "more unique".

@steveloughran

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit protocols MUST NOT contain any assumptions about filenames. It would be silly.

Well, almost not. try creating a file with .pending or .pendingset in the magic committer and it'd be very confused. (Maybe we should change that to something really obscure...)

// the file name is fine and won't overflow.
val split = taskContext.getTaskAttemptID.getTaskID.getId
f"part-$split%05d-$jobId$ext"
f"part-$taskId%05d-$jobId$ext"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A more aggressive way is to simply use a fresh UUID here, but I'm not sure if that's better. cc @zsxwing

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously it uses task id after part-, now this is taskAttemptId. Is it still the same format as before, e.g. part-xxxxx-?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the value will be very different. For one query, the partition id always starts with 0. But task attempt id is unique within a spark application and won't be reset for a new query.

If we do want to keep the part-00000 prefix for some reason, we can also apply the naming rule from streaming to batch. I don't know who will care about the final naming. The commit protocols I'm aware of only care about file listing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

part # is handy for a bit of blame assignment. FWIW the name "part" can be configured in "mapreduce.output.basename". Don't know if anyone does.

Now, the v2 committer whose lack of task commit idempotency is well known is only going be able to recover from a failure partway through task attempt commit if the second attempt creates files with the same name. This should not be a barrier to having better names as, well, it's still broken.

But task attempt id is unique within a spark application and won't be reset for a new query.

this true? I think really need to understand differences between spark job, task ID and attempt IDs and the YARN ones, which as we know, have had duplicate job IDs until SPARK-33402.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark has: job id -> stage id -> partition ID

job id is simply a UUID
stage id is an integer starting from 0, globally unique within the spark application
partition ID is an integer starting from 0, unique within a stage

Eash task does not only have partition ID, but also has attempt ID, which is an integer starting from 0, globally unique within the spark application. There is also an attempt number, which starts from 0 and increases by one for each attempt of this task.

@SparkQA
Copy link

SparkQA commented Jun 21, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44612/

// the file name is fine and won't overflow.
val split = taskContext.getTaskAttemptID.getTaskID.getId
val uuid = UUID.randomUUID.toString
val filename = f"part-$split%05d-$uuid$ext"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2. The file output committe for streaming does not use staging directories. It writes files to the final path directly and uses a manifest file to track the committed files. Thus, partition ID is not sufficient to avoid file name collision. That's why we add a fresh UUID to the file name.

Could you explain this? Currently ManifestFileCommitProtocol should always pick up a new uuid for each file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently it does so to avoid file name collision, but I think it's overkill and we can use "task attempt id + job id" to avoid name collision as well, which is more consistent with the batch side.

It may also be useful to include the job id in the file name like the batch side does, so that people can see which files were written by the same job.

@SparkQA
Copy link

SparkQA commented Jun 21, 2021

Test build #140084 has finished for PR 33002 at commit 715ce3c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks fine. If we'd like to have consistent filename, how about use a common method to assign/generate the filenames?

@steveloughran
Copy link
Contributor

ok, this gets into a world of fun; thanks for mentioning me.

Copy link
Contributor

@steveloughran steveloughran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any problems with this. We have had problems with S3A staging committer (And apparently occasionally with the classic FileOutputCommitter from >1 job starting in the same second and the generated YARN job ID being duplicate. Having #30319 in is a prerequisite here.

Now, the S3A committers (And I should do the same for the IntermediateManifestCommitter of apache/hadoop#2971 ) will use the value of "spark.sql.sources.writeJobUUID" as their unique ID in preference to anything else. That is, if set, it trusts spark. This is what spark used to do, stopped for a bit, had restored. Please can you keep this option and set it to the job uuid. thanks.

// the file name is fine and won't overflow.
val split = taskContext.getTaskAttemptID.getTaskID.getId
f"part-$split%05d-$jobId$ext"
f"part-$taskId%05d-$jobId$ext"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

part # is handy for a bit of blame assignment. FWIW the name "part" can be configured in "mapreduce.output.basename". Don't know if anyone does.

Now, the v2 committer whose lack of task commit idempotency is well known is only going be able to recover from a failure partway through task attempt commit if the second attempt creates files with the same name. This should not be a barrier to having better names as, well, it's still broken.

But task attempt id is unique within a spark application and won't be reset for a new query.

this true? I think really need to understand differences between spark job, task ID and attempt IDs and the YARN ones, which as we know, have had duplicate job IDs until SPARK-33402.

d => new Path(workDir, d)
}.getOrElse(workDir)
val file = new Path(parent, getFilename(taskContext, ext))
val file = new Path(parent, getFilename(ext))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit protocols MUST NOT contain any assumptions about filenames. It would be silly.

Well, almost not. try creating a file with .pending or .pendingset in the magic committer and it'd be very confused. (Maybe we should change that to something really obscure...)

// To avoid file name collision, we should generate a new job ID for every write job, instead
// of using batchId, as we may use the same batchId to write files again, if the streaming job
// fails and we restore from the checkpoint.
val jobId = java.util.UUID.randomUUID().toString
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One change of SPARK-33402 was including some timestamp/version info. That's potentially quite handy later just to see when things were created/order

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the job id for spark file commit protocol. In Hadoop JobId, we do append the timestamp info: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L266

But that's a different story.

// collide if the file name also includes job ID. The Hadoop task id is equivalent to Spark's
// partitionId, which is not unique within the write job, for cases like task retry or
// speculative tasks.
val taskId = TaskContext.get.taskAttemptId()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifically: Hadoop Task ID MUST be the same for all task attempts, so that committers can commit the output of more than one task attempt by renaming the the Task Attempt dir to output/_temporary/jobAttempt/taskID ; as only one task commit can do this (Assuming fs has atomic rename; google GCS doesn't), you get unique output. My WiP manifest committer creates a JSON manifest with task ID in the filename for the same reason: only one file can be committed by file rename (Atomic on GCS as well as azure).

Copy link
Contributor Author

@cloud-fan cloud-fan Jun 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hadoop Task ID MUST be the same for all task attempts

This doesn't change: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L268

This PR is only to unify the file name generated by the builtin Spark file commit protocol, and doesn't change anything in Hadoop Job/Task setting.

// Spark task attempt ID here.
val taskId = TaskContext.get.taskAttemptId()
// The file name looks like part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
// Note that %05d does not truncate the taskId, so if we have more than 100000 tasks,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taskId -> taskAttemptId?

// speculative tasks.
val taskId = TaskContext.get.taskAttemptId()
// The file name looks like part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
// Note that %05d does not truncate the taskId, so if we have more than 100000 tasks,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same, taskId -> taskAttemptId?

@github-actions
Copy link

github-actions bot commented Oct 3, 2021

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Oct 3, 2021
@github-actions github-actions bot closed this Oct 4, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants