Skip to content

Commit 8509519

Browse files
yinxusenmengxr
authored andcommitted
[SPARK-5894] [ML] Add polynomial mapper
See [SPARK-5894](https://issues.apache.org/jira/browse/SPARK-5894). Author: Xusen Yin <[email protected]> Author: Xiangrui Meng <[email protected]> Closes #5245 from yinxusen/SPARK-5894 and squashes the following commits: dc461a6 [Xusen Yin] merge polynomial expansion v2 6d0c3cc [Xusen Yin] Merge branch 'SPARK-5894' of https://github.com/mengxr/spark into mengxr-SPARK-5894 57bfdd5 [Xusen Yin] Merge branch 'master' into SPARK-5894 3d02a7d [Xusen Yin] Merge branch 'master' into SPARK-5894 a067da2 [Xiangrui Meng] a new approach for poly expansion 0789d81 [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into SPARK-5894 4e9aed0 [Xusen Yin] fix test suite 95d8fb9 [Xusen Yin] fix sparse vector indices 8d39674 [Xusen Yin] fix sparse vector expansion error 5998dd6 [Xusen Yin] fix dense vector fillin fa3ade3 [Xusen Yin] change the functional code into imperative one to speedup b70e7e1 [Xusen Yin] remove useless case class 6fa236f [Xusen Yin] fix vector slice error daff601 [Xusen Yin] fix index error of sparse vector 6bd0a10 [Xusen Yin] merge repeated features 419f8a2 [Xusen Yin] need to merge same columns 4ebf34e [Xusen Yin] add test suite of polynomial expansion 372227c [Xusen Yin] add polynomial expansion
1 parent 4c722d7 commit 8509519

File tree

2 files changed

+271
-0
lines changed

2 files changed

+271
-0
lines changed
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.ml.feature
19+
20+
import scala.collection.mutable
21+
22+
import org.apache.spark.annotation.AlphaComponent
23+
import org.apache.spark.ml.UnaryTransformer
24+
import org.apache.spark.ml.param.{IntParam, ParamMap}
25+
import org.apache.spark.mllib.linalg._
26+
import org.apache.spark.sql.types.DataType
27+
28+
/**
29+
* :: AlphaComponent ::
30+
* Perform feature expansion in a polynomial space. As said in wikipedia of Polynomial Expansion,
31+
* which is available at [[http://en.wikipedia.org/wiki/Polynomial_expansion]], "In mathematics, an
32+
* expansion of a product of sums expresses it as a sum of products by using the fact that
33+
* multiplication distributes over addition". Take a 2-variable feature vector as an example:
34+
* `(x, y)`, if we want to expand it with degree 2, then we get `(x, y, x * x, x * y, y * y)`.
35+
*/
36+
@AlphaComponent
37+
class PolynomialExpansion extends UnaryTransformer[Vector, Vector, PolynomialExpansion] {
38+
39+
/**
40+
* The polynomial degree to expand, which should be larger than 1.
41+
* @group param
42+
*/
43+
val degree = new IntParam(this, "degree", "the polynomial degree to expand")
44+
setDefault(degree -> 2)
45+
46+
/** @group getParam */
47+
def getDegree: Int = getOrDefault(degree)
48+
49+
/** @group setParam */
50+
def setDegree(value: Int): this.type = set(degree, value)
51+
52+
override protected def createTransformFunc(paramMap: ParamMap): Vector => Vector = { v =>
53+
val d = paramMap(degree)
54+
PolynomialExpansion.expand(v, d)
55+
}
56+
57+
override protected def outputDataType: DataType = new VectorUDT()
58+
}
59+
60+
/**
61+
* The expansion is done via recursion. Given n features and degree d, the size after expansion is
62+
* (n + d choose d) (including 1 and first-order values). For example, let f([a, b, c], 3) be the
63+
* function that expands [a, b, c] to their monomials of degree 3. We have the following recursion:
64+
*
65+
* {{{
66+
* f([a, b, c], 3) = f([a, b], 3) ++ f([a, b], 2) * c ++ f([a, b], 1) * c^2 ++ [c^3]
67+
* }}}
68+
*
69+
* To handle sparsity, if c is zero, we can skip all monomials that contain it. We remember the
70+
* current index and increment it properly for sparse input.
71+
*/
72+
object PolynomialExpansion {
73+
74+
private def choose(n: Int, k: Int): Int = {
75+
Range(n, n - k, -1).product / Range(k, 1, -1).product
76+
}
77+
78+
private def getPolySize(numFeatures: Int, degree: Int): Int = choose(numFeatures + degree, degree)
79+
80+
private def expandDense(
81+
values: Array[Double],
82+
lastIdx: Int,
83+
degree: Int,
84+
multiplier: Double,
85+
polyValues: Array[Double],
86+
curPolyIdx: Int): Int = {
87+
if (multiplier == 0.0) {
88+
// do nothing
89+
} else if (degree == 0 || lastIdx < 0) {
90+
polyValues(curPolyIdx) = multiplier
91+
} else {
92+
val v = values(lastIdx)
93+
val lastIdx1 = lastIdx - 1
94+
var alpha = multiplier
95+
var i = 0
96+
var curStart = curPolyIdx
97+
while (i <= degree && alpha != 0.0) {
98+
curStart = expandDense(values, lastIdx1, degree - i, alpha, polyValues, curStart)
99+
i += 1
100+
alpha *= v
101+
}
102+
}
103+
curPolyIdx + getPolySize(lastIdx + 1, degree)
104+
}
105+
106+
private def expandSparse(
107+
indices: Array[Int],
108+
values: Array[Double],
109+
lastIdx: Int,
110+
lastFeatureIdx: Int,
111+
degree: Int,
112+
multiplier: Double,
113+
polyIndices: mutable.ArrayBuilder[Int],
114+
polyValues: mutable.ArrayBuilder[Double],
115+
curPolyIdx: Int): Int = {
116+
if (multiplier == 0.0) {
117+
// do nothing
118+
} else if (degree == 0 || lastIdx < 0) {
119+
polyIndices += curPolyIdx
120+
polyValues += multiplier
121+
} else {
122+
// Skip all zeros at the tail.
123+
val v = values(lastIdx)
124+
val lastIdx1 = lastIdx - 1
125+
val lastFeatureIdx1 = indices(lastIdx) - 1
126+
var alpha = multiplier
127+
var curStart = curPolyIdx
128+
var i = 0
129+
while (i <= degree && alpha != 0.0) {
130+
curStart = expandSparse(indices, values, lastIdx1, lastFeatureIdx1, degree - i, alpha,
131+
polyIndices, polyValues, curStart)
132+
i += 1
133+
alpha *= v
134+
}
135+
}
136+
curPolyIdx + getPolySize(lastFeatureIdx + 1, degree)
137+
}
138+
139+
private def expand(dv: DenseVector, degree: Int): DenseVector = {
140+
val n = dv.size
141+
val polySize = getPolySize(n, degree)
142+
val polyValues = new Array[Double](polySize)
143+
expandDense(dv.values, n - 1, degree, 1.0, polyValues, 0)
144+
new DenseVector(polyValues)
145+
}
146+
147+
private def expand(sv: SparseVector, degree: Int): SparseVector = {
148+
val polySize = getPolySize(sv.size, degree)
149+
val nnz = sv.values.length
150+
val nnzPolySize = getPolySize(nnz, degree)
151+
val polyIndices = mutable.ArrayBuilder.make[Int]
152+
polyIndices.sizeHint(nnzPolySize)
153+
val polyValues = mutable.ArrayBuilder.make[Double]
154+
polyValues.sizeHint(nnzPolySize)
155+
expandSparse(
156+
sv.indices, sv.values, nnz - 1, sv.size - 1, degree, 1.0, polyIndices, polyValues, 0)
157+
new SparseVector(polySize, polyIndices.result(), polyValues.result())
158+
}
159+
160+
def expand(v: Vector, degree: Int): Vector = {
161+
v match {
162+
case dv: DenseVector => expand(dv, degree)
163+
case sv: SparseVector => expand(sv, degree)
164+
case _ => throw new IllegalArgumentException
165+
}
166+
}
167+
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.ml.feature
19+
20+
import org.scalatest.FunSuite
21+
22+
import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors}
23+
import org.apache.spark.mllib.util.MLlibTestSparkContext
24+
import org.apache.spark.mllib.util.TestingUtils._
25+
import org.apache.spark.sql.{Row, SQLContext}
26+
import org.scalatest.exceptions.TestFailedException
27+
28+
class PolynomialExpansionSuite extends FunSuite with MLlibTestSparkContext {
29+
30+
@transient var sqlContext: SQLContext = _
31+
32+
override def beforeAll(): Unit = {
33+
super.beforeAll()
34+
sqlContext = new SQLContext(sc)
35+
}
36+
37+
test("Polynomial expansion with default parameter") {
38+
val data = Array(
39+
Vectors.sparse(3, Seq((0, -2.0), (1, 2.3))),
40+
Vectors.dense(-2.0, 2.3),
41+
Vectors.dense(0.0, 0.0, 0.0),
42+
Vectors.dense(0.6, -1.1, -3.0),
43+
Vectors.sparse(3, Seq())
44+
)
45+
46+
val twoDegreeExpansion: Array[Vector] = Array(
47+
Vectors.sparse(10, Array(0, 1, 2, 3, 4, 5), Array(1.0, -2.0, 4.0, 2.3, -4.6, 5.29)),
48+
Vectors.dense(1.0, -2.0, 4.0, 2.3, -4.6, 5.29),
49+
Vectors.dense(Array(1.0) ++ Array.fill[Double](9)(0.0)),
50+
Vectors.dense(1.0, 0.6, 0.36, -1.1, -0.66, 1.21, -3.0, -1.8, 3.3, 9.0),
51+
Vectors.sparse(10, Array(0), Array(1.0)))
52+
53+
val df = sqlContext.createDataFrame(data.zip(twoDegreeExpansion)).toDF("features", "expected")
54+
55+
val polynomialExpansion = new PolynomialExpansion()
56+
.setInputCol("features")
57+
.setOutputCol("polyFeatures")
58+
59+
polynomialExpansion.transform(df).select("polyFeatures", "expected").collect().foreach {
60+
case Row(expanded: DenseVector, expected: DenseVector) =>
61+
assert(expanded ~== expected absTol 1e-1)
62+
case Row(expanded: SparseVector, expected: SparseVector) =>
63+
assert(expanded ~== expected absTol 1e-1)
64+
case _ =>
65+
throw new TestFailedException("Unmatched data types after polynomial expansion", 0)
66+
}
67+
}
68+
69+
test("Polynomial expansion with setter") {
70+
val data = Array(
71+
Vectors.sparse(3, Seq((0, -2.0), (1, 2.3))),
72+
Vectors.dense(-2.0, 2.3),
73+
Vectors.dense(0.0, 0.0, 0.0),
74+
Vectors.dense(0.6, -1.1, -3.0),
75+
Vectors.sparse(3, Seq())
76+
)
77+
78+
val threeDegreeExpansion: Array[Vector] = Array(
79+
Vectors.sparse(20, Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9),
80+
Array(1.0, -2.0, 4.0, -8.0, 2.3, -4.6, 9.2, 5.29, -10.58, 12.17)),
81+
Vectors.dense(1.0, -2.0, 4.0, -8.0, 2.3, -4.6, 9.2, 5.29, -10.58, 12.17),
82+
Vectors.dense(Array(1.0) ++ Array.fill[Double](19)(0.0)),
83+
Vectors.dense(1.0, 0.6, 0.36, 0.216, -1.1, -0.66, -0.396, 1.21, 0.726, -1.331, -3.0, -1.8,
84+
-1.08, 3.3, 1.98, -3.63, 9.0, 5.4, -9.9, -27.0),
85+
Vectors.sparse(20, Array(0), Array(1.0)))
86+
87+
val df = sqlContext.createDataFrame(data.zip(threeDegreeExpansion)).toDF("features", "expected")
88+
89+
val polynomialExpansion = new PolynomialExpansion()
90+
.setInputCol("features")
91+
.setOutputCol("polyFeatures")
92+
.setDegree(3)
93+
94+
polynomialExpansion.transform(df).select("polyFeatures", "expected").collect().foreach {
95+
case Row(expanded: DenseVector, expected: DenseVector) =>
96+
assert(expanded ~== expected absTol 1e-1)
97+
case Row(expanded: SparseVector, expected: SparseVector) =>
98+
assert(expanded ~== expected absTol 1e-1)
99+
case _ =>
100+
throw new TestFailedException("Unmatched data types after polynomial expansion", 0)
101+
}
102+
}
103+
}
104+

0 commit comments

Comments
 (0)