Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
b6932e1
SPARK-27416:UnsafeMapData & UnsafeArrayData Kryo serialization breaks…
Apr 10, 2019
3876b8e
refine code by remove useless method getBytes
Apr 10, 2019
8bee231
Merge remote-tracking branch 'upstream/master' into SPARK-27416
Apr 11, 2019
3a06348
java code style fixing
Apr 11, 2019
3a8990e
fix java code style issue
Apr 11, 2019
3a8ed0f
remove useless else control
Apr 11, 2019
fb566bc
make UnsafeDataUtils final & package private with private constructor…
Apr 11, 2019
43da473
[SPARK-27225][SQL] Implement join strategy hints
maryannxue Apr 11, 2019
4ec7f63
[SPARK-27404][CORE][SQL][STREAMING][YARN] Fix build warnings for 3.0:…
srowen Apr 11, 2019
5d8aee5
[SPARK-27445][SQL][TEST] Update SQLQueryTestSuite to process files en…
dilipbiswal Apr 11, 2019
94adffa
[SPARK-27270][SS] Add Kafka dynamic JAAS authentication debug possibi…
gaborgsomogyi Apr 11, 2019
bbbe54a
[SPARK-27199][SQL][FOLLOWUP] Fix bug in codegen templates in UnixTime…
rednaxelafx Apr 12, 2019
9b7bef0
add unsafe data class to KryoSerializer.newKryo
Apr 12, 2019
d437605
SPARK-27416:UnsafeMapData & UnsafeArrayData Kryo serialization breaks…
Apr 10, 2019
72b4f92
refine code by remove useless method getBytes
Apr 10, 2019
39ef9f9
java code style fixing
Apr 11, 2019
0a5ff14
fix java code style issue
Apr 11, 2019
e901266
remove useless else control
Apr 11, 2019
7b7b01d
make UnsafeDataUtils final & package private with private constructor…
Apr 11, 2019
f21d832
add unsafe data class to KryoSerializer.newKryo
Apr 12, 2019
a52f335
SPARK-27416:UnsafeMapData & UnsafeArrayData Kryo serialization breaks…
pengbo Apr 10, 2019
cbab6ae
refine code by remove useless method getBytes
pengbo Apr 10, 2019
ac0d7a4
java code style fixing
pengbo Apr 11, 2019
35b0d79
fix java code style issue
pengbo Apr 11, 2019
08e91ab
remove useless else control
pengbo Apr 11, 2019
9a47061
make UnsafeDataUtils final & package private with private constructor…
pengbo Apr 11, 2019
e0f1f26
Merge branch 'SPARK-27416' of https://github.com/pengbo/spark into HEAD
pengbo Apr 12, 2019
40b768f
Merge branch 'SPARK-27416' of https://github.com/pengbo/spark into SP…
pengbo Apr 12, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@
*/
public class RetryingBlockFetcherSuite {

ManagedBuffer block0 = new NioManagedBuffer(ByteBuffer.wrap(new byte[13]));
ManagedBuffer block1 = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
ManagedBuffer block2 = new NioManagedBuffer(ByteBuffer.wrap(new byte[19]));
private final ManagedBuffer block0 = new NioManagedBuffer(ByteBuffer.wrap(new byte[13]));
private final ManagedBuffer block1 = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
private final ManagedBuffer block2 = new NioManagedBuffer(ByteBuffer.wrap(new byte[19]));

@Test
public void testNoFailures() throws IOException, InterruptedException {
Expand Down Expand Up @@ -291,7 +291,7 @@ private static void performInteractions(List<? extends Map<String, Object>> inte
}

assertNotNull(stub);
stub.when(fetchStarter).createAndStart(any(), anyObject());
stub.when(fetchStarter).createAndStart(any(), any());
String[] blockIdArray = blockIds.toArray(new String[blockIds.size()]);
new RetryingBlockFetcher(conf, fetchStarter, blockIdArray, listener).start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark
import java.util.{Properties, Timer, TimerTask}

import scala.concurrent.duration._
import scala.language.postfixOps

import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.executor.TaskMetrics
Expand Down Expand Up @@ -122,7 +121,7 @@ class BarrierTaskContext private[spark] (
barrierEpoch),
// Set a fixed timeout for RPC here, so users shall get a SparkException thrown by
// BarrierCoordinator on timeout, instead of RPCTimeoutException from the RPC framework.
timeout = new RpcTimeout(31536000 /* = 3600 * 24 * 365 */ seconds, "barrierTimeout"))
timeout = new RpcTimeout(365.days, "barrierTimeout"))
barrierEpoch += 1
logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) finished " +
"global sync successfully, waited for " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import scala.collection.mutable.ListBuffer
import scala.concurrent.{Future, Promise}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.sys.process._

