Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
6f41c59
[SPARK-22690][ML] Imputer inherit HasOutputCols
zhengruifeng Dec 6, 2017
813c0f9
[SPARK-22704][SQL] Least and Greatest use less global variables
kiszk Dec 6, 2017
e98f964
[SPARK-22695][SQL] ScalaUDF should not use global variables
mgaido91 Dec 6, 2017
4286cba
[SPARK-22710] ConfigBuilder.fallbackConf should trigger onCreate func…
rxin Dec 6, 2017
51066b4
[SPARK-14228][CORE][YARN] Lost executor of RPC disassociated, and occ…
Dec 6, 2017
effca98
[SPARK-22720][SS] Make EventTimeWatermark Extend UnaryNode
gatorsmile Dec 6, 2017
9948b86
[SPARK-22516][SQL] Bump up Univocity version to 2.5.9
smurakozi Dec 6, 2017
f110a7f
[SPARK-22693][SQL] CreateNamedStruct and InSet should not use global …
mgaido91 Dec 6, 2017
8ae004b
[SPARK-22688][SQL] Upgrade Janino version to 3.0.8
kiszk Dec 7, 2017
d32337b
[SPARK-22721] BytesToBytesMap peak memory usage not accurate after re…
juliuszsompolski Dec 7, 2017
c1e5688
[SPARK-22672][SQL][TEST] Refactor ORC Tests
dongjoon-hyun Dec 7, 2017
e103adf
[SPARK-22703][SQL] make ColumnarRow an immutable view
cloud-fan Dec 7, 2017
ea2fbf4
[SPARK-22705][SQL] Case, Coalesce, and In use less global variables
kiszk Dec 7, 2017
2be4482
[SPARK-22452][SQL] Add getInt, getLong, getBoolean to DataSourceV2Opt…
skambha Dec 7, 2017
beb717f
[SPARK-22618][CORE] Catch exception in removeRDD to stop jobs from dying
Dec 7, 2017
dd59a4b
[SPARK-22712][SQL] Use `buildReaderWithPartitionValues` in native Orc…
dongjoon-hyun Dec 7, 2017
fc29446
[SPARK-22699][SQL] GenerateSafeProjection should not use global varia…
mgaido91 Dec 7, 2017
b790719
[SPARK-22696][SQL] objects functions should not use unneeded global v…
mgaido91 Dec 7, 2017
f41c0a9
[SPARK-22660][BUILD] Use position() and limit() to fix ambiguity issu…
kellyzly Dec 7, 2017
18b75d4
[SPARK-22719][SQL] Refactor ConstantPropagation
gengliangwang Dec 7, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,7 @@ public LongArray getArray() {
* Reset this map to initialized state.
*/
public void reset() {
updatePeakMemoryUsed();
numKeys = 0;
numValues = 0;
freeArray(longArray);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
private def calcChecksum(block: ByteBuffer): Int = {
val adler = new Adler32()
if (block.hasArray) {
adler.update(block.array, block.arrayOffset + block.position, block.limit - block.position)
adler.update(block.array, block.arrayOffset + block.position(), block.limit()
- block.position())
} else {
val bytes = new Array[Byte](block.remaining())
block.duplicate.get(bytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,7 @@ private[spark] class CoarseGrainedExecutorBackend(
}

if (notifyDriver && driver.nonEmpty) {
driver.get.ask[Boolean](
RemoveExecutor(executorId, new ExecutorLossReason(reason))
).failed.foreach(e =>
logWarning(s"Unable to notify the driver due to " + e.getMessage, e)
)(ThreadUtils.sameThread)
driver.get.send(RemoveExecutor(executorId, new ExecutorLossReason(reason)))
}

System.exit(code)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ private[spark] class Executor(
// TODO: do not serialize value twice
val directResult = new DirectTaskResult(valueBytes, accumUpdates)
val serializedDirectResult = ser.serialize(directResult)
val resultSize = serializedDirectResult.limit
val resultSize = serializedDirectResult.limit()

// directSend = sending directly back to the driver
val serializedResult: ByteBuffer = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,9 @@ private[spark] case class ConfigBuilder(key: String) {
}

def fallbackConf[T](fallback: ConfigEntry[T]): ConfigEntry[T] = {
new FallbackConfigEntry(key, _alternatives, _doc, _public, fallback)
val entry = new FallbackConfigEntry(key, _alternatives, _doc, _public, fallback)
_onCreate.foreach(_(entry))
entry
}

def regexConf: TypedConfigBuilder[Regex] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// The num of current max ExecutorId used to re-register appMaster
@volatile protected var currentExecutorIdCounter = 0

private val reviveThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")

class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging {

Expand All @@ -103,9 +106,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp

protected val addressToExecutorId = new HashMap[RpcAddress, String]

private val reviveThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")

override def onStart() {
// Periodically revive offers to allow delay scheduling to work
val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")
Expand Down Expand Up @@ -154,6 +154,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
executorDataMap.values.foreach { ed =>
ed.executorEndpoint.send(UpdateDelegationTokens(newDelegationTokens))
}

case RemoveExecutor(executorId, reason) =>
// We will remove the executor's state and cannot restore it. However, the connection
// between the driver and the executor may be still alive so that the executor won't exit
// automatically, so try to tell the executor to stop itself. See SPARK-13519.
executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor))
removeExecutor(executorId, reason)
}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
Expand Down Expand Up @@ -215,14 +222,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
context.reply(true)

case RemoveExecutor(executorId, reason) =>
// We will remove the executor's state and cannot restore it. However, the connection
// between the driver and the executor may be still alive so that the executor won't exit
// automatically, so try to tell the executor to stop itself. See SPARK-13519.
executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor))
removeExecutor(executorId, reason)
context.reply(true)

case RemoveWorker(workerId, host, message) =>
removeWorker(workerId, host, message)
context.reply(true)
Expand Down Expand Up @@ -288,13 +287,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
val serializedTask = TaskDescription.encode(task)
if (serializedTask.limit >= maxRpcMessageSize) {
if (serializedTask.limit() >= maxRpcMessageSize) {
scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
"spark.rpc.message.maxSize (%d bytes). Consider increasing " +
"spark.rpc.message.maxSize or using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)
msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)
taskSetMgr.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)
Expand Down Expand Up @@ -373,10 +372,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp

shouldDisable
}

override def onStop() {
reviveThread.shutdownNow()
}
}

var driverEndpoint: RpcEndpointRef = null
Expand Down Expand Up @@ -417,6 +412,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}

