Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
* Usage: JavaSparkPi [slices]
*/
public final class JavaSparkPi {


Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's just whitespace

public static void main(String[] args) throws Exception {
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi");
Expand Down Expand Up @@ -61,5 +60,7 @@ public Integer call(Integer integer, Integer integer2) {
});

System.out.println("Pi is roughly " + 4.0 * count / n);

jsc.stop();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about using the Closeable feature?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't mean call close() but use try with resources ? That would require requiring Java 7.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the sad reminder that java6 is still an anchor for spark

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public static void main(String[] args) throws Exception {
// Load a text file and convert each line to a Java Bean.
JavaRDD<Person> people = ctx.textFile("examples/src/main/resources/people.txt").map(
new Function<String, Person>() {
public Person call(String line) throws Exception {
@Override
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are these additions of "@OverRide" intentional?

public Person call(String line) {
String[] parts = line.split(",");

Person person = new Person();
Expand All @@ -82,6 +83,7 @@ public Person call(String line) throws Exception {
// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
List<String> teenagerNames = teenagers.map(new Function<Row, String>() {
@Override
public String call(Row row) {
return "Name: " + row.getString(0);
}
Expand All @@ -104,6 +106,7 @@ public String call(Row row) {
JavaSchemaRDD teenagers2 =
sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
teenagerNames = teenagers2.map(new Function<Row, String>() {
@Override
public String call(Row row) {
return "Name: " + row.getString(0);
}
Expand Down Expand Up @@ -136,6 +139,7 @@ public String call(Row row) {
// The results of SQL queries are JavaSchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
teenagerNames = teenagers3.map(new Function<Row, String>() {
@Override
public String call(Row row) { return "Name: " + row.getString(0); }
}).collect();
for (String name: teenagerNames) {
Expand All @@ -162,12 +166,15 @@ public String call(Row row) {

JavaSchemaRDD peopleWithCity = sqlCtx.sql("SELECT name, address.city FROM people2");
List<String> nameAndCity = peopleWithCity.map(new Function<Row, String>() {
@Override
public String call(Row row) {
return "Name: " + row.getString(0) + ", City: " + row.getString(1);
}
}).collect();
for (String name: nameAndCity) {
System.out.println(name);
}

ctx.stop();
}
}
2 changes: 2 additions & 0 deletions examples/src/main/python/avro_inputformat.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,5 @@
output = avro_rdd.map(lambda x: x[0]).collect()
for k in output:
print k

sc.stop()
2 changes: 2 additions & 0 deletions examples/src/main/python/parquet_inputformat.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,5 @@
output = parquet_rdd.map(lambda x: x[1]).collect()
for k in output:
print k

sc.stop()
Original file line number Diff line number Diff line change
Expand Up @@ -136,5 +136,7 @@ object CassandraCQLTest {
classOf[CqlOutputFormat],
job.getConfiguration()
)

sc.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ object CassandraTest {
}
}.saveAsNewAPIHadoopFile("casDemo", classOf[ByteBuffer], classOf[List[Mutation]],
classOf[ColumnFamilyOutputFormat], job.getConfiguration)

sc.stop()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ object GroupByTest {
arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr)
}
arr1
}.cache
}.cache()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's scala, was it intentional to add the parens?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, all intentional. Parens should be used in Scala when methods have side effects, and @OverRide should be used in Java where an override is intended. These are just small matters of style but think worth standardizing to match other source and common practice while changing nearby code.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, i learned something new and it's early on a monday - for others interested: http://docs.scala-lang.org/style/method-invocation.html

// Enforce that everything has been calculated and in cache
pairs1.count
pairs1.count()

println(pairs1.groupByKey(numReducers).count)
println(pairs1.groupByKey(numReducers).count())

sc.stop()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,7 @@ object LogQuery {
.reduceByKey((a, b) => a.merge(b))
.collect().foreach{
case (user, query) => println("%s\t%s".format(user, query))}

sc.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import org.apache.spark._
import org.apache.spark.SparkContext._

import org.apache.spark.bagel._
import org.apache.spark.bagel.Bagel._

import scala.xml.{XML,NodeSeq}

Expand Down Expand Up @@ -78,9 +77,9 @@ object WikipediaPageRank {
(id, new PRVertex(1.0 / numVertices, outEdges))
})
if (usePartitioner) {
vertices = vertices.partitionBy(new HashPartitioner(sc.defaultParallelism)).cache
vertices = vertices.partitionBy(new HashPartitioner(sc.defaultParallelism)).cache()
} else {
vertices = vertices.cache
vertices = vertices.cache()
}
println("Done parsing input file.")

Expand All @@ -100,7 +99,9 @@ object WikipediaPageRank {
(result
.filter { case (id, vertex) => vertex.value >= threshold }
.map { case (id, vertex) => "%s\t%s\n".format(id, vertex.value) }
.collect.mkString)
.collect().mkString)
println(top)

sc.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ object RDDRelation {
val rddFromSql = sql("SELECT key, value FROM records WHERE key < 10")

println("Result of RDD.map:")
rddFromSql.map(row => s"Key: ${row(0)}, Value: ${row(1)}").collect.foreach(println)
rddFromSql.map(row => s"Key: ${row(0)}, Value: ${row(1)}").collect().foreach(println)

// Queries can also be written using a LINQ-like Scala DSL.
rdd.where('key === 1).orderBy('value.asc).select('key).collect().foreach(println)
Expand All @@ -68,5 +68,7 @@ object RDDRelation {
// These files can also be registered as tables.
parquetFile.registerTempTable("parquetFile")
sql("SELECT * FROM parquetFile").collect().foreach(println)

sc.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ object HiveFromSpark {

// Queries are expressed in HiveQL
println("Result of 'SELECT *': ")
sql("SELECT * FROM src").collect.foreach(println)
sql("SELECT * FROM src").collect().foreach(println)

// Aggregation queries are also supported.
val count = sql("SELECT COUNT(*) FROM src").collect().head.getLong(0)
Expand All @@ -61,5 +61,7 @@ object HiveFromSpark {
// Queries can then join RDD data with data stored in Hive.
println("Result of SELECT *:")
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").collect().foreach(println)

sc.stop()
}
}