Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@
*/
public class RetryingBlockFetcherSuite {

ManagedBuffer block0 = new NioManagedBuffer(ByteBuffer.wrap(new byte[13]));
ManagedBuffer block1 = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
ManagedBuffer block2 = new NioManagedBuffer(ByteBuffer.wrap(new byte[19]));
private final ManagedBuffer block0 = new NioManagedBuffer(ByteBuffer.wrap(new byte[13]));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, this is stuff I just cleaned up while touching this file.

private final ManagedBuffer block1 = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
private final ManagedBuffer block2 = new NioManagedBuffer(ByteBuffer.wrap(new byte[19]));

@Test
public void testNoFailures() throws IOException, InterruptedException {
Expand Down Expand Up @@ -291,7 +291,7 @@ private static void performInteractions(List<? extends Map<String, Object>> inte
}

assertNotNull(stub);
stub.when(fetchStarter).createAndStart(any(), anyObject());
stub.when(fetchStarter).createAndStart(any(), any());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was an avoidable deprecated method

String[] blockIdArray = blockIds.toArray(new String[blockIds.size()]);
new RetryingBlockFetcher(conf, fetchStarter, blockIdArray, listener).start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark
import java.util.{Properties, Timer, TimerTask}

import scala.concurrent.duration._
import scala.language.postfixOps

import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.executor.TaskMetrics
Expand Down Expand Up @@ -122,7 +121,7 @@ class BarrierTaskContext private[spark] (
barrierEpoch),
// Set a fixed timeout for RPC here, so users shall get a SparkException thrown by
// BarrierCoordinator on timeout, instead of RPCTimeoutException from the RPC framework.
timeout = new RpcTimeout(31536000 /* = 3600 * 24 * 365 */ seconds, "barrierTimeout"))
timeout = new RpcTimeout(365.days, "barrierTimeout"))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A good example of simplifying expressions while also removing postfixOps

barrierEpoch += 1
logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) finished " +
"global sync successfully, waited for " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import scala.collection.mutable.ListBuffer
import scala.concurrent.{Future, Promise}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.sys.process._

import org.json4s._
Expand Down Expand Up @@ -112,7 +111,7 @@ private object FaultToleranceTest extends App with Logging {
assertValidClusterState()

killLeader()
delay(30 seconds)
delay(30.seconds)
assertValidClusterState()
createClient()
assertValidClusterState()
Expand All @@ -126,12 +125,12 @@ private object FaultToleranceTest extends App with Logging {

killLeader()
addMasters(1)
delay(30 seconds)
delay(30.seconds)
assertValidClusterState()

killLeader()
addMasters(1)
delay(30 seconds)
delay(30.seconds)
assertValidClusterState()
}

Expand All @@ -156,7 +155,7 @@ private object FaultToleranceTest extends App with Logging {
killLeader()
workers.foreach(_.kill())
workers.clear()
delay(30 seconds)
delay(30.seconds)
addWorkers(2)
assertValidClusterState()
}
Expand All @@ -174,7 +173,7 @@ private object FaultToleranceTest extends App with Logging {

(1 to 3).foreach { _ =>
killLeader()
delay(30 seconds)
delay(30.seconds)
assertValidClusterState()
assertTrue(getLeader == masters.head)
addMasters(1)
Expand Down Expand Up @@ -264,7 +263,7 @@ private object FaultToleranceTest extends App with Logging {
}

// Avoid waiting indefinitely (e.g., we could register but get no executors).
assertTrue(ThreadUtils.awaitResult(f, 120 seconds))
assertTrue(ThreadUtils.awaitResult(f, 2.minutes))
}

/**
Expand Down Expand Up @@ -317,7 +316,7 @@ private object FaultToleranceTest extends App with Logging {
}

try {
assertTrue(ThreadUtils.awaitResult(f, 120 seconds))
assertTrue(ThreadUtils.awaitResult(f, 2.minutes))
} catch {
case e: TimeoutException =>
logError("Master states: " + masters.map(_.state))
Expand Down Expand Up @@ -421,7 +420,7 @@ private object SparkDocker {
}

dockerCmd.run(ProcessLogger(findIpAndLog _))
val ip = ThreadUtils.awaitResult(ipPromise.future, 30 seconds)
val ip = ThreadUtils.awaitResult(ipPromise.future, 30.seconds)
val dockerId = Docker.getLastProcessId
(ip, dockerId, outFile)
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private[spark] class RpcTimeout(val duration: FiniteDuration, val timeoutProp: S
*
* @note This can be used in the recover callback of a Future to add to a TimeoutException
* Example:
* val timeout = new RpcTimeout(5 millis, "short timeout")
* val timeout = new RpcTimeout(5.milliseconds, "short timeout")
* Future(throw new TimeoutException).recover(timeout.addMessageIfTimeout)
*/
def addMessageIfTimeout[T]: PartialFunction[Throwable, T] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import scala.collection.Map
import scala.collection.mutable.{ArrayStack, HashMap, HashSet}
import scala.concurrent.duration._
import scala.language.existentials
import scala.language.postfixOps
import scala.util.control.NonFatal

import org.apache.commons.lang3.SerializationUtils
Expand Down Expand Up @@ -270,7 +269,7 @@ private[spark] class DAGScheduler(
listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates,
Some(executorUpdates)))
blockManagerMaster.driverEndpoint.askSync[Boolean](
BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat"))
BlockManagerHeartbeat(blockManagerId), new RpcTimeout(10.minutes, "BlockManagerHeartbeat"))
}