import org.json4s._
Expand Down Expand Up @@ -112,7 +111,7 @@ private object FaultToleranceTest extends App with Logging {
assertValidClusterState()

killLeader()
delay(30 seconds)
delay(30.seconds)
assertValidClusterState()
createClient()
assertValidClusterState()
Expand All @@ -126,12 +125,12 @@ private object FaultToleranceTest extends App with Logging {

killLeader()
addMasters(1)
delay(30 seconds)
delay(30.seconds)
assertValidClusterState()

killLeader()
addMasters(1)
delay(30 seconds)
delay(30.seconds)
assertValidClusterState()
}

Expand All @@ -156,7 +155,7 @@ private object FaultToleranceTest extends App with Logging {
killLeader()
workers.foreach(_.kill())
workers.clear()
delay(30 seconds)
delay(30.seconds)
addWorkers(2)
assertValidClusterState()
}
Expand All @@ -174,7 +173,7 @@ private object FaultToleranceTest extends App with Logging {

(1 to 3).foreach { _ =>
killLeader()
delay(30 seconds)
delay(30.seconds)
assertValidClusterState()
assertTrue(getLeader == masters.head)
addMasters(1)
Expand Down Expand Up @@ -264,7 +263,7 @@ private object FaultToleranceTest extends App with Logging {
}

// Avoid waiting indefinitely (e.g., we could register but get no executors).
assertTrue(ThreadUtils.awaitResult(f, 120 seconds))
assertTrue(ThreadUtils.awaitResult(f, 2.minutes))
}

/**
Expand Down Expand Up @@ -317,7 +316,7 @@ private object FaultToleranceTest extends App with Logging {
}

try {
assertTrue(ThreadUtils.awaitResult(f, 120 seconds))
assertTrue(ThreadUtils.awaitResult(f, 2.minutes))
} catch {
case e: TimeoutException =>
logError("Master states: " + masters.map(_.state))
Expand Down Expand Up @@ -421,7 +420,7 @@ private object SparkDocker {
}

dockerCmd.run(ProcessLogger(findIpAndLog _))
val ip = ThreadUtils.awaitResult(ipPromise.future, 30 seconds)
val ip = ThreadUtils.awaitResult(ipPromise.future, 30.seconds)
val dockerId = Docker.getLastProcessId
(ip, dockerId, outFile)
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private[spark] class RpcTimeout(val duration: FiniteDuration, val timeoutProp: S
*
* @note This can be used in the recover callback of a Future to add to a TimeoutException
* Example:
* val timeout = new RpcTimeout(5 millis, "short timeout")
* val timeout = new RpcTimeout(5.milliseconds, "short timeout")
* Future(throw new TimeoutException).recover(timeout.addMessageIfTimeout)
*/
def addMessageIfTimeout[T]: PartialFunction[Throwable, T] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import scala.collection.Map
import scala.collection.mutable.{ArrayStack, HashMap, HashSet}
import scala.concurrent.duration._
import scala.language.existentials
import scala.language.postfixOps
import scala.util.control.NonFatal

import org.apache.commons.lang3.SerializationUtils
Expand Down Expand Up @@ -270,7 +269,7 @@ private[spark] class DAGScheduler(
listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates,
Some(executorUpdates)))
blockManagerMaster.driverEndpoint.askSync[Boolean](
BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat"))
BlockManagerHeartbeat(blockManagerId), new RpcTimeout(10.minutes, "BlockManagerHeartbeat"))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,10 @@ class KryoSerializer(conf: SparkConf)
// We can't load those class directly in order to avoid unnecessary jar dependencies.
// We load them safely, ignore it if the class not found.
Seq(
"org.apache.spark.sql.catalyst.expressions.UnsafeRow",
"org.apache.spark.sql.catalyst.expressions.UnsafeArrayData",
"org.apache.spark.sql.catalyst.expressions.UnsafeMapData",

"org.apache.spark.ml.attribute.Attribute",
"org.apache.spark.ml.attribute.AttributeGroup",
"org.apache.spark.ml.attribute.BinaryAttribute",
Expand Down
19 changes: 8 additions & 11 deletions core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,12 @@ private[spark] object UIUtils extends Logging {
}
}
// if time is more than a year
return s"$yearString $weekString $dayString"
s"$yearString $weekString $dayString"
} catch {
case e: Exception =>
logError("Error converting time to string", e)
// if there is some error, return blank string
return ""
""
}
}

