-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-20236][SQL] dynamic partition overwrite #18714
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,8 +39,19 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil | |
| * | ||
| * @param jobId the job's or stage's id | ||
| * @param path the job's output path, or null if committer acts as a noop | ||
| * @param dynamicPartitionOverwrite If true, Spark will overwrite partition directories at runtime | ||
| * dynamically, i.e., we first write files under a staging | ||
| * directory with partition path, e.g. | ||
| * /path/to/staging/a=1/b=1/xxx.parquet. When committing the job, | ||
| * we first clean up the corresponding partition directories at | ||
| * destination path, e.g. /path/to/destination/a=1/b=1, and move | ||
| * files from staging directory to the corresponding partition | ||
| * directories under destination path. | ||
| */ | ||
| class HadoopMapReduceCommitProtocol(jobId: String, path: String) | ||
| class HadoopMapReduceCommitProtocol( | ||
| jobId: String, | ||
| path: String, | ||
| dynamicPartitionOverwrite: Boolean = false) | ||
| extends FileCommitProtocol with Serializable with Logging { | ||
|
|
||
| import FileCommitProtocol._ | ||
|
|
@@ -67,9 +78,17 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) | |
| @transient private var addedAbsPathFiles: mutable.Map[String, String] = null | ||
|
|
||
| /** | ||
| * The staging directory for all files committed with absolute output paths. | ||
| * Tracks partitions with default path that have new files written into them by this task, | ||
| * e.g. a=1/b=2. Files under these partitions will be saved into staging directory and moved to | ||
| * destination directory at the end, if `dynamicPartitionOverwrite` is true. | ||
| */ | ||
| private def absPathStagingDir: Path = new Path(path, "_temporary-" + jobId) | ||
| @transient private var partitionPaths: mutable.Set[String] = null | ||
|
||
|
|
||
| /** | ||
| * The staging directory of this write job. Spark uses it to deal with files with absolute output | ||
| * path, or writing data into partitioned directory with dynamicPartitionOverwrite=true. | ||
| */ | ||
| private def stagingDir = new Path(path, ".spark-staging-" + jobId) | ||
|
|
||
| protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = { | ||
| val format = context.getOutputFormatClass.newInstance() | ||
|
|
@@ -85,11 +104,16 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) | |
| taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { | ||
| val filename = getFilename(taskContext, ext) | ||
|
|
||
| val stagingDir: String = committer match { | ||
| val stagingDir: Path = committer match { | ||
| case _ if dynamicPartitionOverwrite => | ||
| assert(dir.isDefined, | ||
| "The dataset to be written must be partitioned when runtimeOverwritePartition is true.") | ||
|
||
| partitionPaths += dir.get | ||
| this.stagingDir | ||
| // For FileOutputCommitter it has its own staging path called "work path". | ||
| case f: FileOutputCommitter => | ||
| Option(f.getWorkPath).map(_.toString).getOrElse(path) | ||
| case _ => path | ||
| new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path)) | ||
| case _ => new Path(path) | ||
| } | ||
|
|
||
| dir.map { d => | ||
|
|
@@ -106,8 +130,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) | |
|
|
||
| // Include a UUID here to prevent file collisions for one task writing to different dirs. | ||
| // In principle we could include hash(absoluteDir) instead but this is simpler. | ||
| val tmpOutputPath = new Path( | ||
| absPathStagingDir, UUID.randomUUID().toString() + "-" + filename).toString | ||
| val tmpOutputPath = new Path(stagingDir, UUID.randomUUID().toString() + "-" + filename).toString | ||
|
|
||
| addedAbsPathFiles(tmpOutputPath) = absOutputPath | ||
| tmpOutputPath | ||
|
|
@@ -141,37 +164,57 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) | |
|
|
||
| override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = { | ||
| committer.commitJob(jobContext) | ||
| val filesToMove = taskCommits.map(_.obj.asInstanceOf[Map[String, String]]) | ||
| .foldLeft(Map[String, String]())(_ ++ _) | ||
| logDebug(s"Committing files staged for absolute locations $filesToMove") | ||
|
|
||
| if (hasValidPath) { | ||
| val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration) | ||
| val (allAbsPathFiles, allPartitionPaths) = | ||
| taskCommits.map(_.obj.asInstanceOf[(Map[String, String], Set[String])]).unzip | ||
| val fs = stagingDir.getFileSystem(jobContext.getConfiguration) | ||
|
|
||
| val filesToMove = allAbsPathFiles.foldLeft(Map[String, String]())(_ ++ _) | ||
| logDebug(s"Committing files staged for absolute locations $filesToMove") | ||
| if (dynamicPartitionOverwrite) { | ||
| val absPartitionPaths = filesToMove.values.map(new Path(_).getParent).toSet | ||
| logDebug(s"Clean up absolute partition directories for overwriting: $absPartitionPaths") | ||
| absPartitionPaths.foreach(fs.delete(_, true)) | ||
| } | ||
| for ((src, dst) <- filesToMove) { | ||
| fs.rename(new Path(src), new Path(dst)) | ||
| } | ||
| fs.delete(absPathStagingDir, true) | ||
|
|
||
| if (dynamicPartitionOverwrite) { | ||
| val partitionPaths = allPartitionPaths.foldLeft(Set[String]())(_ ++ _) | ||
| logDebug(s"Clean up default partition directories for overwriting: $partitionPaths") | ||
| for (part <- partitionPaths) { | ||
| val finalPartPath = new Path(path, part) | ||
| fs.delete(finalPartPath, true) | ||
| fs.rename(new Path(stagingDir, part), finalPartPath) | ||
| } | ||
| } | ||
|
|
||
| fs.delete(stagingDir, true) | ||
| } | ||
| } | ||
|
|
||
| override def abortJob(jobContext: JobContext): Unit = { | ||
| committer.abortJob(jobContext, JobStatus.State.FAILED) | ||
| if (hasValidPath) { | ||
| val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration) | ||
| fs.delete(absPathStagingDir, true) | ||
| val fs = stagingDir.getFileSystem(jobContext.getConfiguration) | ||
| fs.delete(stagingDir, true) | ||
| } | ||
| } | ||
|
|
||
| override def setupTask(taskContext: TaskAttemptContext): Unit = { | ||
| committer = setupCommitter(taskContext) | ||
| committer.setupTask(taskContext) | ||
| addedAbsPathFiles = mutable.Map[String, String]() | ||
| partitionPaths = mutable.Set[String]() | ||
| } | ||
|
|
||
| override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { | ||
| val attemptId = taskContext.getTaskAttemptID | ||
| SparkHadoopMapRedUtil.commitTask( | ||
| committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) | ||
| new TaskCommitMessage(addedAbsPathFiles.toMap) | ||
| new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet) | ||
| } | ||
|
|
||
| override def abortTask(taskContext: TaskAttemptContext): Unit = { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indents.