diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 966198dd5e2c..e41088f7c8f6 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -723,7 +723,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
(safeEnd - safeStart) / step + 1
}
}
- parallelize(0 until numSlices, numSlices).mapPartitionsWithIndex((i, _) => {
+ parallelize(0 until numSlices, numSlices).mapPartitionsWithIndex { (i, _) =>
val partitionStart = (i * numElements) / numSlices * step + start
val partitionEnd = (((i + 1) * numElements) / numSlices) * step + start
def getSafeMargin(bi: BigInt): Long =
@@ -762,7 +762,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
ret
}
}
- })
+ }
}
/** Distribute a local Scala collection to form an RDD.
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 9bd3fc1033f5..b443e8f0519f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -843,10 +843,10 @@ private[deploy] class Master(
addressToApp -= app.driver.address
if (completedApps.size >= RETAINED_APPLICATIONS) {
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
- completedApps.take(toRemove).foreach( a => {
+ completedApps.take(toRemove).foreach { a =>
Option(appIdToUI.remove(a.id)).foreach { ui => webUi.detachSparkUI(ui) }
applicationMetricsSystem.removeSource(a.appSource)
- })
+ }
completedApps.trimStart(toRemove)
}
completedApps += app // Remember it in our history
diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
index 8358244987a6..63d1d1767a8c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
@@ -35,9 +35,9 @@ class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[Blo
override def getPartitions: Array[Partition] = {
assertValid()
- (0 until blockIds.length).map(i => {
+ (0 until blockIds.length).map { i =>
new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition]
- }).toArray
+ }.toArray
}
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 08db96edd69b..daeed89eff8b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -424,7 +424,7 @@ private[spark] object HadoopRDD extends Logging {
private[spark] def convertSplitLocationInfo(infos: Array[AnyRef]): Seq[String] = {
val out = ListBuffer[String]()
- infos.foreach { loc => {
+ infos.foreach { loc =>
val locationStr = HadoopRDD.SPLIT_INFO_REFLECTIONS.get.
getLocation.invoke(loc).asInstanceOf[String]
if (locationStr != "localhost") {
@@ -436,7 +436,7 @@ private[spark] object HadoopRDD extends Logging {
out += new HostTaskLocation(locationStr).toString
}
}
- }}
+ }
out.seq
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
index 462fb39ea20b..bb84e4af15b1 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
@@ -121,11 +121,11 @@ private object ParallelCollectionRDD {
// Sequences need to be sliced at the same set of index positions for operations
// like RDD.zip() to behave as expected
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
- (0 until numSlices).iterator.map(i => {
+ (0 until numSlices).iterator.map { i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
- })
+ }
}
seq match {
case r: Range =>
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
index c3579d761d73..0abba15bec9f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
@@ -68,9 +68,9 @@ class PartitionerAwareUnionRDD[T: ClassTag](
override def getPartitions: Array[Partition] = {
val numPartitions = partitioner.get.numPartitions
- (0 until numPartitions).map(index => {
+ (0 until numPartitions).map { index =>
new PartitionerAwareUnionRDDPartition(rdds, index)
- }).toArray
+ }.toArray
}
// Get the location where most of the partitions of parent RDDs are located
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 7295d506823d..1e322ac67941 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -226,7 +226,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
* @return
*/
protected def toAttributeMap(offerAttributes: JList[Attribute]): Map[String, GeneratedMessage] = {
- offerAttributes.asScala.map(attr => {
+ offerAttributes.asScala.map { attr =>
val attrValue = attr.getType match {
case Value.Type.SCALAR => attr.getScalar
case Value.Type.RANGES => attr.getRanges
@@ -234,7 +234,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
case Value.Type.TEXT => attr.getText
}
(attr.getName, attrValue)
- }).toMap
+ }.toMap
}
diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
index 637b2dfc193b..876cdfaa8760 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
@@ -69,10 +69,10 @@ private[spark] class BlockStoreShuffleReader[K, C](
// Update the context task metrics for each record read.
val readMetrics = context.taskMetrics.registerTempShuffleReadMetrics()
val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
- recordIter.map(record => {
+ recordIter.map { record =>
readMetrics.incRecordsRead(1)
record
- }),
+ },
context.taskMetrics().mergeShuffleReadMetrics())
// An interruptible iterator must be used here in order to support task cancellation
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index 1304efd8f2ec..f609fb4cd2e7 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -42,13 +42,13 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage
var hasShuffleWrite = false
var hasShuffleRead = false
var hasBytesSpilled = false
- stageData.foreach(data => {
+ stageData.foreach { data =>
hasInput = data.hasInput
hasOutput = data.hasOutput
hasShuffleRead = data.hasShuffleRead
hasShuffleWrite = data.hasShuffleWrite
hasBytesSpilled = data.hasBytesSpilled
- })
+ }
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
index b6b8bc33f7e1..bb2af9cd72e2 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
@@ -116,7 +116,7 @@ object RecoverableNetworkWordCount {
val lines = ssc.socketTextStream(ip, port)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
- wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
+ wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
// Get or register the blacklist Broadcast
val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
// Get or register the droppedWordsCounter Accumulator
@@ -135,7 +135,7 @@ object RecoverableNetworkWordCount {
println("Dropped " + droppedWordsCounter.value + " word(s) totally")
println("Appending to " + outputFile.getAbsolutePath)
Files.append(output + "\n", outputFile, Charset.defaultCharset())
- })
+ }
ssc
}
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala
index 3727f8fe6a21..918e124065e4 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala
@@ -59,7 +59,7 @@ object SqlNetworkWordCount {
val words = lines.flatMap(_.split(" "))
// Convert RDDs of the words DStream to DataFrame and run SQL query
- words.foreachRDD((rdd: RDD[String], time: Time) => {
+ words.foreachRDD { (rdd: RDD[String], time: Time) =>
// Get the singleton instance of SQLContext
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
@@ -75,7 +75,7 @@ object SqlNetworkWordCount {
sqlContext.sql("select word, count(*) as total from words group by word")
println(s"========= $time =========")
wordCountsDataFrame.show()
- })
+ }
ssc.start()
ssc.awaitTermination()
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
index 719fca0938b3..8050ec357e26 100644
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
@@ -129,9 +129,9 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
* @param success Whether the batch was successful or not.
*/
private def completeTransaction(sequenceNumber: CharSequence, success: Boolean) {
- removeAndGetProcessor(sequenceNumber).foreach(processor => {
+ removeAndGetProcessor(sequenceNumber).foreach { processor =>
processor.batchProcessed(success)
- })
+ }
}
/**
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
index 14dffb15fef9..41f27e937662 100644
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
@@ -88,23 +88,23 @@ class SparkSink extends AbstractSink with Logging with Configurable {
// dependencies which are being excluded in the build. In practice,
// Netty dependencies are already available on the JVM as Flume would have pulled them in.
serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port)))
- serverOpt.foreach(server => {
+ serverOpt.foreach { server =>
logInfo("Starting Avro server for sink: " + getName)
server.start()
- })
+ }
super.start()
}
override def stop() {
logInfo("Stopping Spark Sink: " + getName)
- handler.foreach(callbackHandler => {
+ handler.foreach { callbackHandler =>
callbackHandler.shutdown()
- })
- serverOpt.foreach(server => {
+ }
+ serverOpt.foreach { server =>
logInfo("Stopping Avro Server for sink: " + getName)
server.close()
server.join()
- })
+ }
blockingLatch.countDown()
super.stop()
}
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
index b15c2097e550..19e736f01697 100644
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
@@ -110,7 +110,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
eventBatch.setErrorMsg("Something went wrong. Channel was " +
"unable to create a transaction!")
}
- txOpt.foreach(tx => {
+ txOpt.foreach { tx =>
tx.begin()
val events = new util.ArrayList[SparkSinkEvent](maxBatchSize)
val loop = new Breaks
@@ -145,7 +145,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
// At this point, the events are available, so fill them into the event batch
eventBatch = new EventBatch("", seqNum, events)
}
- })
+ }
} catch {
case interrupted: InterruptedException =>
// Don't pollute logs if the InterruptedException came from this being stopped
@@ -156,9 +156,9 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
logWarning("Error while processing transaction.", e)
eventBatch.setErrorMsg(e.getMessage)
try {
- txOpt.foreach(tx => {
+ txOpt.foreach { tx =>
rollbackAndClose(tx, close = true)
- })
+ }
} finally {
txOpt = None
}
@@ -174,7 +174,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
*/
private def processAckOrNack() {
batchAckLatch.await(transactionTimeout, TimeUnit.SECONDS)
- txOpt.foreach(tx => {
+ txOpt.foreach { tx =>
if (batchSuccess) {
try {
logDebug("Committing transaction")
@@ -197,7 +197,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
// cause issues. This is required to ensure the TransactionProcessor instance is not leaked
parent.removeAndGetProcessor(seqNum)
}
- })
+ }
}
/**
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
index 250bfc1718db..54565840fa66 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
@@ -79,11 +79,11 @@ private[streaming] class FlumePollingReceiver(
override def onStart(): Unit = {
// Create the connections to each Flume agent.
- addresses.foreach(host => {
+ addresses.foreach { host =>
val transceiver = new NettyTransceiver(host, channelFactory)
val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver)
connections.add(new FlumeConnection(transceiver, client))
- })
+ }
for (i <- 0 until parallelism) {
logInfo("Starting Flume Polling Receiver worker threads..")
// Threads that pull data from Flume.
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
index 1a96df6e94b9..6a4dafb8eddb 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
@@ -123,9 +123,9 @@ private[flume] class PollingFlumeTestUtils {
val latch = new CountDownLatch(batchCount * channels.size)
sinks.foreach(_.countdownWhenBatchReceived(latch))
- channels.foreach(channel => {
+ channels.foreach { channel =>
executorCompletion.submit(new TxnSubmitter(channel))
- })
+ }
for (i <- 0 until channels.size) {
executorCompletion.take()
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index ee7f4fadca89..f43626ca814a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -519,7 +519,7 @@ class CodegenContext {
// Get all the expressions that appear at least twice and set up the state for subexpression
// elimination.
val commonExprs = equivalentExpressions.getAllEquivalentExprs.filter(_.size > 1)
- commonExprs.foreach(e => {
+ commonExprs.foreach { e =>
val expr = e.head
val fnName = freshName("evalExpr")
val isNull = s"${fnName}IsNull"
@@ -561,7 +561,7 @@ class CodegenContext {
subexprFunctions += s"$fnName($INPUT_ROW);"
val state = SubExprEliminationState(isNull, value)
e.foreach(subExprEliminationExprs.put(_, state))
- })
+ }
}
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 438cbabdbb8a..aeb1842677c6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -286,10 +286,10 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper {
assert(children.nonEmpty)
if (projectList.forall(_.deterministic)) {
val newFirstChild = Project(projectList, children.head)
- val newOtherChildren = children.tail.map ( child => {
+ val newOtherChildren = children.tail.map { child =>
val rewrites = buildRewrites(children.head, child)
Project(projectList.map(pushToRight(_, rewrites)), child)
- } )
+ }
Union(newFirstChild +: newOtherChildren)
} else {
p
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index aba500ad8de2..344aaff348e7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -400,7 +400,7 @@ case class Range(
sqlContext
.sparkContext
.parallelize(0 until numSlices, numSlices)
- .mapPartitionsWithIndex((i, _) => {
+ .mapPartitionsWithIndex { (i, _) =>
val partitionStart = (i * numElements) / numSlices * step + start
val partitionEnd = (((i + 1) * numElements) / numSlices) * step + start
def getSafeMargin(bi: BigInt): Long =
@@ -444,7 +444,7 @@ case class Range(
unsafeRow
}
}
- })
+ }
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index b7ff5f72427a..065c8572b06a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -251,12 +251,12 @@ object JdbcUtils extends Logging {
def schemaString(df: DataFrame, url: String): String = {
val sb = new StringBuilder()
val dialect = JdbcDialects.get(url)
- df.schema.fields foreach { field => {
+ df.schema.fields foreach { field =>
val name = field.name
val typ: String = getJdbcType(field.dataType, dialect).databaseTypeDefinition
val nullable = if (field.nullable) "" else "NOT NULL"
sb.append(s", $name $typ $nullable")
- }}
+ }
if (sb.length < 2) "" else sb.substring(2)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index 589862c7c02e..585befe37825 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -450,9 +450,7 @@ private[hive] trait HiveInspectors {
if (o != null) {
val array = o.asInstanceOf[ArrayData]
val values = new java.util.ArrayList[Any](array.numElements())
- array.foreach(elementType, (_, e) => {
- values.add(wrapper(e))
- })
+ array.foreach(elementType, (_, e) => values.add(wrapper(e)))
values
} else {
null
@@ -468,9 +466,8 @@ private[hive] trait HiveInspectors {
if (o != null) {
val map = o.asInstanceOf[MapData]
val jmap = new java.util.HashMap[Any, Any](map.numElements())
- map.foreach(mt.keyType, mt.valueType, (k, v) => {
- jmap.put(keyWrapper(k), valueWrapper(v))
- })
+ map.foreach(mt.keyType, mt.valueType, (k, v) =>
+ jmap.put(keyWrapper(k), valueWrapper(v)))
jmap
} else {
null
@@ -587,9 +584,9 @@ private[hive] trait HiveInspectors {
case x: ListObjectInspector =>
val list = new java.util.ArrayList[Object]
val tpe = dataType.asInstanceOf[ArrayType].elementType
- a.asInstanceOf[ArrayData].foreach(tpe, (_, e) => {
+ a.asInstanceOf[ArrayData].foreach(tpe, (_, e) =>
list.add(wrap(e, x.getListElementObjectInspector, tpe))
- })
+ )
list
case x: MapObjectInspector =>
val keyType = dataType.asInstanceOf[MapType].keyType
@@ -599,10 +596,10 @@ private[hive] trait HiveInspectors {
// Some UDFs seem to assume we pass in a HashMap.
val hashMap = new java.util.HashMap[Any, Any](map.numElements())
- map.foreach(keyType, valueType, (k, v) => {
+ map.foreach(keyType, valueType, (k, v) =>
hashMap.put(wrap(k, x.getMapKeyObjectInspector, keyType),
wrap(v, x.getMapValueObjectInspector, valueType))
- })
+ )
hashMap
}
@@ -704,9 +701,8 @@ private[hive] trait HiveInspectors {
ObjectInspectorFactory.getStandardConstantListObjectInspector(listObjectInspector, null)
} else {
val list = new java.util.ArrayList[Object]()
- value.asInstanceOf[ArrayData].foreach(dt, (_, e) => {
- list.add(wrap(e, listObjectInspector, dt))
- })
+ value.asInstanceOf[ArrayData].foreach(dt, (_, e) =>
+ list.add(wrap(e, listObjectInspector, dt)))
ObjectInspectorFactory.getStandardConstantListObjectInspector(listObjectInspector, list)
}
case Literal(value, MapType(keyType, valueType, _)) =>
@@ -718,9 +714,8 @@ private[hive] trait HiveInspectors {
val map = value.asInstanceOf[MapData]
val jmap = new java.util.HashMap[Any, Any](map.numElements())
- map.foreach(keyType, valueType, (k, v) => {
- jmap.put(wrap(k, keyOI, keyType), wrap(v, valueOI, valueType))
- })
+ map.foreach(keyType, valueType, (k, v) =>
+ jmap.put(wrap(k, keyOI, keyType), wrap(v, valueOI, valueType)))
ObjectInspectorFactory.getStandardConstantMapObjectInspector(keyOI, valueOI, jmap)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 5cc677d08510..03956009541a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -247,10 +247,10 @@ class CheckpointWriter(
// Delete old checkpoint files
val allCheckpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, Some(fs))
if (allCheckpointFiles.size > 10) {
- allCheckpointFiles.take(allCheckpointFiles.size - 10).foreach(file => {
+ allCheckpointFiles.take(allCheckpointFiles.size - 10).foreach { file =>
logInfo("Deleting " + file)
fs.delete(file, true)
- })
+ }
}
// All done, print success
@@ -345,7 +345,7 @@ object CheckpointReader extends Logging {
// Try to read the checkpoint files in the order
logInfo("Checkpoint files found: " + checkpointFiles.mkString(","))
var readError: Exception = null
- checkpointFiles.foreach(file => {
+ checkpointFiles.foreach { file =>
logInfo("Attempting to load checkpoint from file " + file)
try {
val fis = fs.open(file)
@@ -358,7 +358,7 @@ object CheckpointReader extends Logging {
readError = e
logWarning("Error reading checkpoint from file " + file, e)
}
- })
+ }
// If none of checkpoint files could be read, then throw exception
if (!ignoreReadError) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
index 28aed0ca4534..8efb09a8ce98 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
@@ -48,11 +48,11 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
// and then apply the update function
val updateFuncLocal = updateFunc
val finalFunc = (iterator: Iterator[(K, (Iterable[V], Iterable[S]))]) => {
- val i = iterator.map(t => {
+ val i = iterator.map { t =>
val itr = t._2._2.iterator
val headOption = if (itr.hasNext) Some(itr.next()) else None
(t._1, t._2._1.toSeq, headOption)
- })
+ }
updateFuncLocal(i)
}
val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 3b33a979df88..9aa2f0bbb995 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -434,11 +434,11 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
* worker nodes as a parallel collection, and runs them.
*/
private def launchReceivers(): Unit = {
- val receivers = receiverInputStreams.map(nis => {
+ val receivers = receiverInputStreams.map { nis =>
val rcvr = nis.getReceiver()
rcvr.setReceiverId(nis.id)
rcvr
- })
+ }
runDummySparkJob()