Skip to content
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.ml.feature

import scala.annotation.tailrec

import org.apache.spark.annotation.AlphaComponent
import org.apache.spark.ml.UnaryTransformer
import org.apache.spark.ml.param.{IntParam, ParamMap}
import org.apache.spark.mllib.linalg._
import org.apache.spark.sql.types.DataType

/**
* :: AlphaComponent ::
* Perform feature expansion in a polynomial space. As said in wikipedia of Polynomial Expansion,
* which is available at [[http://en.wikipedia.org/wiki/Polynomial_expansion]], "In mathematics, an
* expansion of a product of sums expresses it as a sum of products by using the fact that
* multiplication distributes over addition". Take a 2-variable feature vector as an example:
* `(x, y)`, if we want to expand it with degree 2, then we get `(x, y, x * x, x * y, y * y)`.
*/
@AlphaComponent
class PolynomialMapper extends UnaryTransformer[Vector, Vector, PolynomialMapper] {

/**
* The polynomial degree to expand, which should be larger than 1.
* @group param
*/
val degree = new IntParam(this, "degree", "the polynomial degree to expand", Some(2))

/** @group getParam */
def getDegree: Int = get(degree)

/** @group setParam */
def setDegree(value: Int): this.type = set(degree, value)

override protected def createTransformFunc(paramMap: ParamMap): Vector => Vector = {
PolynomialMapper.transform(getDegree)
}

override protected def outputDataType: DataType = new VectorUDT()
}

object PolynomialMapper {
/**
* The number that combines k items from N items without repeat, i.e. the binomial coefficient.
*/
private def binomialCoefficient(N: Int, k: Int): Int = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is necessary. We can have all coefficients be 1.0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the binomialCoefficient is used to compute the following two values: num of monomials and num expanded length.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def choose(n: Int, k: Int): Int

(N - k + 1 to N).product / (1 to k).product
}

/**
* The number of monomials of a `numVariables` vector after expanding at a specific polynomial
* degree `degree`.
*/
private def numMonomials(degree: Int, numVariables: Int): Int = {
binomialCoefficient(numVariables + degree - 1, degree)
}

/**
* The number of monomials of a `numVariables` vector after expanding from polynomial degree 1 to
* polynomial degree `degree`.
*/
private def numExpandedDims(degree: Int, numVariables: Int): Int = {
binomialCoefficient(numVariables + degree, numVariables) - 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

choose(n + 1, k) - 1?

}

/**
* Given a pre-built array of Double, fill it with expanded monomials until a given polynomial
* degree.
* @param values the array of Double, which represents a dense vector.
* @param prevStart the start offset of elements that filled in the last function call.
* @param prevLen the length of elements that filled in the last function.
* @param currDegree the current degree that we want to expand.
* @param finalDegree the final expected degree that we want to expand.
* @param nVariables number of variables in the original feature vector.
*/
@tailrec
private def fillDenseVector(values: Array[Double], prevStart: Int, prevLen: Int, currDegree: Int,
finalDegree: Int, nVariables: Int): Unit = {

if (currDegree > finalDegree) {
return
}

val currExpandedVecFrom = prevStart + prevLen
val currExpandedVecLen = numMonomials(currDegree, nVariables)

var leftIndex = 0
var currIndex = currExpandedVecFrom

while (leftIndex < nVariables) {
val numToKeep = numMonomials(currDegree - 1, nVariables - leftIndex)
val prevVecStartIndex = prevStart + prevLen - numToKeep

var rightIndex = 0
while (rightIndex < numToKeep) {
values(currIndex) =
values(leftIndex) * values(prevVecStartIndex + rightIndex)
currIndex += 1
rightIndex += 1
}

leftIndex += 1
}

fillDenseVector(values, currExpandedVecFrom, currExpandedVecLen, currDegree + 1, finalDegree,
nVariables)
}

/**
* For polynomial expanding a `SparseVector`, we treat it as a dense vector and call
* `fillDenseVector` to fill in the `values` of `SparseVector`. For its `indices` part, we encode
* the indices from `nVariables` one by one, because we do not care of the real indices.
*/
private def fillPseudoSparseVectorIndices(indices: Array[Int], startFrom: Int, startWith: Int) = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mengxr I notice that it is not a feasible solution. We have to compute the real indices of the sparse vector. I'll fix it ASAP.

var i = startFrom
var j = startWith
while (i < indices.size) {
indices(i) = j
i += 1
j += 1
}
}

