Skip to content

Commit b69f8b2

Browse files
committed
Merge pull request apache#557 from ScrapCodes/style. Closes apache#557.
SPARK-1058, Fix Style Errors and Add Scala Style to Spark Build. Author: Patrick Wendell <[email protected]> Author: Prashant Sharma <[email protected]> == Merge branch commits == commit 1a8bd1c059b842cb95cc246aaea74a79fec684f4 Author: Prashant Sharma <[email protected]> Date: Sun Feb 9 17:39:07 2014 +0530 scala style fixes commit f91709887a8e0b608c5c2b282db19b8a44d53a43 Author: Patrick Wendell <[email protected]> Date: Fri Jan 24 11:22:53 2014 -0800 Adding scalastyle snapshot
1 parent b6dba10 commit b69f8b2

File tree

119 files changed

+795
-460
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

119 files changed

+795
-460
lines changed

bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala

Lines changed: 32 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -28,21 +28,22 @@ object Bagel extends Logging {
2828
/**
2929
* Runs a Bagel program.
3030
* @param sc [[org.apache.spark.SparkContext]] to use for the program.
31-
* @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the Key will be
32-
* the vertex id.
33-
* @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often this will be an
34-
* empty array, i.e. sc.parallelize(Array[K, Message]()).
35-
* @param combiner [[org.apache.spark.bagel.Combiner]] combines multiple individual messages to a given vertex into one
36-
* message before sending (which often involves network I/O).
37-
* @param aggregator [[org.apache.spark.bagel.Aggregator]] performs a reduce across all vertices after each superstep,
38-
* and provides the result to each vertex in the next superstep.
31+
* @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the
32+
* Key will be the vertex id.
33+
* @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often
34+
* this will be an empty array, i.e. sc.parallelize(Array[K, Message]()).
35+
* @param combiner [[org.apache.spark.bagel.Combiner]] combines multiple individual messages to a
36+
* given vertex into one message before sending (which often involves network I/O).
37+
* @param aggregator [[org.apache.spark.bagel.Aggregator]] performs a reduce across all vertices
38+
* after each superstep and provides the result to each vertex in the next
39+
* superstep.
3940
* @param partitioner [[org.apache.spark.Partitioner]] partitions values by key
4041
* @param numPartitions number of partitions across which to split the graph.
4142
* Default is the default parallelism of the SparkContext
42-
* @param storageLevel [[org.apache.spark.storage.StorageLevel]] to use for caching of intermediate RDDs in each superstep.
43-
* Defaults to caching in memory.
44-
* @param compute function that takes a Vertex, optional set of (possibly combined) messages to the Vertex,
45-
* optional Aggregator and the current superstep,
43+
* @param storageLevel [[org.apache.spark.storage.StorageLevel]] to use for caching of
44+
* intermediate RDDs in each superstep. Defaults to caching in memory.
45+
* @param compute function that takes a Vertex, optional set of (possibly combined) messages to
46+
* the Vertex, optional Aggregator and the current superstep,
4647
* and returns a set of (Vertex, outgoing Messages) pairs
4748
* @tparam K key
4849
* @tparam V vertex type
@@ -71,7 +72,7 @@ object Bagel extends Logging {
7172
var msgs = messages
7273
var noActivity = false
7374
do {
74-
logInfo("Starting superstep "+superstep+".")
75+
logInfo("Starting superstep " + superstep + ".")
7576
val startTime = System.currentTimeMillis
7677

7778
val aggregated = agg(verts, aggregator)
@@ -97,7 +98,8 @@ object Bagel extends Logging {
9798
verts
9899
}
99100

100-
/** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the default storage level */
101+
/** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the default
102+
* storage level */
101103
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
102104
sc: SparkContext,
103105
vertices: RDD[(K, V)],
@@ -106,8 +108,8 @@ object Bagel extends Logging {
106108
partitioner: Partitioner,
107109
numPartitions: Int
108110
)(
109-
compute: (V, Option[C], Int) => (V, Array[M])
110-
): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
111+
compute: (V, Option[C], Int) => (V, Array[M])): RDD[(K, V)] = run(sc, vertices, messages,
112+
combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
111113

112114
/** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] */
113115
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
@@ -127,8 +129,8 @@ object Bagel extends Logging {
127129
}
128130

129131
/**
130-
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default [[org.apache.spark.HashPartitioner]]
131-
* and default storage level
132+
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default
133+
* [[org.apache.spark.HashPartitioner]] and default storage level
132134
*/
133135
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
134136
sc: SparkContext,
@@ -138,9 +140,13 @@ object Bagel extends Logging {
138140
numPartitions: Int
139141
)(
140142
compute: (V, Option[C], Int) => (V, Array[M])
141-
): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
143+
): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions,
144+
DEFAULT_STORAGE_LEVEL)(compute)
142145

