Skip to content

Commit 179b3c2

Browse files
committed
Re-port RecoverableNetworkWordCount to Java example, and touch up doc / formatting in related examples
1 parent 0d8cdf0 commit 179b3c2

File tree

3 files changed

+165
-7
lines changed

3 files changed

+165
-7
lines changed

examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.apache.spark.api.java.function.Function2;
2626
import org.apache.spark.api.java.function.PairFunction;
2727
import org.apache.spark.api.java.StorageLevels;
28-
import org.apache.spark.streaming.Duration;
28+
import org.apache.spark.streaming.Durations;
2929
import org.apache.spark.streaming.api.java.JavaDStream;
3030
import org.apache.spark.streaming.api.java.JavaPairDStream;
3131
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
@@ -35,8 +35,9 @@
3535

3636
/**
3737
* Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
38+
*
3839
* Usage: JavaNetworkWordCount <hostname> <port>
39-
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
40+
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
4041
*
4142
* To run this on your local machine, you need to first run a Netcat server
4243
* `$ nc -lk 9999`
@@ -56,7 +57,7 @@ public static void main(String[] args) {
5657

5758
// Create the context with a 1 second batch size
5859
SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount");
59-
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000));
60+
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
6061

6162
// Create a JavaReceiverInputDStream on target ip:port and count the
6263
// words in input stream of \n delimited text (eg. generated by 'nc')
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.examples.streaming;
19+
20+
import java.io.File;
21+
import java.io.IOException;
22+
import java.nio.charset.Charset;
23+
import java.util.Arrays;
24+
import java.util.regex.Pattern;
25+
26+
import scala.Tuple2;
27+
import com.google.common.collect.Lists;
28+
import com.google.common.io.Files;
29+
30+
import org.apache.spark.SparkConf;
31+
import org.apache.spark.api.java.JavaPairRDD;
32+
import org.apache.spark.api.java.function.FlatMapFunction;
33+
import org.apache.spark.api.java.function.Function2;
34+
import org.apache.spark.api.java.function.PairFunction;
35+
import org.apache.spark.streaming.Durations;
36+
import org.apache.spark.streaming.Time;
37+
import org.apache.spark.streaming.api.java.JavaDStream;
38+
import org.apache.spark.streaming.api.java.JavaPairDStream;
39+
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
40+
import org.apache.spark.streaming.api.java.JavaStreamingContext;
41+
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
42+
43+
/**
44+
* Counts words in text encoded with UTF8 received from the network every second.
45+
*
46+
* Usage: JavaRecoverableNetworkWordCount <hostname> <port> <checkpoint-directory> <output-file>
47+
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
48+
* data. <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data
49+
* <output-file> file to which the word counts will be appended
50+
*
51+
* <checkpoint-directory> and <output-file> must be absolute paths
52+
*
53+
* To run this on your local machine, you need to first run a Netcat server
54+
*
55+
* `$ nc -lk 9999`
56+
*
57+
* and run the example as
58+
*
59+
* `$ ./bin/run-example org.apache.spark.examples.streaming.JavaRecoverableNetworkWordCount \
60+
* localhost 9999 ~/checkpoint/ ~/out`
61+
*
62+
* If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create
63+
* a new StreamingContext (will print "Creating new context" to the console). Otherwise, if
64+
* checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from
65+
* the checkpoint data.
66+
*
67+
* To run this example in a local standalone cluster with automatic driver recovery,
68+
*
69+
* `$ bin/spark-class org.apache.spark.deploy.Client -s launch <cluster-url> \
70+
* <path-to-examples-jar> \
71+
* org.apache.spark.examples.streaming.JavaRecoverableNetworkWordCount <cluster-url> \
72+
* localhost 9999 ~/checkpoint ~/out`
73+
*
74+
* <path-to-examples-jar> would typically be
75+
* <spark-dir>/examples/target/scala-XX/spark-examples....jar
76+
*
77+
* Refer to the online documentation for more details.
78+
*/
79+
public final class JavaRecoverableNetworkWordCount {
80+
private static final Pattern SPACE = Pattern.compile(" ");
81+
82+
private static JavaStreamingContext createContext(String ip, int port, String outputPath) {
83+
84+
// If you do not see this printed, that means the StreamingContext has been loaded
85+
// from the new checkpoint
86+
System.out.println("Creating new context");
87+
final File outputFile = new File(outputPath);
88+
if (outputFile.exists()) {
89+
outputFile.delete();
90+
}
91+
SparkConf sparkConf = new SparkConf().setAppName("JavaRecoverableNetworkWordCount");
92+
// Create the context with a 1 second batch size
93+
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
94+
95+
// Create a socket stream on target ip:port and count the
96+
// words in input stream of \n delimited text (eg. generated by 'nc')
97+
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(ip, port);
98+
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
99+
@Override
100+
public Iterable<String> call(String x) {
101+
return Lists.newArrayList(SPACE.split(x));
102+
}
103+
});
104+
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
105+
new PairFunction<String, String, Integer>() {
106+
@Override
107+
public Tuple2<String, Integer> call(String s) {
108+
return new Tuple2<String, Integer>(s, 1);
109+
}
110+
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
111+
@Override
112+
public Integer call(Integer i1, Integer i2) {
113+
return i1 + i2;
114+
}
115+
});
116+
117+
wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
118+
@Override
119+
public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException {
120+
String counts = "Counts at time " + time + " " + rdd.collect();
121+
System.out.println(counts);
122+
System.out.println("Appending to " + outputFile.getAbsolutePath());
123+
Files.append(counts + "\n", outputFile, Charset.defaultCharset());
124+
return null;
125+
}
126+
});
127+
128+
return ssc;
129+
}
130+
131+
public static void main(String[] args) {
132+
if (args.length != 4) {
133+
System.err.println("You arguments were " + Arrays.asList(args));
134+
System.err.println(
135+
"Usage: JavaRecoverableNetworkWordCount <hostname> <port> <checkpoint-directory>\n" +
136+
" <output-file>. <hostname> and <port> describe the TCP server that Spark\n" +
137+
" Streaming would connect to receive data. <checkpoint-directory> directory to\n" +
138+
" HDFS-compatible file system which checkpoint data <output-file> file to which\n" +
139+
" the word counts will be appended\n" +
140+
"\n" +
141+
"In local mode, <master> should be 'local[n]' with n > 1\n" +
142+
"Both <checkpoint-directory> and <output-file> must be absolute paths");
143+
System.exit(1);
144+
}
145+
146+
final String ip = args[0];
147+
final int port = Integer.parseInt(args[1]);
148+
String checkpointDirectory = args[2];
149+
final String outputPath = args[3];
150+
JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
151+
@Override
152+
public JavaStreamingContext create() {
153+
return createContext(ip, port, outputPath);
154+
}
155+
};
156+
JavaStreamingContext ssc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory);
157+
ssc.start();
158+
ssc.awaitTermination();
159+
}
160+
}

examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,13 @@ import org.apache.spark.util.IntParam
3131
/**
3232
* Counts words in text encoded with UTF8 received from the network every second.
3333
*
34-
* Usage: NetworkWordCount <hostname> <port> <checkpoint-directory> <output-file>
34+
* Usage: RecoverableNetworkWordCount <hostname> <port> <checkpoint-directory> <output-file>
3535
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
3636
* data. <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data
3737
* <output-file> file to which the word counts will be appended
3838
*
39-
* In local mode, <master> should be 'local[n]' with n > 1
4039
* <checkpoint-directory> and <output-file> must be absolute paths
4140
*
42-
*
4341
* To run this on your local machine, you need to first run a Netcat server
4442
*
4543
* `$ nc -lk 9999`
@@ -66,7 +64,6 @@ import org.apache.spark.util.IntParam
6664
*
6765
* Refer to the online documentation for more details.
6866
*/
69-
7067
object RecoverableNetworkWordCount {
7168

7269
def createContext(ip: String, port: Int, outputPath: String) = {

0 commit comments

Comments
 (0)