Skip to content

Commit 064985b

Browse files
committed
Merge remote-tracking branch 'upstream/master' into mllib-stats-api-check
2 parents ee918e9 + 32096c2 commit 064985b

File tree

17 files changed

+393
-247
lines changed

17 files changed

+393
-247
lines changed

bin/spark-sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
2929
FWDIR="$(cd `dirname $0`/..; pwd)"
3030

3131
function usage {
32-
echo "Usage: ./sbin/spark-sql [options] [cli option]"
32+
echo "Usage: ./bin/spark-sql [options] [cli option]"
3333
pattern="usage"
3434
pattern+="\|Spark assembly has been built with Hive"
3535
pattern+="\|NOTE: SPARK_PREPEND_CLASSES is set"

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1004,7 +1004,7 @@ abstract class RDD[T: ClassTag](
10041004
},
10051005
(h1: HyperLogLogPlus, h2: HyperLogLogPlus) => {
10061006
h1.addAll(h2)
1007-
h2
1007+
h1
10081008
}).cardinality()
10091009
}
10101010

core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,11 @@ class RDDSuite extends FunSuite with SharedSparkContext {
8181

8282
def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble
8383

84-
val size = 100
85-
val uniformDistro = for (i <- 1 to 100000) yield i % size
86-
val simpleRdd = sc.makeRDD(uniformDistro)
87-
assert(error(simpleRdd.countApproxDistinct(4, 0), size) < 0.4)
88-
assert(error(simpleRdd.countApproxDistinct(8, 0), size) < 0.1)
84+
val size = 1000
85+
val uniformDistro = for (i <- 1 to 5000) yield i % size
86+
val simpleRdd = sc.makeRDD(uniformDistro, 10)
87+
assert(error(simpleRdd.countApproxDistinct(8, 0), size) < 0.2)
88+
assert(error(simpleRdd.countApproxDistinct(12, 0), size) < 0.1)
8989
}
9090