143-
/** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the default [[org.apache.spark.HashPartitioner]]*/
146+
/**
147+
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the
148+
* default [[org.apache.spark.HashPartitioner]]
149+
*/
144150
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
145151
sc: SparkContext,
146152
vertices: RDD[(K, V)],
@@ -158,7 +164,8 @@ object Bagel extends Logging {
158164
}
159165

160166
/**
161-
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default [[org.apache.spark.HashPartitioner]],
167+
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]],
168+
* default [[org.apache.spark.HashPartitioner]],
162169
* [[org.apache.spark.bagel.DefaultCombiner]] and the default storage level
163170
*/
164171
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
@@ -171,7 +178,8 @@ object Bagel extends Logging {
171178
): RDD[(K, V)] = run(sc, vertices, messages, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
172179

173180
/**
174-
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], the default [[org.apache.spark.HashPartitioner]]
181+
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]],
182+
* the default [[org.apache.spark.HashPartitioner]]
175183
* and [[org.apache.spark.bagel.DefaultCombiner]]
176184
*/
177185
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
@@ -227,8 +235,9 @@ object Bagel extends Logging {
227235
})
228236

229237
numMsgs += newMsgs.size
230-
if (newVert.active)
238+
if (newVert.active) {
231239
numActiveVerts += 1
240+
}
232241

233242
Some((newVert, newMsgs))
234243
}.persist(storageLevel)

core/src/main/scala/org/apache/spark/CacheManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
3131
private val loading = new HashSet[RDDBlockId]()
3232

