Skip to content

Commit dfdf3ef

Browse files
committed
Uses commit coordinator to help committing Hive and Parquet tables
1 parent 9f3273b commit dfdf3ef

File tree

6 files changed

+94
-73
lines changed

6 files changed

+94
-73
lines changed

core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala

Lines changed: 2 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import org.apache.hadoop.mapred._
2626
import org.apache.hadoop.fs.FileSystem
2727
import org.apache.hadoop.fs.Path
2828

29-
import org.apache.spark.executor.CommitDeniedException
3029
import org.apache.spark.mapred.SparkHadoopMapRedUtil
3130
import org.apache.spark.rdd.HadoopRDD
3231

@@ -104,55 +103,8 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
104103
}
105104

106105
def commit() {
107-
val taCtxt = getTaskContext()
108-
val cmtr = getOutputCommitter()
109-
110-
// Called after we have decided to commit
111-
def performCommit(): Unit = {
112-
try {
113-
cmtr.commitTask(taCtxt)
114-
logInfo (s"$taID: Committed")
115-
} catch {
116-
case e: IOException =>
117-
logError("Error committing the output of task: " + taID.value, e)
118-
cmtr.abortTask(taCtxt)
119-
throw e
120-
}
121-
}
122-
123-
// First, check whether the task's output has already been committed by some other attempt
124-
if (cmtr.needsTaskCommit(taCtxt)) {
125-
// The task output needs to be committed, but we don't know whether some other task attempt
126-
// might be racing to commit the same output partition. Therefore, coordinate with the driver
127-
// in order to determine whether this attempt can commit (see SPARK-4879).
128-
val shouldCoordinateWithDriver: Boolean = {
129-
val sparkConf = SparkEnv.get.conf
130-
// We only need to coordinate with the driver if there are multiple concurrent task
131-
// attempts, which should only occur if speculation is enabled
132-
val speculationEnabled = sparkConf.getBoolean("spark.speculation", false)
133-
// This (undocumented) setting is an escape-hatch in case the commit code introduces bugs
134-
sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", speculationEnabled)
135-
}
136-
if (shouldCoordinateWithDriver) {
137-
val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
138-
val canCommit = outputCommitCoordinator.canCommit(jobID, splitID, attemptID)
139-
if (canCommit) {
140-
performCommit()
141-
} else {
142-
val msg = s"$taID: Not committed because the driver did not authorize commit"
143-
logInfo(msg)
144-
// We need to abort the task so that the driver can reschedule new attempts, if necessary
145-
cmtr.abortTask(taCtxt)
146-
throw new CommitDeniedException(msg, jobID, splitID, attemptID)
147-
}
148-
} else {
149-
// Speculation is disabled or a user has chosen to manually bypass the commit coordination
150-
performCommit()
151-
}
152-
} else {
153-
// Some other attempt committed the output, so we do nothing and signal success
154-
logInfo(s"No need to commit output of task because needsTaskCommit=false: ${taID.value}")
155-
}
106+
SparkHadoopMapRedUtil.commitTask(
107+
getOutputCommitter(), getTaskContext(), jobID, splitID, attemptID)
156108
}
157109

