6969public final class JavaRecoverableNetworkWordCount {
7070 private static final Pattern SPACE = Pattern .compile (" " );
7171
72- private static JavaStreamingContext createContext (String ip , int port , String outputPath ) {
72+ private static JavaStreamingContext createContext (String ip ,
73+ int port ,
74+ String checkpointDirectory ,
75+ String outputPath ) {
7376
7477 // If you do not see this printed, that means the StreamingContext has been loaded
7578 // from the new checkpoint
@@ -81,6 +84,7 @@ private static JavaStreamingContext createContext(String ip, int port, String ou
8184 SparkConf sparkConf = new SparkConf ().setAppName ("JavaRecoverableNetworkWordCount" );
8285 // Create the context with a 1 second batch size
8386 JavaStreamingContext ssc = new JavaStreamingContext (sparkConf , Durations .seconds (1 ));
87+ ssc .checkpoint (checkpointDirectory );
8488
8589 // Create a socket stream on target ip:port and count the
8690 // words in input stream of \n delimited text (eg. generated by 'nc')
@@ -135,12 +139,12 @@ public static void main(String[] args) {
135139
136140 final String ip = args [0 ];
137141 final int port = Integer .parseInt (args [1 ]);
138- String checkpointDirectory = args [2 ];
142+ final String checkpointDirectory = args [2 ];
139143 final String outputPath = args [3 ];
140144 JavaStreamingContextFactory factory = new JavaStreamingContextFactory () {
141145 @ Override
142146 public JavaStreamingContext create () {
143- return createContext (ip , port , outputPath );
147+ return createContext (ip , port , checkpointDirectory , outputPath );
144148 }
145149 };
146150 JavaStreamingContext ssc = JavaStreamingContext .getOrCreate (checkpointDirectory , factory );
0 commit comments