Skip to content

Commit 2fa26ec

Browse files
CodingCatmateiz
authored andcommitted
SPARK-1102: Create a saveAsNewAPIHadoopDataset method
https://spark-project.atlassian.net/browse/SPARK-1102 Create a saveAsNewAPIHadoopDataset method By @mateiz: "Right now RDDs can only be saved as files using the new Hadoop API, not as "datasets" with no filename and just a JobConf. See http://codeforhire.com/2014/02/18/using-spark-with-mongodb/ for an example of how you have to give a bogus filename. For the old Hadoop API, we have saveAsHadoopDataset." Author: CodingCat <[email protected]> Closes apache#12 from CodingCat/SPARK-1102 and squashes the following commits: 6ba0c83 [CodingCat] add test cases for saveAsHadoopDataSet (new&old API) a8d11ba [CodingCat] style fix......... 95a6929 [CodingCat] code clean 7643c88 [CodingCat] change the parameter type back to Configuration a8583ee [CodingCat] Create a saveAsNewAPIHadoopDataset method
1 parent e7423d4 commit 2fa26ec

File tree

3 files changed

+100
-53
lines changed

3 files changed

+100
-53
lines changed

core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import com.google.common.base.Optional
2626
import org.apache.hadoop.conf.Configuration
2727
import org.apache.hadoop.io.compress.CompressionCodec
2828
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
29-
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
29+
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job}
3030

3131
import org.apache.spark.{HashPartitioner, Partitioner}
3232
import org.apache.spark.Partitioner._
@@ -558,6 +558,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
558558
rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
559559
}
560560

561+
/**
562+
* Output the RDD to any Hadoop-supported storage system, using
563+
* a Configuration object for that storage system.
564+
*/
565+
def saveAsNewAPIHadoopDataset(conf: Configuration) {
566+
rdd.saveAsNewAPIHadoopDataset(conf)
567+
}
568+
561569
/** Output the RDD to any Hadoop-supported file system. */
562570
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
563571
path: String,

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 58 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,11 @@ import scala.reflect.ClassTag
3030

3131
import com.clearspring.analytics.stream.cardinality.HyperLogLog
3232
import org.apache.hadoop.conf.{Configurable, Configuration}
33-
import org.apache.hadoop.fs.{FileSystem, Path}
33+
import org.apache.hadoop.fs.FileSystem
3434
import org.apache.hadoop.io.SequenceFile.CompressionType
3535
import org.apache.hadoop.io.compress.CompressionCodec
3636
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
37-
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => NewAPIHadoopJob, RecordWriter => NewRecordWriter, JobContext, SparkHadoopMapReduceUtil}
37+
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => NewAPIHadoopJob, RecordWriter => NewRecordWriter, SparkHadoopMapReduceUtil}
3838
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
3939

4040
// SparkHadoopWriter and SparkHadoopMapReduceUtil are actually source files defined in Spark.
@@ -603,50 +603,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
603603
val job = new NewAPIHadoopJob(conf)
604604
job.setOutputKeyClass(keyClass)
605605
job.setOutputValueClass(valueClass)
606-
607-
val wrappedConf = new SerializableWritable(job.getConfiguration)
608-
val outpath = new Path(path)
609-
NewFileOutputFormat.setOutputPath(job, outpath)
610-
val jobFormat = outputFormatClass.newInstance
611-
jobFormat.checkOutputSpecs(job)
612-
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
613-
val jobtrackerID = formatter.format(new Date())
614-
val stageId = self.id
615-
def writeShard(context: TaskContext, iter: Iterator[(K,V)]): Int = {
616-
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
617-
// around by taking a mod. We expect that no task will be attempted 2 billion times.
618-
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
619-
/* "reduce task" <split #> <attempt # = spark task #> */
620-
val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
621-
attemptNumber)
622-
val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
623-
val format = outputFormatClass.newInstance
624-
format match {
625-
case c: Configurable => c.setConf(wrappedConf.value)
626-
case _ => ()
627-
}
628-
val committer = format.getOutputCommitter(hadoopContext)
629-
committer.setupTask(hadoopContext)
630-
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
631-
while (iter.hasNext) {
632-
val (k, v) = iter.next()
633-
writer.write(k, v)
634-
}
635-
writer.close(hadoopContext)
636-
committer.commitTask(hadoopContext)
637-
return 1
638-
}
639-
640-
/* apparently we need a TaskAttemptID to construct an OutputCommitter;
641-
* however we're only going to use this local OutputCommitter for
642-
* setupJob/commitJob, so we just use a dummy "map" task.
643-
*/
644-
val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0)
645-
val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
646-
val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
647-
jobCommitter.setupJob(jobTaskContext)
648-
self.context.runJob(self, writeShard _)
649-
jobCommitter.commitJob(jobTaskContext)
606+
job.setOutputFormatClass(outputFormatClass)
607+
job.getConfiguration.set("mapred.output.dir", path)
608+
saveAsNewAPIHadoopDataset(job.getConfiguration)
650609
}
651610