158110
def commitJob() {

core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,15 @@
1717

1818
package org.apache.spark.mapred
1919

20+
import java.io.IOException
2021
import java.lang.reflect.Modifier
2122

22-
import org.apache.hadoop.mapred.{TaskAttemptID, JobID, JobConf, JobContext, TaskAttemptContext}
23+
import org.apache.hadoop.mapred._
24+
import org.apache.hadoop.mapreduce.{TaskAttemptContext => MapReduceTaskAttemptContext}
25+
import org.apache.hadoop.mapreduce.{OutputCommitter => MapReduceOutputCommitter}
26+
27+
import org.apache.spark.executor.CommitDeniedException
28+
import org.apache.spark.{Logging, SparkEnv, TaskContext}
2329

2430
private[spark]
2531
trait SparkHadoopMapRedUtil {
@@ -65,3 +71,77 @@ trait SparkHadoopMapRedUtil {
6571
}
6672
}
6773
}
74+
75+
object SparkHadoopMapRedUtil extends Logging {
76+
def commitTask(
77+
committer: MapReduceOutputCommitter,
78+
mrTaskContext: MapReduceTaskAttemptContext,
79+
sparkTaskContext: TaskContext): Unit = {
80+
commitTask(
81+
committer,
82+
mrTaskContext,
83+
sparkTaskContext.stageId(),
84+
sparkTaskContext.partitionId(),
85+
sparkTaskContext.attemptNumber())
86+
}
87+
88+
def commitTask(
89+
committer: MapReduceOutputCommitter,
90+
mrTaskContext: MapReduceTaskAttemptContext,
91+
jobId: Int,
92+
splitId: Int,
93+
attemptId: Int): Unit = {
94+
95+
val mrTaskAttemptID = mrTaskContext.getTaskAttemptID
96+
97+
// Called after we have decided to commit
98+
def performCommit(): Unit = {
99+
try {
100+
committer.commitTask(mrTaskContext)
101+
logInfo(s"$mrTaskAttemptID: Committed")
102+
} catch {
103+
case cause: IOException =>
104+
logError(s"Error committing the output of task: $mrTaskAttemptID", cause)
105+
committer.abortTask(mrTaskContext)
106+
throw cause
107+
}
108+
}
109+
110+
// First, check whether the task's output has already been committed by some other attempt
111+
if (committer.needsTaskCommit(mrTaskContext)) {
112+
// The task output needs to be committed, but we don't know whether some other task attempt
113+
// might be racing to commit the same output partition. Therefore, coordinate with the driver
114+
// in order to determine whether this attempt can commit (see SPARK-4879).
115+
val shouldCoordinateWithDriver: Boolean = {
116+
val sparkConf = SparkEnv.get.conf
117+
// We only need to coordinate with the driver if there are multiple concurrent task
118+
// attempts, which should only occur if speculation is enabled
119+
val speculationEnabled = sparkConf.getBoolean("spark.speculation", defaultValue = false)
120+
// This (undocumented) setting is an escape-hatch in case the commit code introduces bugs
121+
sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", speculationEnabled)
122+
}
123+
124+
if (shouldCoordinateWithDriver) {
125+
val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
126+
val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, attemptId)
127+
128+
if (canCommit) {
129+
performCommit()
130+
} else {
131+
val message =
132+
s"$mrTaskAttemptID: Not committed because the driver did not authorize commit"
133+
logInfo(message)
134+
// We need to abort the task so that the driver can reschedule new attempts, if necessary
135+
committer.abortTask(mrTaskContext)
136+
throw new CommitDeniedException(message, jobId, splitId, attemptId)
137+
}
138+
} else {
139+
// Speculation is disabled or a user has chosen to manually bypass the commit coordination
140+
performCommit()
141+
}
142+
} else {
143+
// Some other attempt committed the output, so we do nothing and signal success
144+
logInfo(s"No need to commit output of task because needsTaskCommit=false: $mrTaskAttemptID")
145+
}
146+
}
147+
}

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,9 @@ package org.apache.spark.sql.parquet
1919

2020
import java.io.IOException
2121
import java.lang.{Long => JLong}
22-
import java.text.SimpleDateFormat
23-
import java.text.NumberFormat
22+
import java.text.{NumberFormat, SimpleDateFormat}
2423
import java.util.concurrent.{Callable, TimeUnit}
25-
import java.util.{ArrayList, Collections, Date, List => JList}
24+
import java.util.{Date, List => JList}
2625

2726
import scala.collection.JavaConversions._
2827
import scala.collection.mutable
@@ -43,12 +42,13 @@ import parquet.io.ParquetDecodingException
4342
import parquet.schema.MessageType
4443

