Skip to content

Commit b382ea9

Browse files
committed
Fix serialization issue in PairDStreamFunctions.saveAsNewAPIHadoopFiles.
1 parent 0fe54cf commit b382ea9

File tree

2 files changed

+41
-13
lines changed

2 files changed

+41
-13
lines changed

streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,17 @@
1717

1818
package org.apache.spark.streaming.dstream
1919

20-
import org.apache.spark.streaming.StreamingContext._
21-
22-
import org.apache.spark.{Partitioner, HashPartitioner}
23-
import org.apache.spark.SparkContext._
24-
import org.apache.spark.rdd.RDD
25-
2620
import scala.collection.mutable.ArrayBuffer
2721
import scala.reflect.ClassTag
2822

29-
import org.apache.hadoop.mapred.JobConf
30-
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
31-
import org.apache.hadoop.mapred.OutputFormat
3223
import org.apache.hadoop.conf.Configuration
33-
import org.apache.spark.streaming.{Time, Duration}
24+
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
25+
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
26+
27+
import org.apache.spark.{HashPartitioner, Partitioner, SerializableWritable}
28+
import org.apache.spark.rdd.RDD
29+
import org.apache.spark.streaming.{Duration, Time}
30+
import org.apache.spark.streaming.StreamingContext._
3431

3532
/**
3633
* Extra functions available on DStream of (key, value) pairs through an implicit conversion.
@@ -702,11 +699,14 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
702699
keyClass: Class[_],
703700
valueClass: Class[_],
704701
outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
705-
conf: Configuration = new Configuration
702+
conf: Configuration = ssc.sparkContext.hadoopConfiguration
706703
) {
704+
// Wrap this in SerializableWritable so that ForeachDStream can be serialized for checkpoints
705+
val serializableConf = new SerializableWritable(conf)
707706
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
708707
val file = rddToFileName(prefix, suffix, time)
709-
rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
708+
rdd.saveAsNewAPIHadoopFile(
709+
file, keyClass, valueClass, outputFormatClass, serializableConf.value)
710710
}
711711
self.foreachRDD(saveFunc)
712712
}

streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ import org.apache.spark.streaming.StreamingContext._
2929
import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
3030
import org.apache.spark.streaming.util.ManualClock
3131
import org.apache.spark.util.Utils
32+
import org.apache.hadoop.io.{Text, IntWritable}
33+
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
3234

3335
/**
3436
* This test suites tests the checkpointing functionality of DStreams -
@@ -205,6 +207,30 @@ class CheckpointSuite extends TestSuiteBase {
205207
testCheckpointedOperation(input, operation, output, 7)
206208
}
207209

210+
test("recovery with saveAsNewAPIHadoopFiles") {
211+
val tempDir = Files.createTempDir()
212+
try {
213+
testCheckpointedOperation(
214+
Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()),
215+
(s: DStream[String]) => {
216+
val output = s.map(x => (x, 1)).reduceByKey(_ + _)
217+
output.saveAsNewAPIHadoopFiles(
218+
tempDir.toURI.toString,
219+
"result",
220+
classOf[Text],
221+
classOf[IntWritable],
222+
classOf[TextOutputFormat[Text, IntWritable]])
223+
(tempDir.toString, "result")
224+
output
225+
},
226+
Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
227+
3
228+
)
229+
} finally {
230+
Utils.deleteRecursively(tempDir)
231+
}
232+
}
233+
208234

209235
// This tests whether the StateDStream's RDD checkpoints works correctly such
210236
// that the system can recover from a master failure. This assumes as reliable,
@@ -391,7 +417,9 @@ class CheckpointSuite extends TestSuiteBase {
391417
logInfo("Manual clock after advancing = " + clock.time)
392418
Thread.sleep(batchDuration.milliseconds)
393419

394-
val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStreamWithPartitions[V]]
420+
val outputStream = ssc.graph.getOutputStreams.filter { dstream =>
421+
dstream.isInstanceOf[TestOutputStreamWithPartitions[V]]
422+
}.head.asInstanceOf[TestOutputStreamWithPartitions[V]]
395423
outputStream.output.map(_.flatten)
396424
}
397425
}

0 commit comments

Comments
 (0)