diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index d9d7b06cdb8c..6465cc7df6dd 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -92,6 +92,35 @@ abstract class FileCommitProtocol extends Logging { */ def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String + /** + * Notifies the commit protocol to add a new file, and gets back the full path that should be + * used. Must be called on the executors when running tasks. + * + * Note that the returned temp file may have an arbitrary path. The commit protocol only + * promises that the file will be at the location specified by the arguments after job commit. + * + * The "dir" parameter specifies the sub-directory within the base path, used to specify + * partitioning. The "spec" parameter specifies the file name. The rest are left to the commit + * protocol implementation to decide. + * + * Important: it is the caller's responsibility to add uniquely identifying content to "spec" + * if a task is going to write out multiple files to the same dir. The file commit protocol only + * guarantees that files written by different tasks will not conflict. + * + * This API should be implemented and called, instead of + * [[newTaskTempFile(taskContest, dir, ext)]]. Provide a default implementation here to be + * backward compatible with custom [[FileCommitProtocol]] implementations before Spark 3.2.0. + */ + def newTaskTempFile( + taskContext: TaskAttemptContext, dir: Option[String], spec: FileNameSpec): String = { + if (spec.prefix.isEmpty) { + newTaskTempFile(taskContext, dir, spec.suffix) + } else { + throw new UnsupportedOperationException(s"${getClass.getSimpleName}.newTaskTempFile does " + + s"not support file name prefix: ${spec.prefix}") + } + } + /** * Similar to newTaskTempFile(), but allows files to committed to an absolute output location. * Depending on the implementation, there may be weaker guarantees around adding files this way. @@ -103,6 +132,34 @@ abstract class FileCommitProtocol extends Logging { def newTaskTempFileAbsPath( taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String + /** + * Similar to newTaskTempFile(), but allows files to committed to an absolute output location. + * Depending on the implementation, there may be weaker guarantees around adding files this way. + * + * The "absoluteDir" parameter specifies the final absolute directory of file. The "spec" + * parameter specifies the file name. The rest are left to the commit protocol implementation to + * decide. + * + * Important: it is the caller's responsibility to add uniquely identifying content to "spec" + * if a task is going to write out multiple files to the same dir. The file commit protocol only + * guarantees that files written by different tasks will not conflict. + * + * This API should be implemented and called, instead of + * [[newTaskTempFileAbsPath(taskContest, absoluteDir, ext)]]. Provide a default implementation + * here to be backward compatible with custom [[FileCommitProtocol]] implementations before + * Spark 3.2.0. + */ + def newTaskTempFileAbsPath( + taskContext: TaskAttemptContext, absoluteDir: String, spec: FileNameSpec): String = { + if (spec.prefix.isEmpty) { + newTaskTempFileAbsPath(taskContext, absoluteDir, spec.suffix) + } else { + throw new UnsupportedOperationException( + s"${getClass.getSimpleName}.newTaskTempFileAbsPath does not support file name prefix: " + + s"${spec.prefix}") + } + } + /** * Commits a task after the writes succeed. Must be called on the executors when running tasks. */ @@ -174,3 +231,12 @@ object FileCommitProtocol extends Logging { new Path(path, ".spark-staging-" + jobId) } } + +/** + * The specification for Spark output file name. + * This is used by [[FileCommitProtocol]] to create full path of file. + * + * @param prefix Prefix of file. + * @param suffix Suffix of file. + */ +final case class FileNameSpec(prefix: String, suffix: String)