Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ object BroadcastTest {
val arr1 = (0 until num).toArray

for (i <- 0 until 3) {
println("Iteration " + i)
println(s"Iteration $i")
println("===========")
val startTime = System.nanoTime
val barr1 = sc.broadcast(arr1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,10 @@ object DFSReadWriteTest {
}

private def printUsage(): Unit = {
val usage: String = "DFS Read-Write Test\n" +
"\n" +
"Usage: localFile dfsDir\n" +
"\n" +
"localFile - (string) local file to use in test\n" +
"dfsDir - (string) DFS directory for read/write tests\n"
val usage = """DFS Read-Write Test
|Usage: localFile dfsDir
|localFile - (string) local file to use in test
|dfsDir - (string) DFS directory for read/write tests""".stripMargin

println(usage)
}
Expand All @@ -69,13 +67,13 @@ object DFSReadWriteTest {

localFilePath = new File(args(i))
if (!localFilePath.exists) {
System.err.println("Given path (" + args(i) + ") does not exist.\n")
System.err.println(s"Given path (${args(i)}) does not exist")
printUsage()
System.exit(1)
}

if (!localFilePath.isFile) {
System.err.println("Given path (" + args(i) + ") is not a file.\n")
System.err.println(s"Given path (${args(i)}) is not a file")
printUsage()
System.exit(1)
}
Expand Down Expand Up @@ -108,7 +106,7 @@ object DFSReadWriteTest {
.getOrCreate()

println("Writing local file to DFS")
val dfsFilename = dfsDirPath + "/dfs_read_write_test"
val dfsFilename = s"${dfsDirPath}/dfs_read_write_test"
val fileRDD = spark.sparkContext.parallelize(fileContents)
fileRDD.saveAsTextFile(dfsFilename)

Expand All @@ -127,11 +125,11 @@ object DFSReadWriteTest {
spark.stop()

if (localWordCount == dfsWordCount) {
println(s"Success! Local Word Count ($localWordCount) " +
s"and DFS Word Count ($dfsWordCount) agree.")
println(s"Success! Local Word Count $localWordCount and " +
s"DFS Word Count $dfsWordCount agree.")
} else {
println(s"Failure! Local Word Count ($localWordCount) " +
s"and DFS Word Count ($dfsWordCount) disagree.")
println(s"Failure! Local Word Count $localWordCount " +
s"and DFS Word Count $dfsWordCount disagree.")
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ object HdfsTest {
val start = System.currentTimeMillis()
for (x <- mapped) { x + 2 }
val end = System.currentTimeMillis()
println("Iteration " + iter + " took " + (end-start) + " ms")
println(s"Iteration ${iter} took ${(end-start)} ms")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just write as $iter and $end-start

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon $end-start won't work, both are different variables see. I made changes.

}
spark.stop()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,7 @@ object LocalALS {
println(s"Iteration $iter:")
ms = (0 until M).map(i => updateMovie(i, ms(i), us, R)).toArray
us = (0 until U).map(j => updateUser(j, us(j), ms, R)).toArray
println("RMSE = " + rmse(R, ms, us))
println()
println(s"RMSE = ${rmse(R, ms, us)}")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ object LocalFileLR {

// Initialize w to a random value
val w = DenseVector.fill(D) {2 * rand.nextDouble - 1}
println("Initial w: " + w)
println(s"Initial w: $w")

for (i <- 1 to ITERATIONS) {
println("On iteration " + i)
println(s"On iteration $i")
val gradient = DenseVector.zeros[Double](D)
for (p <- points) {
val scale = (1 / (1 + math.exp(-p.y * (w.dot(p.x)))) - 1) * p.y
Expand All @@ -71,7 +71,7 @@ object LocalFileLR {
}

fileSrc.close()
println("Final w: " + w)
println(s"Final w: $w")
}
}
// scalastyle:on println
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ object LocalKMeans {
kPoints.put(i, iter.next())
}

println("Initial centers: " + kPoints)
println(s"Initial centers: $kPoints")

while(tempDist > convergeDist) {
val closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
Expand All @@ -114,7 +114,7 @@ object LocalKMeans {
}
}

println("Final centers: " + kPoints)
println(s"Final centers: $kPoints")
}
}
// scalastyle:on println
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ object LocalLR {
val data = generateData
// Initialize w to a random value
val w = DenseVector.fill(D) {2 * rand.nextDouble - 1}
println("Initial w: " + w)
println(s"Initial w: $w")

for (i <- 1 to ITERATIONS) {
println("On iteration " + i)
println(s"On iteration $i")
val gradient = DenseVector.zeros[Double](D)
for (p <- data) {
val scale = (1 / (1 + math.exp(-p.y * (w.dot(p.x)))) - 1) * p.y
Expand All @@ -73,7 +73,7 @@ object LocalLR {
w -= gradient
}

println("Final w: " + w)
println(s"Final w: $w")
}
}
// scalastyle:on println
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ object LocalPi {
val y = random * 2 - 1
if (x*x + y*y <= 1) count += 1
}
println("Pi is roughly " + 4 * count / 100000.0)
println(s"Pi is roughly ${4 * count / 100000.0}")
}
}
// scalastyle:on println
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ object SimpleSkewedGroupByTest {
// Enforce that everything has been calculated and in cache
pairs1.count

println("RESULT: " + pairs1.groupByKey(numReducers).count)
println(s"RESULT: ${pairs1.groupByKey(numReducers).count}")
// Print how many keys each reducer got (for debugging)
// println("RESULT: " + pairs1.groupByKey(numReducers)
// .map{case (k,v) => (k, v.size)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ object SparkALS {

def showWarning() {
System.err.println(
"""WARN: This is a naive implementation of ALS and is given as an example!
s"""WARN: This is a naive implementation of ALS and is given as an example!
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we don't need s.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Addressed ! Kindly do review

|Please use org.apache.spark.ml.recommendation.ALS
|for more conventional use.
""".stripMargin)
Expand All @@ -100,7 +100,7 @@ object SparkALS {
ITERATIONS = iters.getOrElse("5").toInt
slices = slices_.getOrElse("2").toInt
case _ =>
System.err.println("Usage: SparkALS [M] [U] [F] [iters] [partitions]")
System.err.println(s"Usage: SparkALS [M] [U] [F] [iters] [partitions]")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto for s.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Addressed ! Kindly do review

System.exit(1)
}

Expand Down Expand Up @@ -135,10 +135,8 @@ object SparkALS {
.map(i => update(i, usb.value(i), msb.value, Rc.value.transpose()))
.collect()
usb = sc.broadcast(us) // Re-broadcast us because it was updated
println("RMSE = " + rmse(R, ms, us))
println()
println(s"RMSE = ${rmse(R, ms, us)}")
}

spark.stop()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,17 @@ object SparkHdfsLR {

// Initialize w to a random value
val w = DenseVector.fill(D) {2 * rand.nextDouble - 1}
println("Initial w: " + w)
println(s"Initial w: $w")

for (i <- 1 to ITERATIONS) {
println("On iteration " + i)
println(s"On iteration $i")
val gradient = points.map { p =>
p.x * (1 / (1 + exp(-p.y * (w.dot(p.x)))) - 1) * p.y
}.reduce(_ + _)
w -= gradient
}

println("Final w: " + w)
println(s"Final w: $w")
spark.stop()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ object SparkKMeans {
for (newP <- newPoints) {
kPoints(newP._1) = newP._2
}
println("Finished iteration (delta = " + tempDist + ")")
println(s"Finished iteration (delta = $tempDist)")
}

println("Final centers:")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,17 @@ object SparkLR {

// Initialize w to a random value
val w = DenseVector.fill(D) {2 * rand.nextDouble - 1}
println("Initial w: " + w)
println(s"Initial w: $w")

for (i <- 1 to ITERATIONS) {
println("On iteration " + i)
println(s"On iteration $i")
val gradient = points.map { p =>
p.x * (1 / (1 + exp(-p.y * (w.dot(p.x)))) - 1) * p.y
}.reduce(_ + _)
w -= gradient
}

println("Final w: " + w)
println(s"Final w: $w")

spark.stop()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ object SparkPageRank {
}

val output = ranks.collect()
output.foreach(tup => println(tup._1 + " has rank: " + tup._2 + "."))
output.foreach(tup => println(s"${tup._1} has rank: ${tup._2} ."))

spark.stop()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ object SparkPi {
val y = random * 2 - 1
if (x*x + y*y <= 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / (n - 1))
println(s"Pi is roughly ${4.0 * count / (n - 1)}")
spark.stop()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ object SparkTC {
nextCount = tc.count()
} while (nextCount != oldCount)

println("TC has " + tc.count() + " edges.")
println(s"TC has ${tc.count()} edges.")
spark.stop()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,22 @@ import org.apache.spark.graphx.lib._
import org.apache.spark.internal.Logging
import org.apache.spark.storage.StorageLevel


/**
* Driver program for running graph algorithms.
*/
object Analytics extends Logging {

def main(args: Array[String]): Unit = {
if (args.length < 2) {
System.err.println(
"Usage: Analytics <taskType> <file> --numEPart=<num_edge_partitions> [other options]")
System.err.println("Supported 'taskType' as follows:")
System.err.println(" pagerank Compute PageRank")
System.err.println(" cc Compute the connected components of vertices")
System.err.println(" triangles Count the number of triangles")
val usage = """
|Usage: Analytics <taskType> <file> --numEPart=<num_edge_partitions> [other options]
|Supported 'taskType' as follows:
|pagerank Compute PageRank
|cc Compute the connected components of vertices
|triangles Count the number of triangles""".stripMargin

System.err.println(usage)
System.exit(1)
}

Expand All @@ -48,7 +51,7 @@ object Analytics extends Logging {
val optionsList = args.drop(2).map { arg =>
arg.dropWhile(_ == '-').split('=') match {
case Array(opt, v) => (opt -> v)
case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
case _ => throw new IllegalArgumentException(s"Invalid argument: $arg")
}
}
val options = mutable.Map(optionsList: _*)
Expand All @@ -74,68 +77,68 @@ object Analytics extends Logging {
val numIterOpt = options.remove("numIter").map(_.toInt)

options.foreach {
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
case (opt, _) => throw new IllegalArgumentException(s"Invalid option: $opt")
}

println("======================================")
println("| PageRank |")
println("======================================")

val sc = new SparkContext(conf.setAppName("PageRank(" + fname + ")"))
val sc = new SparkContext(conf.setAppName(s"PageRank($fname)"))

val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
numEdgePartitions = numEPart,
edgeStorageLevel = edgeStorageLevel,
vertexStorageLevel = vertexStorageLevel).cache()
val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))

println("GRAPHX: Number of vertices " + graph.vertices.count)
println("GRAPHX: Number of edges " + graph.edges.count)
println(s"GRAPHX: Number of vertices ${graph.vertices.count}")
println(s"GRAPHX: Number of edges ${graph.edges.count}")

val pr = (numIterOpt match {
case Some(numIter) => PageRank.run(graph, numIter)
case None => PageRank.runUntilConvergence(graph, tol)
}).vertices.cache()

println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_ + _))
println(s"GRAPHX: Total rank: ${pr.map(_._2).reduce(_ + _)}")

if (!outFname.isEmpty) {
logWarning("Saving pageranks of pages to " + outFname)
logWarning(s"Saving pageranks of pages to $outFname")
pr.map { case (id, r) => id + "\t" + r }.saveAsTextFile(outFname)
}

sc.stop()

case "cc" =>
options.foreach {
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
case (opt, _) => throw new IllegalArgumentException(s"Invalid option: $opt")
}

println("======================================")
println("| Connected Components |")
println("======================================")

val sc = new SparkContext(conf.setAppName("ConnectedComponents(" + fname + ")"))
val sc = new SparkContext(conf.setAppName(s"ConnectedComponents($fname)"))
val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
numEdgePartitions = numEPart,
edgeStorageLevel = edgeStorageLevel,
vertexStorageLevel = vertexStorageLevel).cache()
val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))

val cc = ConnectedComponents.run(graph)
println("Components: " + cc.vertices.map { case (vid, data) => data }.distinct())
println(s"Components: ${cc.vertices.map { case (vid, data) => data }.distinct()}")
sc.stop()

case "triangles" =>
options.foreach {
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
case (opt, _) => throw new IllegalArgumentException(s"Invalid option: $opt")
}

println("======================================")
println("| Triangle Count |")
println("======================================")

val sc = new SparkContext(conf.setAppName("TriangleCount(" + fname + ")"))
val sc = new SparkContext(conf.setAppName(s"TriangleCount($fname)"))
val graph = GraphLoader.edgeListFile(sc, fname,
canonicalOrientation = true,
numEdgePartitions = numEPart,
Expand Down