4544
import org.apache.spark.annotation.DeveloperApi
45+
import org.apache.spark.mapred.SparkHadoopMapRedUtil
4646
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
4747
import org.apache.spark.rdd.RDD
4848
import org.apache.spark.sql.SQLConf
4949
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row, _}
5050
import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
51-
import org.apache.spark.sql.types.{DataType, StructType}
51+
import org.apache.spark.sql.types.StructType
5252
import org.apache.spark.{Logging, SerializableWritable, TaskContext}
5353

5454
/**
@@ -356,7 +356,7 @@ private[sql] case class InsertIntoParquetTable(
356356
} finally {
357357
writer.close(hadoopContext)
358358
}
359-
committer.commitTask(hadoopContext)
359+
SparkHadoopMapRedUtil.commitTask(committer, hadoopContext, context)
360360
1
361361
}
362362
val jobFormat = new AppendingParquetOutputFormat(taskIdOffset)
@@ -512,6 +512,7 @@ private[parquet] class FilteringParquetRowInputFormat
512512

513513
import parquet.filter2.compat.FilterCompat.Filter
514514
import parquet.filter2.compat.RowGroupFilter
515+
515516
import org.apache.spark.sql.parquet.FilteringParquetRowInputFormat.blockLocationCache
516517

517518
val cacheMetadata = configuration.getBoolean(SQLConf.PARQUET_CACHE_METADATA, true)

sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import parquet.hadoop.{ParquetInputFormat, _}
4242

4343
import org.apache.spark.annotation.DeveloperApi
4444
import org.apache.spark.deploy.SparkHadoopUtil
45+
import org.apache.spark.mapred.SparkHadoopMapRedUtil
4546
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
4647
import org.apache.spark.rdd.{NewHadoopPartition, NewHadoopRDD, RDD}
4748
import org.apache.spark.sql.catalyst.expressions
@@ -662,7 +663,8 @@ private[sql] case class ParquetRelation2(
662663
} finally {
663664
writer.close(hadoopContext)
664665
}
665-
committer.commitTask(hadoopContext)
666+
667+
SparkHadoopMapRedUtil.commitTask(committer, hadoopContext, context)
666668
}
667669
val jobFormat = new AppendingParquetOutputFormat(taskIdOffset)
668670
/* apparently we need a TaskAttemptID to construct an OutputCommitter;

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ case class InsertIntoHiveTable(
7272
val outputFileFormatClassName = fileSinkConf.getTableInfo.getOutputFileFormatClassName
7373
assert(outputFileFormatClassName != null, "Output format class not set")
7474
conf.value.set("mapred.output.format.class", outputFileFormatClassName)
75-
conf.value.setOutputCommitter(classOf[FileOutputCommitter])
7675

7776
FileOutputFormat.setOutputPath(
7877
conf.value,

sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.spark.sql.hive
1919

20-
import java.io.IOException
2120
import java.text.NumberFormat
2221
import java.util.Date
2322

@@ -118,19 +117,7 @@ private[hive] class SparkHiveWriterContainer(
118117
}
119118

120119
protected def commit() {
121-
if (committer.needsTaskCommit(taskContext)) {
122-
try {
123-
committer.commitTask(taskContext)
124-
logInfo (taID + ": Committed")
125-
} catch {
126-
case e: IOException =>
127-
logError("Error committing the output of task: " + taID.value, e)
128-
committer.abortTask(taskContext)
129-
throw e
130-
}
131-
} else {
132-
logInfo("No need to commit output of task: " + taID.value)
133-
}
120+
SparkHadoopMapRedUtil.commitTask(committer, taskContext, jobID, splitID, attemptID)
134121
}
135122

136123
private def setIDs(jobId: Int, splitId: Int, attemptId: Int) {
@@ -213,7 +200,7 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
213200
.zip(row.toSeq.takeRight(dynamicPartColNames.length))
214201
.map { case (col, rawVal) =>
215202
val string = if (rawVal == null) null else String.valueOf(rawVal)
216-
val colString =
203+
val colString =
217204
if (string == null || string.isEmpty) {
218205
defaultPartName
219206
} else {

0 commit comments

Comments
 (0)