Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ public synchronized void close() throws IOException {
* when Scala wrappers are used, this makes sure that, hopefully, the JNI resources held by
* the iterator will eventually be released.
*/
@SuppressWarnings("deprecation")
@Override
protected void finalize() throws Throwable {
db.closeIterator(this);
Expand Down
5 changes: 5 additions & 0 deletions common/unsafe/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@
<artifactId>commons-lang3</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LevenshteinDistance moved here from commons lang

<scope>test</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.unsafe.types

import org.apache.commons.lang3.StringUtils
import org.apache.commons.text.similarity.LevenshteinDistance
import org.scalacheck.{Arbitrary, Gen}
import org.scalatest.prop.GeneratorDrivenPropertyChecks
// scalastyle:off
Expand Down Expand Up @@ -232,7 +232,7 @@ class UTF8StringPropertyCheckSuite extends FunSuite with GeneratorDrivenProperty
test("levenshteinDistance") {
forAll { (one: String, another: String) =>
assert(toUTF8(one).levenshteinDistance(toUTF8(another)) ===
StringUtils.getLevenshteinDistance(one, another))
LevenshteinDistance.getDefaultInstance.apply(one, another))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* to read a file to avoid extra copy of data between Java and
* native memory which happens when using {@link java.io.BufferedInputStream}.
* Unfortunately, this is not something already available in JDK,
* {@link sun.nio.ch.ChannelInputStream} supports reading a file using nio,
* {@code sun.nio.ch.ChannelInputStream} supports reading a file using nio,
* but does not support buffering.
*/
public final class NioBufferedFileInputStream extends InputStream {
Expand Down Expand Up @@ -130,6 +130,7 @@ public synchronized void close() throws IOException {
StorageUtils.dispose(byteBuffer);
}

@SuppressWarnings("deprecation")
@Override
protected void finalize() throws IOException {
close();
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,9 @@ class SparkContext(config: SparkConf) extends Logging {
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

// create and start the heartbeater for collecting memory metrics
_heartbeater = new Heartbeater(env.memoryManager, reportHeartBeat, "driver-heartbeater",
_heartbeater = new Heartbeater(env.memoryManager,
() => SparkContext.this.reportHeartBeat(),
"driver-heartbeater",
conf.get(EXECUTOR_HEARTBEAT_INTERVAL))
_heartbeater.start()

Expand Down
8 changes: 3 additions & 5 deletions core/src/main/scala/org/apache/spark/api/r/RBackend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.api.r

import java.io.{DataInputStream, DataOutputStream, File, FileOutputStream, IOException}
import java.io.{DataOutputStream, File, FileOutputStream, IOException}
import java.net.{InetAddress, InetSocketAddress, ServerSocket, Socket}
import java.util.concurrent.TimeUnit

Expand All @@ -32,8 +32,6 @@ import io.netty.handler.timeout.ReadTimeoutHandler

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.util.Utils

/**
* Netty-based backend server that is used to communicate between R and Java.
Expand Down Expand Up @@ -99,7 +97,7 @@ private[spark] class RBackend {
if (bootstrap != null && bootstrap.config().group() != null) {
bootstrap.config().group().shutdownGracefully()
}
if (bootstrap != null && bootstrap.childGroup() != null) {
if (bootstrap != null && bootstrap.config().childGroup() != null) {
bootstrap.config().childGroup().shutdownGracefully()
}
bootstrap = null
Expand Down Expand Up @@ -147,7 +145,7 @@ private[spark] object RBackend extends Logging {
new Thread("wait for socket to close") {
setDaemon(true)
override def run(): Unit = {
// any un-catched exception will also shutdown JVM
// any uncaught exception will also shutdown JVM
val buf = new Array[Byte](1024)
// shutdown JVM if R does not connect back in 10 seconds
serverSocket.setSoTimeout(10000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,9 @@ private[spark] class HadoopDelegationTokenManager(
}

private def loadProviders(): Map[String, HadoopDelegationTokenProvider] = {
val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystemsToAccess)) ++
val providers = Seq(
new HadoopFSDelegationTokenProvider(
() => HadoopDelegationTokenManager.this.fileSystemsToAccess())) ++
safeCreateProvider(new HiveDelegationTokenProvider) ++
safeCreateProvider(new HBaseDelegationTokenProvider)

Expand Down
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,11 @@ private[spark] class Executor(
private val HEARTBEAT_INTERVAL_MS = conf.get(EXECUTOR_HEARTBEAT_INTERVAL)

// Executor for the heartbeat task.
private val heartbeater = new Heartbeater(env.memoryManager, reportHeartBeat,
"executor-heartbeater", HEARTBEAT_INTERVAL_MS)
private val heartbeater = new Heartbeater(
env.memoryManager,
() => Executor.this.reportHeartBeat(),
"executor-heartbeater",
HEARTBEAT_INTERVAL_MS)

// must be initialized before running startDriverHeartbeat()
private val heartbeatReceiverRef =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.storage.RDDInfo
@DeveloperApi
class StageInfo(
val stageId: Int,
@deprecated("Use attemptNumber instead", "2.3.0") val attemptId: Int,
private val attemptId: Int,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Funny, MiMa didn't warn about this, but will go in release notes

val name: String,
val numTasks: Int,
val rddInfos: Seq[RDDInfo],
Expand All @@ -56,6 +56,8 @@ class StageInfo(
completionTime = Some(System.currentTimeMillis)
}

// This would just be the second constructor arg, except we need to maintain this method
// with parentheses for compatibility
def attemptNumber(): Int = attemptId

private[spark] def getStatusString: String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,15 +215,15 @@ class ParallelCollectionSplitSuite extends SparkFunSuite with Checkers {
}

test("exclusive ranges of doubles") {
val data = 1.0 until 100.0 by 1.0
val data = Range.BigDecimal(1, 100, 1)
val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3)
assert(slices.map(_.size).sum === 99)
assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
}

test("inclusive ranges of doubles") {
val data = 1.0 to 100.0 by 1.0
val data = Range.BigDecimal.inclusive(1, 100, 1)
val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3)
assert(slices.map(_.size).sum === 100)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
def check[T: ClassTag](t: T) {
assert(ser.deserialize[T](ser.serialize(t)) === t)
// Check that very long ranges don't get written one element at a time
assert(ser.serialize(t).limit() < 100)
assert(ser.serialize(t).limit() < 200)
}
check(1 to 1000000)
check(1 to 1000000 by 2)
Expand All @@ -212,10 +212,10 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
check(1L to 1000000L by 2L)
check(1L until 1000000L)
check(1L until 1000000L by 2L)
check(1.0 to 1000000.0 by 1.0)
check(1.0 to 1000000.0 by 2.0)
check(1.0 until 1000000.0 by 1.0)
check(1.0 until 1000000.0 by 2.0)
check(Range.BigDecimal.inclusive(1, 1000000, 1))
check(Range.BigDecimal.inclusive(1, 1000000, 2))
check(Range.BigDecimal(1, 1000000, 1))
check(Range.BigDecimal(1, 1000000, 2))
}

test("asJavaIterable") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
package org.apache.spark.status

import java.io.File
import java.lang.{Integer => JInteger, Long => JLong}
import java.util.{Arrays, Date, Properties}
import java.util.{Date, Properties}

import scala.collection.JavaConverters._
import scala.collection.immutable.Map
Expand Down Expand Up @@ -1171,12 +1170,12 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
// Stop task 2 before task 1
time += 1
tasks(1).markFinished(TaskState.FINISHED, time)
listener.onTaskEnd(
SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", Success, tasks(1), null))
listener.onTaskEnd(SparkListenerTaskEnd(
stage1.stageId, stage1.attemptNumber, "taskType", Success, tasks(1), null))
time += 1
tasks(0).markFinished(TaskState.FINISHED, time)
listener.onTaskEnd(
SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", Success, tasks(0), null))
listener.onTaskEnd(SparkListenerTaskEnd(
stage1.stageId, stage1.attemptNumber, "taskType", Success, tasks(0), null))

// Start task 3 and task 2 should be evicted.
listener.onTaskStart(SparkListenerTaskStart(stage1.stageId, stage1.attemptNumber, tasks(2)))
Expand Down Expand Up @@ -1241,8 +1240,8 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
// Task 1 Finished
time += 1
tasks(0).markFinished(TaskState.FINISHED, time)
listener.onTaskEnd(
SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", Success, tasks(0), null))
listener.onTaskEnd(SparkListenerTaskEnd(
stage1.stageId, stage1.attemptNumber, "taskType", Success, tasks(0), null))

// Stage 1 Completed
stage1.failureReason = Some("Failed")
Expand All @@ -1256,7 +1255,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
time += 1
tasks(1).markFinished(TaskState.FINISHED, time)
listener.onTaskEnd(
SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType",
SparkListenerTaskEnd(stage1.stageId, stage1.attemptNumber, "taskType",
TaskKilled(reason = "Killed"), tasks(1), null))

// Ensure killed task metrics are updated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.util.collection

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.ref.WeakReference

import org.scalatest.Matchers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
test("test NULL avro type") {
withTempPath { dir =>
val fields =
Seq(new Field("null", Schema.create(Type.NULL), "doc", null)).asJava
Seq(new Field("null", Schema.create(Type.NULL), "doc", null.asInstanceOf[AnyVal])).asJava
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be an exact workalike invocation here, just works around a deprecation

val schema = Schema.createRecord("name", "docs", "namespace", false)
schema.setFields(fields)
val datumWriter = new GenericDatumWriter[GenericRecord](schema)
Expand All @@ -161,7 +161,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
val avroSchema: Schema = {
val union =
Schema.createUnion(List(Schema.create(Type.INT), Schema.create(Type.LONG)).asJava)
val fields = Seq(new Field("field1", union, "doc", null)).asJava
val fields = Seq(new Field("field1", union, "doc", null.asInstanceOf[AnyVal])).asJava
val schema = Schema.createRecord("name", "docs", "namespace", false)
schema.setFields(fields)
schema
Expand Down Expand Up @@ -189,7 +189,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
val avroSchema: Schema = {
val union =
Schema.createUnion(List(Schema.create(Type.FLOAT), Schema.create(Type.DOUBLE)).asJava)
val fields = Seq(new Field("field1", union, "doc", null)).asJava
val fields = Seq(new Field("field1", union, "doc", null.asInstanceOf[AnyVal])).asJava
val schema = Schema.createRecord("name", "docs", "namespace", false)
schema.setFields(fields)
schema
Expand Down Expand Up @@ -221,7 +221,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
Schema.create(Type.NULL)
).asJava
)
val fields = Seq(new Field("field1", union, "doc", null)).asJava
val fields = Seq(new Field("field1", union, "doc", null.asInstanceOf[AnyVal])).asJava
val schema = Schema.createRecord("name", "docs", "namespace", false)
schema.setFields(fields)
schema
Expand All @@ -247,7 +247,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
test("Union of a single type") {
withTempPath { dir =>
val UnionOfOne = Schema.createUnion(List(Schema.create(Type.INT)).asJava)
val fields = Seq(new Field("field1", UnionOfOne, "doc", null)).asJava
val fields = Seq(new Field("field1", UnionOfOne, "doc", null.asInstanceOf[AnyVal])).asJava
val schema = Schema.createRecord("name", "docs", "namespace", false)
schema.setFields(fields)

Expand All @@ -274,10 +274,10 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
val complexUnionType = Schema.createUnion(
List(Schema.create(Type.INT), Schema.create(Type.STRING), fixedSchema, enumSchema).asJava)
val fields = Seq(
new Field("field1", complexUnionType, "doc", null),
new Field("field2", complexUnionType, "doc", null),
new Field("field3", complexUnionType, "doc", null),
new Field("field4", complexUnionType, "doc", null)
new Field("field1", complexUnionType, "doc", null.asInstanceOf[AnyVal]),
new Field("field2", complexUnionType, "doc", null.asInstanceOf[AnyVal]),
new Field("field3", complexUnionType, "doc", null.asInstanceOf[AnyVal]),
new Field("field4", complexUnionType, "doc", null.asInstanceOf[AnyVal])
).asJava
val schema = Schema.createRecord("name", "docs", "namespace", false)
schema.setFields(fields)
Expand Down Expand Up @@ -941,7 +941,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
val avroArrayType = resolveNullable(Schema.createArray(avroType), nullable)
val avroMapType = resolveNullable(Schema.createMap(avroType), nullable)
val name = "foo"
val avroField = new Field(name, avroType, "", null)
val avroField = new Field(name, avroType, "", null.asInstanceOf[AnyVal])
val recordSchema = Schema.createRecord("name", "doc", "space", true, Seq(avroField).asJava)
val avroRecordType = resolveNullable(recordSchema, nullable)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ trait KafkaContinuousTest extends KafkaSourceTest {
}

// Continuous processing tasks end asynchronously, so test that they actually end.
private val tasksEndedListener = new SparkListener() {
private class TasksEndedListener extends SparkListener {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Complains about existential types if you access a method in an anonymous inner class

val activeTaskIdCount = new AtomicInteger(0)

override def onTaskStart(start: SparkListenerTaskStart): Unit = {
Expand All @@ -68,6 +68,8 @@ trait KafkaContinuousTest extends KafkaSourceTest {
}
}

private val tasksEndedListener = new TasksEndedListener()

override def beforeEach(): Unit = {
super.beforeEach()
spark.sparkContext.addSparkListener(tasksEndedListener)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ object ConsumerStrategies {
new Subscribe[K, V](
new ju.ArrayList(topics.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava))
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(jl.Long.valueOf).asJava))
}

/**
Expand Down Expand Up @@ -307,7 +307,7 @@ object ConsumerStrategies {
new SubscribePattern[K, V](
pattern,
new ju.HashMap[String, Object](kafkaParams.asJava),
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava))
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(jl.Long.valueOf).asJava))
}

/**
Expand Down Expand Up @@ -391,7 +391,7 @@ object ConsumerStrategies {
new Assign[K, V](
new ju.ArrayList(topicPartitions.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava))
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(jl.Long.valueOf).asJava))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.ml.feature

import scala.beans.BeanInfo

import org.apache.spark.annotation.Since
import org.apache.spark.ml.linalg.Vector

Expand All @@ -30,8 +28,12 @@ import org.apache.spark.ml.linalg.Vector
* @param features List of features for this data point.
*/
@Since("2.0.0")
@BeanInfo
case class LabeledPoint(@Since("2.0.0") label: Double, @Since("2.0.0") features: Vector) {

def getLabel: Double = label
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are added to explicitly add what the companion BeanInfo class was implicitly adding


def getFeatures: Vector = features

override def toString: String = {
s"($label,$features)"
}
Expand Down
Loading