Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.internal.io

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.TaskAttemptContext

/**
* A [[FileNamingProtocol]] implementation to write output data in batch processing.
*/
class BatchFileNamingProtocol(jobId: String) extends FileNamingProtocol with Serializable {

override def getTaskTempPath(
taskContext: TaskAttemptContext, fileContext: FileContext): String = {
// The file name looks like part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003-c000.parquet
// Note that %05d does not truncate the split number, so if we have more than 100000 tasks,
// the file name is fine and won't overflow.
val split = taskContext.getTaskAttemptID.getTaskID.getId
val prefix = fileContext.prefix.getOrElse("")
val ext = fileContext.ext
val filename = f"${prefix}part-$split%05d-$jobId$ext"

fileContext.relativeDir.map {
d => new Path(d, filename).toString
}.getOrElse(filename)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,35 +73,31 @@ abstract class FileCommitProtocol extends Logging {
* Notifies the commit protocol to add a new file, and gets back the full path that should be
* used. Must be called on the executors when running tasks.
*
* Note that the returned temp file may have an arbitrary path. The commit protocol only
* promises that the file will be at the location specified by the arguments after job commit.
* Note that "relativePath" parameter specifies the relative path of returned temp file. The full
* path is left to the commit protocol to decide. The commit protocol only promises that the file
* will be at the location specified by the relative path after job commits.
*
* A full file path consists of the following parts:
* 1. the base path
* 2. some sub-directory within the base path, used to specify partitioning
* 3. file prefix, usually some unique job id with the task id
* 4. bucket id
* 5. source specific file extension, e.g. ".snappy.parquet"
*
* The "dir" parameter specifies 2, and "ext" parameter specifies both 4 and 5, and the rest
* are left to the commit protocol implementation to decide.
*
* Important: it is the caller's responsibility to add uniquely identifying content to "ext"
* if a task is going to write out multiple files to the same dir. The file commit protocol only
* guarantees that files written by different tasks will not conflict.
* Important: it is the caller's responsibility to add uniquely identifying content to
* "relativePath" if a task is going to write out multiple files to the same directory. The file
* commit protocol only guarantees that files written by different tasks will not conflict.
*/
def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String
def newTaskTempFile(taskContext: TaskAttemptContext, relativePath: String): String

/**
* Similar to newTaskTempFile(), but allows files to committed to an absolute output location.
* Depending on the implementation, there may be weaker guarantees around adding files this way.
*
* Important: it is the caller's responsibility to add uniquely identifying content to "ext"
* if a task is going to write out multiple files to the same dir. The file commit protocol only
* guarantees that files written by different tasks will not conflict.
* "relativePath" parameter specifies the relative path of returned temp file, and "finalPath"
* parameter specifies the full path of file after job commit. The commit protocol promises that
* the file will be at the location specified by the "finalPath" after job commits.
*
* Important: it is the caller's responsibility to add uniquely identifying content to
* "relativePath" and "finalPath" if a task is going to write out multiple files to the same
* directory. The file commit protocol only guarantees that files written by different tasks will
* not conflict.
*/
def newTaskTempFileAbsPath(
taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String
taskContext: TaskAttemptContext, relativePath: String, finalPath: String): String
Copy link
Contributor

@cloud-fan cloud-fan Jun 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: I can see that, this API makes the code simpler, but it makes the semantic a bit more complicated. What if the final path doesn't have the same file name as the relativePath? Maybe it's better to have fileName: String, targetDir: String. Then the semantic is pretty here: the impl should commit the new file to the target dir.


/**
* Commits a task after the writes succeed. Must be called on the executors when running tasks.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.internal.io

import org.apache.hadoop.mapreduce.TaskAttemptContext

/**
* An interface to define how a single Spark job names its outputs. Two notes:
*
* 1. Implementations must be serializable, as the instance instantiated on the driver
* will be used for tasks on executors.
* 2. An instance should not be reused across multiple Spark jobs.
*
* The proper way to call is:
*
* As part of each task's execution, whenever a new output file needs be created, executor calls
* [[getTaskTempPath]] to get a valid relative file path before commit.
*/
abstract class FileNamingProtocol {

/**
* Gets the relative path should be used for the output file.
*
* Important: it is the caller's responsibility to add uniquely identifying content to
* "fileContext" if a task is going to write out multiple files to the same directory. The file
* naming protocol only guarantees that files written by different tasks will not conflict.
*/
def getTaskTempPath(taskContext: TaskAttemptContext, fileContext: FileContext): String
}

/**
* The context for Spark output file. This is used by [[FileNamingProtocol]] to create file path.
*
* @param ext Source specific file extension, e.g. ".snappy.parquet".
* @param relativeDir Relative directory of file. Can be used for writing dynamic partitions.
* E.g., "a=1/b=2" is directory for partition (a=1, b=2).
* @param prefix file prefix.
*/
final case class FileContext(
ext: String,
relativeDir: Option[String],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not very sure about it. It seems more clear to me to let FileNamingProtocol only generate filename, and the caller side should construct the proper relative path w.r.t. the partition dir.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same to ext and prefix: can we let the caller side to pretend/append ext/prefix to the generated file name?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the information in FileContext is not something that the impl can customize: the generated file name must have ext and the end, prefix at the beginning, and relativeDir as the parent dir. Then it's better to let the caller side to guarantee it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the API can be simply named getTaskFileName.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API is only to abstract the naming differences between batch and streaming file writing. If that's not necessary, maybe we can remove this abstraction entirely.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

jobId is generated with UUID as well, I don't see why streaming write needs to generate a new UUID per file, instead of using job id.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm cleaning this up in #33002

prefix: Option[String])
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.internal.io

import java.io.IOException
import java.util.{Date, UUID}
import java.util.Date

import scala.collection.mutable
import scala.util.Try
Expand Down Expand Up @@ -104,7 +104,7 @@ class HadoopMapReduceCommitProtocol(
* The staging directory of this write job. Spark uses it to deal with files with absolute output
* path, or writing data into partitioned directory with dynamicPartitionOverwrite=true.
*/
protected def stagingDir = getStagingDir(path, jobId)
protected def stagingDir: Path = getStagingDir(path, jobId)

protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
val format = context.getOutputFormatClass.getConstructor().newInstance()
Expand All @@ -116,50 +116,30 @@ class HadoopMapReduceCommitProtocol(
format.getOutputCommitter(context)
}

override def newTaskTempFile(
taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
val filename = getFilename(taskContext, ext)

override def newTaskTempFile(taskContext: TaskAttemptContext, relativePath: String): String = {
val stagingDir: Path = committer match {
// For FileOutputCommitter it has its own staging path called "work path".
case f: FileOutputCommitter =>
if (dynamicPartitionOverwrite) {
assert(dir.isDefined,
val dir = new Path(relativePath).getParent.toString
assert(dir.nonEmpty,
"The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.")
partitionPaths += dir.get
partitionPaths += dir
}
new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path))
case _ => new Path(path)
}

dir.map { d =>
new Path(new Path(stagingDir, d), filename).toString
}.getOrElse {
new Path(stagingDir, filename).toString
}
new Path(stagingDir, relativePath).toString
}

override def newTaskTempFileAbsPath(
taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = {
val filename = getFilename(taskContext, ext)
val absOutputPath = new Path(absoluteDir, filename).toString

// Include a UUID here to prevent file collisions for one task writing to different dirs.
// In principle we could include hash(absoluteDir) instead but this is simpler.
val tmpOutputPath = new Path(stagingDir, UUID.randomUUID().toString() + "-" + filename).toString

addedAbsPathFiles(tmpOutputPath) = absOutputPath
taskContext: TaskAttemptContext, relativePath: String, finalPath: String): String = {
val tmpOutputPath = new Path(stagingDir, relativePath).toString
addedAbsPathFiles(tmpOutputPath) = finalPath
tmpOutputPath
}

protected def getFilename(taskContext: TaskAttemptContext, ext: String): String = {
// The file name looks like part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003-c000.parquet
// Note that %05d does not truncate the split number, so if we have more than 100000 tasks,
// the file name is fine and won't overflow.
val split = taskContext.getTaskAttemptID.getTaskID.getId
f"part-$split%05d-$jobId$ext"
}

override def setupJob(jobContext: JobContext): Unit = {
// Setup IDs
val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,24 +121,14 @@ class PathOutputCommitProtocol(
* Create a temporary file for a task.
*
* @param taskContext task context
* @param dir optional subdirectory
* @param ext file extension
* @return a path as a string
* @param relativePath relative path as a string for file
* @return the full path as a string for file
*/
override def newTaskTempFile(
taskContext: TaskAttemptContext,
dir: Option[String],
ext: String): String = {

val workDir = committer.getWorkPath
val parent = dir.map {
d => new Path(workDir, d)
}.getOrElse(workDir)
val file = new Path(parent, getFilename(taskContext, ext))
logTrace(s"Creating task file $file for dir $dir and ext $ext")
override def newTaskTempFile(taskContext: TaskAttemptContext, relativePath: String): String = {
val file = new Path(committer.getWorkPath, relativePath)
logTrace(s"Creating task file $file with relative path $relativePath")
file.toString
}

}

object PathOutputCommitProtocol {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1405,9 +1405,9 @@ object QueryExecutionErrors {
s"Multiple streaming queries are concurrently using $path", e)
}

def addFilesWithAbsolutePathUnsupportedError(commitProtocol: String): Throwable = {
def addFilesWithAbsolutePathUnsupportedError(protocol: String): Throwable = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - sorry, this does not need. Was calling it from naming protocol as well in one iteration. Will change.

new UnsupportedOperationException(
s"$commitProtocol does not support adding files with an absolute path")
s"$protocol does not support adding files with an absolute path")
}

def microBatchUnsupportedByDataSourceError(srcName: String): Throwable = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
*/
package org.apache.spark.sql.execution.datasources

import java.util.UUID

import scala.collection.mutable

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.TaskAttemptContext

import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.internal.io.{FileCommitProtocol, FileContext, FileNamingProtocol}
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
Expand Down Expand Up @@ -122,7 +124,8 @@ class EmptyDirectoryDataWriter(
class SingleDirectoryDataWriter(
description: WriteJobDescription,
taskAttemptContext: TaskAttemptContext,
committer: FileCommitProtocol)
committer: FileCommitProtocol,
namingProtocol: FileNamingProtocol)
extends FileFormatDataWriter(description, taskAttemptContext, committer) {
private var fileCounter: Int = _
private var recordsInFile: Long = _
Expand All @@ -133,11 +136,11 @@ class SingleDirectoryDataWriter(
recordsInFile = 0
releaseResources()

val ext = description.outputWriterFactory.getFileExtension(taskAttemptContext)
val ext = f"-c$fileCounter%03d" +
description.outputWriterFactory.getFileExtension(taskAttemptContext)
val currentPath = committer.newTaskTempFile(
taskAttemptContext,
None,
f"-c$fileCounter%03d" + ext)
namingProtocol.getTaskTempPath(taskAttemptContext, FileContext(ext, None, None)))

currentWriter = description.outputWriterFactory.newInstance(
path = currentPath,
Expand Down Expand Up @@ -169,7 +172,8 @@ class SingleDirectoryDataWriter(
abstract class BaseDynamicPartitionDataWriter(
description: WriteJobDescription,
taskAttemptContext: TaskAttemptContext,
committer: FileCommitProtocol)
committer: FileCommitProtocol,
namingProtocol: FileNamingProtocol)
extends FileFormatDataWriter(description, taskAttemptContext, committer) {

/** Flag saying whether or not the data to be written out is partitioned. */
Expand Down Expand Up @@ -262,9 +266,15 @@ abstract class BaseDynamicPartitionDataWriter(
description.customPartitionLocations.get(PartitioningUtils.parsePathFragment(dir))
}
val currentPath = if (customPath.isDefined) {
committer.newTaskTempFileAbsPath(taskAttemptContext, customPath.get, ext)
// Include a UUID here to prevent file collisions for one task writing to different dirs.
val relativePath = UUID.randomUUID().toString + "-" +
namingProtocol.getTaskTempPath(taskAttemptContext, FileContext(ext, None, None))
val finalPath = new Path(customPath.get, relativePath).toString
committer.newTaskTempFileAbsPath(taskAttemptContext, relativePath, finalPath)
} else {
committer.newTaskTempFile(taskAttemptContext, partDir, ext)
val relativePath = namingProtocol.getTaskTempPath(
taskAttemptContext, FileContext(ext, partDir, None))
committer.newTaskTempFile(taskAttemptContext, relativePath)
}

currentWriter = description.outputWriterFactory.newInstance(
Expand Down Expand Up @@ -314,8 +324,10 @@ abstract class BaseDynamicPartitionDataWriter(
class DynamicPartitionDataSingleWriter(
description: WriteJobDescription,
taskAttemptContext: TaskAttemptContext,
committer: FileCommitProtocol)
extends BaseDynamicPartitionDataWriter(description, taskAttemptContext, committer) {
committer: FileCommitProtocol,
namingProtocol: FileNamingProtocol)
extends BaseDynamicPartitionDataWriter(
description, taskAttemptContext, committer, namingProtocol) {

private var currentPartitionValues: Option[UnsafeRow] = None
private var currentBucketId: Option[Int] = None
Expand Down Expand Up @@ -361,8 +373,10 @@ class DynamicPartitionDataConcurrentWriter(
description: WriteJobDescription,
taskAttemptContext: TaskAttemptContext,
committer: FileCommitProtocol,
namingProtocol: FileNamingProtocol,
concurrentOutputWriterSpec: ConcurrentOutputWriterSpec)
extends BaseDynamicPartitionDataWriter(description, taskAttemptContext, committer)
extends BaseDynamicPartitionDataWriter(
description, taskAttemptContext, committer, namingProtocol)
with Logging {

/** Wrapper class to index a unique concurrent output writer. */
Expand Down
Loading