diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java
index f62e85d43531..e3efc92c4a54 100644
--- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java
+++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java
@@ -196,6 +196,7 @@ public synchronized void close() throws IOException {
* when Scala wrappers are used, this makes sure that, hopefully, the JNI resources held by
* the iterator will eventually be released.
*/
+ @SuppressWarnings("deprecation")
@Override
protected void finalize() throws Throwable {
db.closeIterator(this);
diff --git a/common/unsafe/pom.xml b/common/unsafe/pom.xml
index 7e4b08217f1b..93a4f67fd23f 100644
--- a/common/unsafe/pom.xml
+++ b/common/unsafe/pom.xml
@@ -89,6 +89,11 @@
commons-lang3
test
+
+ org.apache.commons
+ commons-text
+ test
+
target/scala-${scala.binary.version}/classes
diff --git a/common/unsafe/src/test/scala/org/apache/spark/unsafe/types/UTF8StringPropertyCheckSuite.scala b/common/unsafe/src/test/scala/org/apache/spark/unsafe/types/UTF8StringPropertyCheckSuite.scala
index 9656951810da..fdb81a06d41c 100644
--- a/common/unsafe/src/test/scala/org/apache/spark/unsafe/types/UTF8StringPropertyCheckSuite.scala
+++ b/common/unsafe/src/test/scala/org/apache/spark/unsafe/types/UTF8StringPropertyCheckSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.unsafe.types
-import org.apache.commons.lang3.StringUtils
+import org.apache.commons.text.similarity.LevenshteinDistance
import org.scalacheck.{Arbitrary, Gen}
import org.scalatest.prop.GeneratorDrivenPropertyChecks
// scalastyle:off
@@ -232,7 +232,7 @@ class UTF8StringPropertyCheckSuite extends FunSuite with GeneratorDrivenProperty
test("levenshteinDistance") {
forAll { (one: String, another: String) =>
assert(toUTF8(one).levenshteinDistance(toUTF8(another)) ===
- StringUtils.getLevenshteinDistance(one, another))
+ LevenshteinDistance.getDefaultInstance.apply(one, another))
}
}
diff --git a/core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java b/core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java
index f6d1288cb263..92bf0ecc1b5c 100644
--- a/core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java
+++ b/core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java
@@ -27,7 +27,7 @@
* to read a file to avoid extra copy of data between Java and
* native memory which happens when using {@link java.io.BufferedInputStream}.
* Unfortunately, this is not something already available in JDK,
- * {@link sun.nio.ch.ChannelInputStream} supports reading a file using nio,
+ * {@code sun.nio.ch.ChannelInputStream} supports reading a file using nio,
* but does not support buffering.
*/
public final class NioBufferedFileInputStream extends InputStream {
@@ -130,6 +130,7 @@ public synchronized void close() throws IOException {
StorageUtils.dispose(byteBuffer);
}
+ @SuppressWarnings("deprecation")
@Override
protected void finalize() throws IOException {
close();
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index cb91717dfa12..845a3d5f6d6f 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -502,7 +502,9 @@ class SparkContext(config: SparkConf) extends Logging {
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
// create and start the heartbeater for collecting memory metrics
- _heartbeater = new Heartbeater(env.memoryManager, reportHeartBeat, "driver-heartbeater",
+ _heartbeater = new Heartbeater(env.memoryManager,
+ () => SparkContext.this.reportHeartBeat(),
+ "driver-heartbeater",
conf.get(EXECUTOR_HEARTBEAT_INTERVAL))
_heartbeater.start()
diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackend.scala b/core/src/main/scala/org/apache/spark/api/r/RBackend.scala
index 7ce258155501..50c8fdf5316d 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RBackend.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RBackend.scala
@@ -17,7 +17,7 @@
package org.apache.spark.api.r
-import java.io.{DataInputStream, DataOutputStream, File, FileOutputStream, IOException}
+import java.io.{DataOutputStream, File, FileOutputStream, IOException}
import java.net.{InetAddress, InetSocketAddress, ServerSocket, Socket}
import java.util.concurrent.TimeUnit
@@ -32,8 +32,6 @@ import io.netty.handler.timeout.ReadTimeoutHandler
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
-import org.apache.spark.network.util.JavaUtils
-import org.apache.spark.util.Utils
/**
* Netty-based backend server that is used to communicate between R and Java.
@@ -99,7 +97,7 @@ private[spark] class RBackend {
if (bootstrap != null && bootstrap.config().group() != null) {
bootstrap.config().group().shutdownGracefully()
}
- if (bootstrap != null && bootstrap.childGroup() != null) {
+ if (bootstrap != null && bootstrap.config().childGroup() != null) {
bootstrap.config().childGroup().shutdownGracefully()
}
bootstrap = null
@@ -147,7 +145,7 @@ private[spark] object RBackend extends Logging {
new Thread("wait for socket to close") {
setDaemon(true)
override def run(): Unit = {
- // any un-catched exception will also shutdown JVM
+ // any uncaught exception will also shutdown JVM
val buf = new Array[Byte](1024)
// shutdown JVM if R does not connect back in 10 seconds
serverSocket.setSoTimeout(10000)
diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
index 10cd8742f2b4..1169b2878e99 100644
--- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
@@ -270,7 +270,9 @@ private[spark] class HadoopDelegationTokenManager(
}
private def loadProviders(): Map[String, HadoopDelegationTokenProvider] = {
- val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystemsToAccess)) ++
+ val providers = Seq(
+ new HadoopFSDelegationTokenProvider(
+ () => HadoopDelegationTokenManager.this.fileSystemsToAccess())) ++
safeCreateProvider(new HiveDelegationTokenProvider) ++
safeCreateProvider(new HBaseDelegationTokenProvider)
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 61deb543d874..a30a501e5d4a 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -190,8 +190,11 @@ private[spark] class Executor(
private val HEARTBEAT_INTERVAL_MS = conf.get(EXECUTOR_HEARTBEAT_INTERVAL)
// Executor for the heartbeat task.
- private val heartbeater = new Heartbeater(env.memoryManager, reportHeartBeat,
- "executor-heartbeater", HEARTBEAT_INTERVAL_MS)
+ private val heartbeater = new Heartbeater(
+ env.memoryManager,
+ () => Executor.this.reportHeartBeat(),
+ "executor-heartbeater",
+ HEARTBEAT_INTERVAL_MS)
// must be initialized before running startDriverHeartbeat()
private val heartbeatReceiverRef =
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
index 903e25b7986f..33a68f24bd53 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -30,7 +30,7 @@ import org.apache.spark.storage.RDDInfo
@DeveloperApi
class StageInfo(
val stageId: Int,
- @deprecated("Use attemptNumber instead", "2.3.0") val attemptId: Int,
+ private val attemptId: Int,
val name: String,
val numTasks: Int,
val rddInfos: Seq[RDDInfo],
@@ -56,6 +56,8 @@ class StageInfo(
completionTime = Some(System.currentTimeMillis)
}
+ // This would just be the second constructor arg, except we need to maintain this method
+ // with parentheses for compatibility
def attemptNumber(): Int = attemptId
private[spark] def getStatusString: String = {
diff --git a/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala b/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala
index 31ce9483cf20..424d9f825c46 100644
--- a/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala
@@ -215,7 +215,7 @@ class ParallelCollectionSplitSuite extends SparkFunSuite with Checkers {
}
test("exclusive ranges of doubles") {
- val data = 1.0 until 100.0 by 1.0
+ val data = Range.BigDecimal(1, 100, 1)
val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3)
assert(slices.map(_.size).sum === 99)
@@ -223,7 +223,7 @@ class ParallelCollectionSplitSuite extends SparkFunSuite with Checkers {
}
test("inclusive ranges of doubles") {
- val data = 1.0 to 100.0 by 1.0
+ val data = Range.BigDecimal.inclusive(1, 100, 1)
val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3)
assert(slices.map(_.size).sum === 100)
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index 84af73b08d3e..e413fe3b774d 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -202,7 +202,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
def check[T: ClassTag](t: T) {
assert(ser.deserialize[T](ser.serialize(t)) === t)
// Check that very long ranges don't get written one element at a time
- assert(ser.serialize(t).limit() < 100)
+ assert(ser.serialize(t).limit() < 200)
}
check(1 to 1000000)
check(1 to 1000000 by 2)
@@ -212,10 +212,10 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
check(1L to 1000000L by 2L)
check(1L until 1000000L)
check(1L until 1000000L by 2L)
- check(1.0 to 1000000.0 by 1.0)
- check(1.0 to 1000000.0 by 2.0)
- check(1.0 until 1000000.0 by 1.0)
- check(1.0 until 1000000.0 by 2.0)
+ check(Range.BigDecimal.inclusive(1, 1000000, 1))
+ check(Range.BigDecimal.inclusive(1, 1000000, 2))
+ check(Range.BigDecimal(1, 1000000, 1))
+ check(Range.BigDecimal(1, 1000000, 2))
}
test("asJavaIterable") {
diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index bfd73069fbff..5f757b757ac6 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -18,8 +18,7 @@
package org.apache.spark.status
import java.io.File
-import java.lang.{Integer => JInteger, Long => JLong}
-import java.util.{Arrays, Date, Properties}
+import java.util.{Date, Properties}
import scala.collection.JavaConverters._
import scala.collection.immutable.Map
@@ -1171,12 +1170,12 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
// Stop task 2 before task 1
time += 1
tasks(1).markFinished(TaskState.FINISHED, time)
- listener.onTaskEnd(
- SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", Success, tasks(1), null))
+ listener.onTaskEnd(SparkListenerTaskEnd(
+ stage1.stageId, stage1.attemptNumber, "taskType", Success, tasks(1), null))
time += 1
tasks(0).markFinished(TaskState.FINISHED, time)
- listener.onTaskEnd(
- SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", Success, tasks(0), null))
+ listener.onTaskEnd(SparkListenerTaskEnd(
+ stage1.stageId, stage1.attemptNumber, "taskType", Success, tasks(0), null))
// Start task 3 and task 2 should be evicted.
listener.onTaskStart(SparkListenerTaskStart(stage1.stageId, stage1.attemptNumber, tasks(2)))
@@ -1241,8 +1240,8 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
// Task 1 Finished
time += 1
tasks(0).markFinished(TaskState.FINISHED, time)
- listener.onTaskEnd(
- SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", Success, tasks(0), null))
+ listener.onTaskEnd(SparkListenerTaskEnd(
+ stage1.stageId, stage1.attemptNumber, "taskType", Success, tasks(0), null))
// Stage 1 Completed
stage1.failureReason = Some("Failed")
@@ -1256,7 +1255,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
time += 1
tasks(1).markFinished(TaskState.FINISHED, time)
listener.onTaskEnd(
- SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType",
+ SparkListenerTaskEnd(stage1.stageId, stage1.attemptNumber, "taskType",
TaskKilled(reason = "Killed"), tasks(1), null))
// Ensure killed task metrics are updated
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index cd2526578413..35fba1a3b73c 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.util.collection
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
+import scala.language.postfixOps
import scala.ref.WeakReference
import org.scalatest.Matchers
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index 8d6cca8e48c3..207c54ce75f4 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -138,7 +138,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
test("test NULL avro type") {
withTempPath { dir =>
val fields =
- Seq(new Field("null", Schema.create(Type.NULL), "doc", null)).asJava
+ Seq(new Field("null", Schema.create(Type.NULL), "doc", null.asInstanceOf[AnyVal])).asJava
val schema = Schema.createRecord("name", "docs", "namespace", false)
schema.setFields(fields)
val datumWriter = new GenericDatumWriter[GenericRecord](schema)
@@ -161,7 +161,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
val avroSchema: Schema = {
val union =
Schema.createUnion(List(Schema.create(Type.INT), Schema.create(Type.LONG)).asJava)
- val fields = Seq(new Field("field1", union, "doc", null)).asJava
+ val fields = Seq(new Field("field1", union, "doc", null.asInstanceOf[AnyVal])).asJava
val schema = Schema.createRecord("name", "docs", "namespace", false)
schema.setFields(fields)
schema
@@ -189,7 +189,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
val avroSchema: Schema = {
val union =
Schema.createUnion(List(Schema.create(Type.FLOAT), Schema.create(Type.DOUBLE)).asJava)
- val fields = Seq(new Field("field1", union, "doc", null)).asJava
+ val fields = Seq(new Field("field1", union, "doc", null.asInstanceOf[AnyVal])).asJava
val schema = Schema.createRecord("name", "docs", "namespace", false)
schema.setFields(fields)
schema
@@ -221,7 +221,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
Schema.create(Type.NULL)
).asJava
)
- val fields = Seq(new Field("field1", union, "doc", null)).asJava
+ val fields = Seq(new Field("field1", union, "doc", null.asInstanceOf[AnyVal])).asJava
val schema = Schema.createRecord("name", "docs", "namespace", false)
schema.setFields(fields)
schema
@@ -247,7 +247,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
test("Union of a single type") {
withTempPath { dir =>
val UnionOfOne = Schema.createUnion(List(Schema.create(Type.INT)).asJava)
- val fields = Seq(new Field("field1", UnionOfOne, "doc", null)).asJava
+ val fields = Seq(new Field("field1", UnionOfOne, "doc", null.asInstanceOf[AnyVal])).asJava
val schema = Schema.createRecord("name", "docs", "namespace", false)
schema.setFields(fields)
@@ -274,10 +274,10 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
val complexUnionType = Schema.createUnion(
List(Schema.create(Type.INT), Schema.create(Type.STRING), fixedSchema, enumSchema).asJava)
val fields = Seq(
- new Field("field1", complexUnionType, "doc", null),
- new Field("field2", complexUnionType, "doc", null),
- new Field("field3", complexUnionType, "doc", null),
- new Field("field4", complexUnionType, "doc", null)
+ new Field("field1", complexUnionType, "doc", null.asInstanceOf[AnyVal]),
+ new Field("field2", complexUnionType, "doc", null.asInstanceOf[AnyVal]),
+ new Field("field3", complexUnionType, "doc", null.asInstanceOf[AnyVal]),
+ new Field("field4", complexUnionType, "doc", null.asInstanceOf[AnyVal])
).asJava
val schema = Schema.createRecord("name", "docs", "namespace", false)
schema.setFields(fields)
@@ -941,7 +941,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
val avroArrayType = resolveNullable(Schema.createArray(avroType), nullable)
val avroMapType = resolveNullable(Schema.createMap(avroType), nullable)
val name = "foo"
- val avroField = new Field(name, avroType, "", null)
+ val avroField = new Field(name, avroType, "", null.asInstanceOf[AnyVal])
val recordSchema = Schema.createRecord("name", "doc", "space", true, Seq(avroField).asJava)
val avroRecordType = resolveNullable(recordSchema, nullable)
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
index fa6bdc20bd4f..aa21f1271b81 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
@@ -56,7 +56,7 @@ trait KafkaContinuousTest extends KafkaSourceTest {
}
// Continuous processing tasks end asynchronously, so test that they actually end.
- private val tasksEndedListener = new SparkListener() {
+ private class TasksEndedListener extends SparkListener {
val activeTaskIdCount = new AtomicInteger(0)
override def onTaskStart(start: SparkListenerTaskStart): Unit = {
@@ -68,6 +68,8 @@ trait KafkaContinuousTest extends KafkaSourceTest {
}
}
+ private val tasksEndedListener = new TasksEndedListener()
+
override def beforeEach(): Unit = {
super.beforeEach()
spark.sparkContext.addSparkListener(tasksEndedListener)
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
index cf283a5c3e11..07960d14b0bf 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
@@ -228,7 +228,7 @@ object ConsumerStrategies {
new Subscribe[K, V](
new ju.ArrayList(topics.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
- new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava))
+ new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(jl.Long.valueOf).asJava))
}
/**
@@ -307,7 +307,7 @@ object ConsumerStrategies {
new SubscribePattern[K, V](
pattern,
new ju.HashMap[String, Object](kafkaParams.asJava),
- new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava))
+ new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(jl.Long.valueOf).asJava))
}
/**
@@ -391,7 +391,7 @@ object ConsumerStrategies {
new Assign[K, V](
new ju.ArrayList(topicPartitions.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
- new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava))
+ new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(jl.Long.valueOf).asJava))
}
/**
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/LabeledPoint.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/LabeledPoint.scala
index c5d0ec1a8d35..412954f7b2d5 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/LabeledPoint.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/LabeledPoint.scala
@@ -17,8 +17,6 @@
package org.apache.spark.ml.feature
-import scala.beans.BeanInfo
-
import org.apache.spark.annotation.Since
import org.apache.spark.ml.linalg.Vector
@@ -30,8 +28,12 @@ import org.apache.spark.ml.linalg.Vector
* @param features List of features for this data point.
*/
@Since("2.0.0")
-@BeanInfo
case class LabeledPoint(@Since("2.0.0") label: Double, @Since("2.0.0") features: Vector) {
+
+ def getLabel: Double = label
+
+ def getFeatures: Vector = features
+
override def toString: String = {
s"($label,$features)"
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala
index 56e2c543d100..5bfaa3b7f3f5 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala
@@ -17,10 +17,6 @@
package org.apache.spark.ml.feature
-import org.json4s.JsonDSL._
-import org.json4s.JValue
-import org.json4s.jackson.JsonMethods._
-
import org.apache.spark.annotation.Since
import org.apache.spark.internal.Logging
import org.apache.spark.ml._
@@ -209,7 +205,7 @@ final class QuantileDiscretizer @Since("1.6.0") (@Since("1.6.0") override val ui
if (isSet(inputCols)) {
val splitsArray = if (isSet(numBucketsArray)) {
val probArrayPerCol = $(numBucketsArray).map { numOfBuckets =>
- (0.0 to 1.0 by 1.0 / numOfBuckets).toArray
+ (0 to numOfBuckets).map(_.toDouble / numOfBuckets).toArray
}
val probabilityArray = probArrayPerCol.flatten.sorted.distinct
@@ -229,12 +225,12 @@ final class QuantileDiscretizer @Since("1.6.0") (@Since("1.6.0") override val ui
}
} else {
dataset.stat.approxQuantile($(inputCols),
- (0.0 to 1.0 by 1.0 / $(numBuckets)).toArray, $(relativeError))
+ (0 to $(numBuckets)).map(_.toDouble / $(numBuckets)).toArray, $(relativeError))
}
bucketizer.setSplitsArray(splitsArray.map(getDistinctSplits))
} else {
val splits = dataset.stat.approxQuantile($(inputCol),
- (0.0 to 1.0 by 1.0 / $(numBuckets)).toArray, $(relativeError))
+ (0 to $(numBuckets)).map(_.toDouble / $(numBuckets)).toArray, $(relativeError))
bucketizer.setSplits(getDistinctSplits(splits))
}
copyValues(bucketizer.setParent(this))
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala
index 4381d6ab20cc..b320057b2527 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala
@@ -17,8 +17,6 @@
package org.apache.spark.mllib.regression
-import scala.beans.BeanInfo
-
import org.apache.spark.SparkException
import org.apache.spark.annotation.Since
import org.apache.spark.ml.feature.{LabeledPoint => NewLabeledPoint}
@@ -32,10 +30,14 @@ import org.apache.spark.mllib.util.NumericParser
* @param features List of features for this data point.
*/
@Since("0.8.0")
-@BeanInfo
case class LabeledPoint @Since("1.0.0") (
@Since("0.8.0") label: Double,
@Since("1.0.0") features: Vector) {
+
+ def getLabel: Double = label
+
+ def getFeatures: Vector = features
+
override def toString: String = {
s"($label,$features)"
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/StreamingTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/StreamingTest.scala
index 80c6ef0ea1aa..85ed11d6553d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/StreamingTest.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/StreamingTest.scala
@@ -17,8 +17,6 @@
package org.apache.spark.mllib.stat.test
-import scala.beans.BeanInfo
-
import org.apache.spark.annotation.Since
import org.apache.spark.internal.Logging
import org.apache.spark.streaming.api.java.JavaDStream
@@ -32,10 +30,11 @@ import org.apache.spark.util.StatCounter
* @param value numeric value of the observation.
*/
@Since("1.6.0")
-@BeanInfo
case class BinarySample @Since("1.6.0") (
@Since("1.6.0") isExperiment: Boolean,
@Since("1.6.0") value: Double) {
+ def getIsExperiment: Boolean = isExperiment
+ def getValue: Double = value
override def toString: String = {
s"($isExperiment, $value)"
}
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/DCTSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/DCTSuite.scala
index 6734336aac39..985e396000d0 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/DCTSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/DCTSuite.scala
@@ -17,16 +17,16 @@
package org.apache.spark.ml.feature
-import scala.beans.BeanInfo
-
import edu.emory.mathcs.jtransforms.dct.DoubleDCT_1D
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest}
import org.apache.spark.sql.Row
-@BeanInfo
-case class DCTTestData(vec: Vector, wantedVec: Vector)
+case class DCTTestData(vec: Vector, wantedVec: Vector) {
+ def getVec: Vector = vec
+ def getWantedVec: Vector = wantedVec
+}
class DCTSuite extends MLTest with DefaultReadWriteTest {
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala
index 201a335e0d7b..1483d5df4d22 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala
@@ -17,14 +17,13 @@
package org.apache.spark.ml.feature
-import scala.beans.BeanInfo
-
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest}
import org.apache.spark.sql.{DataFrame, Row}
-
-@BeanInfo
-case class NGramTestData(inputTokens: Array[String], wantedNGrams: Array[String])
+case class NGramTestData(inputTokens: Array[String], wantedNGrams: Array[String]) {
+ def getInputTokens: Array[String] = inputTokens
+ def getWantedNGrams: Array[String] = wantedNGrams
+}
class NGramSuite extends MLTest with DefaultReadWriteTest {
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala
index b009038bbd83..82af05039653 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala
@@ -31,7 +31,7 @@ class QuantileDiscretizerSuite extends MLTest with DefaultReadWriteTest {
val datasetSize = 100000
val numBuckets = 5
- val df = sc.parallelize(1.0 to datasetSize by 1.0).map(Tuple1.apply).toDF("input")
+ val df = sc.parallelize(1 to datasetSize).map(_.toDouble).map(Tuple1.apply).toDF("input")
val discretizer = new QuantileDiscretizer()
.setInputCol("input")
.setOutputCol("result")
@@ -114,8 +114,8 @@ class QuantileDiscretizerSuite extends MLTest with DefaultReadWriteTest {
val spark = this.spark
import spark.implicits._
- val trainDF = sc.parallelize(1.0 to 100.0 by 1.0).map(Tuple1.apply).toDF("input")
- val testDF = sc.parallelize(-10.0 to 110.0 by 1.0).map(Tuple1.apply).toDF("input")
+ val trainDF = sc.parallelize((1 to 100).map(_.toDouble)).map(Tuple1.apply).toDF("input")
+ val testDF = sc.parallelize((-10 to 110).map(_.toDouble)).map(Tuple1.apply).toDF("input")
val discretizer = new QuantileDiscretizer()
.setInputCol("input")
.setOutputCol("result")
@@ -276,10 +276,10 @@ class QuantileDiscretizerSuite extends MLTest with DefaultReadWriteTest {
1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0)
val data2 = Array.range(1, 40, 2).map(_.toDouble)
val expected2 = Array (0.0, 0.0, 0.0, 1.0, 1.0, 1.0, 1.0, 2.0, 2.0, 2.0,
- 2.0, 2.0, 3.0, 3.0, 3.0, 4.0, 4.0, 4.0, 4.0, 4.0)
+ 2.0, 3.0, 3.0, 3.0, 3.0, 4.0, 4.0, 4.0, 4.0, 4.0)
val data3 = Array.range(1, 60, 3).map(_.toDouble)
- val expected3 = Array (0.0, 1.0, 1.0, 2.0, 2.0, 2.0, 3.0, 4.0, 4.0, 5.0,
- 5.0, 5.0, 6.0, 6.0, 7.0, 8.0, 8.0, 9.0, 9.0, 9.0)
+ val expected3 = Array (0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0, 4.0, 4.0, 5.0,
+ 5.0, 6.0, 6.0, 7.0, 7.0, 8.0, 8.0, 9.0, 9.0, 9.0)
val data = (0 until 20).map { idx =>
(data1(idx), data2(idx), data3(idx), expected1(idx), expected2(idx), expected3(idx))
}
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/TokenizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/TokenizerSuite.scala
index be59b0af2c78..ba8e79f14de9 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/TokenizerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/TokenizerSuite.scala
@@ -17,14 +17,14 @@
package org.apache.spark.ml.feature
-import scala.beans.BeanInfo
-
import org.apache.spark.ml.param.ParamsSuite
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest}
import org.apache.spark.sql.{DataFrame, Row}
-@BeanInfo
-case class TokenizerTestData(rawText: String, wantedTokens: Array[String])
+case class TokenizerTestData(rawText: String, wantedTokens: Array[String]) {
+ def getRawText: String = rawText
+ def getWantedTokens: Array[String] = wantedTokens
+}
class TokenizerSuite extends MLTest with DefaultReadWriteTest {
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala
index fb5789f945de..44b0f8f8ae7d 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala
@@ -17,8 +17,6 @@
package org.apache.spark.ml.feature
-import scala.beans.{BeanInfo, BeanProperty}
-
import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.ml.attribute._
@@ -26,7 +24,7 @@ import org.apache.spark.ml.linalg.{SparseVector, Vector, Vectors}
import org.apache.spark.ml.param.ParamsSuite
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.DataFrame
class VectorIndexerSuite extends MLTest with DefaultReadWriteTest with Logging {
@@ -339,6 +337,7 @@ class VectorIndexerSuite extends MLTest with DefaultReadWriteTest with Logging {
}
private[feature] object VectorIndexerSuite {
- @BeanInfo
- case class FeatureData(@BeanProperty features: Vector)
+ case class FeatureData(features: Vector) {
+ def getFeatures: Vector = features
+ }
}
diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
index 9a59c41740da..2fc9754ecfe1 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
@@ -601,7 +601,7 @@ class ALSSuite extends MLTest with DefaultReadWriteTest with Logging {
val df = maybeDf.get._2
val expected = estimator.fit(df)
- val actuals = dfs.filter(_ != baseType).map(t => (t, estimator.fit(t._2)))
+ val actuals = dfs.map(t => (t, estimator.fit(t._2)))
actuals.foreach { case (_, actual) => check(expected, actual) }
actuals.foreach { case (t, actual) => check2(expected, actual, t._2, t._1.encoder) }
diff --git a/pom.xml b/pom.xml
index 59e3d0fa772b..1e00edb439c2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -407,6 +407,11 @@
commons-lang3
${commons-lang3.version}
+
+ org.apache.commons
+ commons-text
+ 1.6
+
commons-lang
commons-lang
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index a8d2b5d1d9cb..e35e74aa3304 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -36,6 +36,11 @@ object MimaExcludes {
// Exclude rules for 3.0.x
lazy val v30excludes = v24excludes ++ Seq(
+ // [SPARK-26090] Resolve most miscellaneous deprecation and build warnings for Spark 3
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.stat.test.BinarySampleBeanInfo"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.regression.LabeledPointBeanInfo"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ml.feature.LabeledPointBeanInfo"),
+
// [SPARK-25959] GBTClassifier picks wrong impurity stats on loading
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.classification.GBTClassificationModel.setImpurity"),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.tree.HasVarianceImpurity.org$apache$spark$ml$tree$HasVarianceImpurity$_setter_$impurity_="),
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
index be4daec3b1bb..167fb402cd40 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
@@ -55,7 +55,7 @@ private[spark] class KubernetesDriverBuilder(
providePodTemplateConfigMapStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
=> PodTemplateConfigMapStep) =
new PodTemplateConfigMapStep(_),
- provideInitialPod: () => SparkPod = SparkPod.initialPod) {
+ provideInitialPod: () => SparkPod = () => SparkPod.initialPod()) {
def buildFromFeatures(
kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]): KubernetesDriverSpec = {
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
index 089f84dec277..fc41a4770bce 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
@@ -53,7 +53,7 @@ private[spark] class KubernetesExecutorBuilder(
KubernetesConf[KubernetesExecutorSpecificConf]
=> HadoopSparkUserExecutorFeatureStep) =
new HadoopSparkUserExecutorFeatureStep(_),
- provideInitialPod: () => SparkPod = SparkPod.initialPod) {
+ provideInitialPod: () => SparkPod = () => SparkPod.initialPod()) {
def buildFromFeatures(
kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]): SparkPod = {
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
index 81e3822389f3..08f28758ef48 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
@@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.submit
import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.client.{KubernetesClient, Watch}
-import io.fabric8.kubernetes.client.dsl.{MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource}
+import io.fabric8.kubernetes.client.dsl.PodResource
import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
import org.mockito.Mockito.{doReturn, verify, when}
import org.scalatest.BeforeAndAfter
@@ -28,7 +28,6 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, SparkPod}
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.Fabric8Aliases._
-import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
class ClientSuite extends SparkFunSuite with BeforeAndAfter {
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
index b336774838bc..2f984e5d8980 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
@@ -157,7 +157,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
private def kubernetesConfWithCorrectFields(): KubernetesConf[KubernetesExecutorSpecificConf] =
Matchers.argThat(new ArgumentMatcher[KubernetesConf[KubernetesExecutorSpecificConf]] {
override def matches(argument: scala.Any): Boolean = {
- if (!argument.isInstanceOf[KubernetesConf[KubernetesExecutorSpecificConf]]) {
+ if (!argument.isInstanceOf[KubernetesConf[_]]) {
false
} else {
val k8sConf = argument.asInstanceOf[KubernetesConf[KubernetesExecutorSpecificConf]]
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 35299166d981..c3070de3d17c 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -116,8 +116,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
}
def createContainer(host: String, resource: Resource = containerResource): Container = {
- // When YARN 2.6+ is required, avoid deprecation by using version with long second arg
- val containerId = ContainerId.newInstance(appAttemptId, containerNum)
+ val containerId = ContainerId.newContainerId(appAttemptId, containerNum)
containerNum += 1
val nodeId = NodeId.newInstance(host, 1000)
Container.newInstance(containerId, nodeId, "", resource, RM_REQUEST_PRIORITY, null)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/HyperLogLogPlusPlusHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/HyperLogLogPlusPlusHelper.scala
index 9bacd3b925be..ea619c6a7666 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/HyperLogLogPlusPlusHelper.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/HyperLogLogPlusPlusHelper.scala
@@ -199,7 +199,7 @@ class HyperLogLogPlusPlusHelper(relativeSD: Double) extends Serializable {
var shift = 0
while (idx < m && i < REGISTERS_PER_WORD) {
val Midx = (word >>> shift) & REGISTER_WORD_MASK
- zInverse += 1.0 / (1 << Midx)
+ zInverse += 1.0 / (1L << Midx)
if (Midx == 0) {
V += 1.0d
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index 94778840d706..117e96175e92 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.catalyst.analysis
-import scala.beans.{BeanInfo, BeanProperty}
-
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
@@ -30,8 +28,9 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, MapData}
import org.apache.spark.sql.types._
-@BeanInfo
-private[sql] case class GroupableData(@BeanProperty data: Int)
+private[sql] case class GroupableData(data: Int) {
+ def getData: Int = data
+}
private[sql] class GroupableUDT extends UserDefinedType[GroupableData] {
@@ -50,8 +49,9 @@ private[sql] class GroupableUDT extends UserDefinedType[GroupableData] {
private[spark] override def asNullable: GroupableUDT = this
}
-@BeanInfo
-private[sql] case class UngroupableData(@BeanProperty data: Map[Int, Int])
+private[sql] case class UngroupableData(data: Map[Int, Int]) {
+ def getData: Map[Int, Int] = data
+}
private[sql] class UngroupableUDT extends UserDefinedType[UngroupableData] {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index cd52d991d55c..8ee84b02f675 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -311,7 +311,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
outputMode: OutputMode,
useTempCheckpointLocation: Boolean = false,
recoverFromCheckpointLocation: Boolean = true,
- trigger: Trigger = ProcessingTime(0),
+ trigger: Trigger = Trigger.ProcessingTime(0),
triggerClock: Clock = new SystemClock()): StreamingQuery = {
val query = createQuery(
userSpecifiedName,
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java
index 7f975a647c24..8f35abeb579b 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java
@@ -143,11 +143,16 @@ public void setIntervals(List intervals) {
this.intervals = intervals;
}
+ @Override
+ public int hashCode() {
+ return id ^ Objects.hashCode(intervals);
+ }
+
@Override
public boolean equals(Object obj) {
if (!(obj instanceof ArrayRecord)) return false;
ArrayRecord other = (ArrayRecord) obj;
- return (other.id == this.id) && other.intervals.equals(this.intervals);
+ return (other.id == this.id) && Objects.equals(other.intervals, this.intervals);
}
@Override
@@ -184,6 +189,11 @@ public void setIntervals(Map intervals) {
this.intervals = intervals;
}
+ @Override
+ public int hashCode() {
+ return id ^ Objects.hashCode(intervals);
+ }
+
@Override
public boolean equals(Object obj) {
if (!(obj instanceof MapRecord)) return false;
@@ -225,6 +235,11 @@ public void setEndTime(long endTime) {
this.endTime = endTime;
}
+ @Override
+ public int hashCode() {
+ return Long.hashCode(startTime) ^ Long.hashCode(endTime);
+ }
+
@Override
public boolean equals(Object obj) {
if (!(obj instanceof Interval)) return false;
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaRangeInputPartition.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaRangeInputPartition.java
new file mode 100644
index 000000000000..438f489a3eea
--- /dev/null
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaRangeInputPartition.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
+
+class JavaRangeInputPartition implements InputPartition {
+ int start;
+ int end;
+
+ JavaRangeInputPartition(int start, int end) {
+ this.start = start;
+ this.end = end;
+ }
+}
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleReadSupport.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleReadSupport.java
index 685f9b9747e8..ced51dde6997 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleReadSupport.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleReadSupport.java
@@ -88,12 +88,3 @@ public void close() throws IOException {
}
}
-class JavaRangeInputPartition implements InputPartition {
- int start;
- int end;
-
- JavaRangeInputPartition(int start, int end) {
- this.start = start;
- this.end = end;
- }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
index cc8b600efa46..cf956316057e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql
-import scala.beans.{BeanInfo, BeanProperty}
-
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{Cast, ExpressionEvalHelper, GenericInternalRow, Literal}
@@ -28,10 +26,10 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
-@BeanInfo
-private[sql] case class MyLabeledPoint(
- @BeanProperty label: Double,
- @BeanProperty features: UDT.MyDenseVector)
+private[sql] case class MyLabeledPoint(label: Double, features: UDT.MyDenseVector) {
+ def getLabel: Double = label
+ def getFeatures: UDT.MyDenseVector = features
+}
// Wrapped in an object to check Scala compatibility. See SPARK-13929
object UDT {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala
index 0d9f1fb0c02c..fb3388452e4e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala
@@ -46,6 +46,7 @@ class IntegralDeltaSuite extends SparkFunSuite {
(input.tail, input.init).zipped.map {
case (x: Int, y: Int) => (x - y).toLong
case (x: Long, y: Long) => x - y
+ case other => fail(s"Unexpected input $other")
}
}
@@ -116,7 +117,7 @@ class IntegralDeltaSuite extends SparkFunSuite {
val row = new GenericInternalRow(1)
val nullRow = new GenericInternalRow(1)
nullRow.setNullAt(0)
- input.map { value =>
+ input.foreach { value =>
if (value == nullValue) {
builder.appendFrom(nullRow, 0)
} else {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala
index 80c76915e4c2..2d338ab92211 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala
@@ -19,9 +19,6 @@ package org.apache.spark.sql.execution.streaming
import java.util.concurrent.ConcurrentHashMap
-import scala.collection.mutable
-
-import org.eclipse.jetty.util.ConcurrentHashSet
import org.scalatest.concurrent.{Eventually, Signaler, ThreadSignaler, TimeLimits}
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._
@@ -48,7 +45,7 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite with TimeLimits {
}
test("trigger timing") {
- val triggerTimes = new ConcurrentHashSet[Int]
+ val triggerTimes = ConcurrentHashMap.newKeySet[Int]()
val clock = new StreamManualClock()
@volatile var continueExecuting = true
@volatile var clockIncrementInTrigger = 0L
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
index 635ea6fca649..7db31f1f8f69 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
@@ -382,10 +382,9 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
tasks.foreach {
case t: TextSocketContinuousInputPartition =>
val r = readerFactory.createReader(t).asInstanceOf[TextSocketContinuousPartitionReader]
- for (i <- 0 until numRecords / 2) {
+ for (_ <- 0 until numRecords / 2) {
r.next()
- assert(r.get().get(0, TextSocketReader.SCHEMA_TIMESTAMP)
- .isInstanceOf[(String, Timestamp)])
+ assert(r.get().get(0, TextSocketReader.SCHEMA_TIMESTAMP).isInstanceOf[(_, _)])
}
case _ => throw new IllegalStateException("Unexpected task type")
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
index e8710aeb40bd..ddc5dbb148cb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
@@ -150,7 +150,7 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext {
def getPeakExecutionMemory(stageId: Int): Long = {
val peakMemoryAccumulator = sparkListener.getCompletedStageInfos(stageId).accumulables
- .filter(_._2.name == InternalAccumulator.PEAK_EXECUTION_MEMORY)
+ .filter(_._2.name == Some(InternalAccumulator.PEAK_EXECUTION_MEMORY))
assert(peakMemoryAccumulator.size == 1)
peakMemoryAccumulator.head._2.value.get.asInstanceOf[Long]
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveCliSessionStateSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveCliSessionStateSuite.scala
index 5f9ea4d26790..035b71a37a69 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveCliSessionStateSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveCliSessionStateSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.hive.HiveUtils
class HiveCliSessionStateSuite extends SparkFunSuite {
def withSessionClear(f: () => Unit): Unit = {
- try f finally SessionState.detachSession()
+ try f() finally SessionState.detachSession()
}
test("CliSessionState will be reused") {