Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions core/src/main/scala/org/apache/spark/BarrierCoordinator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark

import java.util.{Timer, TimerTask}
import java.util.concurrent.ConcurrentHashMap
import java.util.function.{Consumer, Function}
import java.util.function.Consumer

import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -202,10 +202,8 @@ private[spark] class BarrierCoordinator(
case request @ RequestToSync(numTasks, stageId, stageAttemptId, _, _) =>
// Get or init the ContextBarrierState correspond to the stage attempt.
val barrierId = ContextBarrierId(stageId, stageAttemptId)
states.computeIfAbsent(barrierId, new Function[ContextBarrierId, ContextBarrierState] {
override def apply(key: ContextBarrierId): ContextBarrierState =
new ContextBarrierState(key, numTasks)
})
states.computeIfAbsent(barrierId,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

90% of the changes are like this, converting an anonymous inner class to a SAM expression.

(key: ContextBarrierId) => new ContextBarrierState(key, numTasks))
val barrierState = states.get(barrierId)

barrierState.handleRequest(context, request)
Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,8 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
cleaningThread.setDaemon(true)
cleaningThread.setName("Spark Context Cleaner")
cleaningThread.start()
periodicGCService.scheduleAtFixedRate(new Runnable {
override def run(): Unit = System.gc()
}, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
periodicGCService.scheduleAtFixedRate(() => System.gc(),
periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
}

/**
Expand Down
8 changes: 3 additions & 5 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
private val killExecutorThread = ThreadUtils.newDaemonSingleThreadExecutor("kill-executor-thread")

override def onStart(): Unit = {
timeoutCheckingTask = eventLoopThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
Option(self).foreach(_.ask[Boolean](ExpireDeadHosts))
}
}, 0, checkTimeoutIntervalMs, TimeUnit.MILLISECONDS)
timeoutCheckingTask = eventLoopThread.scheduleAtFixedRate(
() => Utils.tryLogNonFatalError { Option(self).foreach(_.ask[Boolean](ExpireDeadHosts)) },
0, checkTimeoutIntervalMs, TimeUnit.MILLISECONDS)
}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
Expand Down
6 changes: 2 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria

@transient private lazy val reader: ConfigReader = {
val _reader = new ConfigReader(new SparkConfigProvider(settings))
_reader.bindEnv(new ConfigProvider {
override def get(key: String): Option[String] = Option(getenv(key))
})
_reader.bindEnv((key: String) => Option(getenv(key)))
_reader
}

Expand Down Expand Up @@ -392,7 +390,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria

/** Get an optional value, applying variable substitution. */
private[spark] def getWithSubstitution(key: String): Option[String] = {
getOption(key).map(reader.substitute(_))
getOption(key).map(reader.substitute)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the files I did change, I cleaned up a few other things in nearby code, like this. Other examples are using .nonEmpty and removing redundant braces, etc

}

/** Get all parameters as a list of pairs */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,7 @@ object PythonRunner {
.javaAddress(localhost)
.callbackClient(py4j.GatewayServer.DEFAULT_PYTHON_PORT, localhost, secret)
.build()
val thread = new Thread(new Runnable() {
override def run(): Unit = Utils.logUncaughtExceptions {
gatewayServer.start()
}
})
val thread = new Thread(() => Utils.logUncaughtExceptions { gatewayServer.start() })
thread.setName("py4j-gateway-init")
thread.setDaemon(true)
thread.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.deploy
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream, File, IOException}
import java.security.PrivilegedExceptionAction
import java.text.DateFormat
import java.util.{Arrays, Comparator, Date, Locale}
import java.util.{Arrays, Date, Locale}

import scala.collection.JavaConverters._
import scala.collection.immutable.Map
Expand Down Expand Up @@ -270,11 +270,8 @@ private[spark] class SparkHadoopUtil extends Logging {
name.startsWith(prefix) && !name.endsWith(exclusionSuffix)
}
})
Arrays.sort(fileStatuses, new Comparator[FileStatus] {
override def compare(o1: FileStatus, o2: FileStatus): Int = {
Longs.compare(o1.getModificationTime, o2.getModificationTime)
}
})
Arrays.sort(fileStatuses, (o1: FileStatus, o2: FileStatus) =>
Longs.compare(o1.getModificationTime, o2.getModificationTime))
fileStatuses
} catch {
case NonFatal(e) =>
Expand Down Expand Up @@ -465,7 +462,7 @@ private[spark] object SparkHadoopUtil {
// scalastyle:on line.size.limit
def createNonECFile(fs: FileSystem, path: Path): FSDataOutputStream = {
try {
// Use reflection as this uses apis only avialable in hadoop 3
// Use reflection as this uses APIs only available in Hadoop 3
val builderMethod = fs.getClass().getMethod("createFile", classOf[Path])
// the builder api does not resolve relative paths, nor does it create parent dirs, while
// the old api does.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
* Return a runnable that performs the given operation on the event logs.
* This operation is expected to be executed periodically.
*/
private def getRunner(operateFun: () => Unit): Runnable = {
new Runnable() {
override def run(): Unit = Utils.tryOrExit {
operateFun()
}
}
}
private def getRunner(operateFun: () => Unit): Runnable =
() => Utils.tryOrExit { operateFun() }

/**
* Fixed size thread pool to fetch and parse log files.
Expand Down Expand Up @@ -221,29 +216,25 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// Cannot probe anything while the FS is in safe mode, so spawn a new thread that will wait
// for the FS to leave safe mode before enabling polling. This allows the main history server
// UI to be shown (so that the user can see the HDFS status).
val initThread = new Thread(new Runnable() {
override def run(): Unit = {
try {
while (isFsInSafeMode()) {
logInfo("HDFS is still in safe mode. Waiting...")
val deadline = clock.getTimeMillis() +
TimeUnit.SECONDS.toMillis(SAFEMODE_CHECK_INTERVAL_S)
clock.waitTillTime(deadline)
}
startPolling()
} catch {
case _: InterruptedException =>
val initThread = new Thread(() => {
try {
while (isFsInSafeMode()) {
logInfo("HDFS is still in safe mode. Waiting...")
val deadline = clock.getTimeMillis() +
TimeUnit.SECONDS.toMillis(SAFEMODE_CHECK_INTERVAL_S)
clock.waitTillTime(deadline)
}
startPolling()
} catch {
case _: InterruptedException =>
}
})
initThread.setDaemon(true)
initThread.setName(s"${getClass().getSimpleName()}-init")
initThread.setUncaughtExceptionHandler(errorHandler.getOrElse(
new Thread.UncaughtExceptionHandler() {
override def uncaughtException(t: Thread, e: Throwable): Unit = {
logError("Error initializing FsHistoryProvider.", e)
System.exit(1)
}
(_: Thread, e: Throwable) => {
logError("Error initializing FsHistoryProvider.", e)
System.exit(1)
}))
initThread.start()
initThread
Expand Down Expand Up @@ -517,9 +508,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

val tasks = updated.flatMap { entry =>
try {
val task: Future[Unit] = replayExecutor.submit(new Runnable {
override def run(): Unit = mergeApplicationListing(entry, newLastScanTime, true)
}, Unit)
val task: Future[Unit] = replayExecutor.submit(
() => mergeApplicationListing(entry, newLastScanTime, true))
Some(task -> entry.getPath)
} catch {
// let the iteration over the updated entries break, since an exception on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,9 @@ private[deploy] class Master(
logInfo(s"Spark Master is acting as a reverse proxy. Master, Workers and " +
s"Applications UIs are available at $masterWebUiUrl")
}
checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CheckForWorkerTimeOut)
}
}, 0, workerTimeoutMs, TimeUnit.MILLISECONDS)
checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(
() => Utils.tryLogNonFatalError { self.send(CheckForWorkerTimeOut) },
0, workerTimeoutMs, TimeUnit.MILLISECONDS)

if (restServerEnabled) {
val port = conf.get(MASTER_REST_SERVER_PORT)
Expand Down
38 changes: 14 additions & 24 deletions core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -325,11 +325,9 @@ private[deploy] class Worker(
if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES) {
registrationRetryTimer.foreach(_.cancel(true))
registrationRetryTimer = Some(
forwardMessageScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(ReregisterWithMaster)
}
}, PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS,
forwardMessageScheduler.scheduleAtFixedRate(
() => Utils.tryLogNonFatalError { self.send(ReregisterWithMaster) },
PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS,
PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS,
TimeUnit.SECONDS))
}
Expand All @@ -341,7 +339,7 @@ private[deploy] class Worker(
}

/**
* Cancel last registeration retry, or do nothing if no retry
* Cancel last registration retry, or do nothing if no retry
*/
private def cancelLastRegistrationRetry(): Unit = {
if (registerMasterFutures != null) {
Expand All @@ -361,11 +359,7 @@ private[deploy] class Worker(
registerMasterFutures = tryRegisterAllMasters()
connectionAttemptCount = 0
registrationRetryTimer = Some(forwardMessageScheduler.scheduleAtFixedRate(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
Option(self).foreach(_.send(ReregisterWithMaster))
}
},
() => Utils.tryLogNonFatalError { Option(self).foreach(_.send(ReregisterWithMaster)) },
INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
TimeUnit.SECONDS))
Expand Down Expand Up @@ -407,19 +401,15 @@ private[deploy] class Worker(
}
registered = true
changeMaster(masterRef, masterWebUiUrl, masterAddress)
forwardMessageScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(SendHeartbeat)
}
}, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
forwardMessageScheduler.scheduleAtFixedRate(
() => Utils.tryLogNonFatalError { self.send(SendHeartbeat) },
0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
if (CLEANUP_ENABLED) {
logInfo(
s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
forwardMessageScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(WorkDirCleanup)
}
}, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
forwardMessageScheduler.scheduleAtFixedRate(
() => Utils.tryLogNonFatalError { self.send(WorkDirCleanup) },
CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
}

val execs = executors.values.map { e =>
Expand Down Expand Up @@ -568,7 +558,7 @@ private[deploy] class Worker(
}
}

case executorStateChanged @ ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
case executorStateChanged: ExecutorStateChanged =>
handleExecutorStateChanged(executorStateChanged)

case KillExecutor(masterUrl, appId, execId) =>
Expand Down Expand Up @@ -632,7 +622,7 @@ private[deploy] class Worker(

override def onDisconnected(remoteAddress: RpcAddress): Unit = {
if (master.exists(_.address == remoteAddress) ||
masterAddressToConnect.exists(_ == remoteAddress)) {
masterAddressToConnect.contains(remoteAddress)) {
logInfo(s"$remoteAddress Disassociated !")
masterDisconnected()
}
Expand Down Expand Up @@ -815,7 +805,7 @@ private[deploy] object Worker extends Logging {
val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL)
rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr))
rpcEnv
Expand Down
11 changes: 4 additions & 7 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,14 @@ private[spark] class Executor(
}

// Start worker thread pool
// Use UninterruptibleThread to run tasks so that we can allow running codes without being
// interrupted by `Thread.interrupt()`. Some issues, such as KAFKA-1894, HADOOP-10622,
// will hang forever if some methods are interrupted.
private val threadPool = {
val threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Executor task launch worker-%d")
.setThreadFactory(new ThreadFactory {
override def newThread(r: Runnable): Thread =
// Use UninterruptibleThread to run tasks so that we can allow running codes without being
// interrupted by `Thread.interrupt()`. Some issues, such as KAFKA-1894, HADOOP-10622,
// will hang forever if some methods are interrupted.
new UninterruptibleThread(r, "unused") // thread name will be set by ThreadFactoryBuilder
})
.setThreadFactory((r: Runnable) => new UninterruptibleThread(r, "unused"))
.build()
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ private[spark] abstract class LauncherBackend {
.map(_.toInt)
val secret = conf.getOption(LauncherProtocol.CONF_LAUNCHER_SECRET)
.orElse(sys.env.get(LauncherProtocol.ENV_LAUNCHER_SECRET))
if (port != None && secret != None) {
if (port.isDefined && secret.isDefined) {
val s = new Socket(InetAddress.getLoopbackAddress(), port.get)
connection = new BackendConnection(s)
connection.send(new Hello(secret.get, SPARK_VERSION))
Expand Down Expand Up @@ -94,11 +94,8 @@ private[spark] abstract class LauncherBackend {
protected def onDisconnected() : Unit = { }

private def fireStopRequest(): Unit = {
val thread = LauncherBackend.threadFactory.newThread(new Runnable() {
override def run(): Unit = Utils.tryLogNonFatalError {
onStopRequest()
}
})
val thread = LauncherBackend.threadFactory.newThread(
() => Utils.tryLogNonFatalError { onStopRequest() })
thread.start()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit

import scala.collection.mutable

import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry}
import com.codahale.metrics.{Metric, MetricRegistry}
import org.eclipse.jetty.servlet.ServletContextHandler

import org.apache.spark.{SecurityManager, SparkConf}
Expand Down Expand Up @@ -168,9 +168,7 @@ private[spark] class MetricsSystem private (
def removeSource(source: Source) {
sources -= source
val regName = buildRegistryName(source)
registry.removeMatching(new MetricFilter {
def matches(name: String, metric: Metric): Boolean = name.startsWith(regName)
})
registry.removeMatching((name: String, _: Metric) => name.startsWith(regName))
}

private def registerSources() {
Expand Down
Loading