override def stop() {
reviveThread.shutdownNow()
stopExecutors()
try {
if (driverEndpoint != null) {
Expand Down Expand Up @@ -465,9 +461,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* at once.
*/
protected def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
// Only log the failure since we don't care about the result.
driverEndpoint.ask[Boolean](RemoveExecutor(executorId, reason)).failed.foreach(t =>
logError(t.getMessage, t))(ThreadUtils.sameThread)
driverEndpoint.send(RemoveExecutor(executorId, reason))
}

protected def removeWorker(workerId: String, host: String, message: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.storage

import java.io.IOException
import java.util.{HashMap => JHashMap}

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -159,11 +160,16 @@ class BlockManagerMasterEndpoint(
// Ask the slaves to remove the RDD, and put the result in a sequence of Futures.
// The dispatcher is used as an implicit argument into the Future sequence construction.
val removeMsg = RemoveRdd(rddId)
Future.sequence(
blockManagerInfo.values.map { bm =>
bm.slaveEndpoint.ask[Int](removeMsg)
}.toSeq
)

val futures = blockManagerInfo.values.map { bm =>
bm.slaveEndpoint.ask[Int](removeMsg).recover {
case e: IOException =>
logWarning(s"Error trying to remove RDD $rddId", e)
0 // zero blocks were removed
}
}.toSeq

Future.sequence(futures)
}

private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class ByteBufferInputStream(private var buffer: ByteBuffer)
override def skip(bytes: Long): Long = {
if (buffer != null) {
val amountToSkip = math.min(bytes, buffer.remaining).toInt
buffer.position(buffer.position + amountToSkip)
buffer.position(buffer.position() + amountToSkip)
if (buffer.remaining() == 0) {
cleanUp()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
for (bytes <- getChunks()) {
while (bytes.remaining() > 0) {
val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
bytes.limit(bytes.position + ioSize)
bytes.limit(bytes.position() + ioSize)
channel.write(bytes)
}
}
Expand Down Expand Up @@ -206,7 +206,7 @@ private[spark] class ChunkedByteBufferInputStream(
override def skip(bytes: Long): Long = {
if (currentChunk != null) {
val amountToSkip = math.min(bytes, currentChunk.remaining).toInt
currentChunk.position(currentChunk.position + amountToSkip)
currentChunk.position(currentChunk.position() + amountToSkip)
if (currentChunk.remaining() == 0) {
if (chunks.hasNext) {
currentChunk = chunks.next()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,4 +288,24 @@ class ConfigEntrySuite extends SparkFunSuite {
conf.remove(testKey("b"))
assert(conf.get(iConf) === 3)
}

test("onCreate") {
var onCreateCalled = false
ConfigBuilder(testKey("oc1")).onCreate(_ => onCreateCalled = true).intConf.createWithDefault(1)
assert(onCreateCalled)

onCreateCalled = false
ConfigBuilder(testKey("oc2")).onCreate(_ => onCreateCalled = true).intConf.createOptional
assert(onCreateCalled)

onCreateCalled = false
ConfigBuilder(testKey("oc3")).onCreate(_ => onCreateCalled = true).intConf
.createWithDefaultString("1.0")
assert(onCreateCalled)

val fallback = ConfigBuilder(testKey("oc4")).intConf.createWithDefault(1)
onCreateCalled = false
ConfigBuilder(testKey("oc5")).onCreate(_ => onCreateCalled = true).fallbackConf(fallback)
assert(onCreateCalled)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
def check[T: ClassTag](t: T) {
assert(ser.deserialize[T](ser.serialize(t)) === t)
// Check that very long ranges don't get written one element at a time
assert(ser.serialize(t).limit < 100)
assert(ser.serialize(t).limit() < 100)
}
check(1 to 1000000)
check(1 to 1000000 by 2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class DiskStoreSuite extends SparkFunSuite {
val chunks = chunkedByteBuffer.chunks
assert(chunks.size === 2)
for (chunk <- chunks) {
assert(chunk.limit === 10 * 1024)
assert(chunk.limit() === 10 * 1024)
}

val e = intercept[IllegalArgumentException]{
Expand Down
6 changes: 3 additions & 3 deletions dev/deps/spark-deps-hadoop-2.6
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ commons-beanutils-core-1.8.0.jar
commons-cli-1.2.jar
commons-codec-1.10.jar
commons-collections-3.2.2.jar
commons-compiler-3.0.7.jar
commons-compiler-3.0.8.jar
commons-compress-1.4.1.jar
commons-configuration-1.6.jar
commons-crypto-1.0.0.jar
Expand Down Expand Up @@ -96,7 +96,7 @@ jackson-mapper-asl-1.9.13.jar
jackson-module-paranamer-2.7.9.jar
jackson-module-scala_2.11-2.6.7.1.jar
jackson-xc-1.9.13.jar
janino-3.0.7.jar
janino-3.0.8.jar
java-xmlbuilder-1.1.jar
javassist-3.18.1-GA.jar
javax.annotation-api-1.2.jar
Expand Down Expand Up @@ -180,7 +180,7 @@ stax-api-1.0.1.jar
stream-2.7.0.jar
stringtemplate-3.2.1.jar
super-csv-2.2.0.jar
univocity-parsers-2.5.4.jar
univocity-parsers-2.5.9.jar
validation-api-1.1.0.Final.jar
xbean-asm5-shaded-4.4.jar
xercesImpl-2.9.1.jar
Expand Down
6 changes: 3 additions & 3 deletions dev/deps/spark-deps-hadoop-2.7
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ commons-beanutils-core-1.8.0.jar
commons-cli-1.2.jar
commons-codec-1.10.jar
commons-collections-3.2.2.jar
commons-compiler-3.0.7.jar
commons-compiler-3.0.8.jar
commons-compress-1.4.1.jar
commons-configuration-1.6.jar
commons-crypto-1.0.0.jar
Expand Down Expand Up @@ -96,7 +96,7 @@ jackson-mapper-asl-1.9.13.jar
jackson-module-paranamer-2.7.9.jar
jackson-module-scala_2.11-2.6.7.1.jar
jackson-xc-1.9.13.jar
janino-3.0.7.jar
janino-3.0.8.jar
java-xmlbuilder-1.1.jar
javassist-3.18.1-GA.jar
javax.annotation-api-1.2.jar
Expand Down Expand Up @@ -181,7 +181,7 @@ stax-api-1.0.1.jar
stream-2.7.0.jar
stringtemplate-3.2.1.jar
super-csv-2.2.0.jar
univocity-parsers-2.5.4.jar
univocity-parsers-2.5.9.jar
validation-api-1.1.0.Final.jar
xbean-asm5-shaded-4.4.jar
xercesImpl-2.9.1.jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,9 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
props.put("replica.socket.timeout.ms", "1500")
props.put("delete.topic.enable", "true")
props.put("offsets.topic.num.partitions", "1")
props.putAll(withBrokerProps.asJava)
// Can not use properties.putAll(propsMap.asJava) in scala-2.12
// See https://github.com/scala/bug/issues/10418
withBrokerProps.foreach { case (k, v) => props.put(k, v) }
props
}

Expand Down
14 changes: 2 additions & 12 deletions mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.SparkException
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared.HasInputCols
import org.apache.spark.ml.param.shared.{HasInputCols, HasOutputCols}
import org.apache.spark.ml.util._
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.functions._
Expand All @@ -32,7 +32,7 @@ import org.apache.spark.sql.types._
/**
* Params for [[Imputer]] and [[ImputerModel]].
*/
private[feature] trait ImputerParams extends Params with HasInputCols {
private[feature] trait ImputerParams extends Params with HasInputCols with HasOutputCols {

/**
* The imputation strategy. Currently only "mean" and "median" are supported.
Expand Down Expand Up @@ -63,16 +63,6 @@ private[feature] trait ImputerParams extends Params with HasInputCols {
/** @group getParam */
def getMissingValue: Double = $(missingValue)

/**
* Param for output column names.
* @group param
*/
final val outputCols: StringArrayParam = new StringArrayParam(this, "outputCols",
"output column names")

/** @group getParam */
final def getOutputCols: Array[String] = $(outputCols)

/** Validates and transforms the input schema. */
protected def validateAndTransformSchema(schema: StructType): StructType = {
require($(inputCols).length == $(inputCols).distinct.length, s"inputCols contains" +
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@
<!-- org.apache.commons/commons-lang3/-->
<commons-lang3.version>3.5</commons-lang3.version>
<datanucleus-core.version>3.2.10</datanucleus-core.version>
<janino.version>3.0.7</janino.version>
<janino.version>3.0.8</janino.version>
<jersey.version>2.22.2</jersey.version>
<joda.version>2.9.3</joda.version>
<jodd.version>3.5.2</jodd.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
// No more deletion attempts of the executors.
// This is graceful termination and should not be detected as a failure.
verify(podOperations, times(1)).delete(resolvedPod)
verify(driverEndpointRef, times(1)).ask[Boolean](
verify(driverEndpointRef, times(1)).send(
RemoveExecutor("1", ExecutorExited(
0,
exitCausedByApp = false,
Expand Down Expand Up @@ -318,7 +318,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
requestExecutorRunnable.getValue.run()
allocatorRunnable.getAllValues.asScala.last.run()
verify(podOperations, never()).delete(firstResolvedPod)
verify(driverEndpointRef).ask[Boolean](
verify(driverEndpointRef).send(
RemoveExecutor("1", ExecutorExited(
1,
exitCausedByApp = true,
Expand Down Expand Up @@ -356,7 +356,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
val recreatedResolvedPod = expectPodCreationWithId(2, SECOND_EXECUTOR_POD)
allocatorRunnable.getValue.run()
verify(podOperations).delete(firstResolvedPod)
verify(driverEndpointRef).ask[Boolean](
verify(driverEndpointRef).send(
RemoveExecutor("1", SlaveLost("Executor lost for unknown reasons.")))
}

Expand Down
Loading