Skip to content

Commit bbdfc0a

Browse files
lianchengyhuai
authored andcommitted
[SPARK-8121] [SQL] Fixes InsertIntoHadoopFsRelation job initialization for Hadoop 1.x
For Hadoop 1.x, `TaskAttemptContext` constructor clones the `Configuration` argument, thus configurations done in `HadoopFsRelation.prepareForWriteJob()` are not populated to *driver* side `TaskAttemptContext` (executor side configurations are properly populated). Currently this should only affect Parquet output committer class configuration. Author: Cheng Lian <[email protected]> Closes #6669 from liancheng/spark-8121 and squashes the following commits: 73819e8 [Cheng Lian] Minor logging fix fce089c [Cheng Lian] Adds more logging b6f78a6 [Cheng Lian] Fixes compilation error introduced while rebasing 963a1aa [Cheng Lian] Addresses @yhuai's comment c3a0b1a [Cheng Lian] Fixes InsertIntoHadoopFsRelation job initialization
1 parent ed5c2dc commit bbdfc0a

File tree

4 files changed

+65
-13
lines changed

4 files changed

+65
-13
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ private[spark] object SQLConf {
7676

7777
// The output committer class used by FSBasedRelation. The specified class needs to be a
7878
// subclass of org.apache.hadoop.mapreduce.OutputCommitter.
79+
// NOTE: This property should be set in Hadoop `Configuration` rather than Spark `SQLConf`
7980
val OUTPUT_COMMITTER_CLASS = "spark.sql.sources.outputCommitterClass"
8081

8182
// Whether to perform eager analysis when constructing a dataframe.

sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,13 @@ private[sql] class ParquetRelation2(
212212
classOf[ParquetOutputCommitter],
213213
classOf[ParquetOutputCommitter])
214214

215+
if (conf.get("spark.sql.parquet.output.committer.class") == null) {
216+
logInfo("Using default output committer for Parquet: " +
217+
classOf[ParquetOutputCommitter].getCanonicalName)
218+
} else {
219+
logInfo("Using user defined output committer for Parquet: " + committerClass.getCanonicalName)
220+
}
221+
215222
conf.setClass(
216223
SQLConf.OUTPUT_COMMITTER_CLASS,
217224
committerClass,

sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -297,12 +297,16 @@ private[sql] abstract class BaseWriterContainer(
297297
def driverSideSetup(): Unit = {
298298
setupIDs(0, 0, 0)
299299
setupConf()
300-
taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
301300

302-
// This preparation must happen before initializing output format and output committer, since
303-
// their initialization involves the job configuration, which can be potentially decorated in
304-
// `relation.prepareJobForWrite`.
301+
// Order of the following two lines is important. For Hadoop 1, TaskAttemptContext constructor
302+
// clones the Configuration object passed in. If we initialize the TaskAttemptContext first,
303+
// configurations made in prepareJobForWrite(job) are not populated into the TaskAttemptContext.
304+
//
305+
// Also, the `prepareJobForWrite` call must happen before initializing output format and output
306+
// committer, since their initialization involve the job configuration, which can be potentially
307+
// decorated in `prepareJobForWrite`.
305308
outputWriterFactory = relation.prepareJobForWrite(job)
309+
taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
306310

307311
outputFormatClass = job.getOutputFormatClass
308312
outputCommitter = newOutputCommitter(taskAttemptContext)
@@ -331,6 +335,8 @@ private[sql] abstract class BaseWriterContainer(
331335
SQLConf.OUTPUT_COMMITTER_CLASS, null, classOf[OutputCommitter])
332336

333337
Option(committerClass).map { clazz =>
338+
logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
339+
334340
// Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
335341
// has an associated output committer. To override this output committer,
336342
// we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
@@ -350,7 +356,9 @@ private[sql] abstract class BaseWriterContainer(
350356
}.getOrElse {
351357
// If output committer class is not set, we will use the one associated with the
352358
// file output format.
353-
outputFormatClass.newInstance().getOutputCommitter(context)
359+
val outputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
360+
logInfo(s"Using output committer class ${outputCommitter.getClass.getCanonicalName}")
361+
outputCommitter
354362
}
355363
}
356364

sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,18 @@ import scala.reflect.runtime.universe.TypeTag
2323

2424
import org.apache.hadoop.conf.Configuration
2525
import org.apache.hadoop.fs.{FileSystem, Path}
26-
import org.scalatest.BeforeAndAfterAll
26+
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
2727
import org.apache.parquet.example.data.simple.SimpleGroup
2828
import org.apache.parquet.example.data.{Group, GroupWriter}
2929
import org.apache.parquet.hadoop.api.WriteSupport
3030
import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
31-
import org.apache.parquet.hadoop.metadata.{ParquetMetadata, FileMetaData, CompressionCodecName}
32-
import org.apache.parquet.hadoop.{Footer, ParquetFileWriter, ParquetWriter}
31+
import org.apache.parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata}
32+
import org.apache.parquet.hadoop.{Footer, ParquetFileWriter, ParquetOutputCommitter, ParquetWriter}
3333
import org.apache.parquet.io.api.RecordConsumer
3434
import org.apache.parquet.schema.{MessageType, MessageTypeParser}
35+
import org.scalatest.BeforeAndAfterAll
3536

37+
import org.apache.spark.SparkException
3638
import org.apache.spark.sql.catalyst.ScalaReflection
3739
import org.apache.spark.sql.catalyst.expressions.Row
3840
import org.apache.spark.sql.catalyst.util.DateUtils
@@ -196,7 +198,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
196198

197199
withParquetDataFrame(allNulls :: Nil) { df =>
198200
val rows = df.collect()
199-
assert(rows.size === 1)
201+
assert(rows.length === 1)
200202
assert(rows.head === Row(Seq.fill(5)(null): _*))
201203
}
202204
}
@@ -209,7 +211,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
209211

210212
withParquetDataFrame(allNones :: Nil) { df =>
211213
val rows = df.collect()
212-
assert(rows.size === 1)
214+
assert(rows.length === 1)
213215
assert(rows.head === Row(Seq.fill(3)(null): _*))
214216
}
215217
}
@@ -379,6 +381,8 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
379381
}
380382

