Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ private[spark] object SQLConf {

// The output committer class used by FSBasedRelation. The specified class needs to be a
// subclass of org.apache.hadoop.mapreduce.OutputCommitter.
// NOTE: This property should be set in Hadoop `Configuration` rather than Spark `SQLConf`
val OUTPUT_COMMITTER_CLASS = "spark.sql.sources.outputCommitterClass"

// Whether to perform eager analysis when constructing a dataframe.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,13 @@ private[sql] class ParquetRelation2(
classOf[ParquetOutputCommitter],
classOf[ParquetOutputCommitter])

if (conf.get("spark.sql.parquet.output.committer.class") == null) {
logInfo("Using default output committer for Parquet: " +
classOf[ParquetOutputCommitter].getCanonicalName)
} else {
logInfo("Using user defined output committer for Parquet: " + committerClass.getCanonicalName)
}

conf.setClass(
SQLConf.OUTPUT_COMMITTER_CLASS,
committerClass,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,16 @@ private[sql] abstract class BaseWriterContainer(
def driverSideSetup(): Unit = {
setupIDs(0, 0, 0)
setupConf()
taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)

// This preparation must happen before initializing output format and output committer, since
// their initialization involves the job configuration, which can be potentially decorated in
// `relation.prepareJobForWrite`.
// Order of the following two lines is important. For Hadoop 1, TaskAttemptContext constructor
// clones the Configuration object passed in. If we initialize the TaskAttemptContext first,
// configurations made in prepareJobForWrite(job) are not populated into the TaskAttemptContext.
//
// Also, the `prepareJobForWrite` call must happen before initializing output format and output
// committer, since their initialization involve the job configuration, which can be potentially
// decorated in `prepareJobForWrite`.
outputWriterFactory = relation.prepareJobForWrite(job)
taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)

outputFormatClass = job.getOutputFormatClass
outputCommitter = newOutputCommitter(taskAttemptContext)
Expand Down Expand Up @@ -331,6 +335,8 @@ private[sql] abstract class BaseWriterContainer(
SQLConf.OUTPUT_COMMITTER_CLASS, null, classOf[OutputCommitter])

Option(committerClass).map { clazz =>
logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")

// Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
// has an associated output committer. To override this output committer,
// we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
Expand All @@ -350,7 +356,9 @@ private[sql] abstract class BaseWriterContainer(
}.getOrElse {
// If output committer class is not set, we will use the one associated with the
// file output format.
outputFormatClass.newInstance().getOutputCommitter(context)
val outputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
logInfo(s"Using output committer class ${outputCommitter.getClass.getCanonicalName}")
outputCommitter
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,18 @@ import scala.reflect.runtime.universe.TypeTag

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.scalatest.BeforeAndAfterAll
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
import org.apache.parquet.example.data.simple.SimpleGroup
import org.apache.parquet.example.data.{Group, GroupWriter}
import org.apache.parquet.hadoop.api.WriteSupport
import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
import org.apache.parquet.hadoop.metadata.{ParquetMetadata, FileMetaData, CompressionCodecName}
import org.apache.parquet.hadoop.{Footer, ParquetFileWriter, ParquetWriter}
import org.apache.parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata}
import org.apache.parquet.hadoop.{Footer, ParquetFileWriter, ParquetOutputCommitter, ParquetWriter}
import org.apache.parquet.io.api.RecordConsumer
import org.apache.parquet.schema.{MessageType, MessageTypeParser}
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.util.DateUtils
Expand Down Expand Up @@ -196,7 +198,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {

withParquetDataFrame(allNulls :: Nil) { df =>
val rows = df.collect()
assert(rows.size === 1)
assert(rows.length === 1)
assert(rows.head === Row(Seq.fill(5)(null): _*))
}
}
Expand All @@ -209,7 +211,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {

withParquetDataFrame(allNones :: Nil) { df =>
val rows = df.collect()
assert(rows.size === 1)
assert(rows.length === 1)
assert(rows.head === Row(Seq.fill(3)(null): _*))
}
}
Expand Down Expand Up @@ -379,6 +381,8 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
}

test("SPARK-6352 DirectParquetOutputCommitter") {
val clonedConf = new Configuration(configuration)

// Write to a parquet file and let it fail.
// _temporary should be missing if direct output committer works.
try {
Expand All @@ -393,14 +397,46 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
val fs = path.getFileSystem(configuration)
assert(!fs.exists(path))
}
} finally {
// Hadoop 1 doesn't have `Configuration.unset`
configuration.clear()
clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
}
finally {
configuration.set("spark.sql.parquet.output.committer.class",
"org.apache.parquet.hadoop.ParquetOutputCommitter")
}

test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overriden") {
withTempPath { dir =>
val clonedConf = new Configuration(configuration)

configuration.set(
SQLConf.OUTPUT_COMMITTER_CLASS, classOf[ParquetOutputCommitter].getCanonicalName)

configuration.set(
"spark.sql.parquet.output.committer.class",
classOf[BogusParquetOutputCommitter].getCanonicalName)

try {
val message = intercept[SparkException] {
sqlContext.range(0, 1).write.parquet(dir.getCanonicalPath)
}.getCause.getMessage
assert(message === "Intentional exception for testing purposes")
} finally {
// Hadoop 1 doesn't have `Configuration.unset`
configuration.clear()
clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
}
}
}
}

class BogusParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
extends ParquetOutputCommitter(outputPath, context) {

override def commitJob(jobContext: JobContext): Unit = {
sys.error("Intentional exception for testing purposes")
}
}

class ParquetDataSourceOnIOSuite extends ParquetIOSuiteBase with BeforeAndAfterAll {
private lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi

Expand Down