Skip to content

Commit bb4729a

Browse files
committed
Same treatment for saveAsHadoopFiles
1 parent b382ea9 commit bb4729a

File tree

2 files changed

+35
-9
lines changed

2 files changed

+35
-9
lines changed

streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -668,11 +668,13 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
668668
keyClass: Class[_],
669669
valueClass: Class[_],
670670
outputFormatClass: Class[_ <: OutputFormat[_, _]],
671-
conf: JobConf = new JobConf
671+
conf: JobConf = new JobConf(ssc.sparkContext.hadoopConfiguration)
672672
) {
673+
// Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints
674+
val serializableConf = new SerializableWritable(conf)
673675
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
674676
val file = rddToFileName(prefix, suffix, time)
675-
rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
677+
rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, serializableConf.value)
676678
}
677679
self.foreachRDD(saveFunc)
678680
}
@@ -701,7 +703,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
701703
outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
702704
conf: Configuration = ssc.sparkContext.hadoopConfiguration
703705
) {
704-
// Wrap this in SerializableWritable so that ForeachDStream can be serialized for checkpoints
706+
// Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints
705707
val serializableConf = new SerializableWritable(conf)
706708
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
707709
val file = rddToFileName(prefix, suffix, time)

streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,18 @@ import java.nio.charset.Charset
2222

2323
import scala.collection.mutable.ArrayBuffer
2424
import scala.reflect.ClassTag
25+
2526
import com.google.common.io.Files
26-
import org.apache.hadoop.fs.{Path, FileSystem}
2727
import org.apache.hadoop.conf.Configuration
28+
import org.apache.hadoop.fs.{FileSystem, Path}
29+
import org.apache.hadoop.io.{IntWritable, Text}
30+
import org.apache.hadoop.mapred.TextOutputFormat
31+
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
32+
2833
import org.apache.spark.streaming.StreamingContext._
2934
import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
3035
import org.apache.spark.streaming.util.ManualClock
3136
import org.apache.spark.util.Utils
32-
import org.apache.hadoop.io.{Text, IntWritable}
33-
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
3437

3538
/**
3639
* This test suites tests the checkpointing functionality of DStreams -
@@ -207,20 +210,19 @@ class CheckpointSuite extends TestSuiteBase {
207210
testCheckpointedOperation(input, operation, output, 7)
208211
}
209212

210-
test("recovery with saveAsNewAPIHadoopFiles") {
213+
test("recovery with saveAsHadoopFiles operation") {
211214
val tempDir = Files.createTempDir()
212215
try {
213216
testCheckpointedOperation(
214217
Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()),
215218
(s: DStream[String]) => {
216219
val output = s.map(x => (x, 1)).reduceByKey(_ + _)
217-
output.saveAsNewAPIHadoopFiles(
220+
output.saveAsHadoopFiles(
218221
tempDir.toURI.toString,
219222
"result",
220223
classOf[Text],
221224
classOf[IntWritable],
222225
classOf[TextOutputFormat[Text, IntWritable]])
223-
(tempDir.toString, "result")
224226
output
225227
},
226228
Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
@@ -231,6 +233,28 @@ class CheckpointSuite extends TestSuiteBase {
231233
}
232234
}
233235

236+
test("recovery with saveAsNewAPIHadoopFiles operation") {
237+
val tempDir = Files.createTempDir()
238+
try {
239+
testCheckpointedOperation(
240+
Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()),
241+
(s: DStream[String]) => {
242+
val output = s.map(x => (x, 1)).reduceByKey(_ + _)
243+
output.saveAsNewAPIHadoopFiles(
244+
tempDir.toURI.toString,
245+
"result",
246+
classOf[Text],
247+
classOf[IntWritable],
248+
classOf[NewTextOutputFormat[Text, IntWritable]])
249+
output
250+
},
251+
Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
252+
3
253+
)
254+
} finally {
255+
Utils.deleteRecursively(tempDir)
256+
}
257+
}
234258

235259
// This tests whether the StateDStream's RDD checkpoints works correctly such
236260
// that the system can recover from a master failure. This assumes as reliable,

0 commit comments

Comments
 (0)