Skip to content

Commit 593214a

Browse files
committed
Merge remote-tracking branch 'apache-github/branch-1.1' into branch-1.1
2 parents 2ed927f + 86b1bd0 commit 593214a

File tree

10 files changed

+222
-50
lines changed

10 files changed

+222
-50
lines changed

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1528,7 +1528,7 @@ private[spark] object Utils extends Logging {
15281528
def isBindCollision(exception: Throwable): Boolean = {
15291529
exception match {
15301530
case e: BindException =>
1531-
if (e.getMessage != null && e.getMessage.contains("Address already in use")) {
1531+
if (e.getMessage != null) {
15321532
return true
15331533
}
15341534
isBindCollision(e.getCause)

core/src/test/scala/org/apache/spark/util/UtilsSuite.scala

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import scala.util.Random
2222
import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream}
2323
import java.net.{BindException, ServerSocket, URI}
2424
import java.nio.{ByteBuffer, ByteOrder}
25+
import java.text.DecimalFormatSymbols
26+
import java.util.Locale
2527

2628
import com.google.common.base.Charsets
2729
import com.google.common.io.Files
@@ -101,14 +103,16 @@ class UtilsSuite extends FunSuite {
101103
val hour = minute * 60
102104
def str = Utils.msDurationToString(_)
103105

106+
val sep = new DecimalFormatSymbols(Locale.getDefault()).getDecimalSeparator()
107+
104108
assert(str(123) === "123 ms")
105-
assert(str(second) === "1.0 s")
106-
assert(str(second + 462) === "1.5 s")
107-
assert(str(hour) === "1.00 h")
108-
assert(str(minute) === "1.0 m")
109-
assert(str(minute + 4 * second + 34) === "1.1 m")
110-
assert(str(10 * hour + minute + 4 * second) === "10.02 h")
111-
assert(str(10 * hour + 59 * minute + 59 * second + 999) === "11.00 h")
109+
assert(str(second) === "1" + sep + "0 s")
110+
assert(str(second + 462) === "1" + sep + "5 s")
111+
assert(str(hour) === "1" + sep + "00 h")
112+
assert(str(minute) === "1" + sep + "0 m")
113+
assert(str(minute + 4 * second + 34) === "1" + sep + "1 m")
114+
assert(str(10 * hour + minute + 4 * second) === "10" + sep + "02 h")
115+
assert(str(10 * hour + 59 * minute + 59 * second + 999) === "11" + sep + "00 h")
112116
}
113117

114118
test("reading offset bytes of a file") {
@@ -271,12 +275,11 @@ class UtilsSuite extends FunSuite {
271275
assert(!Utils.isBindCollision(new Exception))
272276
assert(!Utils.isBindCollision(new Exception(new Exception)))
273277
assert(!Utils.isBindCollision(new Exception(new BindException)))
274-
assert(!Utils.isBindCollision(new Exception(new BindException("Random message"))))
275278

276279
// Positives
277-
val be = new BindException("Address already in use")
278-
val be1 = new Exception(new BindException("Address already in use"))
279-
val be2 = new Exception(new Exception(new BindException("Address already in use")))
280+
val be = new BindException("Random Message")
281+
val be1 = new Exception(new BindException("Random Message"))
282+
val be2 = new Exception(new Exception(new BindException("Random Message")))
280283
assert(Utils.isBindCollision(be))
281284
assert(Utils.isBindCollision(be1))
282285
assert(Utils.isBindCollision(be2))

examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public static void main(String[] args) {
7070
// Create a input stream with the custom receiver on target ip:port and count the
7171
// words in input stream of \n delimited text (eg. generated by 'nc')
7272
JavaReceiverInputDStream<String> lines = ssc.receiverStream(
73-
new JavaCustomReceiver(args[1], Integer.parseInt(args[2])));
73+
new JavaCustomReceiver(args[0], Integer.parseInt(args[1])));
7474
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
7575
@Override
7676
public Iterable<String> call(String x) {

examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,9 @@
3535

3636
/**
3737
* Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
38+
*
3839
* Usage: JavaNetworkWordCount <hostname> <port>
39-
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
40+
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
4041
*
4142
* To run this on your local machine, you need to first run a Netcat server
4243
* `$ nc -lk 9999`
@@ -56,7 +57,7 @@ public static void main(String[] args) {
5657

5758
// Create the context with a 1 second batch size
5859
SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount");
59-
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000));
60+
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000));
6061

6162
// Create a JavaReceiverInputDStream on target ip:port and count the
6263
// words in input stream of \n delimited text (eg. generated by 'nc')
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.examples.streaming;
19+
20+
import java.io.File;
21+
import java.io.IOException;
22+
import java.nio.charset.Charset;
23+
import java.util.Arrays;
24+
import java.util.regex.Pattern;
25+
26+
import scala.Tuple2;
27+
import com.google.common.collect.Lists;
28+
import com.google.common.io.Files;
29+
30+
import org.apache.spark.SparkConf;
31+
import org.apache.spark.api.java.JavaPairRDD;
32+
import org.apache.spark.api.java.function.FlatMapFunction;
33+
import org.apache.spark.api.java.function.Function2;
34+
import org.apache.spark.api.java.function.PairFunction;
35+
import org.apache.spark.streaming.Duration;
36+
import org.apache.spark.streaming.Time;
37+
import org.apache.spark.streaming.api.java.JavaDStream;
38+
import org.apache.spark.streaming.api.java.JavaPairDStream;
39+
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
40+
import org.apache.spark.streaming.api.java.JavaStreamingContext;
41+
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
42+
43+
/**
44+
* Counts words in text encoded with UTF8 received from the network every second.
45+
*
46+
* Usage: JavaRecoverableNetworkWordCount <hostname> <port> <checkpoint-directory> <output-file>
47+
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
48+
* data. <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data
49+
* <output-file> file to which the word counts will be appended
50+
*
51+
* <checkpoint-directory> and <output-file> must be absolute paths
52+
*
53+
* To run this on your local machine, you need to first run a Netcat server
54+
*
55+
* `$ nc -lk 9999`
56+
*
57+
* and run the example as
58+
*
59+
* `$ ./bin/run-example org.apache.spark.examples.streaming.JavaRecoverableNetworkWordCount \
60+
* localhost 9999 ~/checkpoint/ ~/out`
61+
*
62+
* If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create
63+
* a new StreamingContext (will print "Creating new context" to the console). Otherwise, if
64+
* checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from
65+
* the checkpoint data.
66+
*
67+
* Refer to the online documentation for more details.
68+
*/
69+
public final class JavaRecoverableNetworkWordCount {
70+
private static final Pattern SPACE = Pattern.compile(" ");
71+
72+
private static JavaStreamingContext createContext(String ip,
73+
int port,
74+
String checkpointDirectory,
75+
String outputPath) {
76+
77+
// If you do not see this printed, that means the StreamingContext has been loaded
78+
// from the new checkpoint
79+
System.out.println("Creating new context");
80+
final File outputFile = new File(outputPath);
81+
if (outputFile.exists()) {
82+
outputFile.delete();
83+
}
84+
SparkConf sparkConf = new SparkConf().setAppName("JavaRecoverableNetworkWordCount");
85+
// Create the context with a 1 second batch size
86+
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000));
87+
ssc.checkpoint(checkpointDirectory);
88+
89+
// Create a socket stream on target ip:port and count the
90+
// words in input stream of \n delimited text (eg. generated by 'nc')
91+
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(ip, port);
92+
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
93+
@Override
94+
public Iterable<String> call(String x) {
95+
return Lists.newArrayList(SPACE.split(x));
96+
}
97+
});
98+
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
99+
new PairFunction<String, String, Integer>() {
100+
@Override
101+
public Tuple2<String, Integer> call(String s) {
102+
return new Tuple2<String, Integer>(s, 1);
103+
}
104+
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
105+
@Override
106+
public Integer call(Integer i1, Integer i2) {
107+
return i1 + i2;
108+
}
109+
});
110+
111+
wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
112+
@Override
113+
public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException {
114+
String counts = "Counts at time " + time + " " + rdd.collect();
115+
System.out.println(counts);
116+
System.out.println("Appending to " + outputFile.getAbsolutePath());
117+
Files.append(counts + "\n", outputFile, Charset.defaultCharset());
118+
return null;
119+
}
120+
});
121+
122+
return ssc;
123+
}
124+
125+
public static void main(String[] args) {
126+
if (args.length != 4) {
127+
System.err.println("You arguments were " + Arrays.asList(args));
128+
System.err.println(
129+
"Usage: JavaRecoverableNetworkWordCount <hostname> <port> <checkpoint-directory>\n" +
130+
" <output-file>. <hostname> and <port> describe the TCP server that Spark\n" +
131+
" Streaming would connect to receive data. <checkpoint-directory> directory to\n" +
132+
" HDFS-compatible file system which checkpoint data <output-file> file to which\n" +
133+
" the word counts will be appended\n" +
134+
"\n" +
135+
"In local mode, <master> should be 'local[n]' with n > 1\n" +
136+
"Both <checkpoint-directory> and <output-file> must be absolute paths");
137+
System.exit(1);
138+
}
139+
140+
final String ip = args[0];
141+
final int port = Integer.parseInt(args[1]);
142+
final String checkpointDirectory = args[2];
143+
final String outputPath = args[3];
144+
JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
145+
@Override
146+
public JavaStreamingContext create() {
147+
return createContext(ip, port, checkpointDirectory, outputPath);
148+
}
149+
};
150+
JavaStreamingContext ssc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory);
151+
ssc.start();
152+
ssc.awaitTermination();
153+
}
154+
}

examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,13 @@ import org.apache.spark.util.IntParam
3131
/**
3232
* Counts words in text encoded with UTF8 received from the network every second.
3333
*
34-
* Usage: NetworkWordCount <hostname> <port> <checkpoint-directory> <output-file>
34+
* Usage: RecoverableNetworkWordCount <hostname> <port> <checkpoint-directory> <output-file>
3535
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
3636
* data. <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data
3737
* <output-file> file to which the word counts will be appended
3838
*
39-
* In local mode, <master> should be 'local[n]' with n > 1
4039
* <checkpoint-directory> and <output-file> must be absolute paths
4140
*
42-
*
4341
* To run this on your local machine, you need to first run a Netcat server
4442
*
4543
* `$ nc -lk 9999`
@@ -54,22 +52,11 @@ import org.apache.spark.util.IntParam
5452
* checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from
5553
* the checkpoint data.
5654
*
57-
* To run this example in a local standalone cluster with automatic driver recovery,
58-
*
59-
* `$ bin/spark-class org.apache.spark.deploy.Client -s launch <cluster-url> \
60-
* <path-to-examples-jar> \
61-
* org.apache.spark.examples.streaming.RecoverableNetworkWordCount <cluster-url> \
62-
* localhost 9999 ~/checkpoint ~/out`
63-
*
64-
* <path-to-examples-jar> would typically be
65-
* <spark-dir>/examples/target/scala-XX/spark-examples....jar
66-
*
6755
* Refer to the online documentation for more details.
6856
*/
69-
7057
object RecoverableNetworkWordCount {
7158

72-
def createContext(ip: String, port: Int, outputPath: String) = {
59+
def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String) = {
7360

7461
// If you do not see this printed, that means the StreamingContext has been loaded
7562
// from the new checkpoint
@@ -79,6 +66,7 @@ object RecoverableNetworkWordCount {
7966
val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount")
8067
// Create the context with a 1 second batch size
8168
val ssc = new StreamingContext(sparkConf, Seconds(1))
69+
ssc.checkpoint(checkpointDirectory)
8270

8371
// Create a socket stream on target ip:port and count the
8472
// words in input stream of \n delimited text (eg. generated by 'nc')
@@ -114,7 +102,7 @@ object RecoverableNetworkWordCount {
114102
val Array(ip, IntParam(port), checkpointDirectory, outputPath) = args
115103
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
116104
() => {
117-
createContext(ip, port, outputPath)
105+
createContext(ip, port, outputPath, checkpointDirectory)
118106
})
119107
ssc.start()
120108
ssc.awaitTermination()

python/pyspark/rdd.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -598,6 +598,8 @@ def sortPartition(iterator):
598598
# the key-space into bins such that the bins have roughly the same
599599
# number of (key, value) pairs falling into them
600600
rddSize = self.count()
601+
if not rddSize:
602+
return self # empty RDD
601603
maxSampleSize = numPartitions * 20.0 # constant from Spark's RangePartitioner
602604
fraction = min(maxSampleSize / max(rddSize, 1), 1.0)
603605
samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect()

python/pyspark/tests.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,9 @@ def test_histogram(self):
470470
self.assertEquals(([1, "b"], [5]), rdd.histogram(1))
471471
self.assertRaises(TypeError, lambda: rdd.histogram(2))
472472

473+
def test_sort_on_empty_rdd(self):
474+
self.assertEqual([], self.sc.parallelize(zip([], [])).sortByKey().collect())
475+
473476
def test_sample(self):
474477
rdd = self.sc.parallelize(range(0, 100), 4)
475478
wo = rdd.sample(False, 0.1, 2).collect()

streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -436,10 +436,10 @@ class StreamingContext private[streaming] (
436436

437437
/**
438438
* Start the execution of the streams.
439+
*
440+
* @throws SparkException if the context has already been started or stopped.
439441
*/
440442
def start(): Unit = synchronized {
441-
// Throw exception if the context has already been started once
442-
// or if a stopped context is being started again
443443
if (state == Started) {
444444
throw new SparkException("StreamingContext has already been started")
445445
}
@@ -472,8 +472,10 @@ class StreamingContext private[streaming] (
472472
/**
473473
* Stop the execution of the streams immediately (does not wait for all received data
474474
* to be processed).
475-
* @param stopSparkContext Stop the associated SparkContext or not
476475
*
476+
* @param stopSparkContext if true, stops the associated SparkContext. The underlying SparkContext
477+
* will be stopped regardless of whether this StreamingContext has been
478+
* started.
477479
*/
478480
def stop(stopSparkContext: Boolean = true): Unit = synchronized {
479481
stop(stopSparkContext, false)
@@ -482,25 +484,27 @@ class StreamingContext private[streaming] (
482484
/**
483485
* Stop the execution of the streams, with option of ensuring all received data
484486
* has been processed.
485-
* @param stopSparkContext Stop the associated SparkContext or not
486-
* @param stopGracefully Stop gracefully by waiting for the processing of all
487+
*
488+
* @param stopSparkContext if true, stops the associated SparkContext. The underlying SparkContext
489+
* will be stopped regardless of whether this StreamingContext has been
490+
* started.
491+
* @param stopGracefully if true, stops gracefully by waiting for the processing of all
487492
* received data to be completed
488493
*/
489494
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = synchronized {
490-
// Warn (but not fail) if context is stopped twice,
491-
// or context is stopped before starting
492-
if (state == Initialized) {
493-
logWarning("StreamingContext has not been started yet")
494-
return
495+
state match {
496+
case Initialized => logWarning("StreamingContext has not been started yet")
497+
case Stopped => logWarning("StreamingContext has already been stopped")
498+
case Started =>
499+
scheduler.stop(stopGracefully)
500+
logInfo("StreamingContext stopped successfully")
501+
waiter.notifyStop()
495502
}
496-
if (state == Stopped) {
497-
logWarning("StreamingContext has already been stopped")
498-
return
499-
} // no need to throw an exception as its okay to stop twice
500-
scheduler.stop(stopGracefully)
501-
logInfo("StreamingContext stopped successfully")
502-
waiter.notifyStop()
503+
// Even if the streaming context has not been started, we still need to stop the SparkContext.
504+
// Even if we have already stopped, we still need to attempt to stop the SparkContext because
505+
// a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true).
503506
if (stopSparkContext) sc.stop()
507+
// The state should always be Stopped after calling `stop()`, even if we haven't started yet:
504508
state = Stopped
505509
}
506510
}

0 commit comments

Comments
 (0)