Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
(safeEnd - safeStart) / step + 1
}
}
parallelize(0 until numSlices, numSlices).mapPartitionsWithIndex((i, _) => {
parallelize(0 until numSlices, numSlices).mapPartitionsWithIndex { (i, _) =>
val partitionStart = (i * numElements) / numSlices * step + start
val partitionEnd = (((i + 1) * numElements) / numSlices) * step + start
def getSafeMargin(bi: BigInt): Long =
Expand Down Expand Up @@ -762,7 +762,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
ret
}
}
})
}
}

/** Distribute a local Scala collection to form an RDD.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -843,10 +843,10 @@ private[deploy] class Master(
addressToApp -= app.driver.address
if (completedApps.size >= RETAINED_APPLICATIONS) {
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
completedApps.take(toRemove).foreach( a => {
completedApps.take(toRemove).foreach { a =>
Option(appIdToUI.remove(a.id)).foreach { ui => webUi.detachSparkUI(ui) }
applicationMetricsSystem.removeSource(a.appSource)
})
}
completedApps.trimStart(toRemove)
}
completedApps += app // Remember it in our history
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[Blo

override def getPartitions: Array[Partition] = {
assertValid()
(0 until blockIds.length).map(i => {
(0 until blockIds.length).map { i =>
new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition]
}).toArray
}.toArray
}

override def compute(split: Partition, context: TaskContext): Iterator[T] = {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ private[spark] object HadoopRDD extends Logging {

private[spark] def convertSplitLocationInfo(infos: Array[AnyRef]): Seq[String] = {
val out = ListBuffer[String]()
infos.foreach { loc => {
infos.foreach { loc =>
val locationStr = HadoopRDD.SPLIT_INFO_REFLECTIONS.get.
getLocation.invoke(loc).asInstanceOf[String]
if (locationStr != "localhost") {
Expand All @@ -436,7 +436,7 @@ private[spark] object HadoopRDD extends Logging {
out += new HostTaskLocation(locationStr).toString
}
}
}}
}
out.seq
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,11 @@ private object ParallelCollectionRDD {
// Sequences need to be sliced at the same set of index positions for operations
// like RDD.zip() to behave as expected
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map(i => {
(0 until numSlices).iterator.map { i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
})
}
}
seq match {
case r: Range =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ class PartitionerAwareUnionRDD[T: ClassTag](

override def getPartitions: Array[Partition] = {
val numPartitions = partitioner.get.numPartitions
(0 until numPartitions).map(index => {
(0 until numPartitions).map { index =>
new PartitionerAwareUnionRDDPartition(rdds, index)
}).toArray
}.toArray
}

// Get the location where most of the partitions of parent RDDs are located
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,15 +226,15 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
* @return
*/
protected def toAttributeMap(offerAttributes: JList[Attribute]): Map[String, GeneratedMessage] = {
offerAttributes.asScala.map(attr => {
offerAttributes.asScala.map { attr =>
val attrValue = attr.getType match {
case Value.Type.SCALAR => attr.getScalar
case Value.Type.RANGES => attr.getRanges
case Value.Type.SET => attr.getSet
case Value.Type.TEXT => attr.getText
}
(attr.getName, attrValue)
}).toMap
}.toMap
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ private[spark] class BlockStoreShuffleReader[K, C](
// Update the context task metrics for each record read.
val readMetrics = context.taskMetrics.registerTempShuffleReadMetrics()
val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
recordIter.map(record => {
recordIter.map { record =>
readMetrics.incRecordsRead(1)
record
}),
},
context.taskMetrics().mergeShuffleReadMetrics())

// An interruptible iterator must be used here in order to support task cancellation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage
var hasShuffleWrite = false
var hasShuffleRead = false
var hasBytesSpilled = false
stageData.foreach(data => {
stageData.foreach { data =>
hasInput = data.hasInput
hasOutput = data.hasOutput
hasShuffleRead = data.hasShuffleRead
hasShuffleWrite = data.hasShuffleWrite
hasBytesSpilled = data.hasBytesSpilled
})
}

<table class={UIUtils.TABLE_CLASS_STRIPED_SORTABLE}>
<thead>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ object RecoverableNetworkWordCount {
val lines = ssc.socketTextStream(ip, port)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we're here, can this just be (rdd, time) =>? and likewise in the following file, maybe others

// Get or register the blacklist Broadcast
val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
// Get or register the droppedWordsCounter Accumulator
Expand All @@ -135,7 +135,7 @@ object RecoverableNetworkWordCount {
println("Dropped " + droppedWordsCounter.value + " word(s) totally")
println("Appending to " + outputFile.getAbsolutePath)
Files.append(output + "\n", outputFile, Charset.defaultCharset())
})
}
ssc
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ object SqlNetworkWordCount {
val words = lines.flatMap(_.split(" "))

// Convert RDDs of the words DStream to DataFrame and run SQL query
words.foreachRDD((rdd: RDD[String], time: Time) => {
words.foreachRDD { (rdd: RDD[String], time: Time) =>
// Get the singleton instance of SQLContext
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
Expand All @@ -75,7 +75,7 @@ object SqlNetworkWordCount {
sqlContext.sql("select word, count(*) as total from words group by word")
println(s"========= $time =========")
wordCountsDataFrame.show()
})
}

ssc.start()
ssc.awaitTermination()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
* @param success Whether the batch was successful or not.
*/
private def completeTransaction(sequenceNumber: CharSequence, success: Boolean) {
removeAndGetProcessor(sequenceNumber).foreach(processor => {
removeAndGetProcessor(sequenceNumber).foreach { processor =>
processor.batchProcessed(success)
})
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,23 +88,23 @@ class SparkSink extends AbstractSink with Logging with Configurable {
// dependencies which are being excluded in the build. In practice,
// Netty dependencies are already available on the JVM as Flume would have pulled them in.
serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port)))
serverOpt.foreach(server => {
serverOpt.foreach { server =>
logInfo("Starting Avro server for sink: " + getName)
server.start()
})
}
super.start()
}

override def stop() {
logInfo("Stopping Spark Sink: " + getName)
handler.foreach(callbackHandler => {
handler.foreach { callbackHandler =>
callbackHandler.shutdown()
})
serverOpt.foreach(server => {
}
serverOpt.foreach { server =>
logInfo("Stopping Avro Server for sink: " + getName)
server.close()
server.join()
})
}
blockingLatch.countDown()
super.stop()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
eventBatch.setErrorMsg("Something went wrong. Channel was " +
"unable to create a transaction!")
}
txOpt.foreach(tx => {
txOpt.foreach { tx =>
tx.begin()
val events = new util.ArrayList[SparkSinkEvent](maxBatchSize)
val loop = new Breaks
Expand Down Expand Up @@ -145,7 +145,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
// At this point, the events are available, so fill them into the event batch
eventBatch = new EventBatch("", seqNum, events)
}
})
}
} catch {
case interrupted: InterruptedException =>
// Don't pollute logs if the InterruptedException came from this being stopped
Expand All @@ -156,9 +156,9 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
logWarning("Error while processing transaction.", e)
eventBatch.setErrorMsg(e.getMessage)
try {
txOpt.foreach(tx => {
txOpt.foreach { tx =>
rollbackAndClose(tx, close = true)
})
}
} finally {
txOpt = None
}
Expand All @@ -174,7 +174,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
*/
private def processAckOrNack() {
batchAckLatch.await(transactionTimeout, TimeUnit.SECONDS)
txOpt.foreach(tx => {
txOpt.foreach { tx =>
if (batchSuccess) {
try {
logDebug("Committing transaction")
Expand All @@ -197,7 +197,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
// cause issues. This is required to ensure the TransactionProcessor instance is not leaked
parent.removeAndGetProcessor(seqNum)
}
})
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ private[streaming] class FlumePollingReceiver(

override def onStart(): Unit = {
// Create the connections to each Flume agent.
addresses.foreach(host => {
addresses.foreach { host =>
val transceiver = new NettyTransceiver(host, channelFactory)
val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver)
connections.add(new FlumeConnection(transceiver, client))
})
}
for (i <- 0 until parallelism) {
logInfo("Starting Flume Polling Receiver worker threads..")
// Threads that pull data from Flume.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ private[flume] class PollingFlumeTestUtils {
val latch = new CountDownLatch(batchCount * channels.size)
sinks.foreach(_.countdownWhenBatchReceived(latch))

channels.foreach(channel => {
channels.foreach { channel =>
executorCompletion.submit(new TxnSubmitter(channel))
})
}

for (i <- 0 until channels.size) {
executorCompletion.take()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ class CodegenContext {
// Get all the expressions that appear at least twice and set up the state for subexpression
// elimination.
val commonExprs = equivalentExpressions.getAllEquivalentExprs.filter(_.size > 1)
commonExprs.foreach(e => {
commonExprs.foreach { e =>
val expr = e.head
val fnName = freshName("evalExpr")
val isNull = s"${fnName}IsNull"
Expand Down Expand Up @@ -561,7 +561,7 @@ class CodegenContext {
subexprFunctions += s"$fnName($INPUT_ROW);"
val state = SubExprEliminationState(isNull, value)
e.foreach(subExprEliminationExprs.put(_, state))
})
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,10 +286,10 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper {
assert(children.nonEmpty)
if (projectList.forall(_.deterministic)) {
val newFirstChild = Project(projectList, children.head)
val newOtherChildren = children.tail.map ( child => {
val newOtherChildren = children.tail.map { child =>
val rewrites = buildRewrites(children.head, child)
Project(projectList.map(pushToRight(_, rewrites)), child)
} )
}
Union(newFirstChild +: newOtherChildren)
} else {
p
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ case class Range(
sqlContext
.sparkContext
.parallelize(0 until numSlices, numSlices)
.mapPartitionsWithIndex((i, _) => {
.mapPartitionsWithIndex { (i, _) =>
val partitionStart = (i * numElements) / numSlices * step + start
val partitionEnd = (((i + 1) * numElements) / numSlices) * step + start
def getSafeMargin(bi: BigInt): Long =
Expand Down Expand Up @@ -444,7 +444,7 @@ case class Range(
unsafeRow
}
}
})
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,12 @@ object JdbcUtils extends Logging {
def schemaString(df: DataFrame, url: String): String = {
val sb = new StringBuilder()
val dialect = JdbcDialects.get(url)
df.schema.fields foreach { field => {
df.schema.fields foreach { field =>
val name = field.name
val typ: String = getJdbcType(field.dataType, dialect).databaseTypeDefinition
val nullable = if (field.nullable) "" else "NOT NULL"
sb.append(s", $name $typ $nullable")
}}
}
if (sb.length < 2) "" else sb.substring(2)
}

Expand Down
Loading