9191
test("SparkContext.union") {

dev/create-release/create-release.sh

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@ RELEASE_VERSION=${RELEASE_VERSION:-1.0.0}
3535
RC_NAME=${RC_NAME:-rc2}
3636
USER_NAME=${USER_NAME:-pwendell}
3737

38+
if [ -z "$JAVA_HOME" ]; then
39+
echo "Error: JAVA_HOME is not set, cannot proceed."
40+
exit -1
41+
fi
42+
JAVA_7_HOME=${JAVA_7_HOME:-$JAVA_HOME}
43+
3844
set -e
3945

4046
GIT_TAG=v$RELEASE_VERSION-$RC_NAME
@@ -130,7 +136,8 @@ scp spark-* \
130136
cd spark
131137
sbt/sbt clean
132138
cd docs
133-
PRODUCTION=1 jekyll build
139+
# Compile docs with Java 7 to use nicer format
140+
JAVA_HOME=$JAVA_7_HOME PRODUCTION=1 jekyll build
134141
echo "Copying release documentation"
135142
rc_docs_folder=${rc_folder}-docs
136143
ssh $USER_NAME@people.apache.org \

mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,14 @@ import org.apache.spark.annotation.DeveloperApi
2525
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
2626
import org.apache.spark.mllib.classification._
2727
import org.apache.spark.mllib.clustering._
28-
import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors}
2928
import org.apache.spark.mllib.optimization._
3029
import org.apache.spark.mllib.linalg.{Matrix, SparseVector, Vector, Vectors}
3130
import org.apache.spark.mllib.random.{RandomRDDGenerators => RG}
3231
import org.apache.spark.mllib.recommendation._
3332
import org.apache.spark.mllib.regression._
34-
import org.apache.spark.mllib.tree.configuration.Algo._
35-
import org.apache.spark.mllib.tree.configuration.Strategy
33+
import org.apache.spark.mllib.tree.configuration.{Algo, Strategy}
3634
import org.apache.spark.mllib.tree.DecisionTree
37-
import org.apache.spark.mllib.tree.impurity.{Entropy, Gini, Impurity, Variance}
35+
import org.apache.spark.mllib.tree.impurity._
3836
import org.apache.spark.mllib.tree.model.DecisionTreeModel
3937
import org.apache.spark.mllib.stat.Statistics
4038
import org.apache.spark.mllib.stat.correlation.CorrelationNames
@@ -523,17 +521,8 @@ class PythonMLLibAPI extends Serializable {
523521

524522
val data = dataBytesJRDD.rdd.map(deserializeLabeledPoint)
525523

526-
val algo: Algo = algoStr match {
527-
case "classification" => Classification
528-
case "regression" => Regression
529-
case _ => throw new IllegalArgumentException(s"Bad algoStr parameter: $algoStr")
530-
}
531-
val impurity: Impurity = impurityStr match {
532-
case "gini" => Gini
533-
case "entropy" => Entropy
534-
case "variance" => Variance
535-
case _ => throw new IllegalArgumentException(s"Bad impurityStr parameter: $impurityStr")
536-
}
524+
val algo = Algo.fromString(algoStr)
525+
val impurity = Impurities.fromString(impurityStr)
537526

538527
val strategy = new Strategy(
539528
algo = algo,

mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala

Lines changed: 61 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -36,87 +36,25 @@ class IDF {
3636

3737
// TODO: Allow different IDF formulations.
3838

39-
private var brzIdf: BDV[Double] = _
40-
4139
/**
4240
* Computes the inverse document frequency.
4341
* @param dataset an RDD of term frequency vectors
4442
*/
45-
def fit(dataset: RDD[Vector]): this.type = {
46-
brzIdf = dataset.treeAggregate(new IDF.DocumentFrequencyAggregator)(
43+
def fit(dataset: RDD[Vector]): IDFModel = {
44+
val idf = dataset.treeAggregate(new IDF.DocumentFrequencyAggregator)(
4745
seqOp = (df, v) => df.add(v),
4846
combOp = (df1, df2) => df1.merge(df2)
4947
).idf()
50-
this
48+
new IDFModel(idf)
5149
}
5250

5351
/**
5452
* Computes the inverse document frequency.
5553
* @param dataset a JavaRDD of term frequency vectors
5654
*/
57-
def fit(dataset: JavaRDD[Vector]): this.type = {
55+
def fit(dataset: JavaRDD[Vector]): IDFModel = {
5856
fit(dataset.rdd)
5957
}
60-
61-
/**
62-
* Transforms term frequency (TF) vectors to TF-IDF vectors.
63-
* @param dataset an RDD of term frequency vectors
64-
* @return an RDD of TF-IDF vectors
65-
*/
66-
def transform(dataset: RDD[Vector]): RDD[Vector] = {
67-
if (!initialized) {
68-
throw new IllegalStateException("Haven't learned IDF yet. Call fit first.")
69-
}
70-
val theIdf = brzIdf
71-
val bcIdf = dataset.context.broadcast(theIdf)
72-
dataset.mapPartitions { iter =>
73-
val thisIdf = bcIdf.value
74-
iter.map { v =>
75-
val n = v.size
76-
v match {
77-
case sv: SparseVector =>
78-
val nnz = sv.indices.size
79-
val newValues = new Array[Double](nnz)
80-
var k = 0
81-
while (k < nnz) {
82-
newValues(k) = sv.values(k) * thisIdf(sv.indices(k))
83-
k += 1
84-
}
85-
Vectors.sparse(n, sv.indices, newValues)
86-
case dv: DenseVector =>
87-
val newValues = new Array[Double](n)
88-
var j = 0
89-
while (j < n) {
90-
newValues(j) = dv.values(j) * thisIdf(j)
91-
j += 1
92-
}
93-
Vectors.dense(newValues)
94-
case other =>
95-
throw new UnsupportedOperationException(
96-
s"Only sparse and dense vectors are supported but got ${other.getClass}.")
97-
}
98-
}
99-
}
100-
}
101-
102-
/**
103-
* Transforms term frequency (TF) vectors to TF-IDF vectors (Java version).
104-
* @param dataset a JavaRDD of term frequency vectors
105-
* @return a JavaRDD of TF-IDF vectors
106-
*/
107-
def transform(dataset: JavaRDD[Vector]): JavaRDD[Vector] = {
108-
transform(dataset.rdd).toJavaRDD()
109-
}
110-
111-
/** Returns the IDF vector. */
112-
def idf(): Vector = {
113-
if (!initialized) {
114-
throw new IllegalStateException("Haven't learned IDF yet. Call fit first.")
115-
}
116-
Vectors.fromBreeze(brzIdf)
117-
}
118-
119-
private def initialized: Boolean = brzIdf != null
12058
}
12159

12260
private object IDF {
@@ -177,18 +115,72 @@ private object IDF {
177115
private def isEmpty: Boolean = m == 0L
178116

179117
/** Returns the current IDF vector. */
180-
def idf(): BDV[Double] = {
118+
def idf(): Vector = {
181119
if (isEmpty) {
182120
throw new IllegalStateException("Haven't seen any document yet.")
183121
}
184122
val n = df.length
185-
val inv = BDV.zeros[Double](n)
123+
val inv = new Array[Double](n)
186124
var j = 0
187125
while (j < n) {
188126
inv(j) = math.log((m + 1.0)/ (df(j) + 1.0))
189127
j += 1
190128
}
191-
inv
129+
Vectors.dense(inv)
192130
}
193131
}
194132
}
133+
134+
/**
135+
* :: Experimental ::
136+
* Represents an IDF model that can transform term frequency vectors.
137+
*/
138+
@Experimental
139+
class IDFModel private[mllib] (val idf: Vector) extends Serializable {
140+
141+
/**
142+
* Transforms term frequency (TF) vectors to TF-IDF vectors.
143+
* @param dataset an RDD of term frequency vectors
144+
* @return an RDD of TF-IDF vectors
145+
*/
146+
def transform(dataset: RDD[Vector]): RDD[Vector] = {
147+
val bcIdf = dataset.context.broadcast(idf)
148+
dataset.mapPartitions { iter =>
149+
val thisIdf = bcIdf.value
150+
iter.map { v =>
151+
val n = v.size
152+
v match {
153+
case sv: SparseVector =>
154+
val nnz = sv.indices.size
155+
val newValues = new Array[Double](nnz)
156+
var k = 0
157+
while (k < nnz) {
158+
newValues(k) = sv.values(k) * thisIdf(sv.indices(k))
159+
k += 1
160+
}
161+
Vectors.sparse(n, sv.indices, newValues)
162+
case dv: DenseVector =>
163+
val newValues = new Array[Double](n)
164+
var j = 0
165+
while (j < n) {
166+
newValues(j) = dv.values(j) * thisIdf(j)
167+
j += 1
168+
}
169+
Vectors.dense(newValues)
170+
case other =>
171+
throw new UnsupportedOperationException(
172+
s"Only sparse and dense vectors are supported but got ${other.getClass}.")
173+
}
174+
}
175+
}
176+
}
177+
178+
/**
179+
* Transforms term frequency (TF) vectors to TF-IDF vectors (Java version).
180+
* @param dataset a JavaRDD of term frequency vectors
181+
* @return a JavaRDD of TF-IDF vectors
182+
*/
183+
def transform(dataset: JavaRDD[Vector]): JavaRDD[Vector] = {
184+
transform(dataset.rdd).toJavaRDD()
185+
}
186+
}

mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala

Lines changed: 35 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717

1818
package org.apache.spark.mllib.feature
1919

20-
import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV}
20+
import breeze.linalg.{DenseVector => BDV, SparseVector => BSV}
2121

22+
import org.apache.spark.Logging
2223
import org.apache.spark.annotation.Experimental
2324
import org.apache.spark.mllib.linalg.{Vector, Vectors}
2425
import org.apache.spark.mllib.rdd.RDDFunctions._
@@ -35,37 +36,55 @@ import org.apache.spark.rdd.RDD
3536
* @param withStd True by default. Scales the data to unit standard deviation.
3637
*/
3738
@Experimental
38-
class StandardScaler(withMean: Boolean, withStd: Boolean) extends VectorTransformer {
39+
class StandardScaler(withMean: Boolean, withStd: Boolean) extends Logging {
3940

4041
def this() = this(false, true)
4142

42-
require(withMean || withStd, s"withMean and withStd both equal to false. Doing nothing.")
43-
44-
private var mean: BV[Double] = _
45-
private var factor: BV[Double] = _
43+
if (!(withMean || withStd)) {
44+
logWarning("Both withMean and withStd are false. The model does nothing.")
45+
}
4646

4747
/**
4848
* Computes the mean and variance and stores as a model to be used for later scaling.
4949
*
5050
* @param data The data used to compute the mean and variance to build the transformation model.
51-
* @return This StandardScalar object.
51+
* @return a StandardScalarModel
5252
*/
53-
def fit(data: RDD[Vector]): this.type = {
53+
def fit(data: RDD[Vector]): StandardScalerModel = {
54+
// TODO: skip computation if both withMean and withStd are false
5455
val summary = data.treeAggregate(new MultivariateOnlineSummarizer)(
5556
(aggregator, data) => aggregator.add(data),
5657
(aggregator1, aggregator2) => aggregator1.merge(aggregator2))
58+
new StandardScalerModel(withMean, withStd, summary.mean, summary.variance)
59+
}
60+
}
5761

58-
mean = summary.mean.toBreeze
59-
factor = summary.variance.toBreeze
60-
require(mean.length == factor.length)
62+
/**
63+
* :: Experimental ::
64+
* Represents a StandardScaler model that can transform vectors.
65+
*
66+
* @param withMean whether to center the data before scaling
67+
* @param withStd whether to scale the data to have unit standard deviation
68+
* @param mean column mean values
69+
* @param variance column variance values
70+
*/
71+
@Experimental
72+
class StandardScalerModel private[mllib] (
73+
val withMean: Boolean,
74+
val withStd: Boolean,
75+
val mean: Vector,
76+
val variance: Vector) extends VectorTransformer {
77+
78+
require(mean.size == variance.size)
6179

80+
private lazy val factor: BDV[Double] = {
81+
val f = BDV.zeros[Double](variance.size)
6282
var i = 0
63-
while (i < factor.length) {
64-
factor(i) = if (factor(i) != 0.0) 1.0 / math.sqrt(factor(i)) else 0.0
83+
while (i < f.size) {
84+
f(i) = if (variance(i) != 0.0) 1.0 / math.sqrt(variance(i)) else 0.0
6585
i += 1
6686
}
67-
68-
this
87+
f
6988
}
7089

7190
/**
@@ -76,13 +95,7 @@ class StandardScaler(withMean: Boolean, withStd: Boolean) extends VectorTransfor
7695
* for the column with zero variance.
7796
*/
7897
override def transform(vector: Vector): Vector = {
79-
if (mean == null || factor == null) {
80-
throw new IllegalStateException(
81-
"Haven't learned column summary statistics yet. Call fit first.")
82-
}
83-
84-
require(vector.size == mean.length)
85-
98+
require(mean.size == vector.size)
8699
if (withMean) {
87100
vector.toBreeze match {
88101
case dv: BDV[Double] =>
@@ -115,5 +128,4 @@ class StandardScaler(withMean: Boolean, withStd: Boolean) extends VectorTransfor
115128
vector
116129
}
117130
}
118-
119131
}

0 commit comments

Comments
 (0)