Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.JavaConverters._
import scala.concurrent.duration._

import org.apache.spark._
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -354,7 +355,8 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
extends Thread(s"Worker Monitor for $pythonExec") {

/** How long to wait before killing the python worker if a task cannot be interrupted. */
private val taskKillTimeout = env.conf.getTimeAsMs("spark.python.task.killTimeout", "2s")
private val taskKillTimeoutMs = env.conf.getTimeAsSeconds("spark.python.task.killTimeout",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conf is not documented, but I think it's designed to accept values like 1.5s
cc @zsxwing

"2s").seconds.toMillis

setDaemon(true)

Expand All @@ -365,7 +367,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
Thread.sleep(2000)
}
if (!context.isCompleted) {
Thread.sleep(taskKillTimeout)
Thread.sleep(taskKillTimeoutMs)
if (!context.isCompleted) {
try {
// Mimic the task name used in `Executor` to help the user find out the task to blame.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.net.URI
import java.nio.charset.StandardCharsets

import scala.collection.JavaConverters._
import scala.concurrent.duration._

import com.google.common.io.Files

Expand Down Expand Up @@ -58,7 +59,7 @@ private[deploy] class DriverRunner(

// Timeout to wait for when trying to terminate a driver.
private val DRIVER_TERMINATE_TIMEOUT_MS =
conf.getTimeAsMs("spark.worker.driverTerminateTimeout", "10s")
conf.getTimeAsSeconds("spark.worker.driverTerminateTimeout", "10s").seconds.toMillis
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto


// Decoupled for testing
def setClock(_clock: Clock): Unit = {
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import javax.annotation.concurrent.GuardedBy

import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
import scala.concurrent.duration._
import scala.util.control.NonFatal

import com.google.common.util.concurrent.ThreadFactoryBuilder
Expand Down Expand Up @@ -613,7 +614,7 @@ private[spark] class Executor(
private[this] val taskId: Long = taskRunner.taskId

private[this] val killPollingIntervalMs: Long =
conf.getTimeAsMs("spark.task.reaper.pollingInterval", "10s")
conf.getTimeAsSeconds("spark.task.reaper.pollingInterval", "10s").seconds.toMillis
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I believe this is change things from right to wrong. If you want to use a second value then you shall use sth like 1s.


private[this] val killTimeoutMs: Long = conf.getTimeAsMs("spark.task.reaper.killTimeout", "-1")

Expand Down Expand Up @@ -820,7 +821,8 @@ private[spark] class Executor(
* Schedules a task to report heartbeat and partial metrics for active tasks to driver.
*/
private def startDriverHeartbeater(): Unit = {
val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")
val intervalMs = conf.getTimeAsSeconds("spark.executor.heartbeatInterval",
"10s").seconds.toMillis

// Wait a random interval so the heartbeats don't end up in sync
val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLong

import scala.collection.Set
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.duration._
import scala.util.Random

import org.apache.spark._
Expand Down Expand Up @@ -80,7 +81,8 @@ private[spark] class TaskSchedulerImpl(
ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation")

// Threshold above which we warn user initial TaskSet may be starved
val STARVATION_TIMEOUT_MS = conf.getTimeAsMs("spark.starvation.timeout", "15s")
val STARVATION_TIMEOUT_MS = conf.getTimeAsSeconds("spark.starvation.timeout",
"15s").seconds.toMillis

// CPUs to request per task
val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import javax.annotation.concurrent.GuardedBy

import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.Future
import scala.concurrent.duration._

import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState}
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -58,7 +59,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Submit tasks after maxRegisteredWaitingTime milliseconds
// if minRegisteredRatio has not yet been reached
private val maxRegisteredWaitingTimeMs =
conf.getTimeAsMs("spark.scheduler.maxRegisteredResourcesWaitingTime", "30s")
conf.getTimeAsSeconds("spark.scheduler.maxRegisteredResourcesWaitingTime",
"30s").seconds.toMillis
private val createTime = System.currentTimeMillis()

// Accessing `executorDataMap` in `DriverEndpoint.receive/receiveAndReply` doesn't need any
Expand Down Expand Up @@ -108,7 +110,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp

override def onStart() {
// Periodically revive offers to allow delay scheduling to work
val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")
val reviveIntervalMs = conf.getTimeAsSeconds("spark.scheduler.revive.interval",
"1s").seconds.toMillis

reviveThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging {
private val CR = '\r'
// Update period of progress bar, in milliseconds
private val updatePeriodMSec =
sc.getConf.getTimeAsMs("spark.ui.consoleProgress.update.interval", "200")
sc.getConf.getTimeAsMs("spark.ui.consoleProgress.update.interval", "200ms")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shall document this config.

// Delay to show up a progress bar, in milliseconds
private val firstDelayMSec = 500L

Expand Down
10 changes: 9 additions & 1 deletion core/src/main/scala/org/apache/spark/util/RpcUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.util

import scala.concurrent.duration._

import org.apache.spark.SparkConf
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, RpcTimeout}

Expand All @@ -39,7 +41,13 @@ private[spark] object RpcUtils {

/** Returns the configured number of milliseconds to wait on each retry */
def retryWaitMs(conf: SparkConf): Long = {
conf.getTimeAsMs("spark.rpc.retry.wait", "3s")
if (conf.contains("spark.rpc.retry.wait") && !conf.contains("spark.akka.retry.wait")) {
conf.getTimeAsSeconds("spark.rpc.retry.wait", "3s").seconds.toMillis
} else {
// compatible with deprecated alternative `spark.akka.retry.wait` which has default
// unit as millisecond
conf.getTimeAsMs("spark.rpc.retry.wait", "3s")
}
}

/** Returns the default Spark timeout to use for RPC ask operations. */
Expand Down
24 changes: 24 additions & 0 deletions core/src/test/scala/org/apache/spark/SparkConfSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,30 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
assert(thrown.getMessage.contains(key))
}
}

test("SPARK-24560") {
val conf = new SparkConf()
conf.set("spark.python.task.killTimeout", "3")
conf.set("spark.worker.driverTerminateTimeout", "12")
conf.set("spark.task.reaper.pollingInterval", "12")
conf.set("spark.executor.heartbeatInterval", "12")
conf.set("spark.starvation.timeout", "16")
conf.set("spark.scheduler.maxRegisteredResourcesWaitingTime", "32")
conf.set("spark.scheduler.revive.interval", "2")
conf.set("spark.rpc.retry.wait", "4")
conf.set("spark.mesos.coarse.shutdownTimeout", "12")
assert(conf.getTimeAsSeconds("spark.python.task.killTimeout").seconds.toMillis === 3000)
assert(conf.getTimeAsSeconds("spark.worker.driverTerminateTimeout").seconds.toMillis === 12000)
assert(conf.getTimeAsSeconds("spark.task.reaper.pollingInterval").seconds.toMillis === 12000)
assert(conf.getTimeAsSeconds("spark.executor.heartbeatInterval").seconds.toMillis === 12000)
assert(conf.getTimeAsSeconds("spark.starvation.timeout").seconds.toMillis === 16000)
assert(conf.getTimeAsSeconds("spark.scheduler.maxRegisteredResourcesWaitingTime")
.seconds.toMillis === 32000)
assert(conf.getTimeAsSeconds("spark.scheduler.revive.interval").seconds.toMillis === 2000)
assert(conf.getTimeAsSeconds("spark.rpc.retry.wait").seconds.toMillis === 4000)
assert(conf.getTimeAsSeconds("spark.mesos.coarse.shutdownTimeout").seconds.toMillis === 12000)
}

}

class Class1 {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import java.util.concurrent.locks.ReentrantLock
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.Future
import scala.concurrent.duration._

import org.apache.hadoop.security.UserGroupInformation
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
Expand Down Expand Up @@ -85,7 +86,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
private val taskLabels = conf.get("spark.mesos.task.labels", "")

private[this] val shutdownTimeoutMS =
conf.getTimeAsMs("spark.mesos.coarse.shutdownTimeout", "10s")
conf.getTimeAsSeconds("spark.mesos.coarse.shutdownTimeout", "10s").seconds.toMillis
.ensuring(_ >= 0, "spark.mesos.coarse.shutdownTimeout must be >= 0")

// Synchronization protected by stateLock
Expand Down Expand Up @@ -634,8 +635,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
slave.hostname,
externalShufflePort,
sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs",
s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L}ms"),
sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s").seconds.toMillis}ms"),
sc.conf.getTimeAsSeconds("spark.executor.heartbeatInterval", "10s").seconds.toMillis)
slave.shuffleRegistered = true
}

Expand Down