Skip to content

Commit 943fc4f

Browse files
Merge remote-tracking branch 'asf/master' into am-log-link
2 parents 9e5c04b + 5a3c04b commit 943fc4f

File tree

37 files changed

+583
-158
lines changed

37 files changed

+583
-158
lines changed

core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -386,9 +386,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
386386

387387
/**
388388
* Aggregate the elements of each partition, and then the results for all the partitions, using a
389-
* given associative function and a neutral "zero value". The function op(t1, t2) is allowed to
390-
* modify t1 and return it as its result value to avoid object allocation; however, it should not
391-
* modify t2.
389+
* given associative and commutative function and a neutral "zero value". The function
390+
* op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object
391+
* allocation; however, it should not modify t2.
392+
*
393+
* This behaves somewhat differently from fold operations implemented for non-distributed
394+
* collections in functional languages like Scala. This fold operation may be applied to
395+
* partitions individually, and then fold those results into the final result, rather than
396+
* apply the fold to each element sequentially in some defined ordering. For functions
397+
* that are not commutative, the result may differ from that of a fold applied to a
398+
* non-distributed collection.
392399
*/
393400
def fold(zeroValue: T)(f: JFunction2[T, T, T]): T =
394401
rdd.fold(zeroValue)(f)

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1015,9 +1015,16 @@ abstract class RDD[T: ClassTag](
10151015

10161016
/**
10171017
* Aggregate the elements of each partition, and then the results for all the partitions, using a
1018-
* given associative function and a neutral "zero value". The function op(t1, t2) is allowed to
1019-
* modify t1 and return it as its result value to avoid object allocation; however, it should not
1020-
* modify t2.
1018+
* given associative and commutative function and a neutral "zero value". The function
1019+
* op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object
1020+
* allocation; however, it should not modify t2.
1021+
*
1022+
* This behaves somewhat differently from fold operations implemented for non-distributed
1023+
* collections in functional languages like Scala. This fold operation may be applied to
1024+
* partitions individually, and then fold those results into the final result, rather than
1025+
* apply the fold to each element sequentially in some defined ordering. For functions
1026+
* that are not commutative, the result may differ from that of a fold applied to a
1027+
* non-distributed collection.
10211028
*/
10221029
def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
10231030
// Clone the zero value since we will also be serializing it as part of tasks

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2201,6 +2201,24 @@ private[spark] object Utils extends Logging {
22012201
shutdownHooks.remove(ref)
22022202
}
22032203

2204+
/**
2205+
* To avoid calling `Utils.getCallSite` for every single RDD we create in the body,
2206+
* set a dummy call site that RDDs use instead. This is for performance optimization.
2207+
*/
2208+
def withDummyCallSite[T](sc: SparkContext)(body: => T): T = {
2209+
val oldShortCallSite = sc.getLocalProperty(CallSite.SHORT_FORM)
2210+
val oldLongCallSite = sc.getLocalProperty(CallSite.LONG_FORM)
2211+
try {
2212+
sc.setLocalProperty(CallSite.SHORT_FORM, "")
2213+
sc.setLocalProperty(CallSite.LONG_FORM, "")
2214+
body
2215+
} finally {
2216+
// Restore the old ones here
2217+
sc.setLocalProperty(CallSite.SHORT_FORM, oldShortCallSite)
2218+
sc.setLocalProperty(CallSite.LONG_FORM, oldLongCallSite)
2219+
}
2220+
}
2221+
22042222
}
22052223