/**
* Transform a vector of variables into a larger vector which stores the polynomial expansion from
* degree 1 to degree `degree`.
*/
private def transform(degree: Int)(feature: Vector): Vector = {
val expectedDims = numExpandedDims(degree, feature.size)
feature match {
case f: DenseVector =>
val originalDims = f.size
val res = Array.fill[Double](expectedDims)(0.0)
for (i <- 0 until f.size) {
res(i) = f(i)
}
fillDenseVector(res, 0, originalDims, 2, degree, originalDims)
Vectors.dense(res)

case f: SparseVector =>
val originalDims = f.indices.size
val expandedDims = numExpandedDims(degree, f.indices.size)
val resIndices = Array.fill[Int](expandedDims)(0)
val resValues = Array.fill[Double](expandedDims)(0.0)
for (i <- 0 until f.indices.size) {
resIndices(i) = f.indices(i)
resValues(i) = f.values(i)
}
fillDenseVector(resValues, 0, f.indices.size, 2, degree, originalDims)
fillPseudoSparseVectorIndices(resIndices, f.indices.size, feature.size)
Vectors.sparse(expectedDims, resIndices, resValues)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.ml.feature

import org.scalatest.FunSuite

import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors}
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.mllib.util.TestingUtils._
import org.apache.spark.sql.{DataFrame, Row, SQLContext}

class PolynomialMapperSuite extends FunSuite with MLlibTestSparkContext {

def collectResult(result: DataFrame): Array[Vector] = {
result.select("poly_features").collect().map {
case Row(features: Vector) => features
}
}

def assertTypeOfVector(lhs: Array[Vector], rhs: Array[Vector]): Unit = {
assert((lhs, rhs).zipped.forall {
case (v1: DenseVector, v2: DenseVector) => true
case (v1: SparseVector, v2: SparseVector) => true
case _ => false
}, "The vector type should be preserved after normalization.")
}

def assertValues(lhs: Array[Vector], rhs: Array[Array[Double]]): Unit = {
assert((lhs, rhs).zipped.forall {
case (vector1: DenseVector, vector2) =>
Vectors.dense(vector1.values) ~== Vectors.dense(vector2) absTol 1E-1
case (vector1: SparseVector, vector2) =>
Vectors.dense(vector1.values) ~== Vectors.dense(vector2) absTol 1E-1
}, "The vector value is not correct after normalization.")
}

test("Polynomial expansion with default parameter") {
val data = Array(
Vectors.sparse(3, Seq((0, -2.0), (1, 2.3))),
Vectors.dense(-2.0, 2.3),
Vectors.dense(0.0, 0.0, 0.0),
Vectors.dense(0.6, -1.1, -3.0),
Vectors.sparse(3, Seq())
)

val sqlContext = new SQLContext(sc)
val dataFrame = sqlContext
.createDataFrame(sc.parallelize(data, 2).map(Tuple1.apply)).toDF("features")

val polynomialMapper = new PolynomialMapper()
.setInputCol("features")
.setOutputCol("poly_features")

val twoDegreeExpansion: Array[Array[Double]] = Array(
Array(-2.0, 2.3, 4.0, -4.6, 5.29),
Array(-2.0, 2.3, 4.0, -4.6, 5.29),
Array.fill[Double](9)(0.0),
Array(0.6, -1.1, -3.0, 0.36, -0.66, -1.8, 1.21, 3.3, 9.0),
Array())

val result = collectResult(polynomialMapper.transform(dataFrame))

assertTypeOfVector(data, result)

assertValues(result, twoDegreeExpansion)

}

test("Polynomial expansion with setter") {
val data = Array(
Vectors.sparse(3, Seq((0, -2.0), (1, 2.3))),
Vectors.dense(-2.0, 2.3),
Vectors.dense(0.0, 0.0, 0.0),
Vectors.dense(0.6, -1.1, -3.0),
Vectors.sparse(3, Seq())
)

val sqlContext = new SQLContext(sc)
val dataFrame = sqlContext
.createDataFrame(sc.parallelize(data, 2).map(Tuple1.apply)).toDF("features")

val polynomialMapper = new PolynomialMapper()
.setInputCol("features")
.setOutputCol("poly_features")
.setDegree(3)

val threeDegreeExpansion: Array[Array[Double]] = Array(
Array(-2.0, 2.3, 4.0, -4.6, 5.29, -8.0, 9.2, -10.58, 12.167),
Array(-2.0, 2.3, 4.0, -4.6, 5.29, -8.0, 9.2, -10.58, 12.167),
Array.fill[Double](19)(0.0),
Array(0.6, -1.1, -3.0, 0.36, -0.66, -1.8, 1.21, 3.3, 9.0, 0.216, -0.396, -1.08, 0.73, 1.98,
5.4, -1.33, -3.63, -9.9, -27.0),
Array())

val result = collectResult(polynomialMapper.transform(dataFrame))

assertTypeOfVector(data, result)

assertValues(result, threeDegreeExpansion)
}
}