From 14f48797cd9f83b9e271ab7801aa9e28604f737c Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Fri, 11 Jun 2021 00:35:27 -0700 Subject: [PATCH 01/10] Decouple file naming from FileCommitProtocol --- .../internal/io/DefaultNamingProtocol.scala | 52 ++++++++ .../internal/io/FileCommitProtocol.scala | 21 ++++ .../internal/io/FileNamingProtocol.scala | 112 ++++++++++++++++++ .../io/HadoopMapReduceCommitProtocol.scala | 58 ++++----- .../io/HadoopMapReduceNamingProtocol.scala | 78 ++++++++++++ .../apache/spark/sql/internal/SQLConf.scala | 19 +++ .../datasources/FileFormatDataWriter.scala | 38 +++--- .../datasources/FileFormatWriter.scala | 14 ++- .../InsertIntoHadoopFsRelationCommand.scala | 12 +- .../datasources/v2/FileBatchWrite.scala | 7 +- .../execution/datasources/v2/FileWrite.scala | 14 ++- .../datasources/v2/FileWriterFactory.scala | 10 +- .../execution/streaming/FileStreamSink.scala | 12 +- .../sql/hive/execution/SaveAsHiveFile.scala | 16 ++- 14 files changed, 391 insertions(+), 72 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/internal/io/DefaultNamingProtocol.scala create mode 100644 core/src/main/scala/org/apache/spark/internal/io/FileNamingProtocol.scala create mode 100644 core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceNamingProtocol.scala diff --git a/core/src/main/scala/org/apache/spark/internal/io/DefaultNamingProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/DefaultNamingProtocol.scala new file mode 100644 index 000000000000..b150a03ef2d0 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/internal/io/DefaultNamingProtocol.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io + +import org.apache.hadoop.mapreduce.TaskAttemptContext + +/** + * An [[FileNamingProtocol]] implementation by default for custom [[FileCommitProtocol]] + * implementations. + * + * This delegates to call [[FileCommitProtocol.newTaskTempFile]] and + * [[FileCommitProtocol.newTaskTempFileAbsPath()]] to be backward compatible with + * custom [[FileCommitProtocol]] implementation before Spark 3.2.0. Newer implementation of + * [[FileCommitProtocol]] after Spark 3.2.0 should create own implementation of + * [[FileNamingProtocol]], instead of using this. + */ +class DefaultNamingProtocol( + jobId: String, + path: String, + commitProtocol: FileCommitProtocol) + extends FileNamingProtocol with Serializable { + + override def getTaskStagingPath( + taskContext: TaskAttemptContext, fileContext: FileContext): String = { + fileContext.absoluteDir match { + case Some(dir) => commitProtocol.newTaskTempFileAbsPath( + taskContext, dir, fileContext.ext) + case _ => commitProtocol.newTaskTempFile( + taskContext, fileContext.relativeDir, fileContext.ext) + } + } + + override def getTaskFinalPath( + taskContext: TaskAttemptContext, fileContext: FileContext): String = { + "" + } +} diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index d9d7b06cdb8c..cbc8b22407f7 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -90,6 +90,7 @@ abstract class FileCommitProtocol extends Logging { * if a task is going to write out multiple files to the same dir. The file commit protocol only * guarantees that files written by different tasks will not conflict. */ + @deprecated("use newTaskFile", "3.2.0") def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String /** @@ -100,9 +101,29 @@ abstract class FileCommitProtocol extends Logging { * if a task is going to write out multiple files to the same dir. The file commit protocol only * guarantees that files written by different tasks will not conflict. */ + @deprecated("use newTaskFile", "3.2.0") def newTaskTempFileAbsPath( taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String + /** + * Notifies the commit protocol that a new file is added. Must be called on the executors when + * running tasks. + * + * The "stagingPath" parameter is the current path of new file. The "finalPath" parameter if + * specified, is the final path of file. The "finalPath" parameter is optional here because + * caller can leave up to file commit protocol to decide the final path. + * + * Important: it is the caller's responsibility to add uniquely identifying content to + * `stagingPath` and `finalPath`. The file commit protocol only guarantees that files written by + * different tasks will not conflict. This API should be preferred to use instead of deprecated + * [[newTaskTempFile]] and [[newTaskTempFileAbsPath]]. + */ + def newTaskFile( + taskContext: TaskAttemptContext, stagingPath: String, finalPath: Option[String]): Unit = { + // No-op as default implementation to be backward compatible with custom [[FileCommitProtocol]] + // implementations before Spark 3.2.0. + } + /** * Commits a task after the writes succeed. Must be called on the executors when running tasks. */ diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileNamingProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileNamingProtocol.scala new file mode 100644 index 000000000000..d1607bb3a902 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/internal/io/FileNamingProtocol.scala @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io + +import org.apache.hadoop.mapreduce.TaskAttemptContext + +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +/** + * An interface to define how a single Spark job names its outputs. Three notes: + * + * 1. Implementations must be serializable, as the instance instantiated on the driver + * will be used for tasks on executors. + * 2. Implementations should have a constructor with 3 arguments: + * (jobId: String, path: String, commitProtocol: [[FileCommitProtocol]]) + * 3. An instance should not be reused across multiple Spark jobs. + * + * The proper way to call is: + * + * As part of each task's execution, whenever a new output file needs be created, executor calls + * [[getTaskStagingPath]] to get a valid file path before commit (i.e. "staging"). Optionally, + * executor can also call [[getTaskFinalPath]] to get a file path after commit (i.e. "final"). + * + * Important: Executor is expected to call [[FileCommitProtocol.newTaskFile]] afterwards to notify + * commit protocol a new file is added. + */ +abstract class FileNamingProtocol { + + /** + * Gets the full path should be used for the output file before commit (i.e. "staging"). + * + * Important: it is the caller's responsibility to add uniquely identifying content to + * "fileContext" if a task is going to write out multiple files to the same directory. The file + * naming protocol only guarantees that files written by different tasks will not conflict. + */ + def getTaskStagingPath(taskContext: TaskAttemptContext, fileContext: FileContext): String + + /** + * Gets the full path should be used for the output file after commit (i.e. "final"). + * + * Important: it is the caller's responsibility to add uniquely identifying content to + * "fileContext" if a task is going to write out multiple files to the same directory. The file + * naming protocol only guarantees that files written by different tasks will not conflict. + */ + def getTaskFinalPath(taskContext: TaskAttemptContext, fileContext: FileContext): String +} + +object FileNamingProtocol extends Logging { + + /** + * Instantiates a [[FileNamingProtocol]] using the given className. + */ + def instantiate( + className: String, + jobId: String, + outputPath: String, + commitProtocol: FileCommitProtocol): FileNamingProtocol = { + + logDebug(s"Creating file naming protocol $className; job $jobId; output=$outputPath;" + + s" commitProtocol=$commitProtocol") + val clazz = Utils.classForName[FileNamingProtocol](className) + // Try the constructor with arguments (jobId: String, outputPath: String, + // commitProtocol: [[FileCommitProtocol]]). + val ctor = clazz.getDeclaredConstructor( + classOf[String], classOf[String], classOf[FileCommitProtocol]) + ctor.newInstance(jobId, outputPath, commitProtocol) + } + + /** + * Gets the mapped [[FileNamingProtocol]] class name for a given [[FileCommitProtocol]]. + * This is used to get a [[FileNamingProtocol]] to use anyway when caller does not specify + * [[FileNamingProtocol]] in configuration. + */ + def getMappedProtocolClassName(commitProtocol: FileCommitProtocol): String = { + commitProtocol match { + case _: HadoopMapReduceCommitProtocol => classOf[HadoopMapReduceNamingProtocol].getName + case _ => classOf[DefaultNamingProtocol].getName + } + } +} + +/** + * The context for Spark output file. This is used by [[FileNamingProtocol]] to create file path. + * + * @param ext Extension of file. + * @param relativeDir Relative directory of file. Can be used for writing dynamic partitions. + * E.g., "a=1/b=2" is directory for partition (a=1, b=2). + * @param absoluteDir Absolute directory of file. Can be used for writing to custom location in + * file system. + * @param prefix Prefix of file. + */ +final case class FileContext( + ext: String, + relativeDir: Option[String], + absoluteDir: Option[String], + prefix: Option[String]) diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index c061d617fce4..31cfde34b65a 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -18,7 +18,7 @@ package org.apache.spark.internal.io import java.io.IOException -import java.util.{Date, UUID} +import java.util.Date import scala.collection.mutable import scala.util.Try @@ -104,7 +104,7 @@ class HadoopMapReduceCommitProtocol( * The staging directory of this write job. Spark uses it to deal with files with absolute output * path, or writing data into partitioned directory with dynamicPartitionOverwrite=true. */ - protected def stagingDir = getStagingDir(path, jobId) + def stagingDir: Path = getStagingDir(path, jobId) protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = { val format = context.getOutputFormatClass.getConstructor().newInstance() @@ -116,48 +116,30 @@ class HadoopMapReduceCommitProtocol( format.getOutputCommitter(context) } + @deprecated("use newTaskFile", "3.2.0") override def newTaskTempFile( taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { - val filename = getFilename(taskContext, ext) - - val stagingDir: Path = committer match { - // For FileOutputCommitter it has its own staging path called "work path". - case f: FileOutputCommitter => - if (dynamicPartitionOverwrite) { - assert(dir.isDefined, - "The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.") - partitionPaths += dir.get - } - new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path)) - case _ => new Path(path) - } - - dir.map { d => - new Path(new Path(stagingDir, d), filename).toString - }.getOrElse { - new Path(stagingDir, filename).toString - } + throw new UnsupportedOperationException } + @deprecated("use newTaskFile", "3.2.0") override def newTaskTempFileAbsPath( taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = { - val filename = getFilename(taskContext, ext) - val absOutputPath = new Path(absoluteDir, filename).toString - - // Include a UUID here to prevent file collisions for one task writing to different dirs. - // In principle we could include hash(absoluteDir) instead but this is simpler. - val tmpOutputPath = new Path(stagingDir, UUID.randomUUID().toString() + "-" + filename).toString - - addedAbsPathFiles(tmpOutputPath) = absOutputPath - tmpOutputPath + throw new UnsupportedOperationException } - protected def getFilename(taskContext: TaskAttemptContext, ext: String): String = { - // The file name looks like part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003-c000.parquet - // Note that %05d does not truncate the split number, so if we have more than 100000 tasks, - // the file name is fine and won't overflow. - val split = taskContext.getTaskAttemptID.getTaskID.getId - f"part-$split%05d-$jobId$ext" + override def newTaskFile( + taskContext: TaskAttemptContext, stagingPath: String, finalPath: Option[String]): Unit = { + finalPath match { + case Some(path) => addedAbsPathFiles(stagingPath) = path + case None => + committer match { + case _: FileOutputCommitter if dynamicPartitionOverwrite => + val dir = new Path(stagingPath).getParent.getName + partitionPaths += dir + case _ => + } + } } override def setupJob(jobContext: JobContext): Unit = { @@ -295,4 +277,8 @@ class HadoopMapReduceCommitProtocol( logWarning(s"Exception while aborting ${taskContext.getTaskAttemptID}", e) } } + + def getCommitter: OutputCommitter = { + committer + } } diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceNamingProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceNamingProtocol.scala new file mode 100644 index 000000000000..97f8ad104cb6 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceNamingProtocol.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io + +import java.util.UUID + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.TaskAttemptContext +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter + +/** + * An [[FileNamingProtocol]] implementation for [[HadoopMapReduceCommitProtocol]]. + */ +class HadoopMapReduceNamingProtocol( + jobId: String, + path: String, + commitProtocol: FileCommitProtocol) + extends FileNamingProtocol with Serializable { + + require(commitProtocol.isInstanceOf[HadoopMapReduceCommitProtocol]) + + private val hadoopMRCommitProtocol = commitProtocol.asInstanceOf[HadoopMapReduceCommitProtocol] + + override def getTaskStagingPath( + taskContext: TaskAttemptContext, fileContext: FileContext): String = { + val filename = getFilename(taskContext, fileContext) + fileContext.absoluteDir match { + case Some(_) => + new Path(hadoopMRCommitProtocol.stagingDir, UUID.randomUUID().toString + "-" + filename) + .toString + case _ => + val stagingDir: Path = hadoopMRCommitProtocol.getCommitter match { + // For FileOutputCommitter it has its own staging path called "work path". + case f: FileOutputCommitter => + new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path)) + case _ => new Path(path) + } + + fileContext.relativeDir.map { d => + new Path(new Path(stagingDir, d), filename).toString + }.getOrElse { + new Path(stagingDir, filename).toString + } + } + } + + override def getTaskFinalPath( + taskContext: TaskAttemptContext, fileContext: FileContext): String = { + require(fileContext.absoluteDir.isDefined) + val filename = getFilename(taskContext, fileContext) + new Path(fileContext.absoluteDir.get, filename).toString + } + + private def getFilename(taskContext: TaskAttemptContext, fileContext: FileContext): String = { + // The file name looks like part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003-c000.parquet + // Note that %05d does not truncate the split number, so if we have more than 100000 tasks, + // the file name is fine and won't overflow. + val split = taskContext.getTaskAttemptID.getTaskID.getId + val prefix = fileContext.prefix.getOrElse("") + val ext = fileContext.ext + f"${prefix}part-$split%05d-$jobId$ext" + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 9e4a4dde4f8b..3fb9f54f8fcf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1192,6 +1192,13 @@ object SQLConf { .createWithDefault( "org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol") + val FILE_NAMING_PROTOCOL_CLASS = + buildConf("spark.sql.sources.namingProtocolClass") + .version("3.2.0") + .internal() + .stringConf + .createOptional + val PARALLEL_PARTITION_DISCOVERY_THRESHOLD = buildConf("spark.sql.sources.parallelPartitionDiscovery.threshold") .doc("The maximum number of paths allowed for listing files at driver side. If the number " + @@ -1702,6 +1709,13 @@ object SQLConf { .stringConf .createWithDefault("org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol") + val STREAMING_FILE_NAMING_PROTOCOL_CLASS = + buildConf("spark.sql.streaming.namingProtocolClass") + .version("3.2.0") + .internal() + .stringConf + .createOptional + val STREAMING_MULTIPLE_WATERMARK_POLICY = buildConf("spark.sql.streaming.multipleWatermarkPolicy") .doc("Policy to calculate the global watermark value when there are multiple watermark " + @@ -3425,6 +3439,9 @@ class SQLConf extends Serializable with Logging { def streamingFileCommitProtocolClass: String = getConf(STREAMING_FILE_COMMIT_PROTOCOL_CLASS) + def streamingFileNamingProtocolClass: Option[String] = + getConf(SQLConf.STREAMING_FILE_NAMING_PROTOCOL_CLASS) + def fileSinkLogDeletion: Boolean = getConf(FILE_SINK_LOG_DELETION) def fileSinkLogCompactInterval: Int = getConf(FILE_SINK_LOG_COMPACT_INTERVAL) @@ -3685,6 +3702,8 @@ class SQLConf extends Serializable with Logging { def fileCommitProtocolClass: String = getConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS) + def fileNamingProtocolClass: Option[String] = getConf(SQLConf.FILE_NAMING_PROTOCOL_CLASS) + def parallelPartitionDiscoveryThreshold: Int = getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala index 7e5a8cce2783..3d9a4a6235da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.TaskAttemptContext import org.apache.spark.internal.Logging -import org.apache.spark.internal.io.FileCommitProtocol +import org.apache.spark.internal.io.{FileCommitProtocol, FileContext, FileNamingProtocol} import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec @@ -122,7 +122,8 @@ class EmptyDirectoryDataWriter( class SingleDirectoryDataWriter( description: WriteJobDescription, taskAttemptContext: TaskAttemptContext, - committer: FileCommitProtocol) + committer: FileCommitProtocol, + namingProtocol: FileNamingProtocol) extends FileFormatDataWriter(description, taskAttemptContext, committer) { private var fileCounter: Int = _ private var recordsInFile: Long = _ @@ -133,11 +134,11 @@ class SingleDirectoryDataWriter( recordsInFile = 0 releaseResources() - val ext = description.outputWriterFactory.getFileExtension(taskAttemptContext) - val currentPath = committer.newTaskTempFile( - taskAttemptContext, - None, - f"-c$fileCounter%03d" + ext) + val ext = f"-c$fileCounter%03d" + + description.outputWriterFactory.getFileExtension(taskAttemptContext) + val fileContext = FileContext(ext, None, None, None) + val currentPath = namingProtocol.getTaskStagingPath(taskAttemptContext, fileContext) + committer.newTaskFile(taskAttemptContext, currentPath, None) currentWriter = description.outputWriterFactory.newInstance( path = currentPath, @@ -169,7 +170,8 @@ class SingleDirectoryDataWriter( abstract class BaseDynamicPartitionDataWriter( description: WriteJobDescription, taskAttemptContext: TaskAttemptContext, - committer: FileCommitProtocol) + committer: FileCommitProtocol, + namingProtocol: FileNamingProtocol) extends FileFormatDataWriter(description, taskAttemptContext, committer) { /** Flag saying whether or not the data to be written out is partitioned. */ @@ -261,11 +263,15 @@ abstract class BaseDynamicPartitionDataWriter( val customPath = partDir.flatMap { dir => description.customPartitionLocations.get(PartitioningUtils.parsePathFragment(dir)) } - val currentPath = if (customPath.isDefined) { - committer.newTaskTempFileAbsPath(taskAttemptContext, customPath.get, ext) + val (currentPath, finalPath) = if (customPath.isDefined) { + val fileContext = FileContext(ext, None, Some(customPath.get), None) + (namingProtocol.getTaskStagingPath(taskAttemptContext, fileContext), + Some(namingProtocol.getTaskFinalPath(taskAttemptContext, fileContext))) } else { - committer.newTaskTempFile(taskAttemptContext, partDir, ext) + val fileContext = FileContext(ext, partDir, None, None) + (namingProtocol.getTaskStagingPath(taskAttemptContext, fileContext), None) } + committer.newTaskFile(taskAttemptContext, currentPath, finalPath) currentWriter = description.outputWriterFactory.newInstance( path = currentPath, @@ -314,8 +320,10 @@ abstract class BaseDynamicPartitionDataWriter( class DynamicPartitionDataSingleWriter( description: WriteJobDescription, taskAttemptContext: TaskAttemptContext, - committer: FileCommitProtocol) - extends BaseDynamicPartitionDataWriter(description, taskAttemptContext, committer) { + committer: FileCommitProtocol, + namingProtocol: FileNamingProtocol) + extends BaseDynamicPartitionDataWriter( + description, taskAttemptContext, committer, namingProtocol) { private var currentPartitionValues: Option[UnsafeRow] = None private var currentBucketId: Option[Int] = None @@ -361,8 +369,10 @@ class DynamicPartitionDataConcurrentWriter( description: WriteJobDescription, taskAttemptContext: TaskAttemptContext, committer: FileCommitProtocol, + namingProtocol: FileNamingProtocol, concurrentOutputWriterSpec: ConcurrentOutputWriterSpec) - extends BaseDynamicPartitionDataWriter(description, taskAttemptContext, committer) + extends BaseDynamicPartitionDataWriter( + description, taskAttemptContext, committer, namingProtocol) with Logging { /** Wrapper class to index a unique concurrent output writer. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 6839a4db0bc2..b89eef4cb3d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.spark._ import org.apache.spark.internal.Logging -import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils} +import org.apache.spark.internal.io.{FileCommitProtocol, FileNamingProtocol, SparkHadoopWriterUtils} import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow @@ -96,7 +96,7 @@ object FileFormatWriter extends Logging { sparkSession: SparkSession, plan: SparkPlan, fileFormat: FileFormat, - committer: FileCommitProtocol, + protocols: (FileCommitProtocol, FileNamingProtocol), outputSpec: OutputSpec, hadoopConf: Configuration, partitionColumns: Seq[Attribute], @@ -105,6 +105,7 @@ object FileFormatWriter extends Logging { options: Map[String, String]) : Set[String] = { + val committer = protocols._1 val job = Job.getInstance(hadoopConf) job.setOutputKeyClass(classOf[Void]) job.setOutputValueClass(classOf[InternalRow]) @@ -225,6 +226,7 @@ object FileFormatWriter extends Logging { sparkPartitionId = taskContext.partitionId(), sparkAttemptNumber = taskContext.taskAttemptId().toInt & Integer.MAX_VALUE, committer, + protocols._2, iterator = iter, concurrentOutputWriterSpec = concurrentOutputWriterSpec) }, @@ -260,6 +262,7 @@ object FileFormatWriter extends Logging { sparkPartitionId: Int, sparkAttemptNumber: Int, committer: FileCommitProtocol, + namingProtocol: FileNamingProtocol, iterator: Iterator[InternalRow], concurrentOutputWriterSpec: Option[ConcurrentOutputWriterSpec]): WriteTaskResult = { @@ -287,14 +290,15 @@ object FileFormatWriter extends Logging { // In case of empty job, leave first partition to save meta for file format like parquet. new EmptyDirectoryDataWriter(description, taskAttemptContext, committer) } else if (description.partitionColumns.isEmpty && description.bucketIdExpression.isEmpty) { - new SingleDirectoryDataWriter(description, taskAttemptContext, committer) + new SingleDirectoryDataWriter(description, taskAttemptContext, committer, namingProtocol) } else { concurrentOutputWriterSpec match { case Some(spec) => new DynamicPartitionDataConcurrentWriter( - description, taskAttemptContext, committer, spec) + description, taskAttemptContext, committer, namingProtocol, spec) case _ => - new DynamicPartitionDataSingleWriter(description, taskAttemptContext, committer) + new DynamicPartitionDataSingleWriter( + description, taskAttemptContext, committer, namingProtocol) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 267b360b474c..191c66ce8198 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.spark.internal.io.FileCommitProtocol +import org.apache.spark.internal.io.{FileCommitProtocol, FileNamingProtocol} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTablePartition} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec @@ -111,6 +111,14 @@ case class InsertIntoHadoopFsRelationCommand( outputPath = outputPath.toString, dynamicPartitionOverwrite = dynamicPartitionOverwrite) + val namingProtocolClass = sparkSession.sessionState.conf.fileNamingProtocolClass + .getOrElse(FileNamingProtocol.getMappedProtocolClassName(committer)) + val namingProtocol = FileNamingProtocol.instantiate( + namingProtocolClass, + jobId = jobId, + outputPath = outputPath.toString, + commitProtocol = committer) + val doInsertion = if (mode == SaveMode.Append) { true } else { @@ -176,7 +184,7 @@ case class InsertIntoHadoopFsRelationCommand( sparkSession = sparkSession, plan = child, fileFormat = fileFormat, - committer = committer, + protocols = (committer, namingProtocol), outputSpec = FileFormatWriter.OutputSpec( committerOutputPath.toString, customPartitionLocations, outputColumns), hadoopConf = hadoopConf, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala index 7227e48bc9a1..847dc2b58374 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileBatchWrite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.hadoop.mapreduce.Job import org.apache.spark.internal.Logging -import org.apache.spark.internal.io.FileCommitProtocol +import org.apache.spark.internal.io.{FileCommitProtocol, FileNamingProtocol} import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, PhysicalWriteInfo, WriterCommitMessage} import org.apache.spark.sql.execution.datasources.{WriteJobDescription, WriteTaskResult} import org.apache.spark.sql.execution.datasources.FileFormatWriter.processStats @@ -28,7 +28,8 @@ import org.apache.spark.util.Utils class FileBatchWrite( job: Job, description: WriteJobDescription, - committer: FileCommitProtocol) + committer: FileCommitProtocol, + namingProtocol: FileNamingProtocol) extends BatchWrite with Logging { override def commit(messages: Array[WriterCommitMessage]): Unit = { val results = messages.map(_.asInstanceOf[WriteTaskResult]) @@ -47,7 +48,7 @@ class FileBatchWrite( } override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = { - FileWriterFactory(description, committer) + FileWriterFactory(description, committer, namingProtocol) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala index 4f736cbd8970..a0bcd7c9ce7b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat -import org.apache.spark.internal.io.FileCommitProtocol +import org.apache.spark.internal.io.{FileCommitProtocol, FileNamingProtocol} import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} @@ -57,15 +57,23 @@ trait FileWrite extends Write { // Hadoop Configurations are case sensitive. val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap) val job = getJobInstance(hadoopConf, path) + val jobId = java.util.UUID.randomUUID().toString val committer = FileCommitProtocol.instantiate( sparkSession.sessionState.conf.fileCommitProtocolClass, - jobId = java.util.UUID.randomUUID().toString, + jobId = jobId, outputPath = paths.head) + val namingProtocolClass = sparkSession.sessionState.conf.fileNamingProtocolClass + .getOrElse(FileNamingProtocol.getMappedProtocolClassName(committer)) + val namingProtocol = FileNamingProtocol.instantiate( + namingProtocolClass, + jobId = jobId, + outputPath = paths.head, + commitProtocol = committer) lazy val description = createWriteJobDescription(sparkSession, hadoopConf, job, paths.head, options.asScala.toMap) committer.setupJob(job) - new FileBatchWrite(job, description, committer) + new FileBatchWrite(job, description, committer, namingProtocol) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala index d827e8362357..8cbbae894a70 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala @@ -21,21 +21,23 @@ import java.util.Date import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskID, TaskType} import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl -import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils} +import org.apache.spark.internal.io.{FileCommitProtocol, FileNamingProtocol, SparkHadoopWriterUtils} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.write.{DataWriter, DataWriterFactory} import org.apache.spark.sql.execution.datasources.{DynamicPartitionDataSingleWriter, SingleDirectoryDataWriter, WriteJobDescription} case class FileWriterFactory ( description: WriteJobDescription, - committer: FileCommitProtocol) extends DataWriterFactory { + committer: FileCommitProtocol, + namingProtocol: FileNamingProtocol) extends DataWriterFactory { override def createWriter(partitionId: Int, realTaskId: Long): DataWriter[InternalRow] = { val taskAttemptContext = createTaskAttemptContext(partitionId) committer.setupTask(taskAttemptContext) if (description.partitionColumns.isEmpty) { - new SingleDirectoryDataWriter(description, taskAttemptContext, committer) + new SingleDirectoryDataWriter(description, taskAttemptContext, committer, namingProtocol) } else { - new DynamicPartitionDataSingleWriter(description, taskAttemptContext, committer) + new DynamicPartitionDataSingleWriter( + description, taskAttemptContext, committer, namingProtocol) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index 0066678a5d96..37622463295b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -24,7 +24,7 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.SparkException import org.apache.spark.internal.Logging -import org.apache.spark.internal.io.FileCommitProtocol +import org.apache.spark.internal.io.{FileCommitProtocol, FileNamingProtocol} import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.errors.QueryExecutionErrors @@ -153,6 +153,14 @@ class FileStreamSink( case _ => // Do nothing } + val namingProtocolClass = sparkSession.sessionState.conf.streamingFileNamingProtocolClass + .getOrElse(FileNamingProtocol.getMappedProtocolClassName(committer)) + val namingProtocol = FileNamingProtocol.instantiate( + namingProtocolClass, + jobId = batchId.toString, + outputPath = path, + commitProtocol = committer) + // Get the actual partition columns as attributes after matching them by name with // the given columns names. val partitionColumns: Seq[Attribute] = partitionColumnNames.map { col => @@ -167,7 +175,7 @@ class FileStreamSink( sparkSession = sparkSession, plan = qe.executedPlan, fileFormat = fileFormat, - committer = committer, + protocols = (committer, namingProtocol), outputSpec = FileFormatWriter.OutputSpec(path, Map.empty, qe.analyzed.output), hadoopConf = hadoopConf, partitionColumns = partitionColumns, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala index ec189344f4fa..cc91fc2bede3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.common.FileUtils import org.apache.hadoop.hive.ql.exec.TaskRunner -import org.apache.spark.internal.io.FileCommitProtocol +import org.apache.spark.internal.io.{FileCommitProtocol, FileNamingProtocol} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Attribute @@ -79,16 +79,26 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { .foreach { case (compression, codec) => hadoopConf.set(compression, codec) } } + val jobId = java.util.UUID.randomUUID().toString + val committer = FileCommitProtocol.instantiate( sparkSession.sessionState.conf.fileCommitProtocolClass, - jobId = java.util.UUID.randomUUID().toString, + jobId = jobId, outputPath = outputLocation) + val namingProtocolClass = sparkSession.sessionState.conf.fileNamingProtocolClass + .getOrElse(FileNamingProtocol.getMappedProtocolClassName(committer)) + val namingProtocol = FileNamingProtocol.instantiate( + namingProtocolClass, + jobId = jobId, + outputPath = outputLocation, + commitProtocol = committer) + FileFormatWriter.write( sparkSession = sparkSession, plan = plan, fileFormat = new HiveFileFormat(fileSinkConf), - committer = committer, + protocols = (committer, namingProtocol), outputSpec = FileFormatWriter.OutputSpec(outputLocation, customPartitionLocations, outputColumns), hadoopConf = hadoopConf, From 93fe5bdea2dcaf554138815c6966ac2f7b2a9b55 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Mon, 14 Jun 2021 12:19:29 -0700 Subject: [PATCH 02/10] Add back HadoopMapReduceCommitProtocol.getFilename and mark it deprecated as PathOutputCommitProtocol depends on it --- .../internal/io/HadoopMapReduceCommitProtocol.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 31cfde34b65a..b5dfcec3c362 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -128,6 +128,15 @@ class HadoopMapReduceCommitProtocol( throw new UnsupportedOperationException } + @deprecated("use newTaskFile", "3.2.0") + protected def getFilename(taskContext: TaskAttemptContext, ext: String): String = { + // The file name looks like part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003-c000.parquet + // Note that %05d does not truncate the split number, so if we have more than 100000 tasks, + // the file name is fine and won't overflow. + val split = taskContext.getTaskAttemptID.getTaskID.getId + f"part-$split%05d-$jobId$ext" + } + override def newTaskFile( taskContext: TaskAttemptContext, stagingPath: String, finalPath: Option[String]): Unit = { finalPath match { From fef7946929c211a469ebbe92375cbb06d4ad7a7a Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Tue, 15 Jun 2021 01:49:49 -0700 Subject: [PATCH 03/10] Change interface of newTaskFile to try to fix test failure of dynamic partitions --- .../apache/spark/internal/io/FileCommitProtocol.scala | 9 +++++++-- .../internal/io/HadoopMapReduceCommitProtocol.scala | 11 +++++++---- .../execution/datasources/FileFormatDataWriter.scala | 11 ++++++----- 3 files changed, 20 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index cbc8b22407f7..830381e4aad4 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -111,7 +111,9 @@ abstract class FileCommitProtocol extends Logging { * * The "stagingPath" parameter is the current path of new file. The "finalPath" parameter if * specified, is the final path of file. The "finalPath" parameter is optional here because - * caller can leave up to file commit protocol to decide the final path. + * caller can leave up to file commit protocol to decide the final path. The "stagingDir" + * parameter if specified, is the sub-directory used to specify dynamic partitioning. The + * "stagingDir" parameter is optional here for non-dynamic partitioning. * * Important: it is the caller's responsibility to add uniquely identifying content to * `stagingPath` and `finalPath`. The file commit protocol only guarantees that files written by @@ -119,7 +121,10 @@ abstract class FileCommitProtocol extends Logging { * [[newTaskTempFile]] and [[newTaskTempFileAbsPath]]. */ def newTaskFile( - taskContext: TaskAttemptContext, stagingPath: String, finalPath: Option[String]): Unit = { + taskContext: TaskAttemptContext, + stagingPath: String, + finalPath: Option[String], + stagingDir: Option[String]): Unit = { // No-op as default implementation to be backward compatible with custom [[FileCommitProtocol]] // implementations before Spark 3.2.0. } diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index b5dfcec3c362..fca326818f3a 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -138,13 +138,16 @@ class HadoopMapReduceCommitProtocol( } override def newTaskFile( - taskContext: TaskAttemptContext, stagingPath: String, finalPath: Option[String]): Unit = { + taskContext: + TaskAttemptContext, + stagingPath: String, + finalPath: Option[String], + stagingDir: Option[String]): Unit = { finalPath match { case Some(path) => addedAbsPathFiles(stagingPath) = path case None => - committer match { - case _: FileOutputCommitter if dynamicPartitionOverwrite => - val dir = new Path(stagingPath).getParent.getName + (committer, stagingDir) match { + case (_: FileOutputCommitter, Some(dir)) if dynamicPartitionOverwrite => partitionPaths += dir case _ => } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala index 3d9a4a6235da..0cef55ac19b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala @@ -138,7 +138,7 @@ class SingleDirectoryDataWriter( description.outputWriterFactory.getFileExtension(taskAttemptContext) val fileContext = FileContext(ext, None, None, None) val currentPath = namingProtocol.getTaskStagingPath(taskAttemptContext, fileContext) - committer.newTaskFile(taskAttemptContext, currentPath, None) + committer.newTaskFile(taskAttemptContext, currentPath, None, None) currentWriter = description.outputWriterFactory.newInstance( path = currentPath, @@ -263,15 +263,16 @@ abstract class BaseDynamicPartitionDataWriter( val customPath = partDir.flatMap { dir => description.customPartitionLocations.get(PartitioningUtils.parsePathFragment(dir)) } - val (currentPath, finalPath) = if (customPath.isDefined) { + val (currentPath, finalPath, currentDir) = if (customPath.isDefined) { val fileContext = FileContext(ext, None, Some(customPath.get), None) (namingProtocol.getTaskStagingPath(taskAttemptContext, fileContext), - Some(namingProtocol.getTaskFinalPath(taskAttemptContext, fileContext))) + Some(namingProtocol.getTaskFinalPath(taskAttemptContext, fileContext)), + None) } else { val fileContext = FileContext(ext, partDir, None, None) - (namingProtocol.getTaskStagingPath(taskAttemptContext, fileContext), None) + (namingProtocol.getTaskStagingPath(taskAttemptContext, fileContext), None, partDir) } - committer.newTaskFile(taskAttemptContext, currentPath, finalPath) + committer.newTaskFile(taskAttemptContext, currentPath, finalPath, currentDir) currentWriter = description.outputWriterFactory.newInstance( path = currentPath, From e221b8572693c471eb497334ae923e041ccdc2b3 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Thu, 17 Jun 2021 20:10:36 -0700 Subject: [PATCH 04/10] Address comment for introducing new API and removing old API --- .../internal/io/FileCommitProtocol.scala | 45 +-------------- .../internal/io/FileNamingProtocol.scala | 16 +----- .../io/HadoopMapReduceCommitProtocol.scala | 26 +-------- .../io/HadoopMapReduceNamingProtocol.scala | 2 +- .../io/cloud/PathOutputCommitProtocol.scala | 23 +------- .../io/cloud/PathOutputNamingProtocol.scala | 57 +++++++++++++++++++ .../apache/spark/sql/internal/SQLConf.scala | 15 +++-- .../InsertIntoHadoopFsRelationCommand.scala | 4 +- .../execution/datasources/v2/FileWrite.scala | 4 +- .../execution/streaming/FileStreamSink.scala | 4 +- .../ManifestFileCommitProtocol.scala | 31 +++------- .../ManifestFileNamingProtocol.scala | 40 +++++++------ .../sql/sources/PartitionedWriteSuite.scala | 12 +++- .../sql/hive/execution/SaveAsHiveFile.scala | 4 +- 14 files changed, 122 insertions(+), 161 deletions(-) create mode 100644 hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputNamingProtocol.scala rename core/src/main/scala/org/apache/spark/internal/io/DefaultNamingProtocol.scala => sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileNamingProtocol.scala (52%) diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index 830381e4aad4..280de2c7de6a 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -69,42 +69,6 @@ abstract class FileCommitProtocol extends Logging { */ def setupTask(taskContext: TaskAttemptContext): Unit - /** - * Notifies the commit protocol to add a new file, and gets back the full path that should be - * used. Must be called on the executors when running tasks. - * - * Note that the returned temp file may have an arbitrary path. The commit protocol only - * promises that the file will be at the location specified by the arguments after job commit. - * - * A full file path consists of the following parts: - * 1. the base path - * 2. some sub-directory within the base path, used to specify partitioning - * 3. file prefix, usually some unique job id with the task id - * 4. bucket id - * 5. source specific file extension, e.g. ".snappy.parquet" - * - * The "dir" parameter specifies 2, and "ext" parameter specifies both 4 and 5, and the rest - * are left to the commit protocol implementation to decide. - * - * Important: it is the caller's responsibility to add uniquely identifying content to "ext" - * if a task is going to write out multiple files to the same dir. The file commit protocol only - * guarantees that files written by different tasks will not conflict. - */ - @deprecated("use newTaskFile", "3.2.0") - def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String - - /** - * Similar to newTaskTempFile(), but allows files to committed to an absolute output location. - * Depending on the implementation, there may be weaker guarantees around adding files this way. - * - * Important: it is the caller's responsibility to add uniquely identifying content to "ext" - * if a task is going to write out multiple files to the same dir. The file commit protocol only - * guarantees that files written by different tasks will not conflict. - */ - @deprecated("use newTaskFile", "3.2.0") - def newTaskTempFileAbsPath( - taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String - /** * Notifies the commit protocol that a new file is added. Must be called on the executors when * running tasks. @@ -117,17 +81,14 @@ abstract class FileCommitProtocol extends Logging { * * Important: it is the caller's responsibility to add uniquely identifying content to * `stagingPath` and `finalPath`. The file commit protocol only guarantees that files written by - * different tasks will not conflict. This API should be preferred to use instead of deprecated - * [[newTaskTempFile]] and [[newTaskTempFileAbsPath]]. + * different tasks will not conflict. This API should be used instead of deprecated + * `newTaskTempFile` and `newTaskTempFileAbsPath`. */ def newTaskFile( taskContext: TaskAttemptContext, stagingPath: String, finalPath: Option[String], - stagingDir: Option[String]): Unit = { - // No-op as default implementation to be backward compatible with custom [[FileCommitProtocol]] - // implementations before Spark 3.2.0. - } + stagingDir: Option[String]): Unit /** * Commits a task after the writes succeed. Must be called on the executors when running tasks. diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileNamingProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileNamingProtocol.scala index d1607bb3a902..70cba083667f 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileNamingProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileNamingProtocol.scala @@ -81,29 +81,17 @@ object FileNamingProtocol extends Logging { classOf[String], classOf[String], classOf[FileCommitProtocol]) ctor.newInstance(jobId, outputPath, commitProtocol) } - - /** - * Gets the mapped [[FileNamingProtocol]] class name for a given [[FileCommitProtocol]]. - * This is used to get a [[FileNamingProtocol]] to use anyway when caller does not specify - * [[FileNamingProtocol]] in configuration. - */ - def getMappedProtocolClassName(commitProtocol: FileCommitProtocol): String = { - commitProtocol match { - case _: HadoopMapReduceCommitProtocol => classOf[HadoopMapReduceNamingProtocol].getName - case _ => classOf[DefaultNamingProtocol].getName - } - } } /** * The context for Spark output file. This is used by [[FileNamingProtocol]] to create file path. * - * @param ext Extension of file. + * @param ext Source specific file extension, e.g. ".snappy.parquet". * @param relativeDir Relative directory of file. Can be used for writing dynamic partitions. * E.g., "a=1/b=2" is directory for partition (a=1, b=2). * @param absoluteDir Absolute directory of file. Can be used for writing to custom location in * file system. - * @param prefix Prefix of file. + * @param prefix file prefix. */ final case class FileContext( ext: String, diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index fca326818f3a..c751413e33fa 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -116,36 +116,14 @@ class HadoopMapReduceCommitProtocol( format.getOutputCommitter(context) } - @deprecated("use newTaskFile", "3.2.0") - override def newTaskTempFile( - taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { - throw new UnsupportedOperationException - } - - @deprecated("use newTaskFile", "3.2.0") - override def newTaskTempFileAbsPath( - taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = { - throw new UnsupportedOperationException - } - - @deprecated("use newTaskFile", "3.2.0") - protected def getFilename(taskContext: TaskAttemptContext, ext: String): String = { - // The file name looks like part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003-c000.parquet - // Note that %05d does not truncate the split number, so if we have more than 100000 tasks, - // the file name is fine and won't overflow. - val split = taskContext.getTaskAttemptID.getTaskID.getId - f"part-$split%05d-$jobId$ext" - } - override def newTaskFile( - taskContext: - TaskAttemptContext, + taskContext: TaskAttemptContext, stagingPath: String, finalPath: Option[String], stagingDir: Option[String]): Unit = { finalPath match { case Some(path) => addedAbsPathFiles(stagingPath) = path - case None => + case _ => (committer, stagingDir) match { case (_: FileOutputCommitter, Some(dir)) if dynamicPartitionOverwrite => partitionPaths += dir diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceNamingProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceNamingProtocol.scala index 97f8ad104cb6..1ebc38a2c19b 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceNamingProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceNamingProtocol.scala @@ -66,7 +66,7 @@ class HadoopMapReduceNamingProtocol( new Path(fileContext.absoluteDir.get, filename).toString } - private def getFilename(taskContext: TaskAttemptContext, fileContext: FileContext): String = { + protected def getFilename(taskContext: TaskAttemptContext, fileContext: FileContext): String = { // The file name looks like part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003-c000.parquet // Note that %05d does not truncate the split number, so if we have more than 100000 tasks, // the file name is fine and won't overflow. diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala index 2ca50878485c..40422fb80166 100644 --- a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala +++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala @@ -117,28 +117,9 @@ class PathOutputCommitProtocol( committer } - /** - * Create a temporary file for a task. - * - * @param taskContext task context - * @param dir optional subdirectory - * @param ext file extension - * @return a path as a string - */ - override def newTaskTempFile( - taskContext: TaskAttemptContext, - dir: Option[String], - ext: String): String = { - - val workDir = committer.getWorkPath - val parent = dir.map { - d => new Path(workDir, d) - }.getOrElse(workDir) - val file = new Path(parent, getFilename(taskContext, ext)) - logTrace(s"Creating task file $file for dir $dir and ext $ext") - file.toString + def getCommitter: PathOutputCommitter = { + committer } - } object PathOutputCommitProtocol { diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputNamingProtocol.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputNamingProtocol.scala new file mode 100644 index 000000000000..f208371c1cc1 --- /dev/null +++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputNamingProtocol.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io.cloud + +import java.util.UUID + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.TaskAttemptContext + +import org.apache.spark.internal.io.{FileCommitProtocol, FileContext, HadoopMapReduceNamingProtocol} + +/** + * An [[FileNamingProtocol]] implementation for [[PathOutputCommitProtocol]]. + */ +class PathOutputNamingProtocol( + jobId: String, + dest: String, + commitProtocol: FileCommitProtocol) + extends HadoopMapReduceNamingProtocol(jobId, dest, commitProtocol) { + + require(commitProtocol.isInstanceOf[PathOutputCommitProtocol]) + + private val pathOutputCommitProtocol = commitProtocol.asInstanceOf[PathOutputCommitProtocol] + + override def getTaskStagingPath( + taskContext: TaskAttemptContext, fileContext: FileContext): String = { + val filename = getFilename(taskContext, fileContext) + fileContext.absoluteDir match { + case Some(_) => + new Path(pathOutputCommitProtocol.stagingDir, UUID.randomUUID().toString + "-" + filename) + .toString + case _ => + val workDir = pathOutputCommitProtocol.getCommitter.getWorkPath + val parent = fileContext.relativeDir.map { + d => new Path(workDir, d) + }.getOrElse(workDir) + val file = new Path(parent, filename) + logTrace(s"Creating task file $file for dir $dir and ext $ext") + file.toString + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 3fb9f54f8fcf..c211da5ca6e6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1194,10 +1194,14 @@ object SQLConf { val FILE_NAMING_PROTOCOL_CLASS = buildConf("spark.sql.sources.namingProtocolClass") + .doc("The class name for output file naming protocol. This is used together with " + + s"${FILE_COMMIT_PROTOCOL_CLASS.key} for output file commit. The class should " + + "implement org.apache.spark.internal.io.FileNamingProtocol.") .version("3.2.0") .internal() .stringConf - .createOptional + .createWithDefault( + "org.apache.spark.internal.io.HadoopMapReduceNamingProtocol") val PARALLEL_PARTITION_DISCOVERY_THRESHOLD = buildConf("spark.sql.sources.parallelPartitionDiscovery.threshold") @@ -1711,10 +1715,13 @@ object SQLConf { val STREAMING_FILE_NAMING_PROTOCOL_CLASS = buildConf("spark.sql.streaming.namingProtocolClass") + .doc("The class name for streaming output file naming protocol. This is used together " + + s"with ${STREAMING_FILE_COMMIT_PROTOCOL_CLASS.key} for output file commit. The class " + + "should implement org.apache.spark.internal.io.FileNamingProtocol.") .version("3.2.0") .internal() .stringConf - .createOptional + .createWithDefault("org.apache.spark.sql.execution.streaming.ManifestFileNamingProtocol") val STREAMING_MULTIPLE_WATERMARK_POLICY = buildConf("spark.sql.streaming.multipleWatermarkPolicy") @@ -3439,7 +3446,7 @@ class SQLConf extends Serializable with Logging { def streamingFileCommitProtocolClass: String = getConf(STREAMING_FILE_COMMIT_PROTOCOL_CLASS) - def streamingFileNamingProtocolClass: Option[String] = + def streamingFileNamingProtocolClass: String = getConf(SQLConf.STREAMING_FILE_NAMING_PROTOCOL_CLASS) def fileSinkLogDeletion: Boolean = getConf(FILE_SINK_LOG_DELETION) @@ -3702,7 +3709,7 @@ class SQLConf extends Serializable with Logging { def fileCommitProtocolClass: String = getConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS) - def fileNamingProtocolClass: Option[String] = getConf(SQLConf.FILE_NAMING_PROTOCOL_CLASS) + def fileNamingProtocolClass: String = getConf(SQLConf.FILE_NAMING_PROTOCOL_CLASS) def parallelPartitionDiscoveryThreshold: Int = getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 191c66ce8198..99413c51ccad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -111,10 +111,8 @@ case class InsertIntoHadoopFsRelationCommand( outputPath = outputPath.toString, dynamicPartitionOverwrite = dynamicPartitionOverwrite) - val namingProtocolClass = sparkSession.sessionState.conf.fileNamingProtocolClass - .getOrElse(FileNamingProtocol.getMappedProtocolClassName(committer)) val namingProtocol = FileNamingProtocol.instantiate( - namingProtocolClass, + sparkSession.sessionState.conf.fileNamingProtocolClass, jobId = jobId, outputPath = outputPath.toString, commitProtocol = committer) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala index a0bcd7c9ce7b..9e25987ef78f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala @@ -62,10 +62,8 @@ trait FileWrite extends Write { sparkSession.sessionState.conf.fileCommitProtocolClass, jobId = jobId, outputPath = paths.head) - val namingProtocolClass = sparkSession.sessionState.conf.fileNamingProtocolClass - .getOrElse(FileNamingProtocol.getMappedProtocolClassName(committer)) val namingProtocol = FileNamingProtocol.instantiate( - namingProtocolClass, + sparkSession.sessionState.conf.fileNamingProtocolClass, jobId = jobId, outputPath = paths.head, commitProtocol = committer) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index 37622463295b..9273071ef018 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -153,10 +153,8 @@ class FileStreamSink( case _ => // Do nothing } - val namingProtocolClass = sparkSession.sessionState.conf.streamingFileNamingProtocolClass - .getOrElse(FileNamingProtocol.getMappedProtocolClassName(committer)) val namingProtocol = FileNamingProtocol.instantiate( - namingProtocolClass, + sparkSession.sessionState.conf.streamingFileNamingProtocolClass, jobId = batchId.toString, outputPath = path, commitProtocol = committer) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala index 46ce33687890..9932ce08d204 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution.streaming import java.io.IOException -import java.util.UUID import scala.collection.mutable.ArrayBuffer @@ -28,7 +27,6 @@ import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} import org.apache.spark.internal.Logging import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage -import org.apache.spark.sql.errors.QueryExecutionErrors /** * A [[FileCommitProtocol]] that tracks the list of valid files in a manifest file, used in @@ -111,28 +109,13 @@ class ManifestFileCommitProtocol(jobId: String, path: String) addedFiles = new ArrayBuffer[String] } - override def newTaskTempFile( - taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { - // The file name looks like part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet - // Note that %05d does not truncate the split number, so if we have more than 100000 tasks, - // the file name is fine and won't overflow. - val split = taskContext.getTaskAttemptID.getTaskID.getId - val uuid = UUID.randomUUID.toString - val filename = f"part-$split%05d-$uuid$ext" - - val file = dir.map { d => - new Path(new Path(path, d), filename).toString - }.getOrElse { - new Path(path, filename).toString - } - - addedFiles += file - file - } - - override def newTaskTempFileAbsPath( - taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = { - throw QueryExecutionErrors.addFilesWithAbsolutePathUnsupportedError(this.toString) + def newTaskFile( + taskContext: TaskAttemptContext, + stagingPath: String, + finalPath: Option[String], + stagingDir: Option[String]): Unit = { + require(finalPath.isEmpty, s"$this does not support adding files with an absolute path") + addedFiles += stagingPath } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { diff --git a/core/src/main/scala/org/apache/spark/internal/io/DefaultNamingProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileNamingProtocol.scala similarity index 52% rename from core/src/main/scala/org/apache/spark/internal/io/DefaultNamingProtocol.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileNamingProtocol.scala index b150a03ef2d0..2ece76b74e68 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/DefaultNamingProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileNamingProtocol.scala @@ -15,38 +15,46 @@ * limitations under the License. */ -package org.apache.spark.internal.io +package org.apache.spark.sql.execution.streaming +import java.util.UUID + +import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.TaskAttemptContext +import org.apache.spark.internal.io.{FileCommitProtocol, FileContext, FileNamingProtocol} +import org.apache.spark.sql.errors.QueryExecutionErrors + /** - * An [[FileNamingProtocol]] implementation by default for custom [[FileCommitProtocol]] - * implementations. - * - * This delegates to call [[FileCommitProtocol.newTaskTempFile]] and - * [[FileCommitProtocol.newTaskTempFileAbsPath()]] to be backward compatible with - * custom [[FileCommitProtocol]] implementation before Spark 3.2.0. Newer implementation of - * [[FileCommitProtocol]] after Spark 3.2.0 should create own implementation of - * [[FileNamingProtocol]], instead of using this. + * An [[FileNamingProtocol]] implementation for [[ManifestFileCommitProtocol]]. */ -class DefaultNamingProtocol( +class ManifestFileNamingProtocol( jobId: String, path: String, commitProtocol: FileCommitProtocol) extends FileNamingProtocol with Serializable { + require(commitProtocol.isInstanceOf[ManifestFileCommitProtocol]) + override def getTaskStagingPath( taskContext: TaskAttemptContext, fileContext: FileContext): String = { - fileContext.absoluteDir match { - case Some(dir) => commitProtocol.newTaskTempFileAbsPath( - taskContext, dir, fileContext.ext) - case _ => commitProtocol.newTaskTempFile( - taskContext, fileContext.relativeDir, fileContext.ext) + // The file name looks like part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet + // Note that %05d does not truncate the split number, so if we have more than 100000 tasks, + // the file name is fine and won't overflow. + val split = taskContext.getTaskAttemptID.getTaskID.getId + val uuid = UUID.randomUUID.toString + val ext = fileContext.ext + val filename = f"part-$split%05d-$uuid$ext" + + fileContext.relativeDir.map { d => + new Path(new Path(path, d), filename).toString + }.getOrElse { + new Path(path, filename).toString } } override def getTaskFinalPath( taskContext: TaskAttemptContext, fileContext: FileContext): String = { - "" + throw QueryExecutionErrors.addFilesWithAbsolutePathUnsupportedError(this.toString) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala index b9266429f81a..42762fd031cc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala @@ -38,9 +38,15 @@ private class OnlyDetectCustomPathFileCommitProtocol(jobId: String, path: String extends SQLHadoopMapReduceCommitProtocol(jobId, path) with Serializable with Logging { - override def newTaskTempFileAbsPath( - taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = { - throw new Exception("there should be no custom partition path") + override def newTaskFile( + taskContext: TaskAttemptContext, + stagingPath: String, + finalPath: Option[String], + stagingDir: Option[String]): Unit = { + finalPath match { + case Some(_) => throw new Exception("there should be no custom partition path") + case _ => super.newTaskFile(taskContext, stagingPath, finalPath, stagingDir) + } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala index cc91fc2bede3..31dff6969a2c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala @@ -86,10 +86,8 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { jobId = jobId, outputPath = outputLocation) - val namingProtocolClass = sparkSession.sessionState.conf.fileNamingProtocolClass - .getOrElse(FileNamingProtocol.getMappedProtocolClassName(committer)) val namingProtocol = FileNamingProtocol.instantiate( - namingProtocolClass, + sparkSession.sessionState.conf.fileNamingProtocolClass, jobId = jobId, outputPath = outputLocation, commitProtocol = committer) From 9878f2e4008b732f156f98f1adff6aab870c58fd Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Thu, 17 Jun 2021 20:32:53 -0700 Subject: [PATCH 05/10] Try to fix build --- .../spark/internal/io/cloud/PathOutputNamingProtocol.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputNamingProtocol.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputNamingProtocol.scala index f208371c1cc1..f41e4c1b2eb3 100644 --- a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputNamingProtocol.scala +++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputNamingProtocol.scala @@ -50,7 +50,8 @@ class PathOutputNamingProtocol( d => new Path(workDir, d) }.getOrElse(workDir) val file = new Path(parent, filename) - logTrace(s"Creating task file $file for dir $dir and ext $ext") + logTrace(s"Creating task file $file for dir ${fileContext.relativeDir} and ext " + + s"${fileContext.ext}") file.toString } } From ddc695597c7d71e85b2069dda6ad265ad625f076 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Thu, 17 Jun 2021 21:39:24 -0700 Subject: [PATCH 06/10] Try to fix build again --- .../spark/internal/io/cloud/PathOutputNamingProtocol.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputNamingProtocol.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputNamingProtocol.scala index f41e4c1b2eb3..523be1d52153 100644 --- a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputNamingProtocol.scala +++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputNamingProtocol.scala @@ -22,6 +22,7 @@ import java.util.UUID import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.TaskAttemptContext +import org.apache.spark.internal.Logging import org.apache.spark.internal.io.{FileCommitProtocol, FileContext, HadoopMapReduceNamingProtocol} /** @@ -31,7 +32,7 @@ class PathOutputNamingProtocol( jobId: String, dest: String, commitProtocol: FileCommitProtocol) - extends HadoopMapReduceNamingProtocol(jobId, dest, commitProtocol) { + extends HadoopMapReduceNamingProtocol(jobId, dest, commitProtocol) with Logging { require(commitProtocol.isInstanceOf[PathOutputCommitProtocol]) From f716efef86e82405a8ab1d515730f5b9f53517b9 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Thu, 17 Jun 2021 22:40:02 -0700 Subject: [PATCH 07/10] Try to fix build again --- .../spark/internal/io/cloud/PathOutputCommitProtocol.scala | 2 +- .../spark/internal/io/cloud/PathOutputNamingProtocol.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala index 40422fb80166..63b4672c0a7c 100644 --- a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala +++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala @@ -117,7 +117,7 @@ class PathOutputCommitProtocol( committer } - def getCommitter: PathOutputCommitter = { + def getPathOutputCommitter: PathOutputCommitter = { committer } } diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputNamingProtocol.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputNamingProtocol.scala index 523be1d52153..c699ec5561d9 100644 --- a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputNamingProtocol.scala +++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputNamingProtocol.scala @@ -46,7 +46,7 @@ class PathOutputNamingProtocol( new Path(pathOutputCommitProtocol.stagingDir, UUID.randomUUID().toString + "-" + filename) .toString case _ => - val workDir = pathOutputCommitProtocol.getCommitter.getWorkPath + val workDir = pathOutputCommitProtocol.getPathOutputCommitter.getWorkPath val parent = fileContext.relativeDir.map { d => new Path(workDir, d) }.getOrElse(workDir) From 8341ebc9e10931b7c6fa2deb4ab0cf4043692bac Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Fri, 18 Jun 2021 02:42:52 -0700 Subject: [PATCH 08/10] Change the approach to pass relative path to commit protocol Co-authored-by: Wenchen Fan --- .../internal/io/BatchFileNamingProtocol.scala | 42 ++++++++++ .../internal/io/FileCommitProtocol.scala | 39 ++++++---- .../internal/io/FileNamingProtocol.scala | 53 ++----------- .../io/HadoopMapReduceCommitProtocol.scala | 38 +++++---- .../io/HadoopMapReduceNamingProtocol.scala | 78 ------------------- .../io/cloud/PathOutputCommitProtocol.scala | 13 +++- .../io/cloud/PathOutputNamingProtocol.scala | 59 -------------- .../sql/errors/QueryExecutionErrors.scala | 4 +- .../apache/spark/sql/internal/SQLConf.scala | 26 ------- .../datasources/FileFormatDataWriter.scala | 26 ++++--- .../InsertIntoHadoopFsRelationCommand.scala | 9 +-- .../execution/datasources/v2/FileWrite.scala | 8 +- .../execution/streaming/FileStreamSink.scala | 9 +-- .../ManifestFileCommitProtocol.scala | 17 ++-- ...cala => StreamingFileNamingProtocol.scala} | 26 ++----- .../sql/sources/PartitionedWriteSuite.scala | 12 +-- .../sql/hive/execution/SaveAsHiveFile.scala | 9 +-- 17 files changed, 147 insertions(+), 321 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/internal/io/BatchFileNamingProtocol.scala delete mode 100644 core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceNamingProtocol.scala delete mode 100644 hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputNamingProtocol.scala rename sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/{ManifestFileNamingProtocol.scala => StreamingFileNamingProtocol.scala} (65%) diff --git a/core/src/main/scala/org/apache/spark/internal/io/BatchFileNamingProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/BatchFileNamingProtocol.scala new file mode 100644 index 000000000000..f90e4315bcf9 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/internal/io/BatchFileNamingProtocol.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.TaskAttemptContext + +/** + * A [[FileNamingProtocol]] implementation to write output data in batch processing. + */ +class BatchFileNamingProtocol(jobId: String) extends FileNamingProtocol with Serializable { + + override def getTaskTempPath( + taskContext: TaskAttemptContext, fileContext: FileContext): String = { + // The file name looks like part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003-c000.parquet + // Note that %05d does not truncate the split number, so if we have more than 100000 tasks, + // the file name is fine and won't overflow. + val split = taskContext.getTaskAttemptID.getTaskID.getId + val prefix = fileContext.prefix.getOrElse("") + val ext = fileContext.ext + val filename = f"${prefix}part-$split%05d-$jobId$ext" + + fileContext.relativeDir.map { + d => new Path(d, filename).toString + }.getOrElse(filename) + } +} diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index 280de2c7de6a..45a72e48e39d 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -70,25 +70,34 @@ abstract class FileCommitProtocol extends Logging { def setupTask(taskContext: TaskAttemptContext): Unit /** - * Notifies the commit protocol that a new file is added. Must be called on the executors when - * running tasks. + * Notifies the commit protocol to add a new file, and gets back the full path that should be + * used. Must be called on the executors when running tasks. * - * The "stagingPath" parameter is the current path of new file. The "finalPath" parameter if - * specified, is the final path of file. The "finalPath" parameter is optional here because - * caller can leave up to file commit protocol to decide the final path. The "stagingDir" - * parameter if specified, is the sub-directory used to specify dynamic partitioning. The - * "stagingDir" parameter is optional here for non-dynamic partitioning. + * Note that "relativePath" parameter specifies the relative path of returned temp file. The full + * path is left to the commit protocol to decide. The commit protocol only promises that the file + * will be at the location specified by the relative path after job commits. * * Important: it is the caller's responsibility to add uniquely identifying content to - * `stagingPath` and `finalPath`. The file commit protocol only guarantees that files written by - * different tasks will not conflict. This API should be used instead of deprecated - * `newTaskTempFile` and `newTaskTempFileAbsPath`. + * "relativePath" if a task is going to write out multiple files to the same directory. The file + * commit protocol only guarantees that files written by different tasks will not conflict. */ - def newTaskFile( - taskContext: TaskAttemptContext, - stagingPath: String, - finalPath: Option[String], - stagingDir: Option[String]): Unit + def newTaskTempFile(taskContext: TaskAttemptContext, relativePath: String): String + + /** + * Similar to newTaskTempFile(), but allows files to committed to an absolute output location. + * Depending on the implementation, there may be weaker guarantees around adding files this way. + * + * "relativePath" parameter specifies the relative path of returned temp file, and "finalPath" + * parameter specifies the full path of file after job commit. The commit protocol promises that + * the file will be at the location specified by the "finalPath" after job commits. + * + * Important: it is the caller's responsibility to add uniquely identifying content to + * "relativePath" and "finalPath" if a task is going to write out multiple files to the same + * directory. The file commit protocol only guarantees that files written by different tasks will + * not conflict. + */ + def newTaskTempFileAbsPath( + taskContext: TaskAttemptContext, relativePath: String, finalPath: String): String /** * Commits a task after the writes succeed. Must be called on the executors when running tasks. diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileNamingProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileNamingProtocol.scala index 70cba083667f..404e44ce7eb4 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileNamingProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileNamingProtocol.scala @@ -19,68 +19,28 @@ package org.apache.spark.internal.io import org.apache.hadoop.mapreduce.TaskAttemptContext -import org.apache.spark.internal.Logging -import org.apache.spark.util.Utils - /** - * An interface to define how a single Spark job names its outputs. Three notes: + * An interface to define how a single Spark job names its outputs. Two notes: * * 1. Implementations must be serializable, as the instance instantiated on the driver * will be used for tasks on executors. - * 2. Implementations should have a constructor with 3 arguments: - * (jobId: String, path: String, commitProtocol: [[FileCommitProtocol]]) - * 3. An instance should not be reused across multiple Spark jobs. + * 2. An instance should not be reused across multiple Spark jobs. * * The proper way to call is: * * As part of each task's execution, whenever a new output file needs be created, executor calls - * [[getTaskStagingPath]] to get a valid file path before commit (i.e. "staging"). Optionally, - * executor can also call [[getTaskFinalPath]] to get a file path after commit (i.e. "final"). - * - * Important: Executor is expected to call [[FileCommitProtocol.newTaskFile]] afterwards to notify - * commit protocol a new file is added. + * [[getTaskTempPath]] to get a valid relative file path before commit. */ abstract class FileNamingProtocol { /** - * Gets the full path should be used for the output file before commit (i.e. "staging"). + * Gets the relative path should be used for the output file. * * Important: it is the caller's responsibility to add uniquely identifying content to * "fileContext" if a task is going to write out multiple files to the same directory. The file * naming protocol only guarantees that files written by different tasks will not conflict. */ - def getTaskStagingPath(taskContext: TaskAttemptContext, fileContext: FileContext): String - - /** - * Gets the full path should be used for the output file after commit (i.e. "final"). - * - * Important: it is the caller's responsibility to add uniquely identifying content to - * "fileContext" if a task is going to write out multiple files to the same directory. The file - * naming protocol only guarantees that files written by different tasks will not conflict. - */ - def getTaskFinalPath(taskContext: TaskAttemptContext, fileContext: FileContext): String -} - -object FileNamingProtocol extends Logging { - - /** - * Instantiates a [[FileNamingProtocol]] using the given className. - */ - def instantiate( - className: String, - jobId: String, - outputPath: String, - commitProtocol: FileCommitProtocol): FileNamingProtocol = { - - logDebug(s"Creating file naming protocol $className; job $jobId; output=$outputPath;" + - s" commitProtocol=$commitProtocol") - val clazz = Utils.classForName[FileNamingProtocol](className) - // Try the constructor with arguments (jobId: String, outputPath: String, - // commitProtocol: [[FileCommitProtocol]]). - val ctor = clazz.getDeclaredConstructor( - classOf[String], classOf[String], classOf[FileCommitProtocol]) - ctor.newInstance(jobId, outputPath, commitProtocol) - } + def getTaskTempPath(taskContext: TaskAttemptContext, fileContext: FileContext): String } /** @@ -89,12 +49,9 @@ object FileNamingProtocol extends Logging { * @param ext Source specific file extension, e.g. ".snappy.parquet". * @param relativeDir Relative directory of file. Can be used for writing dynamic partitions. * E.g., "a=1/b=2" is directory for partition (a=1, b=2). - * @param absoluteDir Absolute directory of file. Can be used for writing to custom location in - * file system. * @param prefix file prefix. */ final case class FileContext( ext: String, relativeDir: Option[String], - absoluteDir: Option[String], prefix: Option[String]) diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index c751413e33fa..ea2342a3fb4c 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -104,7 +104,7 @@ class HadoopMapReduceCommitProtocol( * The staging directory of this write job. Spark uses it to deal with files with absolute output * path, or writing data into partitioned directory with dynamicPartitionOverwrite=true. */ - def stagingDir: Path = getStagingDir(path, jobId) + protected def stagingDir: Path = getStagingDir(path, jobId) protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = { val format = context.getOutputFormatClass.getConstructor().newInstance() @@ -116,20 +116,28 @@ class HadoopMapReduceCommitProtocol( format.getOutputCommitter(context) } - override def newTaskFile( - taskContext: TaskAttemptContext, - stagingPath: String, - finalPath: Option[String], - stagingDir: Option[String]): Unit = { - finalPath match { - case Some(path) => addedAbsPathFiles(stagingPath) = path - case _ => - (committer, stagingDir) match { - case (_: FileOutputCommitter, Some(dir)) if dynamicPartitionOverwrite => - partitionPaths += dir - case _ => + override def newTaskTempFile(taskContext: TaskAttemptContext, relativePath: String): String = { + val stagingDir: Path = committer match { + // For FileOutputCommitter it has its own staging path called "work path". + case f: FileOutputCommitter => + if (dynamicPartitionOverwrite) { + val dir = new Path(relativePath).getParent.toString + assert(dir.nonEmpty, + "The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.") + partitionPaths += dir } + new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path)) + case _ => new Path(path) } + + new Path(stagingDir, relativePath).toString + } + + override def newTaskTempFileAbsPath( + taskContext: TaskAttemptContext, relativePath: String, finalPath: String): String = { + val tmpOutputPath = new Path(stagingDir, relativePath).toString + addedAbsPathFiles(tmpOutputPath) = finalPath + tmpOutputPath } override def setupJob(jobContext: JobContext): Unit = { @@ -267,8 +275,4 @@ class HadoopMapReduceCommitProtocol( logWarning(s"Exception while aborting ${taskContext.getTaskAttemptID}", e) } } - - def getCommitter: OutputCommitter = { - committer - } } diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceNamingProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceNamingProtocol.scala deleted file mode 100644 index 1ebc38a2c19b..000000000000 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceNamingProtocol.scala +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.internal.io - -import java.util.UUID - -import org.apache.hadoop.fs.Path -import org.apache.hadoop.mapreduce.TaskAttemptContext -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter - -/** - * An [[FileNamingProtocol]] implementation for [[HadoopMapReduceCommitProtocol]]. - */ -class HadoopMapReduceNamingProtocol( - jobId: String, - path: String, - commitProtocol: FileCommitProtocol) - extends FileNamingProtocol with Serializable { - - require(commitProtocol.isInstanceOf[HadoopMapReduceCommitProtocol]) - - private val hadoopMRCommitProtocol = commitProtocol.asInstanceOf[HadoopMapReduceCommitProtocol] - - override def getTaskStagingPath( - taskContext: TaskAttemptContext, fileContext: FileContext): String = { - val filename = getFilename(taskContext, fileContext) - fileContext.absoluteDir match { - case Some(_) => - new Path(hadoopMRCommitProtocol.stagingDir, UUID.randomUUID().toString + "-" + filename) - .toString - case _ => - val stagingDir: Path = hadoopMRCommitProtocol.getCommitter match { - // For FileOutputCommitter it has its own staging path called "work path". - case f: FileOutputCommitter => - new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path)) - case _ => new Path(path) - } - - fileContext.relativeDir.map { d => - new Path(new Path(stagingDir, d), filename).toString - }.getOrElse { - new Path(stagingDir, filename).toString - } - } - } - - override def getTaskFinalPath( - taskContext: TaskAttemptContext, fileContext: FileContext): String = { - require(fileContext.absoluteDir.isDefined) - val filename = getFilename(taskContext, fileContext) - new Path(fileContext.absoluteDir.get, filename).toString - } - - protected def getFilename(taskContext: TaskAttemptContext, fileContext: FileContext): String = { - // The file name looks like part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003-c000.parquet - // Note that %05d does not truncate the split number, so if we have more than 100000 tasks, - // the file name is fine and won't overflow. - val split = taskContext.getTaskAttemptID.getTaskID.getId - val prefix = fileContext.prefix.getOrElse("") - val ext = fileContext.ext - f"${prefix}part-$split%05d-$jobId$ext" - } -} diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala index 63b4672c0a7c..76747534ddd2 100644 --- a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala +++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala @@ -117,8 +117,17 @@ class PathOutputCommitProtocol( committer } - def getPathOutputCommitter: PathOutputCommitter = { - committer + /** + * Create a temporary file for a task. + * + * @param taskContext task context + * @param relativePath relative path as a string for file + * @return the full path as a string for file + */ + override def newTaskTempFile(taskContext: TaskAttemptContext, relativePath: String): String = { + val file = new Path(committer.getWorkPath, relativePath) + logTrace(s"Creating task file $file with relative path $dir") + file.toString } } diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputNamingProtocol.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputNamingProtocol.scala deleted file mode 100644 index c699ec5561d9..000000000000 --- a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputNamingProtocol.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.internal.io.cloud - -import java.util.UUID - -import org.apache.hadoop.fs.Path -import org.apache.hadoop.mapreduce.TaskAttemptContext - -import org.apache.spark.internal.Logging -import org.apache.spark.internal.io.{FileCommitProtocol, FileContext, HadoopMapReduceNamingProtocol} - -/** - * An [[FileNamingProtocol]] implementation for [[PathOutputCommitProtocol]]. - */ -class PathOutputNamingProtocol( - jobId: String, - dest: String, - commitProtocol: FileCommitProtocol) - extends HadoopMapReduceNamingProtocol(jobId, dest, commitProtocol) with Logging { - - require(commitProtocol.isInstanceOf[PathOutputCommitProtocol]) - - private val pathOutputCommitProtocol = commitProtocol.asInstanceOf[PathOutputCommitProtocol] - - override def getTaskStagingPath( - taskContext: TaskAttemptContext, fileContext: FileContext): String = { - val filename = getFilename(taskContext, fileContext) - fileContext.absoluteDir match { - case Some(_) => - new Path(pathOutputCommitProtocol.stagingDir, UUID.randomUUID().toString + "-" + filename) - .toString - case _ => - val workDir = pathOutputCommitProtocol.getPathOutputCommitter.getWorkPath - val parent = fileContext.relativeDir.map { - d => new Path(workDir, d) - }.getOrElse(workDir) - val file = new Path(parent, filename) - logTrace(s"Creating task file $file for dir ${fileContext.relativeDir} and ext " + - s"${fileContext.ext}") - file.toString - } - } -} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index c7da75883f80..d2f90ab6e6a8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -1405,9 +1405,9 @@ object QueryExecutionErrors { s"Multiple streaming queries are concurrently using $path", e) } - def addFilesWithAbsolutePathUnsupportedError(commitProtocol: String): Throwable = { + def addFilesWithAbsolutePathUnsupportedError(protocol: String): Throwable = { new UnsupportedOperationException( - s"$commitProtocol does not support adding files with an absolute path") + s"$protocol does not support adding files with an absolute path") } def microBatchUnsupportedByDataSourceError(srcName: String): Throwable = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index c211da5ca6e6..9e4a4dde4f8b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1192,17 +1192,6 @@ object SQLConf { .createWithDefault( "org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol") - val FILE_NAMING_PROTOCOL_CLASS = - buildConf("spark.sql.sources.namingProtocolClass") - .doc("The class name for output file naming protocol. This is used together with " + - s"${FILE_COMMIT_PROTOCOL_CLASS.key} for output file commit. The class should " + - "implement org.apache.spark.internal.io.FileNamingProtocol.") - .version("3.2.0") - .internal() - .stringConf - .createWithDefault( - "org.apache.spark.internal.io.HadoopMapReduceNamingProtocol") - val PARALLEL_PARTITION_DISCOVERY_THRESHOLD = buildConf("spark.sql.sources.parallelPartitionDiscovery.threshold") .doc("The maximum number of paths allowed for listing files at driver side. If the number " + @@ -1713,16 +1702,6 @@ object SQLConf { .stringConf .createWithDefault("org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol") - val STREAMING_FILE_NAMING_PROTOCOL_CLASS = - buildConf("spark.sql.streaming.namingProtocolClass") - .doc("The class name for streaming output file naming protocol. This is used together " + - s"with ${STREAMING_FILE_COMMIT_PROTOCOL_CLASS.key} for output file commit. The class " + - "should implement org.apache.spark.internal.io.FileNamingProtocol.") - .version("3.2.0") - .internal() - .stringConf - .createWithDefault("org.apache.spark.sql.execution.streaming.ManifestFileNamingProtocol") - val STREAMING_MULTIPLE_WATERMARK_POLICY = buildConf("spark.sql.streaming.multipleWatermarkPolicy") .doc("Policy to calculate the global watermark value when there are multiple watermark " + @@ -3446,9 +3425,6 @@ class SQLConf extends Serializable with Logging { def streamingFileCommitProtocolClass: String = getConf(STREAMING_FILE_COMMIT_PROTOCOL_CLASS) - def streamingFileNamingProtocolClass: String = - getConf(SQLConf.STREAMING_FILE_NAMING_PROTOCOL_CLASS) - def fileSinkLogDeletion: Boolean = getConf(FILE_SINK_LOG_DELETION) def fileSinkLogCompactInterval: Int = getConf(FILE_SINK_LOG_COMPACT_INTERVAL) @@ -3709,8 +3685,6 @@ class SQLConf extends Serializable with Logging { def fileCommitProtocolClass: String = getConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS) - def fileNamingProtocolClass: String = getConf(SQLConf.FILE_NAMING_PROTOCOL_CLASS) - def parallelPartitionDiscoveryThreshold: Int = getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala index 0cef55ac19b8..fa6bd3719054 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.sql.execution.datasources +import java.util.UUID + import scala.collection.mutable import org.apache.hadoop.fs.Path @@ -136,9 +138,9 @@ class SingleDirectoryDataWriter( val ext = f"-c$fileCounter%03d" + description.outputWriterFactory.getFileExtension(taskAttemptContext) - val fileContext = FileContext(ext, None, None, None) - val currentPath = namingProtocol.getTaskStagingPath(taskAttemptContext, fileContext) - committer.newTaskFile(taskAttemptContext, currentPath, None, None) + val currentPath = committer.newTaskTempFile( + taskAttemptContext, + namingProtocol.getTaskTempPath(taskAttemptContext, FileContext(ext, None, None))) currentWriter = description.outputWriterFactory.newInstance( path = currentPath, @@ -263,16 +265,18 @@ abstract class BaseDynamicPartitionDataWriter( val customPath = partDir.flatMap { dir => description.customPartitionLocations.get(PartitioningUtils.parsePathFragment(dir)) } - val (currentPath, finalPath, currentDir) = if (customPath.isDefined) { - val fileContext = FileContext(ext, None, Some(customPath.get), None) - (namingProtocol.getTaskStagingPath(taskAttemptContext, fileContext), - Some(namingProtocol.getTaskFinalPath(taskAttemptContext, fileContext)), - None) + val currentPath = if (customPath.isDefined) { + committer.newTaskTempFileAbsPath(taskAttemptContext, customPath.get, ext) + // Include a UUID here to prevent file collisions for one task writing to different dirs. + val relativePath = UUID.randomUUID().toString + "-" + + namingProtocol.getTaskTempPath(taskAttemptContext, FileContext(ext, None, None)) + val finalPath = new Path(customPath.get, relativePath).toString + committer.newTaskTempFileAbsPath(taskAttemptContext, relativePath, finalPath) } else { - val fileContext = FileContext(ext, partDir, None, None) - (namingProtocol.getTaskStagingPath(taskAttemptContext, fileContext), None, partDir) + val relativePath = namingProtocol.getTaskTempPath( + taskAttemptContext, FileContext(ext, partDir, None)) + committer.newTaskTempFile(taskAttemptContext, relativePath) } - committer.newTaskFile(taskAttemptContext, currentPath, finalPath, currentDir) currentWriter = description.outputWriterFactory.newInstance( path = currentPath, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 99413c51ccad..a226cab02738 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.spark.internal.io.{FileCommitProtocol, FileNamingProtocol} +import org.apache.spark.internal.io.{BatchFileNamingProtocol, FileCommitProtocol} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTablePartition} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec @@ -110,12 +110,7 @@ case class InsertIntoHadoopFsRelationCommand( jobId = jobId, outputPath = outputPath.toString, dynamicPartitionOverwrite = dynamicPartitionOverwrite) - - val namingProtocol = FileNamingProtocol.instantiate( - sparkSession.sessionState.conf.fileNamingProtocolClass, - jobId = jobId, - outputPath = outputPath.toString, - commitProtocol = committer) + val namingProtocol = new BatchFileNamingProtocol(jobId) val doInsertion = if (mode == SaveMode.Append) { true diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala index 9e25987ef78f..2d01643b98a9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat -import org.apache.spark.internal.io.{FileCommitProtocol, FileNamingProtocol} +import org.apache.spark.internal.io.{BatchFileNamingProtocol, FileCommitProtocol} import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} @@ -62,11 +62,7 @@ trait FileWrite extends Write { sparkSession.sessionState.conf.fileCommitProtocolClass, jobId = jobId, outputPath = paths.head) - val namingProtocol = FileNamingProtocol.instantiate( - sparkSession.sessionState.conf.fileNamingProtocolClass, - jobId = jobId, - outputPath = paths.head, - commitProtocol = committer) + val namingProtocol = new BatchFileNamingProtocol(jobId) lazy val description = createWriteJobDescription(sparkSession, hadoopConf, job, paths.head, options.asScala.toMap) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index 9273071ef018..cd4d5e3180b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -24,7 +24,7 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.SparkException import org.apache.spark.internal.Logging -import org.apache.spark.internal.io.{FileCommitProtocol, FileNamingProtocol} +import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.errors.QueryExecutionErrors @@ -152,12 +152,7 @@ class FileStreamSink( manifestCommitter.setupManifestOptions(fileLog, batchId) case _ => // Do nothing } - - val namingProtocol = FileNamingProtocol.instantiate( - sparkSession.sessionState.conf.streamingFileNamingProtocolClass, - jobId = batchId.toString, - outputPath = path, - commitProtocol = committer) + val namingProtocol = new StreamingFileNamingProtocol(batchId.toString) // Get the actual partition columns as attributes after matching them by name with // the given columns names. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala index 9932ce08d204..f79b952db4de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala @@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} import org.apache.spark.internal.Logging import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage +import org.apache.spark.sql.errors.QueryExecutionErrors /** * A [[FileCommitProtocol]] that tracks the list of valid files in a manifest file, used in @@ -109,13 +110,15 @@ class ManifestFileCommitProtocol(jobId: String, path: String) addedFiles = new ArrayBuffer[String] } - def newTaskFile( - taskContext: TaskAttemptContext, - stagingPath: String, - finalPath: Option[String], - stagingDir: Option[String]): Unit = { - require(finalPath.isEmpty, s"$this does not support adding files with an absolute path") - addedFiles += stagingPath + override def newTaskTempFile(taskContext: TaskAttemptContext, relativePath: String): String = { + val file = new Path(path, relativePath).toString + addedFiles += file + file + } + + override def newTaskTempFileAbsPath( + taskContext: TaskAttemptContext, relativePath: String, finalPath: String): String = { + throw QueryExecutionErrors.addFilesWithAbsolutePathUnsupportedError(this.toString) } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileNamingProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingFileNamingProtocol.scala similarity index 65% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileNamingProtocol.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingFileNamingProtocol.scala index 2ece76b74e68..abc8f25b2e19 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileNamingProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingFileNamingProtocol.scala @@ -22,21 +22,14 @@ import java.util.UUID import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.TaskAttemptContext -import org.apache.spark.internal.io.{FileCommitProtocol, FileContext, FileNamingProtocol} -import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.internal.io.{FileContext, FileNamingProtocol} /** - * An [[FileNamingProtocol]] implementation for [[ManifestFileCommitProtocol]]. + * A [[FileNamingProtocol]] implementation to write output data in streaming processing. */ -class ManifestFileNamingProtocol( - jobId: String, - path: String, - commitProtocol: FileCommitProtocol) - extends FileNamingProtocol with Serializable { +class StreamingFileNamingProtocol(jobId: String) extends FileNamingProtocol with Serializable { - require(commitProtocol.isInstanceOf[ManifestFileCommitProtocol]) - - override def getTaskStagingPath( + override def getTaskTempPath( taskContext: TaskAttemptContext, fileContext: FileContext): String = { // The file name looks like part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet // Note that %05d does not truncate the split number, so if we have more than 100000 tasks, @@ -47,14 +40,7 @@ class ManifestFileNamingProtocol( val filename = f"part-$split%05d-$uuid$ext" fileContext.relativeDir.map { d => - new Path(new Path(path, d), filename).toString - }.getOrElse { - new Path(path, filename).toString - } - } - - override def getTaskFinalPath( - taskContext: TaskAttemptContext, fileContext: FileContext): String = { - throw QueryExecutionErrors.addFilesWithAbsolutePathUnsupportedError(this.toString) + new Path(d, filename).toString + }.getOrElse(filename) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala index 42762fd031cc..0b80f1276030 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala @@ -38,15 +38,9 @@ private class OnlyDetectCustomPathFileCommitProtocol(jobId: String, path: String extends SQLHadoopMapReduceCommitProtocol(jobId, path) with Serializable with Logging { - override def newTaskFile( - taskContext: TaskAttemptContext, - stagingPath: String, - finalPath: Option[String], - stagingDir: Option[String]): Unit = { - finalPath match { - case Some(_) => throw new Exception("there should be no custom partition path") - case _ => super.newTaskFile(taskContext, stagingPath, finalPath, stagingDir) - } + override def newTaskTempFileAbsPath( + taskContext: TaskAttemptContext, relativePath: String, finalPath: String): String = { + throw new Exception("there should be no custom partition path") } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala index 31dff6969a2c..c063d73b920e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.common.FileUtils import org.apache.hadoop.hive.ql.exec.TaskRunner -import org.apache.spark.internal.io.{FileCommitProtocol, FileNamingProtocol} +import org.apache.spark.internal.io.{BatchFileNamingProtocol, FileCommitProtocol} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Attribute @@ -85,12 +85,7 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { sparkSession.sessionState.conf.fileCommitProtocolClass, jobId = jobId, outputPath = outputLocation) - - val namingProtocol = FileNamingProtocol.instantiate( - sparkSession.sessionState.conf.fileNamingProtocolClass, - jobId = jobId, - outputPath = outputLocation, - commitProtocol = committer) + val namingProtocol = new BatchFileNamingProtocol(jobId) FileFormatWriter.write( sparkSession = sparkSession, From db2594d7f8f05541d23752be7b5a3c20c550c6d1 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Fri, 18 Jun 2021 02:52:03 -0700 Subject: [PATCH 09/10] Try to fix unit test failure --- .../spark/sql/execution/datasources/FileFormatDataWriter.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala index fa6bd3719054..afa8e56f18eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala @@ -266,7 +266,6 @@ abstract class BaseDynamicPartitionDataWriter( description.customPartitionLocations.get(PartitioningUtils.parsePathFragment(dir)) } val currentPath = if (customPath.isDefined) { - committer.newTaskTempFileAbsPath(taskAttemptContext, customPath.get, ext) // Include a UUID here to prevent file collisions for one task writing to different dirs. val relativePath = UUID.randomUUID().toString + "-" + namingProtocol.getTaskTempPath(taskAttemptContext, FileContext(ext, None, None)) From 0f3df0f45dc12768c7b9843e84e28a50b279443e Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Fri, 18 Jun 2021 18:51:31 -0700 Subject: [PATCH 10/10] Try to fix build --- .../spark/internal/io/cloud/PathOutputCommitProtocol.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala index 76747534ddd2..1529d6f9dcd9 100644 --- a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala +++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala @@ -126,7 +126,7 @@ class PathOutputCommitProtocol( */ override def newTaskTempFile(taskContext: TaskAttemptContext, relativePath: String): String = { val file = new Path(committer.getWorkPath, relativePath) - logTrace(s"Creating task file $file with relative path $dir") + logTrace(s"Creating task file $file with relative path $relativePath") file.toString } }