381383
test("SPARK-6352 DirectParquetOutputCommitter") {
384+
val clonedConf = new Configuration(configuration)
385+
382386
// Write to a parquet file and let it fail.
383387
// _temporary should be missing if direct output committer works.
384388
try {
@@ -393,14 +397,46 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
393397
val fs = path.getFileSystem(configuration)
394398
assert(!fs.exists(path))
395399
}
400+
} finally {
401+
// Hadoop 1 doesn't have `Configuration.unset`
402+
configuration.clear()
403+
clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
396404
}
397-
finally {
398-
configuration.set("spark.sql.parquet.output.committer.class",
399-
"org.apache.parquet.hadoop.ParquetOutputCommitter")
405+
}
406+
407+
test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overriden") {
408+
withTempPath { dir =>
409+
val clonedConf = new Configuration(configuration)
410+
411+
configuration.set(
412+
SQLConf.OUTPUT_COMMITTER_CLASS, classOf[ParquetOutputCommitter].getCanonicalName)
413+
414+
configuration.set(
415+
"spark.sql.parquet.output.committer.class",
416+
classOf[BogusParquetOutputCommitter].getCanonicalName)
417+
418+
try {
419+
val message = intercept[SparkException] {
420+
sqlContext.range(0, 1).write.parquet(dir.getCanonicalPath)
421+
}.getCause.getMessage
422+
assert(message === "Intentional exception for testing purposes")
423+
} finally {
424+
// Hadoop 1 doesn't have `Configuration.unset`
425+
configuration.clear()
426+
clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
427+
}
400428
}
401429
}
402430
}
403431

432+
class BogusParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
433+
extends ParquetOutputCommitter(outputPath, context) {
434+
435+
override def commitJob(jobContext: JobContext): Unit = {
436+
sys.error("Intentional exception for testing purposes")
437+
}
438+
}
439+
404440
class ParquetDataSourceOnIOSuite extends ParquetIOSuiteBase with BeforeAndAfterAll {
405441
private lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi
406442

0 commit comments

Comments
 (0)