/**
Expand Down
19 changes: 8 additions & 11 deletions core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,12 @@ private[spark] object UIUtils extends Logging {
}
}
// if time is more than a year
return s"$yearString $weekString $dayString"
s"$yearString $weekString $dayString"
} catch {
case e: Exception =>
logError("Error converting time to string", e)
// if there is some error, return blank string
return ""
""
}
}

Expand Down Expand Up @@ -336,7 +336,7 @@ private[spark] object UIUtils extends Logging {
def getHeaderContent(header: String): Seq[Node] = {
if (newlinesInHeader) {
<ul class="unstyled">
{ header.split("\n").map { case t => <li> {t} </li> } }
{ header.split("\n").map(t => <li> {t} </li>) }
</ul>
} else {
Text(header)
Expand Down Expand Up @@ -446,7 +446,7 @@ private[spark] object UIUtils extends Logging {
* the whole string will rendered as a simple escaped text.
*
* Note: In terms of security, only anchor tags with root relative links are supported. So any
* attempts to embed links outside Spark UI, or other tags like {@code <script>} will cause in
* attempts to embed links outside Spark UI, or other tags like &lt;script&gt; will cause in
* the whole description to be treated as plain text.
*
* @param desc the original job or stage description string, which may contain html tags.
Expand All @@ -458,7 +458,6 @@ private[spark] object UIUtils extends Logging {
* is true, and an Elem otherwise.
*/
def makeDescription(desc: String, basePathUri: String, plainText: Boolean = false): NodeSeq = {
import scala.language.postfixOps

// If the description can be parsed as HTML and has only relative links, then render
// as HTML, otherwise render as escaped string
Expand All @@ -468,9 +467,7 @@ private[spark] object UIUtils extends Logging {

// Verify that this has only anchors and span (we are wrapping in span)
val allowedNodeLabels = Set("a", "span", "br")
val illegalNodes = xml \\ "_" filterNot { case node: Node =>
allowedNodeLabels.contains(node.label)
}
val illegalNodes = (xml \\ "_").filterNot(node => allowedNodeLabels.contains(node.label))
if (illegalNodes.nonEmpty) {
throw new IllegalArgumentException(
"Only HTML anchors allowed in job descriptions\n" +
Expand All @@ -491,8 +488,8 @@ private[spark] object UIUtils extends Logging {
new RewriteRule() {
override def transform(n: Node): Seq[Node] = {
n match {
case e: Elem if e.child isEmpty => Text(e.text)
case e: Elem if e.child nonEmpty => Text(e.child.flatMap(transform).text)
case e: Elem if e.child.isEmpty => Text(e.text)
case e: Elem => Text(e.child.flatMap(transform).text)
case _ => n
}
}
Expand All @@ -503,7 +500,7 @@ private[spark] object UIUtils extends Logging {
new RewriteRule() {
override def transform(n: Node): Seq[Node] = {
n match {
case e: Elem if e \ "@href" nonEmpty =>
case e: Elem if (e \ "@href").nonEmpty =>
val relativePath = e.attribute("href").get.toString
val fullUri = s"${basePathUri.stripSuffix("/")}/${relativePath.stripPrefix("/")}"
e % Attribute(null, "href", fullUri, Null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark

import scala.concurrent.duration._
import scala.language.postfixOps

import org.apache.spark.internal.config._
import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
Expand Down Expand Up @@ -52,7 +51,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext
)

val error = intercept[SparkException] {
ThreadUtils.awaitResult(futureAction, 5 seconds)
ThreadUtils.awaitResult(futureAction, 5.seconds)
}.getCause.getMessage
assert(error.contains(message))
}
Expand Down
16 changes: 8 additions & 8 deletions core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.apache.spark.storage._
abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[SortShuffleManager])
extends SparkFunSuite with BeforeAndAfter with LocalSparkContext
{
implicit val defaultTimeout = timeout(10000 millis)
implicit val defaultTimeout = timeout(10.seconds)
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("ContextCleanerSuite")
Expand Down Expand Up @@ -159,7 +159,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
val preGCTester = new CleanerTester(sc, rddIds = Seq(rdd.id))
runGC()
intercept[Exception] {
preGCTester.assertCleanup()(timeout(1000 millis))
preGCTester.assertCleanup()(timeout(1.second))
}

// Test that GC causes RDD cleanup after dereferencing the RDD
Expand All @@ -178,7 +178,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
val preGCTester = new CleanerTester(sc, shuffleIds = Seq(0))
runGC()
intercept[Exception] {
preGCTester.assertCleanup()(timeout(1000 millis))
preGCTester.assertCleanup()(timeout(1.second))
}
rdd.count() // Defeat early collection by the JVM

Expand All @@ -196,7 +196,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
val preGCTester = new CleanerTester(sc, broadcastIds = Seq(broadcast.id))
runGC()
intercept[Exception] {
preGCTester.assertCleanup()(timeout(1000 millis))
preGCTester.assertCleanup()(timeout(1.second))
}

// Test that GC causes broadcast cleanup after dereferencing the broadcast variable
Expand Down Expand Up @@ -272,7 +272,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
val preGCTester = new CleanerTester(sc, rddIds = Seq(rdd.id))
runGC()
intercept[Exception] {
preGCTester.assertCleanup()(timeout(1000 millis))
preGCTester.assertCleanup()(timeout(1.second))
}

// Test that RDD going out of scope does cause the checkpoint blocks to be cleaned up
Expand All @@ -294,7 +294,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds)
runGC()
intercept[Exception] {
preGCTester.assertCleanup()(timeout(1000 millis))
preGCTester.assertCleanup()(timeout(1.second))
}

// Test that GC triggers the cleanup of all variables after the dereferencing them
Expand Down Expand Up @@ -334,7 +334,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds)
runGC()
intercept[Exception] {
preGCTester.assertCleanup()(timeout(1000 millis))
preGCTester.assertCleanup()(timeout(1.second))
}

// Test that GC triggers the cleanup of all variables after the dereferencing them
Expand Down Expand Up @@ -408,7 +408,7 @@ class CleanerTester(
/** Assert that all the stuff has been cleaned up */
def assertCleanup()(implicit waitTimeout: PatienceConfiguration.Timeout) {
try {
eventually(waitTimeout, interval(100 millis)) {
eventually(waitTimeout, interval(100.milliseconds)) {
assert(isAllCleanedUp,
"The following resources were not cleaned up:\n" + uncleanedResourcesToString)
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/org/apache/spark/DriverSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ class DriverSuite extends SparkFunSuite with TimeLimits {
ignore("driver should exit after finishing without cleanup (SPARK-530)") {
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
val masters = Table("master", "local", "local-cluster[2,1,1024]")
forAll(masters) { (master: String) =>
forAll(masters) { master =>
val process = Utils.executeCommand(
Seq(s"$sparkHome/bin/spark-class", "org.apache.spark.DriverWithoutCleanup", master),
new File(sparkHome),
Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
failAfter(60 seconds) { process.waitFor() }
failAfter(1.minute) { process.waitFor() }
// Ensure we still kill the process in case it timed out
process.destroy()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
assert(e.getMessage contains "cancel")

// Once A is cancelled, job B should finish fairly quickly.
assert(ThreadUtils.awaitResult(jobB, 60.seconds) === 100)
assert(ThreadUtils.awaitResult(jobB, 1.minute) === 100)
}

test("task reaper will not kill JVM if spark.task.killTimeout == -1") {
Expand Down Expand Up @@ -290,7 +290,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
assert(e.getMessage contains "cancel")

// Once A is cancelled, job B should finish fairly quickly.
assert(ThreadUtils.awaitResult(jobB, 60.seconds) === 100)
assert(ThreadUtils.awaitResult(jobB, 1.minute) === 100)
}

test("two jobs sharing the same stage") {
Expand Down
5 changes: 2 additions & 3 deletions core/src/test/scala/org/apache/spark/SparkConfSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.util.concurrent.{Executors, TimeUnit}

import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.{Random, Try}

import com.esotericsoftware.kryo.Kryo
Expand Down Expand Up @@ -279,10 +278,10 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
assert(RpcUtils.retryWaitMs(conf) === 2L)

conf.set("spark.akka.askTimeout", "3")
assert(RpcUtils.askRpcTimeout(conf).duration === (3 seconds))
assert(RpcUtils.askRpcTimeout(conf).duration === 3.seconds)

conf.set("spark.akka.lookupTimeout", "4")
assert(RpcUtils.lookupRpcTimeout(conf).duration === (4 seconds))
assert(RpcUtils.lookupRpcTimeout(conf).duration === 4.seconds)
}

test("SPARK-13727") {
Expand Down
Loading