Skip to content

Commit dcb2f73

Browse files
srowenJoshRosen
authored andcommitted
SPARK-2626 [DOCS] Stop SparkContext in all examples
Call SparkContext.stop() in all examples (and touch up minor nearby code style issues while at it) Author: Sean Owen <[email protected]> Closes #2575 from srowen/SPARK-2626 and squashes the following commits: 5b2baae [Sean Owen] Call SparkContext.stop() in all examples (and touch up minor nearby code style issues while at it)
1 parent abf588f commit dcb2f73

File tree

11 files changed

+34
-11
lines changed

11 files changed

+34
-11
lines changed

examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
* Usage: JavaSparkPi [slices]
3232
*/
3333
public final class JavaSparkPi {
34-
3534

3635
public static void main(String[] args) throws Exception {
3736
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi");
@@ -61,5 +60,7 @@ public Integer call(Integer integer, Integer integer2) {
6160
});
6261

6362
System.out.println("Pi is roughly " + 4.0 * count / n);
63+
64+
jsc.stop();
6465
}
6566
}

examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ public static void main(String[] args) throws Exception {
6161
// Load a text file and convert each line to a Java Bean.
6262
JavaRDD<Person> people = ctx.textFile("examples/src/main/resources/people.txt").map(
6363
new Function<String, Person>() {
64-
public Person call(String line) throws Exception {
64+
@Override
65+
public Person call(String line) {
6566
String[] parts = line.split(",");
6667

6768
Person person = new Person();
@@ -82,6 +83,7 @@ public Person call(String line) throws Exception {
8283
// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
8384
// The columns of a row in the result can be accessed by ordinal.
8485
List<String> teenagerNames = teenagers.map(new Function<Row, String>() {
86+
@Override
8587
public String call(Row row) {
8688
return "Name: " + row.getString(0);
8789
}
@@ -104,6 +106,7 @@ public String call(Row row) {
104106
JavaSchemaRDD teenagers2 =
105107
sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
106108
teenagerNames = teenagers2.map(new Function<Row, String>() {
109+
@Override
107110
public String call(Row row) {
108111
return "Name: " + row.getString(0);
109112
}
@@ -136,6 +139,7 @@ public String call(Row row) {
136139
// The results of SQL queries are JavaSchemaRDDs and support all the normal RDD operations.
137140
// The columns of a row in the result can be accessed by ordinal.
138141
teenagerNames = teenagers3.map(new Function<Row, String>() {
142+
@Override
139143
public String call(Row row) { return "Name: " + row.getString(0); }
140144
}).collect();
141145
for (String name: teenagerNames) {
@@ -162,12 +166,15 @@ public String call(Row row) {
162166

163167
JavaSchemaRDD peopleWithCity = sqlCtx.sql("SELECT name, address.city FROM people2");
164168
List<String> nameAndCity = peopleWithCity.map(new Function<Row, String>() {
169+
@Override
165170
public String call(Row row) {
166171
return "Name: " + row.getString(0) + ", City: " + row.getString(1);
167172
}
168173
}).collect();
169174
for (String name: nameAndCity) {
170175
System.out.println(name);
171176
}
177+
178+
ctx.stop();
172179
}
173180
}

examples/src/main/python/avro_inputformat.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,3 +78,5 @@
7878
output = avro_rdd.map(lambda x: x[0]).collect()
7979
for k in output:
8080
print k
81+
82+
sc.stop()

examples/src/main/python/parquet_inputformat.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,3 +57,5 @@
5757
output = parquet_rdd.map(lambda x: x[1]).collect()
5858
for k in output:
5959
print k
60+
61+
sc.stop()

examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,5 +136,7 @@ object CassandraCQLTest {
136136
classOf[CqlOutputFormat],
137137
job.getConfiguration()
138138
)
139+
140+
sc.stop()
139141
}
140142
}

examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,8 @@ object CassandraTest {
126126
}
127127
}.saveAsNewAPIHadoopFile("casDemo", classOf[ByteBuffer], classOf[List[Mutation]],
128128
classOf[ColumnFamilyOutputFormat], job.getConfiguration)
129+
130+
sc.stop()
129131
}
130132
}
131133

examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,11 @@ object GroupByTest {
4444
arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr)
4545
}
4646
arr1
47-
}.cache
47+
}.cache()
4848
// Enforce that everything has been calculated and in cache
49-
pairs1.count
49+
pairs1.count()
5050

51-
println(pairs1.groupByKey(numReducers).count)
51+
println(pairs1.groupByKey(numReducers).count())
5252

5353
sc.stop()
5454
}

examples/src/main/scala/org/apache/spark/examples/LogQuery.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,5 +79,7 @@ object LogQuery {
7979
.reduceByKey((a, b) => a.merge(b))
8080
.collect().foreach{
8181
case (user, query) => println("%s\t%s".format(user, query))}
82+
83+
sc.stop()
8284
}
8385
}

examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import org.apache.spark._
2121
import org.apache.spark.SparkContext._
2222

2323
import org.apache.spark.bagel._
24-
import org.apache.spark.bagel.Bagel._
2524

2625
import scala.xml.{XML,NodeSeq}
2726

@@ -78,9 +77,9 @@ object WikipediaPageRank {
7877
(id, new PRVertex(1.0 / numVertices, outEdges))
7978
})
8079
if (usePartitioner) {
81-
vertices = vertices.partitionBy(new HashPartitioner(sc.defaultParallelism)).cache
80+
vertices = vertices.partitionBy(new HashPartitioner(sc.defaultParallelism)).cache()
8281
} else {
83-
vertices = vertices.cache
82+
vertices = vertices.cache()
8483
}
8584
println("Done parsing input file.")
8685

@@ -100,7 +99,9 @@ object WikipediaPageRank {
10099
(result
101100
.filter { case (id, vertex) => vertex.value >= threshold }
102101
.map { case (id, vertex) => "%s\t%s\n".format(id, vertex.value) }
103-
.collect.mkString)
102+
.collect().mkString)
104103
println(top)
104+
105+
sc.stop()
105106
}
106107
}

examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ object RDDRelation {
5151
val rddFromSql = sql("SELECT key, value FROM records WHERE key < 10")
5252

5353
println("Result of RDD.map:")
54-
rddFromSql.map(row => s"Key: ${row(0)}, Value: ${row(1)}").collect.foreach(println)
54+
rddFromSql.map(row => s"Key: ${row(0)}, Value: ${row(1)}").collect().foreach(println)
5555

5656
// Queries can also be written using a LINQ-like Scala DSL.
5757
rdd.where('key === 1).orderBy('value.asc).select('key).collect().foreach(println)
@@ -68,5 +68,7 @@ object RDDRelation {
6868
// These files can also be registered as tables.
6969
parquetFile.registerTempTable("parquetFile")
7070
sql("SELECT * FROM parquetFile").collect().foreach(println)
71+
72+
sc.stop()
7173
}
7274
}

0 commit comments

Comments
 (0)