Skip to content

Commit 69b7d65

Browse files
committed
log warning in hive part
1 parent d7c6f1d commit 69b7d65

File tree

2 files changed

+26
-11
lines changed

2 files changed

+26
-11
lines changed

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1052,10 +1052,14 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
10521052
// When speculation is on and output committer class name contains "Direct", we should warn
10531053
// users that they may loss data if they are using a direct output committer.
10541054
val speculationEnabled = self.conf.getBoolean("spark.speculation", false)
1055-
if (speculationEnabled &&
1056-
hadoopConf.get("mapred.output.committer.class", "").contains("Direct")) {
1057-
logWarning("We may loss data when use direct output committer with speculation enabled, " +
1058-
"please make sure your output committer doesn't write data directly.")
1055+
val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "")
1056+
if (speculationEnabled && outputCommitterClass.contains("Direct")) {
1057+
val warningMessage =
1058+
s"$outputCommitterClass may be an output committer that writes data directly to " +
1059+
"the final location. Because speculation is enabled, this output committer may " +
1060+
"cause data loss (see the case in SPARK-10063). If possible, please use a output " +
1061+
"committer that does not have this behavior (e.g. FileOutputCommitter)."
1062+
logWarning(warningMessage)
10591063
}
10601064

10611065
FileOutputFormat.setOutputPath(hadoopConf,
@@ -1135,9 +1139,14 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
11351139
// When speculation is on and output committer class name contains "Direct", we should warn
11361140
// users that they may loss data if they are using a direct output committer.
11371141
val speculationEnabled = self.conf.getBoolean("spark.speculation", false)
1138-
if (speculationEnabled && jobCommitter.getClass.getSimpleName.contains("Direct")) {
1139-
logWarning("We may loss data when use direct output committer with speculation enabled, " +
1140-
"please make sure your output committer doesn't write data directly.")
1142+
val outputCommitterClass = jobCommitter.getClass.getSimpleName
1143+
if (speculationEnabled && outputCommitterClass.contains("Direct")) {
1144+
val warningMessage =
1145+
s"$outputCommitterClass may be an output committer that writes data directly to " +
1146+
"the final location. Because speculation is enabled, this output committer may " +
1147+
"cause data loss (see the case in SPARK-10063). If possible, please use a output " +
1148+
"committer that does not have this behavior (e.g. FileOutputCommitter)."
1149+
logWarning(warningMessage)
11411150
}
11421151

11431152
jobCommitter.setupJob(jobTaskContext)

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -178,11 +178,17 @@ case class InsertIntoHiveTable(
178178
val jobConf = new JobConf(sc.hiveconf)
179179
val jobConfSer = new SerializableJobConf(jobConf)
180180

181-
// When speculation is enabled, it's not safe to use customized output committer classes,
182-
// especially direct output committees (e.g. `DirectParquetOutputCommitter`).
181+
// When speculation is on and output committer class name contains "Direct", we should warn
182+
// users that they may loss data if they are using a direct output committer.
183183
val speculationEnabled = sqlContext.sparkContext.conf.getBoolean("spark.speculation", false)
184-
if (speculationEnabled) {
185-
jobConf.setOutputCommitter(classOf[FileOutputCommitter])
184+
val outputCommitterClass = jobConf.get("mapred.output.committer.class", "")
185+
if (speculationEnabled && outputCommitterClass.contains("Direct")) {
186+
val warningMessage =
187+
s"$outputCommitterClass may be an output committer that writes data directly to " +
188+
"the final location. Because speculation is enabled, this output committer may " +
189+
"cause data loss (see the case in SPARK-10063). If possible, please use a output " +
190+
"committer that does not have this behavior (e.g. FileOutputCommitter)."
191+
logWarning(warningMessage)
186192
}
187193

188194
val writerContainer = if (numDynamicPartitions > 0) {

0 commit comments

Comments
 (0)