Expand Down Expand Up @@ -336,7 +336,7 @@ private[spark] object UIUtils extends Logging {
def getHeaderContent(header: String): Seq[Node] = {
if (newlinesInHeader) {
<ul class="unstyled">
{ header.split("\n").map { case t => <li> {t} </li> } }
{ header.split("\n").map(t => <li> {t} </li>) }
</ul>
} else {
Text(header)
Expand Down Expand Up @@ -446,7 +446,7 @@ private[spark] object UIUtils extends Logging {
* the whole string will rendered as a simple escaped text.
*
* Note: In terms of security, only anchor tags with root relative links are supported. So any
* attempts to embed links outside Spark UI, or other tags like {@code <script>} will cause in
* attempts to embed links outside Spark UI, or other tags like &lt;script&gt; will cause in
* the whole description to be treated as plain text.
*
* @param desc the original job or stage description string, which may contain html tags.
Expand All @@ -458,7 +458,6 @@ private[spark] object UIUtils extends Logging {
* is true, and an Elem otherwise.
*/
def makeDescription(desc: String, basePathUri: String, plainText: Boolean = false): NodeSeq = {
import scala.language.postfixOps

// If the description can be parsed as HTML and has only relative links, then render
// as HTML, otherwise render as escaped string
Expand All @@ -468,9 +467,7 @@ private[spark] object UIUtils extends Logging {

// Verify that this has only anchors and span (we are wrapping in span)
val allowedNodeLabels = Set("a", "span", "br")
val illegalNodes = xml \\ "_" filterNot { case node: Node =>
allowedNodeLabels.contains(node.label)
}
val illegalNodes = (xml \\ "_").filterNot(node => allowedNodeLabels.contains(node.label))
if (illegalNodes.nonEmpty) {
throw new IllegalArgumentException(
"Only HTML anchors allowed in job descriptions\n" +
Expand All @@ -491,8 +488,8 @@ private[spark] object UIUtils extends Logging {
new RewriteRule() {
override def transform(n: Node): Seq[Node] = {
n match {
case e: Elem if e.child isEmpty => Text(e.text)
case e: Elem if e.child nonEmpty => Text(e.child.flatMap(transform).text)
case e: Elem if e.child.isEmpty => Text(e.text)
case e: Elem => Text(e.child.flatMap(transform).text)
case _ => n
}
}
Expand All @@ -503,7 +500,7 @@ private[spark] object UIUtils extends Logging {
new RewriteRule() {
override def transform(n: Node): Seq[Node] = {
n match {
case e: Elem if e \ "@href" nonEmpty =>
case e: Elem if (e \ "@href").nonEmpty =>
val relativePath = e.attribute("href").get.toString
val fullUri = s"${basePathUri.stripSuffix("/")}/${relativePath.stripPrefix("/")}"
e % Attribute(null, "href", fullUri, Null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark

import scala.concurrent.duration._
import scala.language.postfixOps

import org.apache.spark.internal.config._
import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
Expand Down Expand Up @@ -52,7 +51,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext
)

val error = intercept[SparkException] {
ThreadUtils.awaitResult(futureAction, 5 seconds)
ThreadUtils.awaitResult(futureAction, 5.seconds)
}.getCause.getMessage
assert(error.contains(message))
}
Expand Down
16 changes: 8 additions & 8 deletions core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.apache.spark.storage._
abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[SortShuffleManager])
extends SparkFunSuite with BeforeAndAfter with LocalSparkContext
{
implicit val defaultTimeout = timeout(10000 millis)
implicit val defaultTimeout = timeout(10.seconds)
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("ContextCleanerSuite")
Expand Down Expand Up @@ -159,7 +159,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
val preGCTester = new CleanerTester(sc, rddIds = Seq(rdd.id))
runGC()
intercept[Exception] {
preGCTester.assertCleanup()(timeout(1000 millis))
preGCTester.assertCleanup()(timeout(1.second))
}

// Test that GC causes RDD cleanup after dereferencing the RDD
Expand All @@ -178,7 +178,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
val preGCTester = new CleanerTester(sc, shuffleIds = Seq(0))
runGC()
intercept[Exception] {
preGCTester.assertCleanup()(timeout(1000 millis))
preGCTester.assertCleanup()(timeout(1.second))
}
rdd.count() // Defeat early collection by the JVM

Expand All @@ -196,7 +196,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
val preGCTester = new CleanerTester(sc, broadcastIds = Seq(broadcast.id))
runGC()
intercept[Exception] {
preGCTester.assertCleanup()(timeout(1000 millis))
preGCTester.assertCleanup()(timeout(1.second))
}

// Test that GC causes broadcast cleanup after dereferencing the broadcast variable
Expand Down Expand Up @@ -272,7 +272,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
val preGCTester = new CleanerTester(sc, rddIds = Seq(rdd.id))
runGC()
intercept[Exception] {
preGCTester.assertCleanup()(timeout(1000 millis))
preGCTester.assertCleanup()(timeout(1.second))
}

// Test that RDD going out of scope does cause the checkpoint blocks to be cleaned up
Expand All @@ -294,7 +294,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds)
runGC()
intercept[Exception] {
preGCTester.assertCleanup()(timeout(1000 millis))
preGCTester.assertCleanup()(timeout(1.second))
}

// Test that GC triggers the cleanup of all variables after the dereferencing them
Expand Down Expand Up @@ -334,7 +334,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds)
runGC()
intercept[Exception] {
preGCTester.assertCleanup()(timeout(1000 millis))
preGCTester.assertCleanup()(timeout(1.second))
}

// Test that GC triggers the cleanup of all variables after the dereferencing them
Expand Down Expand Up @@ -408,7 +408,7 @@ class CleanerTester(
/** Assert that all the stuff has been cleaned up */
def assertCleanup()(implicit waitTimeout: PatienceConfiguration.Timeout) {
try {
eventually(waitTimeout, interval(100 millis)) {
eventually(waitTimeout, interval(100.milliseconds)) {
assert(isAllCleanedUp,
"The following resources were not cleaned up:\n" + uncleanedResourcesToString)
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/org/apache/spark/DriverSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ class DriverSuite extends SparkFunSuite with TimeLimits {
ignore("driver should exit after finishing without cleanup (SPARK-530)") {
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
val masters = Table("master", "local", "local-cluster[2,1,1024]")
forAll(masters) { (master: String) =>
forAll(masters) { master =>
val process = Utils.executeCommand(
Seq(s"$sparkHome/bin/spark-class", "org.apache.spark.DriverWithoutCleanup", master),
new File(sparkHome),
Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
failAfter(60 seconds) { process.waitFor() }
failAfter(1.minute) { process.waitFor() }
// Ensure we still kill the process in case it timed out
process.destroy()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
assert(e.getMessage contains "cancel")

// Once A is cancelled, job B should finish fairly quickly.
assert(ThreadUtils.awaitResult(jobB, 60.seconds) === 100)
assert(ThreadUtils.awaitResult(jobB, 1.minute) === 100)
}

test("task reaper will not kill JVM if spark.task.killTimeout == -1") {
Expand Down Expand Up @@ -290,7 +290,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
assert(e.getMessage contains "cancel")

// Once A is cancelled, job B should finish fairly quickly.
assert(ThreadUtils.awaitResult(jobB, 60.seconds) === 100)
assert(ThreadUtils.awaitResult(jobB, 1.minute) === 100)
}

test("two jobs sharing the same stage") {
Expand Down
5 changes: 2 additions & 3 deletions core/src/test/scala/org/apache/spark/SparkConfSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.util.concurrent.{Executors, TimeUnit}

import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.{Random, Try}

import com.esotericsoftware.kryo.Kryo
Expand Down Expand Up @@ -279,10 +278,10 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
assert(RpcUtils.retryWaitMs(conf) === 2L)

conf.set("spark.akka.askTimeout", "3")
assert(RpcUtils.askRpcTimeout(conf).duration === (3 seconds))
assert(RpcUtils.askRpcTimeout(conf).duration === 3.seconds)

conf.set("spark.akka.lookupTimeout", "4")
assert(RpcUtils.lookupRpcTimeout(conf).duration === (4 seconds))
assert(RpcUtils.lookupRpcTimeout(conf).duration === 4.seconds)
}

test("SPARK-13727") {
Expand Down
Loading