Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
val weight = (1.0 / fraction).toFloat
candidates ++= reSampled.map(x => (x, weight))
}
RangePartitioner.determineBounds(candidates, partitions)
RangePartitioner.determineBounds(candidates, math.min(partitions, candidates.size))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,11 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
val iter = endpoints.keySet().iterator()
while (iter.hasNext) {
val name = iter.next
postMessage(name, message, (e) => logWarning(s"Message $message dropped. ${e.getMessage}"))
}
postMessage(name, message, (e) => { e match {
case e: RpcEnvStoppedException => logDebug (s"Message $message dropped. ${e.getMessage}")
case e: Throwable => logWarning(s"Message $message dropped. ${e.getMessage}")
}}
)}
}

/** Posts a message sent by a remote endpoint. */
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,12 @@ private[netty] class Inbox(
try action catch {
case NonFatal(e) =>
try endpoint.onError(e) catch {
case NonFatal(ee) => logError(s"Ignoring error", ee)
case NonFatal(ee) =>
if (stopped) {
logDebug("Ignoring error", ee)
} else {
logError("Ignoring error", ee)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ private[netty] class NettyRpcEnv(
try {
dispatcher.postOneWayMessage(message)
} catch {
case e: RpcEnvStoppedException => logWarning(e.getMessage)
case e: RpcEnvStoppedException => logDebug(e.getMessage)
}
} else {
// Message to a remote RPC endpoint.
Expand All @@ -203,7 +203,10 @@ private[netty] class NettyRpcEnv(

def onFailure(e: Throwable): Unit = {
if (!promise.tryFailure(e)) {
logWarning(s"Ignored failure: $e")
e match {
case e : RpcEnvStoppedException => logDebug (s"Ignored failure: $e")
case _ => logWarning(s"Ignored failure: $e")
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private[netty] case class OneWayOutboxMessage(content: ByteBuffer) extends Outbo

override def onFailure(e: Throwable): Unit = {
e match {
case e1: RpcEnvStoppedException => logWarning(e1.getMessage)
case e1: RpcEnvStoppedException => logDebug(e1.getMessage)
case e1: Throwable => logWarning(s"Failed to send one-way RPC.", e1)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus {
def post(event: SparkListenerEvent): Unit = {
if (stopped.get) {
// Drop further events to make `listenerThread` exit ASAP
logError(s"$name has already stopped! Dropping event $event")
logDebug(s"$name has already stopped! Dropping event $event")
return
}
metrics.numEventsPosted.inc()
Expand Down
6 changes: 6 additions & 0 deletions core/src/test/scala/org/apache/spark/PartitioningSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,12 @@ class PartitioningSuite extends SparkFunSuite with SharedSparkContext with Priva

// Add other tests here for classes that should be able to handle empty partitions correctly
}

test("Number of elements in RDD is less than number of partitions") {
val rdd = sc.parallelize(1 to 3).map(x => (x, x))
val partitioner = new RangePartitioner(22, rdd)
assert(partitioner.numPartitions === 3)
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,26 @@ private[ml] object ValidatorParams {
extraMetadata: Option[JObject] = None): Unit = {
import org.json4s.JsonDSL._

var numParamsNotJson = 0
val estimatorParamMapsJson = compact(render(
instance.getEstimatorParamMaps.map { case paramMap =>
paramMap.toSeq.map { case ParamPair(p, v) =>
Map("parent" -> p.parent, "name" -> p.name, "value" -> p.jsonEncode(v))
v match {
case writeableObj: DefaultParamsWritable =>
val relativePath = "epm_" + p.name + numParamsNotJson
val paramPath = new Path(path, relativePath).toString
numParamsNotJson += 1
writeableObj.save(paramPath)
Map("parent" -> p.parent, "name" -> p.name,
"value" -> compact(render(JString(relativePath))),
"isJson" -> compact(render(JBool(false))))
case _: MLWritable =>
throw new NotImplementedError("ValidatorParams.saveImpl does not handle parameters " +
"of type: MLWritable that are not DefaultParamsWritable")
case _ =>
Map("parent" -> p.parent, "name" -> p.name, "value" -> p.jsonEncode(v),
"isJson" -> compact(render(JBool(true))))
}
}
}.toSeq
))
Expand Down Expand Up @@ -183,8 +199,17 @@ private[ml] object ValidatorParams {
val paramPairs = pMap.map { case pInfo: Map[String, String] =>
val est = uidToParams(pInfo("parent"))
val param = est.getParam(pInfo("name"))
val value = param.jsonDecode(pInfo("value"))
param -> value
// [Spark-21221] introduced the isJson field
if (!pInfo.contains("isJson") ||
(pInfo.contains("isJson") && pInfo("isJson").toBoolean.booleanValue())) {
val value = param.jsonDecode(pInfo("value"))
param -> value
} else {
val relativePath = param.jsonDecode(pInfo("value")).toString
val value = DefaultParamsReader
.loadParamsInstance[MLWritable](new Path(path, relativePath).toString, sc)
param -> value
}
}
ParamMap(paramPairs: _*)
}.toArray
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ package org.apache.spark.ml.tuning

import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.{Estimator, Model, Pipeline}
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel}
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel, OneVsRest}
import org.apache.spark.ml.classification.LogisticRegressionSuite.generateLogisticInput
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, Evaluator, RegressionEvaluator}
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, Evaluator, MulticlassClassificationEvaluator, RegressionEvaluator}
import org.apache.spark.ml.feature.HashingTF
import org.apache.spark.ml.linalg.{DenseMatrix, Vectors}
import org.apache.spark.ml.param.{ParamMap, ParamPair}
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.param.shared.HasInputCol
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
Expand Down Expand Up @@ -153,7 +153,76 @@ class CrossValidatorSuite
s" LogisticRegression but found ${other.getClass.getName}")
}

CrossValidatorSuite.compareParamMaps(cv.getEstimatorParamMaps, cv2.getEstimatorParamMaps)
ValidatorParamsSuiteHelpers
.compareParamMaps(cv.getEstimatorParamMaps, cv2.getEstimatorParamMaps)
}

test("read/write: CrossValidator with nested estimator") {
val ova = new OneVsRest().setClassifier(new LogisticRegression)
val evaluator = new MulticlassClassificationEvaluator()
.setMetricName("accuracy")
val classifier1 = new LogisticRegression().setRegParam(2.0)
val classifier2 = new LogisticRegression().setRegParam(3.0)
// params that are not JSON serializable must inherit from Params
val paramMaps = new ParamGridBuilder()
.addGrid(ova.classifier, Array(classifier1, classifier2))
.build()
val cv = new CrossValidator()
.setEstimator(ova)
.setEvaluator(evaluator)
.setNumFolds(20)
.setEstimatorParamMaps(paramMaps)

val cv2 = testDefaultReadWrite(cv, testParams = false)

assert(cv.uid === cv2.uid)
assert(cv.getNumFolds === cv2.getNumFolds)
assert(cv.getSeed === cv2.getSeed)

assert(cv2.getEvaluator.isInstanceOf[MulticlassClassificationEvaluator])
val evaluator2 = cv2.getEvaluator.asInstanceOf[MulticlassClassificationEvaluator]
assert(evaluator.uid === evaluator2.uid)
assert(evaluator.getMetricName === evaluator2.getMetricName)

cv2.getEstimator match {
case ova2: OneVsRest =>
assert(ova.uid === ova2.uid)
val classifier = ova2.getClassifier
classifier match {
case lr: LogisticRegression =>
assert(ova.getClassifier.asInstanceOf[LogisticRegression].getMaxIter
=== lr.getMaxIter)
case _ =>
throw new AssertionError(s"Loaded CrossValidator expected estimator of type" +
s" LogisticREgression but found ${classifier.getClass.getName}")
}

case other =>
throw new AssertionError(s"Loaded CrossValidator expected estimator of type" +
s" OneVsRest but found ${other.getClass.getName}")
}

ValidatorParamsSuiteHelpers
.compareParamMaps(cv.getEstimatorParamMaps, cv2.getEstimatorParamMaps)
}

test("read/write: Persistence of nested estimator works if parent directory changes") {
val ova = new OneVsRest().setClassifier(new LogisticRegression)
val evaluator = new MulticlassClassificationEvaluator()
.setMetricName("accuracy")
val classifier1 = new LogisticRegression().setRegParam(2.0)
val classifier2 = new LogisticRegression().setRegParam(3.0)
// params that are not JSON serializable must inherit from Params
val paramMaps = new ParamGridBuilder()
.addGrid(ova.classifier, Array(classifier1, classifier2))
.build()
val cv = new CrossValidator()
.setEstimator(ova)
.setEvaluator(evaluator)
.setNumFolds(20)
.setEstimatorParamMaps(paramMaps)

ValidatorParamsSuiteHelpers.testFileMove(cv)
}

test("read/write: CrossValidator with complex estimator") {
Expand Down Expand Up @@ -193,7 +262,8 @@ class CrossValidatorSuite
assert(cv2.getEvaluator.isInstanceOf[BinaryClassificationEvaluator])
assert(cv.getEvaluator.uid === cv2.getEvaluator.uid)

CrossValidatorSuite.compareParamMaps(cv.getEstimatorParamMaps, cv2.getEstimatorParamMaps)
ValidatorParamsSuiteHelpers
.compareParamMaps(cv.getEstimatorParamMaps, cv2.getEstimatorParamMaps)

cv2.getEstimator match {
case pipeline2: Pipeline =>
Expand All @@ -212,7 +282,8 @@ class CrossValidatorSuite
assert(lrcv.uid === lrcv2.uid)
assert(lrcv2.getEvaluator.isInstanceOf[BinaryClassificationEvaluator])
assert(lrEvaluator.uid === lrcv2.getEvaluator.uid)
CrossValidatorSuite.compareParamMaps(lrParamMaps, lrcv2.getEstimatorParamMaps)
ValidatorParamsSuiteHelpers
.compareParamMaps(lrParamMaps, lrcv2.getEstimatorParamMaps)
case other =>
throw new AssertionError("Loaded Pipeline expected stages (HashingTF, CrossValidator)" +
" but found: " + other.map(_.getClass.getName).mkString(", "))
Expand Down Expand Up @@ -278,7 +349,8 @@ class CrossValidatorSuite
s" LogisticRegression but found ${other.getClass.getName}")
}

CrossValidatorSuite.compareParamMaps(cv.getEstimatorParamMaps, cv2.getEstimatorParamMaps)
ValidatorParamsSuiteHelpers
.compareParamMaps(cv.getEstimatorParamMaps, cv2.getEstimatorParamMaps)

cv2.bestModel match {
case lrModel2: LogisticRegressionModel =>
Expand All @@ -296,21 +368,6 @@ class CrossValidatorSuite

object CrossValidatorSuite extends SparkFunSuite {

/**
* Assert sequences of estimatorParamMaps are identical.
* Params must be simple types comparable with `===`.
*/
def compareParamMaps(pMaps: Array[ParamMap], pMaps2: Array[ParamMap]): Unit = {
assert(pMaps.length === pMaps2.length)
pMaps.zip(pMaps2).foreach { case (pMap, pMap2) =>
assert(pMap.size === pMap2.size)
pMap.toSeq.foreach { case ParamPair(p, v) =>
assert(pMap2.contains(p))
assert(pMap2(p) === v)
}
}
}

abstract class MyModel extends Model[MyModel]

class MyEstimator(override val uid: String) extends Estimator[MyModel] with HasInputCol {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ package org.apache.spark.ml.tuning

import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel}
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel, OneVsRest}
import org.apache.spark.ml.classification.LogisticRegressionSuite.generateLogisticInput
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, Evaluator, RegressionEvaluator}
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.param.{ParamMap}
import org.apache.spark.ml.param.shared.HasInputCol
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
Expand Down Expand Up @@ -95,7 +95,7 @@ class TrainValidationSplitSuite
}

test("transformSchema should check estimatorParamMaps") {
import TrainValidationSplitSuite._
import TrainValidationSplitSuite.{MyEstimator, MyEvaluator}

val est = new MyEstimator("est")
val eval = new MyEvaluator
Expand Down Expand Up @@ -134,6 +134,82 @@ class TrainValidationSplitSuite

assert(tvs.getTrainRatio === tvs2.getTrainRatio)
assert(tvs.getSeed === tvs2.getSeed)

ValidatorParamsSuiteHelpers
.compareParamMaps(tvs.getEstimatorParamMaps, tvs2.getEstimatorParamMaps)

tvs2.getEstimator match {
case lr2: LogisticRegression =>
assert(lr.uid === lr2.uid)
assert(lr.getMaxIter === lr2.getMaxIter)
case other =>
throw new AssertionError(s"Loaded TrainValidationSplit expected estimator of type" +
s" LogisticRegression but found ${other.getClass.getName}")
}
}

test("read/write: TrainValidationSplit with nested estimator") {
val ova = new OneVsRest()
.setClassifier(new LogisticRegression)
val evaluator = new BinaryClassificationEvaluator()
.setMetricName("areaUnderPR") // not default metric
val classifier1 = new LogisticRegression().setRegParam(2.0)
val classifier2 = new LogisticRegression().setRegParam(3.0)
val paramMaps = new ParamGridBuilder()
.addGrid(ova.classifier, Array(classifier1, classifier2))
.build()
val tvs = new TrainValidationSplit()
.setEstimator(ova)
.setEvaluator(evaluator)
.setTrainRatio(0.5)
.setEstimatorParamMaps(paramMaps)
.setSeed(42L)

val tvs2 = testDefaultReadWrite(tvs, testParams = false)

assert(tvs.getTrainRatio === tvs2.getTrainRatio)
assert(tvs.getSeed === tvs2.getSeed)

tvs2.getEstimator match {
case ova2: OneVsRest =>
assert(ova.uid === ova2.uid)
val classifier = ova2.getClassifier
classifier match {
case lr: LogisticRegression =>
assert(ova.getClassifier.asInstanceOf[LogisticRegression].getMaxIter
=== lr.getMaxIter)
case _ =>
throw new AssertionError(s"Loaded TrainValidationSplit expected estimator of type" +
s" LogisticREgression but found ${classifier.getClass.getName}")
}

case other =>
throw new AssertionError(s"Loaded TrainValidationSplit expected estimator of type" +
s" OneVsRest but found ${other.getClass.getName}")
}

ValidatorParamsSuiteHelpers
.compareParamMaps(tvs.getEstimatorParamMaps, tvs2.getEstimatorParamMaps)
}

test("read/write: Persistence of nested estimator works if parent directory changes") {
val ova = new OneVsRest()
.setClassifier(new LogisticRegression)
val evaluator = new BinaryClassificationEvaluator()
.setMetricName("areaUnderPR") // not default metric
val classifier1 = new LogisticRegression().setRegParam(2.0)
val classifier2 = new LogisticRegression().setRegParam(3.0)
val paramMaps = new ParamGridBuilder()
.addGrid(ova.classifier, Array(classifier1, classifier2))
.build()
val tvs = new TrainValidationSplit()
.setEstimator(ova)
.setEvaluator(evaluator)
.setTrainRatio(0.5)
.setEstimatorParamMaps(paramMaps)
.setSeed(42L)

ValidatorParamsSuiteHelpers.testFileMove(tvs)
}

test("read/write: TrainValidationSplitModel") {
Expand All @@ -160,7 +236,7 @@ class TrainValidationSplitSuite
}
}

object TrainValidationSplitSuite {
object TrainValidationSplitSuite extends SparkFunSuite{

abstract class MyModel extends Model[MyModel]

Expand Down
Loading