Skip to content

Commit 0bc6ad1

Browse files
committed
Resorts to new Hadoop API, and now FSBasedRelation can customize output format class
1 parent f320766 commit 0bc6ad1

File tree

3 files changed

+59
-44
lines changed

3 files changed

+59
-44
lines changed

sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala

Lines changed: 47 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,27 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17+
1718
package org.apache.spark.sql.sources
1819

19-
import java.util
2020
import java.util.Date
2121

2222
import scala.collection.mutable
2323

24+
import org.apache.hadoop.conf.Configuration
2425
import org.apache.hadoop.fs.Path
25-
import org.apache.hadoop.mapred._
26+
import org.apache.hadoop.mapreduce._
27+
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
2628
import org.apache.hadoop.util.Shell
29+
import parquet.hadoop.util.ContextUtil
2730

2831
import org.apache.spark._
2932
import org.apache.spark.mapred.SparkHadoopMapRedUtil
33+
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
3034
import org.apache.spark.sql.catalyst.expressions._
3135
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3236
import org.apache.spark.sql.execution.RunnableCommand
33-
import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext}
37+
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
3438

3539
private[sql] case class InsertIntoDataSource(
3640
logicalRelation: LogicalRelation,
@@ -79,21 +83,23 @@ private[sql] case class InsertIntoFSBasedRelation(
7983
}
8084

8185
if (doInsertion) {
82-
val jobConf = new JobConf(hadoopConf)
83-
jobConf.setOutputKeyClass(classOf[Void])
84-
jobConf.setOutputValueClass(classOf[Row])
85-
FileOutputFormat.setOutputPath(jobConf, outputPath)
86+
val job = Job.getInstance(hadoopConf)
87+
job.setOutputKeyClass(classOf[Void])
88+
job.setOutputValueClass(classOf[Row])
89+
FileOutputFormat.setOutputPath(job, outputPath)
90+
91+
val jobConf: Configuration = ContextUtil.getConfiguration(job)
8692

8793
val df = sqlContext.createDataFrame(
8894
DataFrame(sqlContext, query).queryExecution.toRdd,
8995
relation.schema,
9096
needsConversion = false)
9197

9298
if (partitionColumns.isEmpty) {
93-
insert(new DefaultWriterContainer(relation, jobConf), df)
99+
insert(new DefaultWriterContainer(relation, job), df)
94100
} else {
95101
val writerContainer = new DynamicPartitionWriterContainer(
96-
relation, jobConf, partitionColumns, "__HIVE_DEFAULT_PARTITION__")
102+
relation, job, partitionColumns, "__HIVE_DEFAULT_PARTITION__")
97103
insertWithDynamicPartitions(writerContainer, df, partitionColumns)
98104
}
99105
}
@@ -169,6 +175,7 @@ private[sql] case class InsertIntoFSBasedRelation(
169175
writerContainer.commitJob()
170176
relation.refreshPartitions()
171177
} catch { case cause: Throwable =>
178+
logError("Aborting job.", cause)
172179
writerContainer.abortJob()
173180
throw new SparkException("Job aborted.", cause)
174181
}
@@ -193,24 +200,22 @@ private[sql] case class InsertIntoFSBasedRelation(
193200

194201
private[sql] abstract class BaseWriterContainer(
195202
@transient val relation: FSBasedRelation,
196-
@transient jobConf: JobConf)
197-
extends SparkHadoopMapRedUtil
203+
@transient job: Job)
204+
extends SparkHadoopMapReduceUtil
198205
with Logging
199206
with Serializable {
200207

201-
protected val serializableJobConf = new SerializableWritable(jobConf)
208+
protected val serializableConf = new SerializableWritable(ContextUtil.getConfiguration(job))
202209

203210
// This is only used on driver side.
204-
@transient private var jobContext: JobContext = _
205-
206-
// This is only used on executor side.
207-
@transient private var taskAttemptContext: TaskAttemptContext = _
211+
@transient private var jobContext: JobContext = job
208212

209213
// The following fields are initialized and used on both driver and executor side.
210214
@transient private var outputCommitter: OutputCommitter = _
211215
@transient private var jobId: JobID = _
212216
@transient private var taskId: TaskID = _
213217
@transient private var taskAttemptId: TaskAttemptID = _
218+
@transient private var taskAttemptContext: TaskAttemptContext = _
214219

215220
protected val outputPath = {
216221
assert(
@@ -221,22 +226,25 @@ private[sql] abstract class BaseWriterContainer(
221226

222227
protected val dataSchema = relation.dataSchema
223228

229+
protected val outputFormatClass: Class[_ <: OutputFormat[Void, Row]] = relation.outputFormatClass
230+
224231
protected val outputWriterClass: Class[_ <: OutputWriter] = relation.outputWriterClass
225232

226233
def driverSideSetup(): Unit = {
227234
setupIDs(0, 0, 0)
228-
relation.prepareForWrite(serializableJobConf.value)
229-
setupJobConf()
230-
jobContext = newJobContext(jobConf, jobId)
231-
outputCommitter = jobConf.getOutputCommitter
235+
setupConf()
236+
taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
237+
val outputFormat = relation.outputFormatClass.newInstance()
238+
outputCommitter = outputFormat.getOutputCommitter(taskAttemptContext)
232239
outputCommitter.setupJob(jobContext)
233240
}
234241

235242
def executorSideSetup(taskContext: TaskContext): Unit = {
236243
setupIDs(taskContext.stageId(), taskContext.partitionId(), taskContext.attemptNumber())
237-
setupJobConf()
238-
taskAttemptContext = newTaskAttemptContext(serializableJobConf.value, taskAttemptId)
239-
outputCommitter = serializableJobConf.value.getOutputCommitter
244+
setupConf()
245+
taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
246+
val outputFormat = outputFormatClass.newInstance()
247+
outputCommitter = outputFormat.getOutputCommitter(taskAttemptContext)
240248
outputCommitter.setupTask(taskAttemptContext)
241249
initWriters()
242250
}
@@ -247,20 +255,20 @@ private[sql] abstract class BaseWriterContainer(
247255
this.taskAttemptId = new TaskAttemptID(taskId, attemptId)
248256
}
249257

250-
private def setupJobConf(): Unit = {
251-
serializableJobConf.value.set("mapred.job.id", jobId.toString)
252-
serializableJobConf.value.set("mapred.tip.id", taskAttemptId.getTaskID.toString)
253-
serializableJobConf.value.set("mapred.task.id", taskAttemptId.toString)
254-
serializableJobConf.value.setBoolean("mapred.task.is.map", true)
255-
serializableJobConf.value.setInt("mapred.task.partition", 0)
258+
private def setupConf(): Unit = {
259+
serializableConf.value.set("mapred.job.id", jobId.toString)
260+
serializableConf.value.set("mapred.tip.id", taskAttemptId.getTaskID.toString)
261+
serializableConf.value.set("mapred.task.id", taskAttemptId.toString)
262+
serializableConf.value.setBoolean("mapred.task.is.map", true)
263+
serializableConf.value.setInt("mapred.task.partition", 0)
256264
}
257265

258266
// Called on executor side when writing rows
259267
def outputWriterForRow(row: Row): OutputWriter
260268

261269
protected def initWriters(): Unit = {
262270
val writer = outputWriterClass.newInstance()
263-
writer.init(outputPath, dataSchema, serializableJobConf.value)
271+
writer.init(outputPath, dataSchema, serializableConf.value)
264272
mutable.Map(outputPath -> writer)
265273
}
266274

@@ -280,21 +288,21 @@ private[sql] abstract class BaseWriterContainer(
280288
}
281289

282290
def abortJob(): Unit = {
283-
outputCommitter.abortJob(jobContext, JobStatus.FAILED)
291+
outputCommitter.abortJob(jobContext, JobStatus.State.FAILED)
284292
logError(s"Job $jobId aborted.")
285293
}
286294
}
287295

288296
private[sql] class DefaultWriterContainer(
289297
@transient relation: FSBasedRelation,
290-
@transient conf: JobConf)
291-
extends BaseWriterContainer(relation, conf) {
298+
@transient job: Job)
299+
extends BaseWriterContainer(relation, job) {
292300

293301
@transient private var writer: OutputWriter = _
294302

295303
override protected def initWriters(): Unit = {
296304
writer = relation.outputWriterClass.newInstance()
297-
writer.init(outputPath, dataSchema, serializableJobConf.value)
305+
writer.init(outputPath, dataSchema, serializableConf.value)
298306
}
299307

300308
override def outputWriterForRow(row: Row): OutputWriter = writer
@@ -312,10 +320,10 @@ private[sql] class DefaultWriterContainer(
312320

313321
private[sql] class DynamicPartitionWriterContainer(
314322
@transient relation: FSBasedRelation,
315-
@transient conf: JobConf,
323+
@transient job: Job,
316324
partitionColumns: Array[String],
317325
defaultPartitionName: String)
318-
extends BaseWriterContainer(relation, conf) {
326+
extends BaseWriterContainer(relation, job) {
319327

320328
// All output writers are created on executor side.
321329
@transient protected var outputWriters: mutable.Map[String, OutputWriter] = _
@@ -338,7 +346,7 @@ private[sql] class DynamicPartitionWriterContainer(
338346
outputWriters.getOrElseUpdate(partitionPath, {
339347
val path = new Path(outputPath, partitionPath.stripPrefix(Path.SEPARATOR))
340348
val writer = outputWriterClass.newInstance()
341-
writer.init(path.toString, dataSchema, serializableJobConf.value)
349+
writer.init(path.toString, dataSchema, serializableConf.value)
342350
writer
343351
})
344352
}
@@ -356,7 +364,7 @@ private[sql] class DynamicPartitionWriterContainer(
356364

357365
private[sql] object DynamicPartitionWriterContainer {
358366
val charToEscape = {
359-
val bitSet = new util.BitSet(128)
367+
val bitSet = new java.util.BitSet(128)
360368

361369
/**
362370
* ASCII 01-1F are HTTP control characters that need to be escaped.
@@ -379,7 +387,7 @@ private[sql] object DynamicPartitionWriterContainer {
379387
}
380388

381389
def needsEscaping(c: Char): Boolean = {
382-
c >= 0 && c < charToEscape.size() && charToEscape.get(c);
390+
c >= 0 && c < charToEscape.size() && charToEscape.get(c)
383391
}
384392

385393
def escapePathName(path: String): String = {

sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.apache.spark.sql.sources
1919

2020
import org.apache.hadoop.conf.Configuration
2121
import org.apache.hadoop.fs.{FileStatus, Path}
22+
import org.apache.hadoop.mapreduce.{OutputFormat, OutputCommitter}
23+
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
2224

2325
import org.apache.spark.annotation.{DeveloperApi, Experimental}
2426
import org.apache.spark.deploy.SparkHadoopUtil
@@ -363,7 +365,7 @@ abstract class FSBasedRelation private[sql](
363365
/**
364366
* For a non-partitioned relation, this method builds an `RDD[Row]` containing all rows within
365367
* this relation. For partitioned relations, this method is called for each selected partition,
366-
* and builds an `RDD[Row]` containg all rows within that single partition.
368+
* and builds an `RDD[Row]` containing all rows within that single partition.
367369
*
368370
* @param inputPaths For a non-partitioned relation, it contains paths of all data files in the
369371
* relation. For a partitioned relation, it contains paths of all data files in a single
@@ -377,7 +379,7 @@ abstract class FSBasedRelation private[sql](
377379
/**
378380
* For a non-partitioned relation, this method builds an `RDD[Row]` containing all rows within
379381
* this relation. For partitioned relations, this method is called for each selected partition,
380-
* and builds an `RDD[Row]` containg all rows within that single partition.
382+
* and builds an `RDD[Row]` containing all rows within that single partition.
381383
*
382384
* @param requiredColumns Required columns.
383385
* @param inputPaths For a non-partitioned relation, it contains paths of all data files in the
@@ -391,7 +393,7 @@ abstract class FSBasedRelation private[sql](
391393
/**
392394
* For a non-partitioned relation, this method builds an `RDD[Row]` containing all rows within
393395
* this relation. For partitioned relations, this method is called for each selected partition,
394-
* and builds an `RDD[Row]` containg all rows within that single partition.
396+
* and builds an `RDD[Row]` containing all rows within that single partition.
395397
*
396398
* @param requiredColumns Required columns.
397399
* @param filters Candidate filters to be pushed down. The actual filter should be the conjunction
@@ -409,7 +411,7 @@ abstract class FSBasedRelation private[sql](
409411
buildScan(requiredColumns, inputPaths)
410412
}
411413

412-
def prepareForWrite(conf: Configuration): Unit
414+
def outputFormatClass: Class[_ <: OutputFormat[Void, Row]]
413415

414416
/**
415417
* This method is responsible for producing a new [[OutputWriter]] for each newly opened output

sql/hive/src/test/scala/org/apache/spark/sql/sources/FSBasedRelationSuite.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import scala.collection.mutable
2222
import com.google.common.base.Objects
2323
import org.apache.hadoop.conf.Configuration
2424
import org.apache.hadoop.fs.{FileSystem, Path}
25+
import org.apache.hadoop.mapreduce.OutputFormat
26+
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
2527
import org.scalatest.BeforeAndAfter
2628

2729
import org.apache.spark.rdd.RDD
@@ -108,7 +110,10 @@ class SimpleFSBasedRelation
108110

109111
override def outputWriterClass: Class[_ <: OutputWriter] = classOf[SimpleOutputWriter]
110112

111-
override def prepareForWrite(conf: Configuration): Unit = ()
113+
override def outputFormatClass: Class[_ <: OutputFormat[Void, Row]] = {
114+
// This is just a mock, not used within this test suite.
115+
classOf[TextOutputFormat[Void, Row]]
116+
}
112117
}
113118

114119
object TestResult {

0 commit comments

Comments
 (0)