Skip to content

Commit a8583ee

Browse files
committed
Create a saveAsNewAPIHadoopDataset method
1 parent e7423d4 commit a8583ee

File tree

2 files changed

+69
-45
lines changed

2 files changed

+69
-45
lines changed

core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import com.google.common.base.Optional
2626
import org.apache.hadoop.conf.Configuration
2727
import org.apache.hadoop.io.compress.CompressionCodec
2828
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
29-
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
29+
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job}
3030

3131
import org.apache.spark.{HashPartitioner, Partitioner}
3232
import org.apache.spark.Partitioner._
@@ -558,6 +558,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
558558
rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
559559
}
560560

561+
/**
562+
* Output the RDD to any Hadoop-supported storage system, using
563+
* a org.apache.hadoop.mapreduce.Job object for that storage system.
564+
*/
565+
def saveAsNewAPIHadoopDataset(job: Job) {
566+
rdd.saveAsNewAPIHadoopDataset(job)
567+
}
568+
561569
/** Output the RDD to any Hadoop-supported file system. */
562570
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
563571
path: String,

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 60 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,21 @@ import scala.collection.mutable.ArrayBuffer
2929
import scala.reflect.ClassTag
3030

3131
import com.clearspring.analytics.stream.cardinality.HyperLogLog
32+
<<<<<<< HEAD
3233
import org.apache.hadoop.conf.{Configurable, Configuration}
3334
import org.apache.hadoop.fs.{FileSystem, Path}
35+
=======
36+
import org.apache.hadoop.conf.Configuration
37+
>>>>>>> Create a saveAsNewAPIHadoopDataset method
3438
import org.apache.hadoop.io.SequenceFile.CompressionType
3539
import org.apache.hadoop.io.compress.CompressionCodec
3640
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
3741
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => NewAPIHadoopJob, RecordWriter => NewRecordWriter, JobContext, SparkHadoopMapReduceUtil}
42+
<<<<<<< HEAD
3843
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
44+
=======
45+
46+
>>>>>>> Create a saveAsNewAPIHadoopDataset method
3947

4048
// SparkHadoopWriter and SparkHadoopMapReduceUtil are actually source files defined in Spark.
4149
import org.apache.hadoop.mapred.SparkHadoopWriter
@@ -603,50 +611,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
603611
val job = new NewAPIHadoopJob(conf)
604612
job.setOutputKeyClass(keyClass)
605613
job.setOutputValueClass(valueClass)
606-
607-
val wrappedConf = new SerializableWritable(job.getConfiguration)
608-
val outpath = new Path(path)
609-
NewFileOutputFormat.setOutputPath(job, outpath)
610-
val jobFormat = outputFormatClass.newInstance
611-
jobFormat.checkOutputSpecs(job)
612-
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
613-
val jobtrackerID = formatter.format(new Date())
614-
val stageId = self.id
615-
def writeShard(context: TaskContext, iter: Iterator[(K,V)]): Int = {
616-
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
617-
// around by taking a mod. We expect that no task will be attempted 2 billion times.
618-
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
619-
/* "reduce task" <split #> <attempt # = spark task #> */
620-
val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
621-
attemptNumber)
622-
val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
623-
val format = outputFormatClass.newInstance
624-
format match {
625-
case c: Configurable => c.setConf(wrappedConf.value)
626-
case _ => ()
627-
}
628-
val committer = format.getOutputCommitter(hadoopContext)
629-
committer.setupTask(hadoopContext)
630-
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
631-
while (iter.hasNext) {
632-
val (k, v) = iter.next()
633-
writer.write(k, v)
634-
}
635-
writer.close(hadoopContext)
636-
committer.commitTask(hadoopContext)
637-
return 1
638-
}
639-
640-
/* apparently we need a TaskAttemptID to construct an OutputCommitter;
641-
* however we're only going to use this local OutputCommitter for
642-
* setupJob/commitJob, so we just use a dummy "map" task.
643-
*/
644-
val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0)
645-
val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
646-
val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
647-
jobCommitter.setupJob(jobTaskContext)
648-
self.context.runJob(self, writeShard _)
649-
jobCommitter.commitJob(jobTaskContext)
614+
job.setOutputFormatClass(outputFormatClass)
615+
job.getConfiguration.set("mapred.output.dir", path)
616+
saveAsNewAPIHadoopDataset(job)
650617
}
651618

652619
/**
@@ -692,6 +659,55 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
692659
saveAsHadoopDataset(conf)
693660
}
694661

662+
/**
663+
* Output the RDD to any Hadoop-supported storage system with new Hadoop API, using a Hadoop
664+
* Job object for that storage system. The Job should set an OutputFormat and any output paths
665+
* required (e.g. a table name to write to) in the same way as it would be configured for a Hadoop
666+
* MapReduce job.
667+
*/
668+
def saveAsNewAPIHadoopDataset(job: NewAPIHadoopJob) {
669+
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
670+
val jobtrackerID = formatter.format(new Date())
671+
val stageId = self.id
672+
val wrappedConf = new SerializableWritable(job.getConfiguration)
673+
val outfmt = job.getOutputFormatClass
674+
val outputFormatInstance = outfmt.newInstance()
675+
676+
if (outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) {
677+
// FileOutputFormat ignores the filesystem parameter
678+
val conf = job.getConfiguration
679+
outputFormatInstance.checkOutputSpecs(job)
680+
}
681+
682+
def writeShard(context: TaskContext, iter: Iterator[(K,V)]): Int = {
683+
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
684+
// around by taking a mod. We expect that no task will be attempted 2 billion times.
685+
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
686+
/* "reduce task" <split #> <attempt # = spark task #> */
687+
val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
688+
attemptNumber)
689+
val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
690+
val format = outfmt.newInstance
691+
val committer = format.getOutputCommitter(hadoopContext)
692+
committer.setupTask(hadoopContext)
693+
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
694+
while (iter.hasNext) {
695+
val (k, v) = iter.next()
696+
writer.write(k, v)
697+
}
698+
writer.close(hadoopContext)
699+
committer.commitTask(hadoopContext)
700+
return 1
701+
}
702+
val jobFormat = outfmt.newInstance
703+
val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0)
704+
val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
705+
val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
706+
jobCommitter.setupJob(jobTaskContext)
707+
self.context.runJob(self, writeShard _).sum
708+
jobCommitter.commitJob(jobTaskContext)
709+
}
710+
695711
/**
696712
* Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for
697713
* that storage system. The JobConf should set an OutputFormat and any output paths required

0 commit comments

Comments
 (0)