Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 0 additions & 33 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1466,37 +1466,6 @@ Configuration of Parquet can be done using the `setConf` method on `SQLContext`
support.
</td>
</tr>
<tr>
<td><code>spark.sql.parquet.output.committer.class</code></td>
<td><code>org.apache.parquet.hadoop.<br />ParquetOutputCommitter</code></td>
<td>
<p>
The output committer class used by Parquet. The specified class needs to be a subclass of
<code>org.apache.hadoop.<br />mapreduce.OutputCommitter</code>. Typically, it's also a
subclass of <code>org.apache.parquet.hadoop.ParquetOutputCommitter</code>.
</p>
<p>
<b>Note:</b>
<ul>
<li>
This option is automatically ignored if <code>spark.speculation</code> is turned on.
</li>
<li>
This option must be set via Hadoop <code>Configuration</code> rather than Spark
<code>SQLConf</code>.
</li>
<li>
This option overrides <code>spark.sql.sources.<br />outputCommitterClass</code>.
</li>
</ul>
</p>
<p>
Spark SQL comes with a builtin
<code>org.apache.spark.sql.<br />parquet.DirectParquetOutputCommitter</code>, which can be more
efficient then the default Parquet output committer when writing data to S3.
</p>
</td>
</tr>
<tr>
<td><code>spark.sql.parquet.mergeSchema</code></td>
<td><code>false</code></td>
Expand Down Expand Up @@ -2165,8 +2134,6 @@ options.
- In the `sql` dialect, floating point numbers are now parsed as decimal. HiveQL parsing remains
unchanged.
- The canonical name of SQL/DataFrame functions are now lower case (e.g. sum vs SUM).
- It has been determined that using the DirectOutputCommitter when speculation is enabled is unsafe
and thus this output committer will not be used when speculation is on, independent of configuration.
- JSON data source will not automatically load new files that are created by other applications
(i.e. files that are not inserted to the dataset through Spark SQL).
For a JSON persistent table (i.e. the metadata of the table is stored in Hive Metastore),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,17 @@ private[sql] abstract class BaseWriterContainer(
outputWriterFactory.newInstance(path, bucketId, dataSchema, taskAttemptContext)
} catch {
case e: org.apache.hadoop.fs.FileAlreadyExistsException =>
if (outputCommitter.isInstanceOf[parquet.DirectParquetOutputCommitter]) {
// Spark-11382: DirectParquetOutputCommitter is not idempotent, meaning on retry
if (outputCommitter.getClass.getName.contains("Direct")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is pretty brittle/ugly. Is there any other way, such as having an interface covering commit semantics. then the code can go .isInstanceOf[NonAtomicCommitter]?

// SPARK-11382: DirectParquetOutputCommitter is not idempotent, meaning on retry
// attempts, the task will fail because the output file is created from a prior attempt.
// This often means the most visible error to the user is misleading. Augment the error
// to tell the user to look for the actual error.
throw new SparkException("The output file already exists but this could be due to a " +
"failure from an earlier attempt. Look through the earlier logs or stage page for " +
"the first error.\n File exists error: " + e)
"the first error.\n File exists error: " + e.getLocalizedMessage, e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. need to check for that message being null, fallback to e.toString.
  2. Maybe also list the path in question, in case the inner one doesn't.
  3. And, perhaps, the classname of the committer in question

In #12004 I'm proposing a module to add optional cloud tests, documentation, and make sure the relevant hadoop artifacts for s3, openstack and azure got pulled in to releases. It's got a doc page too. Assuming that gets in, the exception text could include a link to it (or better, the ASF link redirector). I'd add a section on commit problems there ....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. oops - it should be getMessage. I'm not sure how it ended up being getLocalizedMessage. I will fix it.
  2. the inner already has the error typically.
  3. this is mostly a hack in case people are using their own direct ouput committer, and isn't really meant to be bulletproof.

} else {
throw e
}
throw e
}
}