652611
/**
@@ -692,6 +651,59 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
692651
saveAsHadoopDataset(conf)
693652
}
694653

654+
/**
655+
* Output the RDD to any Hadoop-supported storage system with new Hadoop API, using a Hadoop
656+
* Configuration object for that storage system. The Conf should set an OutputFormat and any
657+
* output paths required (e.g. a table name to write to) in the same way as it would be
658+
* configured for a Hadoop MapReduce job.
659+
*/
660+
def saveAsNewAPIHadoopDataset(conf: Configuration) {
661+
val job = new NewAPIHadoopJob(conf)
662+
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
663+
val jobtrackerID = formatter.format(new Date())
664+
val stageId = self.id
665+
val wrappedConf = new SerializableWritable(job.getConfiguration)
666+
val outfmt = job.getOutputFormatClass
667+
val jobFormat = outfmt.newInstance
668+
669+
if (jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) {
670+
// FileOutputFormat ignores the filesystem parameter
671+
jobFormat.checkOutputSpecs(job)
672+
}
673+
674+
def writeShard(context: TaskContext, iter: Iterator[(K,V)]): Int = {
675+
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
676+
// around by taking a mod. We expect that no task will be attempted 2 billion times.
677+
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
678+
/* "reduce task" <split #> <attempt # = spark task #> */
679+
val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
680+
attemptNumber)
681+
val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
682+
val format = outfmt.newInstance
683+
format match {
684+
case c: Configurable => c.setConf(wrappedConf.value)
685+
case _ => ()
686+
}
687+
val committer = format.getOutputCommitter(hadoopContext)
688+
committer.setupTask(hadoopContext)
689+
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
690+
while (iter.hasNext) {
691+
val (k, v) = iter.next()
692+
writer.write(k, v)
693+
}
694+
writer.close(hadoopContext)
695+
committer.commitTask(hadoopContext)
696+
return 1
697+
}
698+
699+
val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0)
700+
val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
701+
val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
702+
jobCommitter.setupJob(jobTaskContext)
703+
self.context.runJob(self, writeShard _)
704+
jobCommitter.commitJob(jobTaskContext)
705+
}
706+
695707
/**
696708
* Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for
697709
* that storage system. The JobConf should set an OutputFormat and any output paths required

core/src/test/scala/org/apache/spark/FileSuite.scala

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,12 @@ import scala.io.Source
2424
import com.google.common.io.Files
2525
import org.apache.hadoop.io._
2626
import org.apache.hadoop.io.compress.DefaultCodec
27-
import org.apache.hadoop.mapred.FileAlreadyExistsException
27+
import org.apache.hadoop.mapred.{JobConf, FileAlreadyExistsException, TextOutputFormat}
28+
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
29+
import org.apache.hadoop.mapreduce.Job
2830
import org.scalatest.FunSuite
2931

3032
import org.apache.spark.SparkContext._
31-
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
3233

3334
class FileSuite extends FunSuite with LocalSparkContext {
3435

@@ -236,18 +237,44 @@ class FileSuite extends FunSuite with LocalSparkContext {
236237
val tempdir = Files.createTempDir()
237238
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
238239
intercept[FileAlreadyExistsException] {
239-
randomRDD.saveAsNewAPIHadoopFile[TextOutputFormat[String, String]](tempdir.getPath)
240+
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempdir.getPath)
240241
}
241242
}
242243

243244
test ("prevent user from overwriting the non-empty directory (new Hadoop API)") {
244245
sc = new SparkContext("local", "test")
245246
val tempdir = Files.createTempDir()
246247
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
247-
randomRDD.saveAsTextFile(tempdir.getPath + "/output")
248-
assert(new File(tempdir.getPath + "/output/part-00000").exists() === true)
248+
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempdir.getPath + "/output")
249+
assert(new File(tempdir.getPath + "/output/part-r-00000").exists() === true)
249250
intercept[FileAlreadyExistsException] {
250-
randomRDD.saveAsNewAPIHadoopFile[TextOutputFormat[String, String]](tempdir.getPath)
251+
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempdir.getPath)
251252
}
252253
}
254+
255+
test ("save Hadoop Dataset through old Hadoop API") {
256+
sc = new SparkContext("local", "test")
257+
val tempdir = Files.createTempDir()
258+
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
259+
val job = new JobConf()
260+
job.setOutputKeyClass(classOf[String])
261+
job.setOutputValueClass(classOf[String])
262+
job.set("mapred.output.format.class", classOf[TextOutputFormat[String, String]].getName)
263+
job.set("mapred.output.dir", tempdir.getPath + "/outputDataset_old")
264+
randomRDD.saveAsHadoopDataset(job)
265+
assert(new File(tempdir.getPath + "/outputDataset_old/part-00000").exists() === true)
266+
}
267+
268+
test ("save Hadoop Dataset through new Hadoop API") {
269+
sc = new SparkContext("local", "test")
270+
val tempdir = Files.createTempDir()
271+
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
272+
val job = new Job(sc.hadoopConfiguration)
273+
job.setOutputKeyClass(classOf[String])
274+
job.setOutputValueClass(classOf[String])
275+
job.setOutputFormatClass(classOf[NewTextOutputFormat[String, String]])
276+
job.getConfiguration.set("mapred.output.dir", tempdir.getPath + "/outputDataset_new")
277+
randomRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)
278+
assert(new File(tempdir.getPath + "/outputDataset_new/part-r-00000").exists() === true)
279+
}
253280
}

0 commit comments

Comments
 (0)