3333
/** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */
34-
def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel)
35-
: Iterator[T] = {
34+
def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext,
35+
storageLevel: StorageLevel): Iterator[T] = {
3636
val key = RDDBlockId(rdd.id, split.index)
3737
logDebug("Looking for partition " + key)
3838
blockManager.get(key) match {

core/src/main/scala/org/apache/spark/FetchFailedException.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ private[spark] class FetchFailedException(
2525
cause: Throwable)
2626
extends Exception {
2727

28-
def this (bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int, cause: Throwable) =
28+
def this (bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int,
29+
cause: Throwable) =
2930
this(FetchFailed(bmAddress, shuffleId, mapId, reduceId),
3031
"Fetch failed: %s %d %d %d".format(bmAddress, shuffleId, mapId, reduceId),
3132
cause)

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import akka.pattern.ask
2929

3030
import org.apache.spark.scheduler.MapStatus
3131
import org.apache.spark.storage.BlockManagerId
32-
import org.apache.spark.util.{AkkaUtils, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils}
32+
import org.apache.spark.util.{AkkaUtils, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
3333

3434
private[spark] sealed trait MapOutputTrackerMessage
3535
private[spark] case class GetMapOutputStatuses(shuffleId: Int)

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,9 @@ import org.apache.spark.util.{Utils, TimeStampedHashMap, MetadataCleaner, Metada
6363
*/
6464
class SparkContext(
6565
config: SparkConf,
66-
// This is used only by YARN for now, but should be relevant to other cluster types (Mesos, etc)
67-
// too. This is typically generated from InputFormatInfo.computePreferredLocations. It contains
68-
// a map from hostname to a list of input format splits on the host.
66+
// This is used only by YARN for now, but should be relevant to other cluster types (Mesos,
67+
// etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It
68+
// contains a map from hostname to a list of input format splits on the host.
6969
val preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map())
7070
extends Logging {
7171

@@ -552,10 +552,11 @@ class SparkContext(
552552

553553
/**
554554
* Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and
555-
* BytesWritable values that contain a serialized partition. This is still an experimental storage
556-
* format and may not be supported exactly as is in future Spark releases. It will also be pretty
557-
* slow if you use the default serializer (Java serialization), though the nice thing about it is
558-
* that there's very little effort required to save arbitrary objects.
555+
* BytesWritable values that contain a serialized partition. This is still an experimental
556+
* storage format and may not be supported exactly as is in future Spark releases. It will also
557+
* be pretty slow if you use the default serializer (Java serialization),
558+
* though the nice thing about it is that there's very little effort required to save arbitrary
559+
* objects.
559560
*/
560561
def objectFile[T: ClassTag](
561562
path: String,
@@ -1043,7 +1044,7 @@ object SparkContext {
10431044

10441045
implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
10451046
def addInPlace(t1: Long, t2: Long) = t1 + t2
1046-
def zero(initialValue: Long) = 0l
1047+
def zero(initialValue: Long) = 0L
10471048
}
10481049

10491050
implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
@@ -1109,7 +1110,8 @@ object SparkContext {
11091110

11101111
implicit def floatWritableConverter() = simpleWritableConverter[Float, FloatWritable](_.get)
11111112

1112-
implicit def booleanWritableConverter() = simpleWritableConverter[Boolean, BooleanWritable](_.get)
1113+
implicit def booleanWritableConverter() =
1114+
simpleWritableConverter[Boolean, BooleanWritable](_.get)
11131115

11141116
implicit def bytesWritableConverter() = {
11151117
simpleWritableConverter[Array[Byte], BytesWritable](_.getBytes)
@@ -1258,7 +1260,8 @@ object SparkContext {
12581260

12591261
case "yarn-client" =>
12601262
val scheduler = try {
1261-
val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
1263+
val clazz =
1264+
Class.forName("org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
12621265
val cons = clazz.getConstructor(classOf[SparkContext])
12631266
cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
12641267

@@ -1269,7 +1272,8 @@ object SparkContext {
12691272
}
12701273

12711274
val backend = try {
1272-
val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")
1275+
val clazz =
1276+
Class.forName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")
12731277
val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
12741278
cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
12751279
} catch {

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ object SparkEnv extends Logging {
9696
@volatile private var lastSetSparkEnv : SparkEnv = _
9797

9898
def set(e: SparkEnv) {
99-
lastSetSparkEnv = e
99+
lastSetSparkEnv = e
100100
env.set(e)
101101
}
102102

@@ -112,7 +112,7 @@ object SparkEnv extends Logging {
112112
* Returns the ThreadLocal SparkEnv.
113113
*/
114114
def getThreadLocal: SparkEnv = {
115-
env.get()
115+
env.get()
116116
}
117117

118118
private[spark] def create(
@@ -168,7 +168,8 @@ object SparkEnv extends Logging {
168168
val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
169169
"BlockManagerMaster",
170170
new BlockManagerMasterActor(isLocal, conf)), conf)
171-
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer, conf)
171+
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
172+
serializer, conf)
172173

173174
val connectionManager = blockManager.connectionManager
174175

core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,8 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
148148
def sum(): Double = srdd.sum()
149149

150150
/**
151-
* Return a [[org.apache.spark.util.StatCounter]] object that captures the mean, variance and count
152-
* of the RDD's elements in one operation.
151+
* Return a [[org.apache.spark.util.StatCounter]] object that captures the mean, variance and
152+
* count of the RDD's elements in one operation.
153153
*/
154154
def stats(): StatCounter = srdd.stats()
155155

core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
8888
/**
8989
* Return a new RDD containing the distinct elements in this RDD.
9090
*/
91-
def distinct(numPartitions: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.distinct(numPartitions))
91+
def distinct(numPartitions: Int): JavaPairRDD[K, V] =
92+
new JavaPairRDD[K, V](rdd.distinct(numPartitions))
9293

9394
/**
9495
* Return a new RDD containing only the elements that satisfy a predicate.
@@ -210,25 +211,25 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
210211
rdd.countByKeyApprox(timeout, confidence).map(mapAsJavaMap)
211212

212213
/**
213-
* Merge the values for each key using an associative function and a neutral "zero value" which may
214-
* be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
215-
* list concatenation, 0 for addition, or 1 for multiplication.).
214+
* Merge the values for each key using an associative function and a neutral "zero value" which
215+
* may be added to the result an arbitrary number of times, and must not change the result
216+
* (e.g ., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
216217
*/
217-
def foldByKey(zeroValue: V, partitioner: Partitioner, func: JFunction2[V, V, V]): JavaPairRDD[K, V] =
218-
fromRDD(rdd.foldByKey(zeroValue, partitioner)(func))
218+
def foldByKey(zeroValue: V, partitioner: Partitioner, func: JFunction2[V, V, V])
219+
: JavaPairRDD[K, V] = fromRDD(rdd.foldByKey(zeroValue, partitioner)(func))
219220

220221
/**
221-
* Merge the values for each key using an associative function and a neutral "zero value" which may
222-
* be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
223-
* list concatenation, 0 for addition, or 1 for multiplication.).
222+
* Merge the values for each key using an associative function and a neutral "zero value" which
223+
* may be added to the result an arbitrary number of times, and must not change the result
224+
* (e.g ., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
224225
*/
225226
def foldByKey(zeroValue: V, numPartitions: Int, func: JFunction2[V, V, V]): JavaPairRDD[K, V] =
226227
fromRDD(rdd.foldByKey(zeroValue, numPartitions)(func))
227228

228229
/**
229-
* Merge the values for each key using an associative function and a neutral "zero value" which may
230-
* be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
231-
* list concatenation, 0 for addition, or 1 for multiplication.).
230+
* Merge the values for each key using an associative function and a neutral "zero value"
231+
* which may be added to the result an arbitrary number of times, and must not change the result
232+
* (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
232233
*/
233234
def foldByKey(zeroValue: V, func: JFunction2[V, V, V]): JavaPairRDD[K, V] =
234235
fromRDD(rdd.foldByKey(zeroValue)(func))
@@ -375,7 +376,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
375376
* pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
376377
* into `numPartitions` partitions.
377378
*/
378-
def leftOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, Optional[W])] = {
379+
def leftOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int)
380+
: JavaPairRDD[K, (V, Optional[W])] = {
379381
val joinResult = rdd.leftOuterJoin(other, numPartitions)
380382
fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))})
381383
}
@@ -397,7 +399,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
397399
* pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
398400
* RDD into the given number of partitions.
399401
*/
400-
def rightOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (Optional[V], W)] = {
402+
def rightOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int)
403+
: JavaPairRDD[K, (Optional[V], W)] = {
401404
val joinResult = rdd.rightOuterJoin(other, numPartitions)
402405
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
403406
}
@@ -439,8 +442,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
439442
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
440443
* tuple with the list of values for that key in `this`, `other1` and `other2`.
441444
*/
442-
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], partitioner: Partitioner)
443-
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
445+
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2],
446+
partitioner: Partitioner): JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
444447
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, partitioner)))
445448

446449
/**
@@ -462,8 +465,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
462465
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
463466
* list of values for that key in `this` as well as `other`.
464467
*/
465-
def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (JList[V], JList[W])]
466-
= fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions)))
468+
def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int)
469+
: JavaPairRDD[K, (JList[V], JList[W])] =
470+
fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions)))
467471

468472
/**
469473
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a

0 commit comments

Comments
 (0)