22062224
private [util] class SparkShutdownHookManager {

dev/scalastyle

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717
# limitations under the License.
1818
#
1919

20-
echo -e "q\n" | build/sbt -Phive -Phive-thriftserver scalastyle > scalastyle.txt
21-
echo -e "q\n" | build/sbt -Phive -Phive-thriftserver test:scalastyle >> scalastyle.txt
20+
echo -e "q\n" | build/sbt -Pkinesis-asl -Phive -Phive-thriftserver scalastyle > scalastyle.txt
21+
echo -e "q\n" | build/sbt -Pkinesis-asl -Phive -Phive-thriftserver test:scalastyle >> scalastyle.txt
2222
# Check style with YARN built too
23-
echo -e "q\n" | build/sbt -Pyarn -Phadoop-2.2 scalastyle >> scalastyle.txt
24-
echo -e "q\n" | build/sbt -Pyarn -Phadoop-2.2 test:scalastyle >> scalastyle.txt
23+
echo -e "q\n" | build/sbt -Pkinesis-asl -Pyarn -Phadoop-2.2 scalastyle >> scalastyle.txt
24+
echo -e "q\n" | build/sbt -Pkinesis-asl -Pyarn -Phadoop-2.2 test:scalastyle >> scalastyle.txt
2525

2626
ERRORS=$(cat scalastyle.txt | awk '{if($1~/error/)print}')
2727
rm scalastyle.txt

docs/ml-features.md

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -535,5 +535,88 @@ encoded = encoder.transform(indexed)
535535
</div>
536536
</div>
537537

538+
## VectorIndexer
539+
540+
`VectorIndexer` helps index categorical features in datasets of `Vector`s.
541+
It can both automatically decide which features are categorical and convert original values to category indices. Specifically, it does the following:
542+
543+
1. Take an input column of type [Vector](api/scala/index.html#org.apache.spark.mllib.linalg.Vector) and a parameter `maxCategories`.
544+
2. Decide which features should be categorical based on the number of distinct values, where features with at most `maxCategories` are declared categorical.
545+
3. Compute 0-based category indices for each categorical feature.
546+
4. Index categorical features and transform original feature values to indices.
547+
548+
Indexing categorical features allows algorithms such as Decision Trees and Tree Ensembles to treat categorical features appropriately, improving performance.
549+
550+
Please refer to the [VectorIndexer API docs](api/scala/index.html#org.apache.spark.ml.feature.VectorIndexer) for more details.
551+
552+
In the example below, we read in a dataset of labeled points and then use `VectorIndexer` to decide which features should be treated as categorical. We transform the categorical feature values to their indices. This transformed data could then be passed to algorithms such as `DecisionTreeRegressor` that handle categorical features.
553+
554+
<div class="codetabs">
555+
<div data-lang="scala" markdown="1">
556+
{% highlight scala %}
557+
import org.apache.spark.ml.feature.VectorIndexer
558+
import org.apache.spark.mllib.util.MLUtils
559+
560+
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt").toDF()
561+
val indexer = new VectorIndexer()
562+
.setInputCol("features")
563+
.setOutputCol("indexed")
564+
.setMaxCategories(10)
565+
val indexerModel = indexer.fit(data)
566+
val categoricalFeatures: Set[Int] = indexerModel.categoryMaps.keys.toSet
567+
println(s"Chose ${categoricalFeatures.size} categorical features: " +
568+
categoricalFeatures.mkString(", "))
569+
570+
// Create new column "indexed" with categorical values transformed to indices
571+
val indexedData = indexerModel.transform(data)
572+
{% endhighlight %}
573+
</div>
574+
575+
<div data-lang="java" markdown="1">
576+
{% highlight java %}
577+
import java.util.Map;
578+
579+
import org.apache.spark.api.java.JavaRDD;
580+
import org.apache.spark.ml.feature.VectorIndexer;
581+
import org.apache.spark.ml.feature.VectorIndexerModel;
582+
import org.apache.spark.mllib.regression.LabeledPoint;
583+
import org.apache.spark.mllib.util.MLUtils;
584+
import org.apache.spark.sql.DataFrame;
585+
586+
JavaRDD<LabeledPoint> rdd = MLUtils.loadLibSVMFile(sc.sc(),
587+
"data/mllib/sample_libsvm_data.txt").toJavaRDD();
588+
DataFrame data = sqlContext.createDataFrame(rdd, LabeledPoint.class);
589+
VectorIndexer indexer = new VectorIndexer()
590+
.setInputCol("features")
591+
.setOutputCol("indexed")
592+
.setMaxCategories(10);
593+
VectorIndexerModel indexerModel = indexer.fit(data);
594+
Map<Integer, Map<Double, Integer>> categoryMaps = indexerModel.javaCategoryMaps();
595+
System.out.print("Chose " + categoryMaps.size() + "categorical features:");
596+
for (Integer feature : categoryMaps.keySet()) {
597+
System.out.print(" " + feature);
598+
}
599+
System.out.println();
600+
601+
// Create new column "indexed" with categorical values transformed to indices
602+
DataFrame indexedData = indexerModel.transform(data);
603+
{% endhighlight %}
604+
</div>
605+
606+
<div data-lang="python" markdown="1">
607+
{% highlight python %}
608+
from pyspark.ml.feature import VectorIndexer
609+
from pyspark.mllib.util import MLUtils
610+
611+
data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt").toDF()
612+
indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10)
613+
indexerModel = indexer.fit(data)
614+
615+
# Create new column "indexed" with categorical values transformed to indices
616+
indexedData = indexerModel.transform(data)
617+
{% endhighlight %}
618+
</div>
619+
</div>
620+
538621
# Feature Selectors
539622

extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@ import org.apache.spark.util.Utils
3131

3232
private[kinesis]
3333
case class SerializableAWSCredentials(accessKeyId: String, secretKey: String)
34-
extends BasicAWSCredentials(accessKeyId, secretKey) with Serializable
34+
extends AWSCredentials {
35+
override def getAWSAccessKeyId: String = accessKeyId
36+
override def getAWSSecretKey: String = secretKey
37+
}
3538

3639
/**
3740
* Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver.

extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,27 +20,18 @@ import java.nio.ByteBuffer
2020

2121
import scala.collection.JavaConversions.seqAsJavaList
2222

23-
import org.apache.spark.storage.StorageLevel
24-
import org.apache.spark.streaming.Milliseconds
25-
import org.apache.spark.streaming.Seconds
26-
import org.apache.spark.streaming.StreamingContext
27-
import org.apache.spark.streaming.TestSuiteBase
28-
import org.apache.spark.util.{ManualClock, Clock}
29-
30-
import org.mockito.Mockito._
31-
import org.scalatest.BeforeAndAfter
32-
import org.scalatest.Matchers
33-
import org.scalatest.mock.MockitoSugar
34-
35-
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException
36-
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException
37-
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException
38-
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException
23+
import com.amazonaws.services.kinesis.clientlibrary.exceptions.{InvalidStateException, KinesisClientLibDependencyException, ShutdownException, ThrottlingException}
3924
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
4025
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
4126
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
4227
import com.amazonaws.services.kinesis.model.Record
43-
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
28+
import org.mockito.Mockito._
29+
import org.scalatest.{BeforeAndAfter, Matchers}
30+
import org.scalatest.mock.MockitoSugar
31+
32+
import org.apache.spark.storage.StorageLevel
33+
import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext, TestSuiteBase}
34+
import org.apache.spark.util.{Clock, ManualClock, Utils}
4435

4536
/**
4637
* Suite of Kinesis streaming receiver tests focusing mostly on the KinesisRecordProcessor
@@ -66,7 +57,7 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
6657
var checkpointStateMock: KinesisCheckpointState = _
6758
var currentClockMock: Clock = _
6859

69-
override def beforeFunction() = {
60+
override def beforeFunction(): Unit = {
7061
receiverMock = mock[KinesisReceiver]
7162
checkpointerMock = mock[IRecordProcessorCheckpointer]
7263
checkpointClockMock = mock[ManualClock]
@@ -99,6 +90,11 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
9990
ssc.stop()
10091
}
10192

93+
test("check serializability of SerializableAWSCredentials") {
94+
Utils.deserialize[SerializableAWSCredentials](
95+
Utils.serialize(new SerializableAWSCredentials("x", "y")))
96+
}
97+
10298
test("process records including store and checkpoint") {
10399
when(receiverMock.isStopped()).thenReturn(false)
104100
when(checkpointStateMock.shouldCheckpoint()).thenReturn(true)

mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@
1717

1818
package org.apache.spark.ml.feature
1919

20+
import java.lang.{Double => JDouble, Integer => JInt}
21+
import java.util.{Map => JMap}
22+
23+
import scala.collection.JavaConverters._
24+
2025
import org.apache.spark.annotation.AlphaComponent
2126
import org.apache.spark.ml.{Estimator, Model}
2227
import org.apache.spark.ml.attribute._
@@ -248,6 +253,11 @@ class VectorIndexerModel private[ml] (
248253
val categoryMaps: Map[Int, Map[Double, Int]])
249254
extends Model[VectorIndexerModel] with VectorIndexerParams {
250255

256+
/** Java-friendly version of [[categoryMaps]] */
257+
def javaCategoryMaps: JMap[JInt, JMap[JDouble, JInt]] = {
258+
categoryMaps.mapValues(_.asJava).asJava.asInstanceOf[JMap[JInt, JMap[JDouble, JInt]]]
259+
}
260+
251261
/**
252262
* Pre-computed feature attributes, with some missing info.
253263
* In transform(), set attribute name and other info, if available.

mllib/src/main/scala/org/apache/spark/ml/param/params.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -438,19 +438,18 @@ trait Params extends Identifiable with Serializable {
438438
* @param value the default value
439439
*/
440440
protected final def setDefault[T](param: Param[T], value: T): this.type = {
441-
defaultParamMap.put(param, value)
441+
defaultParamMap.put(param -> value)
442442
this
443443
}
444444

445445
/**
446446
* Sets default values for a list of params.
447447
*
448-
* Note: Java developers should use the single-parameter [[setDefault()]].
449-
* Annotating this with varargs causes compilation failures. See SPARK-7498.
450448
* @param paramPairs a list of param pairs that specify params and their default values to set
451449
* respectively. Make sure that the params are initialized before this method
452450
* gets called.
453451
*/
452+
@varargs
454453
protected final def setDefault(paramPairs: ParamPair[_]*): this.type = {
455454
paramPairs.foreach { p =>
456455
setDefault(p.param.asInstanceOf[Param[Any]], p.value)
@@ -559,7 +558,7 @@ final class ParamMap private[ml] (private val map: mutable.Map[Param[Any], Any])
559558
/**
560559
* Puts a (param, value) pair (overwrites if the input param exists).
561560
*/
562-
def put[T](param: Param[T], value: T): this.type = put(ParamPair(param, value))
561+
def put[T](param: Param[T], value: T): this.type = put(param -> value)
563562

564563
/**
565564
* Puts a list of param pairs (overwrites if the input params exists).

mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ class SVMModel (
8989
override protected def formatVersion: String = "1.0"
9090

9191
override def toString: String = {
92-
s"${super.toString}, numClasses = 2, threshold = ${threshold.get}"
92+
s"${super.toString}, numClasses = 2, threshold = ${threshold.getOrElse("None")}"
9393
}
9494
}
9595

0 commit comments

Comments
 (0)