Skip to content

Commit 0340c56

Browse files
comcmipitdas
authored andcommitted
Update RecoverableNetworkWordCount.scala
Trying this example, I missed the moment when the checkpoint was iniciated Author: comcmipi <[email protected]> Closes #2735 from comcmipi/patch-1 and squashes the following commits: b6d8001 [comcmipi] Update RecoverableNetworkWordCount.scala 96fe274 [comcmipi] Update RecoverableNetworkWordCount.scala
1 parent 3a02d41 commit 0340c56

File tree

1 file changed

+3
-2
lines changed

1 file changed

+3
-2
lines changed

examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ import org.apache.spark.util.IntParam
5656
*/
5757
object RecoverableNetworkWordCount {
5858

59-
def createContext(ip: String, port: Int, outputPath: String) = {
59+
def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String) = {
6060

6161
// If you do not see this printed, that means the StreamingContext has been loaded
6262
// from the new checkpoint
@@ -66,6 +66,7 @@ object RecoverableNetworkWordCount {
6666
val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount")
6767
// Create the context with a 1 second batch size
6868
val ssc = new StreamingContext(sparkConf, Seconds(1))
69+
ssc.checkpoint(checkpointDirectory)
6970

7071
// Create a socket stream on target ip:port and count the
7172
// words in input stream of \n delimited text (eg. generated by 'nc')
@@ -101,7 +102,7 @@ object RecoverableNetworkWordCount {
101102
val Array(ip, IntParam(port), checkpointDirectory, outputPath) = args
102103
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
103104
() => {
104-
createContext(ip, port, outputPath)
105+
createContext(ip, port, outputPath, checkpointDirectory)
105106
})
106107
ssc.start()
107108
ssc.awaitTermination()

0 commit comments

Comments
 (0)