Skip to content

Commit 51be443

Browse files
committed
Replaces FSBasedRelation.outputCommitterClass with FSBasedRelation.prepareForWrite
1 parent c4ed4fe commit 51be443

File tree

2 files changed

+20
-20
lines changed

2 files changed

+20
-20
lines changed

sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -229,39 +229,35 @@ private[sql] abstract class BaseWriterContainer(
229229

230230
protected val dataSchema = relation.dataSchema
231231

232-
protected val outputCommitterClass: Class[_ <: FileOutputCommitter] =
233-
relation.outputCommitterClass
234-
235232
protected val outputWriterClass: Class[_ <: OutputWriter] = relation.outputWriterClass
236233

234+
private var outputFormatClass: Class[_ <: OutputFormat[_, _]] = _
235+
237236
def driverSideSetup(): Unit = {
238237
setupIDs(0, 0, 0)
239238
setupConf()
240239
taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
241-
outputCommitter = newOutputCommitter(outputCommitterClass, outputPath, taskAttemptContext)
240+
relation.prepareForWrite(job)
241+
outputFormatClass = job.getOutputFormatClass
242+
outputCommitter = newOutputCommitter(taskAttemptContext)
242243
outputCommitter.setupJob(jobContext)
243244
}
244245

245246
def executorSideSetup(taskContext: TaskContext): Unit = {
246247
setupIDs(taskContext.stageId(), taskContext.partitionId(), taskContext.attemptNumber())
247248
setupConf()
248249
taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
249-
outputCommitter = newOutputCommitter(outputCommitterClass, outputPath, taskAttemptContext)
250+
outputCommitter = newOutputCommitter(taskAttemptContext)
250251
outputCommitter.setupTask(taskAttemptContext)
251252
initWriters()
252253
}
253254

254-
private def newOutputCommitter(
255-
clazz: Class[_ <: FileOutputCommitter],
256-
path: String,
257-
context: TaskAttemptContext): FileOutputCommitter = {
258-
val ctor = outputCommitterClass.getConstructor(classOf[Path], classOf[TaskAttemptContext])
259-
ctor.setAccessible(true)
260-
261-
val hadoopPath = new Path(path)
262-
val fs = hadoopPath.getFileSystem(serializableConf.value)
263-
val qualified = fs.makeQualified(hadoopPath)
264-
ctor.newInstance(qualified, context)
255+
private def newOutputCommitter(context: TaskAttemptContext): FileOutputCommitter = {
256+
outputFormatClass.newInstance().getOutputCommitter(context) match {
257+
case f: FileOutputCommitter => f
258+
case f => sys.error(
259+
s"FileOutputCommitter or its subclass is expected, but got a ${f.getClass.getName}.")
260+
}
265261
}
266262

267263
private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = {

sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.sources
1919

2020
import org.apache.hadoop.conf.Configuration
2121
import org.apache.hadoop.fs.{FileStatus, Path}
22-
import org.apache.hadoop.mapreduce.TaskAttemptContext
22+
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
2323
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
2424

2525
import org.apache.spark.annotation.{DeveloperApi, Experimental}
@@ -432,10 +432,14 @@ abstract class FSBasedRelation private[sql](
432432
}
433433

434434
/**
435-
* The output committer class to use. Default to
436-
* [[org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter]].
435+
* Client side preparation for data writing can be put here. For example, user defined output
436+
* committer can be configured here.
437+
*
438+
* Note that the only side effect expected here is mutating `job` via its setters. Especially,
439+
* Spark SQL caches [[BaseRelation]] instances for performance, mutating relation internal states
440+
* may cause unexpected behaviors.
437441
*/
438-
def outputCommitterClass: Class[_ <: FileOutputCommitter] = classOf[FileOutputCommitter]
442+
def prepareForWrite(job: Job): Unit = ()
439443

440444
/**
441445
* This method is responsible for producing a new [[OutputWriter]] for each newly opened output

0 commit comments

Comments
 (0)