From 2a87bc0df2790674563c2a621cdbc674202bc63a Mon Sep 17 00:00:00 2001 From: duripeng Date: Fri, 27 Mar 2020 10:28:37 +0800 Subject: [PATCH 1/6] [SPARK-27194][SPARK-29302][SQL] Fix commit collision in dynamic partition overwrite mode --- .../internal/io/FileCommitProtocol.scala | 4 ++ .../io/HadoopMapReduceCommitProtocol.scala | 30 +++++++----- .../InsertIntoHadoopFsRelationCommand.scala | 12 ++++- .../SQLHadoopMapReduceCommitProtocol.scala | 3 +- .../sql/sources/PartitionedWriteSuite.scala | 47 ++++++++++++++++++- 5 files changed, 79 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index 0746e43babf9..d9d7b06cdb8c 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -169,4 +169,8 @@ object FileCommitProtocol extends Logging { ctor.newInstance(jobId, outputPath) } } + + def getStagingDir(path: String, jobId: String): Path = { + new Path(path, ".spark-staging-" + jobId) + } } diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 11ce608f52ee..43a6a29b3f98 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -41,13 +41,17 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil * @param jobId the job's or stage's id * @param path the job's output path, or null if committer acts as a noop * @param dynamicPartitionOverwrite If true, Spark will overwrite partition directories at runtime - * dynamically, i.e., we first write files under a staging - * directory with partition path, e.g. - * /path/to/staging/a=1/b=1/xxx.parquet. When committing the job, - * we first clean up the corresponding partition directories at - * destination path, e.g. /path/to/destination/a=1/b=1, and move - * files from staging directory to the corresponding partition - * directories under destination path. + * dynamically, i.e., for speculative tasks, we first write files + * to task attempt paths under a staging directory, e.g. + * /path/to/staging/.spark-staging-{jobId}/_temporary/ + * {appAttemptId}/_temporary/{taskAttemptId}/a=1/b=1/xxx.parquet. + * When committing the job, we first move files from task attempt + * paths to corresponding partition directories under the staging + * directory, e.g. + * /path/to/staging/.spark-staging-{jobId}/a=1/b=1. + * Secondly, move the partition directories under staging + * directory to partition directories under destination path, + * e.g. /path/to/destination/a=1/b=1 */ class HadoopMapReduceCommitProtocol( jobId: String, @@ -89,7 +93,7 @@ class HadoopMapReduceCommitProtocol( * The staging directory of this write job. Spark uses it to deal with files with absolute output * path, or writing data into partitioned directory with dynamicPartitionOverwrite=true. */ - private def stagingDir = new Path(path, ".spark-staging-" + jobId) + protected def stagingDir = getStagingDir(path, jobId) protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = { val format = context.getOutputFormatClass.getConstructor().newInstance() @@ -106,13 +110,13 @@ class HadoopMapReduceCommitProtocol( val filename = getFilename(taskContext, ext) val stagingDir: Path = committer match { - case _ if dynamicPartitionOverwrite => - assert(dir.isDefined, - "The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.") - partitionPaths += dir.get - this.stagingDir // For FileOutputCommitter it has its own staging path called "work path". case f: FileOutputCommitter => + if (dynamicPartitionOverwrite) { + assert(dir.isDefined, + "The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.") + partitionPaths += dir.get + } new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path)) case _ => new Path(path) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index fe733f4238e1..522888a8a494 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -106,9 +106,10 @@ case class InsertIntoHadoopFsRelationCommand( fs, catalogTable.get, qualifiedOutputPath, matchingPartitions) } + val jobId = java.util.UUID.randomUUID().toString val committer = FileCommitProtocol.instantiate( sparkSession.sessionState.conf.fileCommitProtocolClass, - jobId = java.util.UUID.randomUUID().toString, + jobId = jobId, outputPath = outputPath.toString, dynamicPartitionOverwrite = dynamicPartitionOverwrite) @@ -163,6 +164,13 @@ case class InsertIntoHadoopFsRelationCommand( } } + // For dynamic partition overwrite, FileOutputCommitter's output path is staging path, files + // will be renamed from staging path to final output path during commit job + val committerOutputPath = if (dynamicPartitionOverwrite) { + FileCommitProtocol.getStagingDir(outputPath.toString, jobId) + .makeQualified(fs.getUri, fs.getWorkingDirectory) + } else qualifiedOutputPath + val updatedPartitionPaths = FileFormatWriter.write( sparkSession = sparkSession, @@ -170,7 +178,7 @@ case class InsertIntoHadoopFsRelationCommand( fileFormat = fileFormat, committer = committer, outputSpec = FileFormatWriter.OutputSpec( - qualifiedOutputPath.toString, customPartitionLocations, outputColumns), + committerOutputPath.toString, customPartitionLocations, outputColumns), hadoopConf = hadoopConf, partitionColumns = partitionColumns, bucketSpec = bucketSpec, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala index 39c594a9bc61..144be2316f09 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala @@ -55,7 +55,8 @@ class SQLHadoopMapReduceCommitProtocol( // The specified output committer is a FileOutputCommitter. // So, we will use the FileOutputCommitter-specified constructor. val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext]) - committer = ctor.newInstance(new Path(path), context) + val committerOutputPath = if (dynamicPartitionOverwrite) stagingDir else new Path(path) + committer = ctor.newInstance(committerOutputPath, context) } else { // The specified output committer is just an OutputCommitter. // So, we will use the no-argument constructor. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala index 6df1c5db14c2..f89837f09fe7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala @@ -20,10 +20,11 @@ package org.apache.spark.sql.sources import java.io.File import java.sql.Timestamp -import org.apache.hadoop.mapreduce.TaskAttemptContext +import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} import org.apache.spark.TestUtils import org.apache.spark.internal.Logging +import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils import org.apache.spark.sql.catalyst.util.DateTimeUtils @@ -164,4 +165,48 @@ class PartitionedWriteSuite extends QueryTest with SharedSparkSession { assert(e.getMessage.contains("Found duplicate column(s) b, b: `b`;")) } } + + test("SPARK-27194 SPARK-29302: Fix commit collision in dynamic partition overwrite mode") { + withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> + SQLConf.PartitionOverwriteMode.DYNAMIC.toString, + SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> + classOf[PartitionFileExistCommitProtocol].getName) { + withTempDir { d => + withTable("t") { + sql( + s""" + | create table t(c1 int, p1 int) using parquet partitioned by (p1) + | location '${d.getAbsolutePath}' + """.stripMargin) + + val df = Seq((1, 2)).toDF("c1", "p1") + df.write + .partitionBy("p1") + .mode("overwrite") + .saveAsTable("t") + checkAnswer(sql("select * from t"), df) + } + } + } + } +} + +/** + * A file commit protocol with pre-created partition file. when try to overwrite partition dir + * in dynamic partition mode, FileAlreadyExist exception would raise without SPARK-31968 + */ +private class PartitionFileExistCommitProtocol( + jobId: String, + path: String, + dynamicPartitionOverwrite: Boolean) + extends HadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite) { + override def setupJob(jobContext: JobContext): Unit = { + super.setupJob(jobContext) + val stagingDir = new File(path, s".spark-staging-$jobId") + stagingDir.mkdirs() + val stagingPartDir = new File(stagingDir, "p1=2") + stagingPartDir.mkdirs() + val conflictTaskFile = new File(stagingPartDir, s"part-00000-$jobId.c000.snappy.parquet") + conflictTaskFile.createNewFile() + } } From 269f09ba0348adfd5177ccdd9fe6487ee004c783 Mon Sep 17 00:00:00 2001 From: duripeng Date: Mon, 3 Aug 2020 02:11:12 +0800 Subject: [PATCH 2/6] correct annotation and code clean --- .../io/HadoopMapReduceCommitProtocol.scala | 22 ++++++++++++------- .../InsertIntoHadoopFsRelationCommand.scala | 4 +++- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 43a6a29b3f98..d3d2fcbe4d37 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -41,17 +41,23 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil * @param jobId the job's or stage's id * @param path the job's output path, or null if committer acts as a noop * @param dynamicPartitionOverwrite If true, Spark will overwrite partition directories at runtime - * dynamically, i.e., for speculative tasks, we first write files - * to task attempt paths under a staging directory, e.g. - * /path/to/staging/.spark-staging-{jobId}/_temporary/ + * dynamically, i.e., we first write files to task attempt paths + * under a staging directory, e.g. + * /path/to/outputPath/.spark-staging-{jobId}/_temporary/ * {appAttemptId}/_temporary/{taskAttemptId}/a=1/b=1/xxx.parquet. - * When committing the job, we first move files from task attempt + * 1. When [[FileOutputCommitter]] algorithm version set to 1, + * we firstly move files from task attempt * paths to corresponding partition directories under the staging - * directory, e.g. - * /path/to/staging/.spark-staging-{jobId}/a=1/b=1. + * directory during committing job, e.g. + * /path/to/outputPath/.spark-staging-{jobId}/a=1/b=1. * Secondly, move the partition directories under staging - * directory to partition directories under destination path, - * e.g. /path/to/destination/a=1/b=1 + * directory to destination path, e.g. /path/to/outputPath/a=1/b=1 + * 2. When [[FileOutputCommitter]] algorithm version set to 2, + * committing tasks directly move files to staging directory, + * e.g. /path/to/outputPath/.spark-staging-{jobId}/a=1/b=1. + * Then move this partition directories under staging directory + * to destination path during job committing, e.g. + * /path/to/outputPath/a=1/b=1 */ class HadoopMapReduceCommitProtocol( jobId: String, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 522888a8a494..db7264d0c6ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -169,7 +169,9 @@ case class InsertIntoHadoopFsRelationCommand( val committerOutputPath = if (dynamicPartitionOverwrite) { FileCommitProtocol.getStagingDir(outputPath.toString, jobId) .makeQualified(fs.getUri, fs.getWorkingDirectory) - } else qualifiedOutputPath + } else { + qualifiedOutputPath + } val updatedPartitionPaths = FileFormatWriter.write( From 16e219ac76495156b83033987e238b9d3f1282d7 Mon Sep 17 00:00:00 2001 From: duripeng Date: Thu, 20 Aug 2020 16:35:35 +0800 Subject: [PATCH 3/6] fix ut create issue --- .../io/HadoopMapReduceCommitProtocol.scala | 18 +++++++----------- .../sql/sources/PartitionedWriteSuite.scala | 5 +++-- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index d3d2fcbe4d37..407d603af4a5 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -46,18 +46,14 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil * /path/to/outputPath/.spark-staging-{jobId}/_temporary/ * {appAttemptId}/_temporary/{taskAttemptId}/a=1/b=1/xxx.parquet. * 1. When [[FileOutputCommitter]] algorithm version set to 1, - * we firstly move files from task attempt - * paths to corresponding partition directories under the staging - * directory during committing job, e.g. - * /path/to/outputPath/.spark-staging-{jobId}/a=1/b=1. - * Secondly, move the partition directories under staging - * directory to destination path, e.g. /path/to/outputPath/a=1/b=1 + * we firstly move files from task attempt paths to + * /path/to/outputPath/.spark-staging-{jobId}/a=1/b=1, + * then move them to /path/to/outputPath/a=1/b=1 * 2. When [[FileOutputCommitter]] algorithm version set to 2, - * committing tasks directly move files to staging directory, - * e.g. /path/to/outputPath/.spark-staging-{jobId}/a=1/b=1. - * Then move this partition directories under staging directory - * to destination path during job committing, e.g. - * /path/to/outputPath/a=1/b=1 + * committing tasks directly move files to + * /path/to/outputPath/.spark-staging-{jobId}/a=1/b=1, + * then move them to /path/to/outputPath/a=1/b=1 at the end of job + * committing. */ class HadoopMapReduceCommitProtocol( jobId: String, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala index f89837f09fe7..efcfde9db044 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.sources import java.io.File import java.sql.Timestamp +import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} import org.apache.spark.TestUtils @@ -193,7 +194,7 @@ class PartitionedWriteSuite extends QueryTest with SharedSparkSession { /** * A file commit protocol with pre-created partition file. when try to overwrite partition dir - * in dynamic partition mode, FileAlreadyExist exception would raise without SPARK-31968 + * in dynamic partition mode, FileAlreadyExist exception would raise without SPARK-27194 */ private class PartitionFileExistCommitProtocol( jobId: String, @@ -202,7 +203,7 @@ private class PartitionFileExistCommitProtocol( extends HadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite) { override def setupJob(jobContext: JobContext): Unit = { super.setupJob(jobContext) - val stagingDir = new File(path, s".spark-staging-$jobId") + val stagingDir = new File(new Path(path).toUri.getPath, s".spark-staging-$jobId") stagingDir.mkdirs() val stagingPartDir = new File(stagingDir, "p1=2") stagingPartDir.mkdirs() From 8b131faaaa33b260bc03ae09b9c624e2e8ab0529 Mon Sep 17 00:00:00 2001 From: duripeng Date: Thu, 20 Aug 2020 19:24:09 +0800 Subject: [PATCH 4/6] trigger ut --- .../org/apache/spark/sql/sources/PartitionedWriteSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala index efcfde9db044..4b28359ae378 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala @@ -181,6 +181,7 @@ class PartitionedWriteSuite extends QueryTest with SharedSparkSession { """.stripMargin) val df = Seq((1, 2)).toDF("c1", "p1") + // trigger ut df.write .partitionBy("p1") .mode("overwrite") From d306538a143451693c9be3aa7e1386e32d99abb0 Mon Sep 17 00:00:00 2001 From: duripeng Date: Thu, 20 Aug 2020 19:24:34 +0800 Subject: [PATCH 5/6] Revert "trigger ut" This reverts commit 8b131faaaa33b260bc03ae09b9c624e2e8ab0529. --- .../org/apache/spark/sql/sources/PartitionedWriteSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala index 4b28359ae378..efcfde9db044 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala @@ -181,7 +181,6 @@ class PartitionedWriteSuite extends QueryTest with SharedSparkSession { """.stripMargin) val df = Seq((1, 2)).toDF("c1", "p1") - // trigger ut df.write .partitionBy("p1") .mode("overwrite") From 85aa12a618ceadfe510a4f9fc3718a746a1bc357 Mon Sep 17 00:00:00 2001 From: duripeng Date: Wed, 18 Nov 2020 22:28:52 +0800 Subject: [PATCH 6/6] revise comment & change ut extends --- .../io/HadoopMapReduceCommitProtocol.scala | 27 ++++++++++++------- .../sql/sources/PartitionedWriteSuite.scala | 3 +-- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 407d603af4a5..30f9a650a69c 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -41,19 +41,28 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil * @param jobId the job's or stage's id * @param path the job's output path, or null if committer acts as a noop * @param dynamicPartitionOverwrite If true, Spark will overwrite partition directories at runtime - * dynamically, i.e., we first write files to task attempt paths - * under a staging directory, e.g. + * dynamically. Suppose final path is /path/to/outputPath, output + * path of [[FileOutputCommitter]] is an intermediate path, e.g. + * /path/to/outputPath/.spark-staging-{jobId}, which is a staging + * directory. Task attempts firstly write files under the + * intermediate path, e.g. * /path/to/outputPath/.spark-staging-{jobId}/_temporary/ * {appAttemptId}/_temporary/{taskAttemptId}/a=1/b=1/xxx.parquet. + * * 1. When [[FileOutputCommitter]] algorithm version set to 1, - * we firstly move files from task attempt paths to - * /path/to/outputPath/.spark-staging-{jobId}/a=1/b=1, - * then move them to /path/to/outputPath/a=1/b=1 + * we firstly move task attempt output files to + * /path/to/outputPath/.spark-staging-{jobId}/_temporary/ + * {appAttemptId}/{taskId}/a=1/b=1, + * then move them to + * /path/to/outputPath/.spark-staging-{jobId}/a=1/b=1. * 2. When [[FileOutputCommitter]] algorithm version set to 2, - * committing tasks directly move files to - * /path/to/outputPath/.spark-staging-{jobId}/a=1/b=1, - * then move them to /path/to/outputPath/a=1/b=1 at the end of job - * committing. + * committing tasks directly move task attempt output files to + * /path/to/outputPath/.spark-staging-{jobId}/a=1/b=1. + * + * At the end of committing job, we move output files from + * intermediate path to final path, e.g., move files from + * /path/to/outputPath/.spark-staging-{jobId}/a=1/b=1 + * to /path/to/outputPath/a=1/b=1 */ class HadoopMapReduceCommitProtocol( jobId: String, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala index efcfde9db044..52825a155e46 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala @@ -25,7 +25,6 @@ import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} import org.apache.spark.TestUtils import org.apache.spark.internal.Logging -import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils import org.apache.spark.sql.catalyst.util.DateTimeUtils @@ -200,7 +199,7 @@ private class PartitionFileExistCommitProtocol( jobId: String, path: String, dynamicPartitionOverwrite: Boolean) - extends HadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite) { + extends SQLHadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite) { override def setupJob(jobContext: JobContext): Unit = { super.setupJob(jobContext) val stagingDir = new File(new Path(path).toUri.getPath, s".spark-staging-$jobId")