Expand All @@ -156,15 +157,6 @@ private[sql] abstract class BaseWriterContainer(
s"Using default output committer ${defaultOutputCommitter.getClass.getCanonicalName} " +
"for appending.")
defaultOutputCommitter
} else if (speculationEnabled) {
// When speculation is enabled, it's not safe to use customized output committer classes,
// especially direct output committers (e.g. `DirectParquetOutputCommitter`).
//
// See SPARK-9899 for more details.
logInfo(
s"Using default output committer ${defaultOutputCommitter.getClass.getCanonicalName} " +
"because spark.speculation is configured to be true.")
defaultOutputCommitter
} else {
val configuration = context.getConfiguration
val committerClass = configuration.getClass(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,6 @@ private[sql] class DefaultSource

val conf = ContextUtil.getConfiguration(job)

// SPARK-9849 DirectParquetOutputCommitter qualified name should be backward compatible
val committerClassName = conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key)
if (committerClassName == "org.apache.spark.sql.parquet.DirectParquetOutputCommitter") {
conf.set(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
classOf[DirectParquetOutputCommitter].getCanonicalName)
}

val committerClass =
conf.getClass(
SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,55 +445,6 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}
}

testQuietly("SPARK-6352 DirectParquetOutputCommitter") {
val clonedConf = new Configuration(hadoopConfiguration)

// Write to a parquet file and let it fail.
// _temporary should be missing if direct output committer works.
try {
hadoopConfiguration.set("spark.sql.parquet.output.committer.class",
classOf[DirectParquetOutputCommitter].getCanonicalName)
sqlContext.udf.register("div0", (x: Int) => x / 0)
withTempPath { dir =>
intercept[org.apache.spark.SparkException] {
sqlContext.sql("select div0(1) as div0").write.parquet(dir.getCanonicalPath)
}
val path = new Path(dir.getCanonicalPath, "_temporary")
val fs = path.getFileSystem(hadoopConfiguration)
assert(!fs.exists(path))
}
} finally {
// Hadoop 1 doesn't have `Configuration.unset`
hadoopConfiguration.clear()
clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
}
}

testQuietly("SPARK-9849 DirectParquetOutputCommitter qualified name backwards compatibility") {
val clonedConf = new Configuration(hadoopConfiguration)

// Write to a parquet file and let it fail.
// _temporary should be missing if direct output committer works.
try {
hadoopConfiguration.set("spark.sql.parquet.output.committer.class",
"org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
sqlContext.udf.register("div0", (x: Int) => x / 0)
withTempPath { dir =>
intercept[org.apache.spark.SparkException] {
sqlContext.sql("select div0(1) as div0").write.parquet(dir.getCanonicalPath)
}
val path = new Path(dir.getCanonicalPath, "_temporary")
val fs = path.getFileSystem(hadoopConfiguration)
assert(!fs.exists(path))
}
} finally {
// Hadoop 1 doesn't have `Configuration.unset`
hadoopConfiguration.clear()
clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
}
}


test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overridden") {
withTempPath { dir =>
val clonedConf = new Configuration(hadoopConfiguration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -668,40 +668,6 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
df.write.format(dataSourceName).partitionBy("c", "d", "e").saveAsTable("t")
}
}

test("SPARK-9899 Disable customized output committer when speculation is on") {
val clonedConf = new Configuration(hadoopConfiguration)
val speculationEnabled =
sqlContext.sparkContext.conf.getBoolean("spark.speculation", defaultValue = false)

try {
withTempPath { dir =>
// Enables task speculation
sqlContext.sparkContext.conf.set("spark.speculation", "true")

// Uses a customized output committer which always fails
hadoopConfiguration.set(
SQLConf.OUTPUT_COMMITTER_CLASS.key,
classOf[AlwaysFailOutputCommitter].getName)

// Code below shouldn't throw since customized output committer should be disabled.
val df = sqlContext.range(10).toDF().coalesce(1)
df.write.format(dataSourceName).save(dir.getCanonicalPath)
checkAnswer(
sqlContext
.read
.format(dataSourceName)
.option("dataSchema", df.schema.json)
.load(dir.getCanonicalPath),
df)
}
} finally {
// Hadoop 1 doesn't have `Configuration.unset`
hadoopConfiguration.clear()
clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
sqlContext.sparkContext.conf.set("spark.speculation", speculationEnabled.toString)
}
}
}

// This class is used to test SPARK-8578. We should not use any custom output committer when
Expand Down