Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions core/src/main/scala/org/apache/spark/status/LiveEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import org.apache.spark.storage.RDDInfo
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.AccumulatorContext
import org.apache.spark.util.collection.OpenHashSet
import org.apache.spark.util.kvstore.KVStore

/**
* A mutable representation of a live entity in Spark (jobs, stages, tasks, et al). Every live
Expand Down Expand Up @@ -584,8 +583,7 @@ private object LiveEntityHelpers {
.filter { acc =>
// We don't need to store internal or SQL accumulables as their values will be shown in
// other places, so drop them to reduce the memory usage.
!acc.internal && (!acc.metadata.isDefined ||
acc.metadata.get != Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER))
!acc.internal && acc.metadata != Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CC @vanzin on this one for a double-check

}
.map { acc =>
new v1.AccumulableInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ private[spark] object ClosureCleaner extends Logging {
closure.getClass.isSynthetic &&
closure
.getClass
.getInterfaces.exists(_.getName.equals("scala.Serializable"))
.getInterfaces.exists(_.getName == "scala.Serializable")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See JIRA for explanation of this one


if (isClosureCandidate) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.util.collection

import java.util.Objects

import scala.collection.mutable.ArrayBuffer
import scala.ref.WeakReference

Expand Down Expand Up @@ -509,7 +507,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite
.sorted

assert(it.isEmpty)
assert(keys == (0 until 100))
assert(keys == (0 until 100).toList)

assert(map.numSpills == 0)
// these asserts try to show that we're no longer holding references to the underlying map.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ private[spark] class MesosClusterScheduler(
} else if (removeFromPendingRetryDrivers(submissionId)) {
k.success = true
k.message = "Removed driver while it's being retried"
} else if (finishedDrivers.exists(_.driverDescription.submissionId.equals(submissionId))) {
} else if (finishedDrivers.exists(_.driverDescription.submissionId == submissionId)) {
k.success = false
k.message = "Driver already terminated"
} else {
Expand All @@ -222,21 +222,21 @@ private[spark] class MesosClusterScheduler(
}
s.submissionId = submissionId
stateLock.synchronized {
if (queuedDrivers.exists(_.submissionId.equals(submissionId))) {
if (queuedDrivers.exists(_.submissionId == submissionId)) {
s.success = true
s.driverState = "QUEUED"
} else if (launchedDrivers.contains(submissionId)) {
s.success = true
s.driverState = "RUNNING"
launchedDrivers(submissionId).mesosTaskStatus.foreach(state => s.message = state.toString)
} else if (finishedDrivers.exists(_.driverDescription.submissionId.equals(submissionId))) {
} else if (finishedDrivers.exists(_.driverDescription.submissionId == submissionId)) {
s.success = true
s.driverState = "FINISHED"
finishedDrivers
.find(d => d.driverDescription.submissionId.equals(submissionId)).get.mesosTaskStatus
.foreach(state => s.message = state.toString)
} else if (pendingRetryDrivers.exists(_.submissionId.equals(submissionId))) {
val status = pendingRetryDrivers.find(_.submissionId.equals(submissionId))
} else if (pendingRetryDrivers.exists(_.submissionId == submissionId)) {
val status = pendingRetryDrivers.find(_.submissionId == submissionId)
.get.retryState.get.lastFailureStatus
s.success = true
s.driverState = "RETRYING"
Expand All @@ -254,13 +254,13 @@ private[spark] class MesosClusterScheduler(
*/
def getDriverState(submissionId: String): Option[MesosDriverState] = {
stateLock.synchronized {
queuedDrivers.find(_.submissionId.equals(submissionId))
queuedDrivers.find(_.submissionId == submissionId)
.map(d => new MesosDriverState("QUEUED", d))
.orElse(launchedDrivers.get(submissionId)
.map(d => new MesosDriverState("RUNNING", d.driverDescription, Some(d))))
.orElse(finishedDrivers.find(_.driverDescription.submissionId.equals(submissionId))
.orElse(finishedDrivers.find(_.driverDescription.submissionId == submissionId)
.map(d => new MesosDriverState("FINISHED", d.driverDescription, Some(d))))
.orElse(pendingRetryDrivers.find(_.submissionId.equals(submissionId))
.orElse(pendingRetryDrivers.find(_.submissionId == submissionId)
.map(d => new MesosDriverState("RETRYING", d)))
}
}
Expand Down Expand Up @@ -814,7 +814,7 @@ private[spark] class MesosClusterScheduler(
status: Int): Unit = {}

private def removeFromQueuedDrivers(subId: String): Boolean = {
val index = queuedDrivers.indexWhere(_.submissionId.equals(subId))
val index = queuedDrivers.indexWhere(_.submissionId == subId)
if (index != -1) {
queuedDrivers.remove(index)
queuedDriversState.expunge(subId)
Expand All @@ -834,7 +834,7 @@ private[spark] class MesosClusterScheduler(
}

private def removeFromPendingRetryDrivers(subId: String): Boolean = {
val index = pendingRetryDrivers.indexWhere(_.submissionId.equals(subId))
val index = pendingRetryDrivers.indexWhere(_.submissionId == subId)
if (index != -1) {
pendingRetryDrivers.remove(index)
pendingRetryDriversState.expunge(subId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.{Collection, Collections, Date}

import scala.collection.JavaConverters._

import org.apache.mesos.Protos.{Environment, Secret, TaskState => MesosTaskState, _}
import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
import org.apache.mesos.Protos.Value.{Scalar, Type}
import org.apache.mesos.SchedulerDriver
import org.mockito.{ArgumentCaptor, Matchers}
Expand Down Expand Up @@ -146,14 +146,14 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
assert(scheduler.getResource(resources, "cpus") == 1.5)
assert(scheduler.getResource(resources, "mem") == 1200)
val resourcesSeq: Seq[Resource] = resources.asScala
val cpus = resourcesSeq.filter(_.getName.equals("cpus")).toList
val cpus = resourcesSeq.filter(_.getName == "cpus").toList
assert(cpus.size == 2)
assert(cpus.exists(_.getRole().equals("role2")))
assert(cpus.exists(_.getRole().equals("*")))
val mem = resourcesSeq.filter(_.getName.equals("mem")).toList
assert(cpus.exists(_.getRole() == "role2"))
assert(cpus.exists(_.getRole() == "*"))
val mem = resourcesSeq.filter(_.getName == "mem").toList
assert(mem.size == 2)
assert(mem.exists(_.getRole().equals("role2")))
assert(mem.exists(_.getRole().equals("*")))
assert(mem.exists(_.getRole() == "role2"))
assert(mem.exists(_.getRole() == "*"))

verify(driver, times(1)).launchTasks(
Matchers.eq(Collections.singleton(offer.getId)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class MesosFineGrainedSchedulerBackendSuite
// uri is null.
val (executorInfo, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id")
val executorResources = executorInfo.getResourcesList
val cpus = executorResources.asScala.find(_.getName.equals("cpus")).get.getScalar.getValue
val cpus = executorResources.asScala.find(_.getName == "cpus").get.getScalar.getValue

assert(cpus === mesosExecutorCores)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
appContext.getQueue should be ("staging-queue")
appContext.getAMContainerSpec should be (containerLaunchContext)
appContext.getApplicationType should be ("SPARK")
appContext.getClass.getMethods.filter(_.getName.equals("getApplicationTags")).foreach{ method =>
appContext.getClass.getMethods.filter(_.getName == "getApplicationTags").foreach { method =>
val tags = method.invoke(appContext).asInstanceOf[java.util.Set[String]]
tags should contain allOf ("tag1", "dup", "tag2", "multi word")
tags.asScala.count(_.nonEmpty) should be (4)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class PropagateEmptyRelationSuite extends PlanTest {
.where(false)
.select('a)
.where('a > 1)
.where('a != 200)
.where('a =!= 200)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(!= isn't actually the Column operator for historical reasons having to do with precedence)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, thank you for fixing this.

.orderBy('a.asc)

val optimized = Optimize.execute(query.analyze)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,15 @@ class UnsafeArraySuite extends SparkFunSuite {
assert(unsafeDate.isInstanceOf[UnsafeArrayData])
assert(unsafeDate.numElements == dateArray.length)
dateArray.zipWithIndex.map { case (e, i) =>
assert(unsafeDate.get(i, DateType) == e)
assert(unsafeDate.get(i, DateType).asInstanceOf[Int] == e)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue here is that you can't really compare AnyRef to a primitive, not via boxing or unboxing; this may just be making some Scala coercion explicit but seemed worthwhile

}

val unsafeTimestamp = ExpressionEncoder[Array[Long]].resolveAndBind().
toRow(timestampArray).getArray(0)
assert(unsafeTimestamp.isInstanceOf[UnsafeArrayData])
assert(unsafeTimestamp.numElements == timestampArray.length)
timestampArray.zipWithIndex.map { case (e, i) =>
assert(unsafeTimestamp.get(i, TimestampType) == e)
assert(unsafeTimestamp.get(i, TimestampType).asInstanceOf[Long] == e)
}

Seq(decimalArray4_1, decimalArray20_20).map { decimalArray =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
).toDF("id", "stringData")
val sampleDF = df.sample(false, 0.7, 50)
// After sampling, sampleDF doesn't contain id=1.
assert(!sampleDF.select("id").collect.contains(1))
assert(!sampleDF.select("id").as[Int].collect.contains(1))
// simpleUdf should not encounter id=1.
checkAnswer(sampleDF.select(simpleUdf($"id")), List.fill(sampleDF.count.toInt)(Row(1)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
assert(errMsg.startsWith("Parquet column cannot be converted in file"))
val file = errMsg.substring("Parquet column cannot be converted in file ".length,
errMsg.indexOf(". "))
val col = spark.read.parquet(file).schema.fields.filter(_.name.equals("a"))
val col = spark.read.parquet(file).schema.fields.filter(_.name == "a")
assert(col.length == 1)
if (col(0).dataType == StringType) {
assert(errMsg.contains("Column: [a], Expected: int, Found: BINARY"))
Expand Down