Skip to content
185 changes: 185 additions & 0 deletions mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.ml.feature

import org.apache.spark.annotation.Since
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.attribute.AttributeGroup
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.param.{IntParam, ParamMap, ParamValidators}
import org.apache.spark.ml.param.shared.{HasInputCols, HasOutputCol}
import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable, SchemaUtils}
import org.apache.spark.mllib.feature.{HashingTF => OldHashingTF}
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
import org.apache.spark.util.collection.OpenHashMap

/**
* Feature hashing projects a set of categorical or numerical features into a feature vector of
* specified dimension (typically substantially smaller than that of the original feature
* space). This is done using the hashing trick (https://en.wikipedia.org/wiki/Feature_hashing)
* to map features to indices in the feature vector.
*
* The [[FeatureHasher]] transformer operates on multiple columns. Each column may be numeric
* (representing a real feature) or string (representing a categorical feature). Boolean columns
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be good to make the behavior for each type of column clearer here. Specifically for numeric columns that are meant to be categories. Something like:

/**
 * Behavior
 *  -Numeric columns: For numeric features, the hash value of the column name is used to map the
 *                    feature value to its index in the feature vector. Numeric features are never
 *                    treated as categorical, even when they are integers. You must convert
 *                    categorical columns to strings first.
 *  -String columns: ...
 *  -Boolean columns: ...
 */

Anyway, this is a very minor suggestion and I think it's also ok to leave as is.

* are also supported, and treated as categorical features. For numeric features, the hash value of
* the column name is used to map the feature value to its index in the feature vector.
* For categorical features, the hash value of the string "column_name=value" is used to map to the
* vector index, with an indicator value of `1.0`. Thus, categorical features are "one-hot" encoded
* (similarly to using [[OneHotEncoder]] with `dropLast=false`).
*
* Null (missing) values are ignored (implicitly zero in the resulting feature vector).
*
* Since a simple modulo is used to transform the hash function to a vector index,
* it is advisable to use a power of two as the numFeatures parameter;
* otherwise the features will not be mapped evenly to the vector indices.
*
* {{{
* val df = Seq(
* (2.0, true, "1", "foo"),
* (3.0, false, "2", "bar")
* ).toDF("real", "bool", "stringNum", "string")
*
* val hasher = new FeatureHasher()
* .setInputCols("real", "bool", "stringNum", "num")
* .setOutputCol("features")
*
* hasher.transform(df).show()
*
* +----+-----+---------+------+--------------------+
* |real| bool|stringNum|string| features|
* +----+-----+---------+------+--------------------+
* | 2.0| true| 1| foo|(262144,[51871,63...|
* | 3.0|false| 2| bar|(262144,[6031,806...|
* +----+-----+---------+------+--------------------+
* }}}
*/
@Since("2.3.0")
class FeatureHasher(@Since("2.3.0") override val uid: String) extends Transformer
with HasInputCols with HasOutputCol with DefaultParamsWritable {

@Since("2.3.0")
def this() = this(Identifiable.randomUID("featureHasher"))

/**
* Number of features. Should be greater than 0.
* (default = 2^18^)
* @group param
*/
@Since("2.3.0")
val numFeatures = new IntParam(this, "numFeatures", "number of features (> 0)",
ParamValidators.gt(0))

setDefault(numFeatures -> (1 << 18))

/** @group getParam */
@Since("2.3.0")
def getNumFeatures: Int = $(numFeatures)

/** @group setParam */
@Since("2.3.0")
def setNumFeatures(value: Int): this.type = set(numFeatures, value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need a way to know the default value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean exactly


/** @group setParam */
@Since("2.3.0")
def setInputCols(values: String*): this.type = setInputCols(values.toArray)

/** @group setParam */
@Since("2.3.0")
def setInputCols(value: Array[String]): this.type = set(inputCols, value)

/** @group setParam */
@Since("2.3.0")
def setOutputCol(value: String): this.type = set(outputCol, value)

override def transform(dataset: Dataset[_]): DataFrame = {
val hashFunc: Any => Int = OldHashingTF.murmur3Hash
val n = $(numFeatures)

val outputSchema = transformSchema(dataset.schema)
val realFields = outputSchema.fields.filter { f =>
f.dataType.isInstanceOf[NumericType]
}.map(_.name).toSet

def getDouble(x: Any): Double = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe val getDouble...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this is a method not a function - so I don't think it will be faster to do val in this case?

x match {
case n: java.lang.Number =>
n.doubleValue()
case other =>
// will throw ClassCastException if it cannot be cast, as would row.getDouble
other.asInstanceOf[Double]
}
}

val hashFeatures = udf { row: Row =>
val map = new OpenHashMap[Int, Double]()
$(inputCols).foreach { case colName =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case does nothing here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, I think you'll serialize the entire object here by using $(inputCols). Maybe you can make a local pointer to it before the udf.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah thanks - this was left over from a previous code version

val fieldIndex = row.fieldIndex(colName)
if (!row.isNullAt(fieldIndex)) {
val (rawIdx, value) = if (realFields(colName)) {
// numeric values are kept as is, with vector index based on hash of "column_name"
val value = getDouble(row.get(fieldIndex))
val hash = hashFunc(colName)
(hash, value)
} else {
// string and boolean values are treated as categorical, with an indicator value of 1.0
// and vector index based on hash of "column_name=value"
val value = row.get(fieldIndex).toString
val fieldName = s"$colName=$value"
val hash = hashFunc(fieldName)
(hash, 1.0)
}
val idx = Utils.nonNegativeMod(rawIdx, n)
map.changeValue(idx, value, v => v + value)
}
}
Vectors.sparse(n, map.toSeq)
}

val metadata = outputSchema($(outputCol)).metadata
dataset.select(
col("*"),
hashFeatures(struct($(inputCols).map(col(_)): _*)).as($(outputCol), metadata))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.map(col)

}

override def copy(extra: ParamMap): FeatureHasher = defaultCopy(extra)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since tags on all public methods (copy, transformSchema, transform)


override def transformSchema(schema: StructType): StructType = {
val fields = schema($(inputCols).toSet)
fields.foreach { case fieldSchema =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case does nothing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, think it was left over from some previous version, will update

val dataType = fieldSchema.dataType
val fieldName = fieldSchema.name
require(dataType.isInstanceOf[NumericType] ||
dataType.isInstanceOf[StringType] ||
dataType.isInstanceOf[BooleanType],
s"FeatureHasher requires columns to be of NumericType, BooleanType or StringType. " +
s"Column $fieldName was $dataType")
}
val attrGroup = new AttributeGroup($(outputCol), $(numFeatures))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we didn't store Attributes in the AttributeGroup, but we did it in VectorAssembler, and both of FeatureHasher and VectorAssembler can be followed with ML algorithms directly. I'd like to confirm is it intentional?I understand this may be due to performance considerations, and users may not interested to know the attribute of hashed features. We can leave as it is, until we find it affects some scenarios.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feature hashing doesn't keep the feature -> idx mapping for memory efficiency, so by extension it won't keep attribute info. This is by design, and the tradeoff is speed & efficiency vs. not being able to do the reverse mapping (or knowing the cardinality of each feature, for example).

If users want to keep the mapping & attribute info, then of course they can just use one-hot encoding and vector assembler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MLnick Thanks for clarifying.

SchemaUtils.appendColumn(schema, attrGroup.toStructField())
}
}

@Since("2.3.0")
object FeatureHasher extends DefaultParamsReadable[FeatureHasher] {

@Since("2.3.0")
override def load(path: String): FeatureHasher = super.load(path)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.ml.feature

import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.attribute.AttributeGroup
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamsSuite
import org.apache.spark.ml.util.DefaultReadWriteTest
import org.apache.spark.ml.util.TestingUtils._
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types._

class FeatureHasherSuite extends SparkFunSuite
with MLlibTestSparkContext
with DefaultReadWriteTest {

import testImplicits._

import HashingTFSuite.murmur3FeatureIdx

implicit val vectorEncoder = ExpressionEncoder[Vector]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private


test("params") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a test for the Unicode column name (like Chinese, "中文")

ParamsSuite.checkParams(new FeatureHasher)
}

test("specify input cols using varargs or array") {
val featureHasher1 = new FeatureHasher()
.setInputCols("int", "double", "float", "stringNum", "string")
val featureHasher2 = new FeatureHasher()
.setInputCols(Array("int", "double", "float", "stringNum", "string"))
assert(featureHasher1.getInputCols === featureHasher2.getInputCols)
}

test("feature hashing") {
val df = Seq(
(2.0, true, "1", "foo"),
(3.0, false, "2", "bar")
).toDF("real", "bool", "stringNum", "string")

val n = 100
val hasher = new FeatureHasher()
.setInputCols("real", "bool", "stringNum", "string")
.setOutputCol("features")
.setNumFeatures(n)
val output = hasher.transform(df)
val attrGroup = AttributeGroup.fromStructField(output.schema("features"))
require(attrGroup.numAttributes === Some(n))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this an assert


val features = output.select("features").as[Vector].collect()
// Assume perfect hash on field names
def idx: Any => Int = murmur3FeatureIdx(n)
// check expected indices
val expected = Seq(
Vectors.sparse(n, Seq((idx("real"), 2.0), (idx("bool=true"), 1.0),
(idx("stringNum=1"), 1.0), (idx("string=foo"), 1.0))),
Vectors.sparse(n, Seq((idx("real"), 3.0), (idx("bool=false"), 1.0),
(idx("stringNum=2"), 1.0), (idx("string=bar"), 1.0)))
)
assert(features.zip(expected).forall { case (e, a) => e ~== a absTol 1e-14 })
}

test("hashing works for all numeric types") {
val df = Seq(5.0, 10.0, 15.0).toDF("real")

val hasher = new FeatureHasher()
.setInputCols("real")
.setOutputCol("features")

val expectedResult = hasher.transform(df).select("features").as[Vector].collect()
// check all numeric types work as expected. String & boolean types are tested in default case
val types =
Seq(ShortType, LongType, IntegerType, FloatType, ByteType, DoubleType, DecimalType(10, 0))
types.foreach { t =>
val castDF = df.select(col("real").cast(t))
val castResult = hasher.transform(castDF).select("features").as[Vector].collect()
withClue(s"FeatureHasher works for all numeric types (testing $t): ") {
assert(castResult.zip(expectedResult).forall { case (actual, expected) =>
actual ~== expected absTol 1e-14
})
}
}
}

test("invalid input type should fail") {
val df = Seq(
Vectors.dense(1),
Vectors.dense(2)
).toDF("vec")

intercept[IllegalArgumentException] {
new FeatureHasher().setInputCols("vec").transform(df)
}
}

test("hash collisions sum feature values") {
val df = Seq(
(1.0, "foo", "foo"),
(2.0, "bar", "baz")
).toDF("real", "string1", "string2")

val n = 1
val hasher = new FeatureHasher()
.setInputCols("real", "string1", "string2")
.setOutputCol("features")
.setNumFeatures(n)

val features = hasher.transform(df).select("features").as[Vector].collect()
def idx: Any => Int = murmur3FeatureIdx(n)
// everything should hash into one field
assert(idx("real") === idx("string1=foo"))
assert(idx("string1=foo") === idx("string2=foo"))
assert(idx("string2=foo") === idx("string1=bar"))
assert(idx("string1=bar") === idx("string2=baz"))
val expected = Seq(
Vectors.sparse(n, Seq((idx("string1=foo"), 3.0))),
Vectors.sparse(n, Seq((idx("string2=bar"), 4.0)))
)
assert(features.zip(expected).forall { case (e, a) => e ~== a absTol 1e-14 })
}

test("ignores null values in feature hashing") {
import org.apache.spark.sql.functions._

val df = Seq(
(2.0, "foo", null),
(3.0, "bar", "baz")
).toDF("real", "string1", "string2").select(
when(col("real") === 3.0, null).otherwise(col("real")).alias("real"),
col("string1"),
col("string2")
)

val n = 100
val hasher = new FeatureHasher()
.setInputCols("real", "string1", "string2")
.setOutputCol("features")
.setNumFeatures(n)

val features = hasher.transform(df).select("features").as[Vector].collect()
def idx: Any => Int = murmur3FeatureIdx(n)
val expected = Seq(
Vectors.sparse(n, Seq((idx("real"), 2.0), (idx("string1=foo"), 1.0))),
Vectors.sparse(n, Seq((idx("string1=bar"), 1.0), (idx("string2=baz"), 1.0)))
)
assert(features.zip(expected).forall { case (e, a) => e ~== a absTol 1e-14 })
}

test("unicode column names and values") {
// scalastyle:off nonascii
val df = Seq((2.0, "中文")).toDF("中文", "unicode")

val n = 100
val hasher = new FeatureHasher()
.setInputCols("中文", "unicode")
.setOutputCol("features")
.setNumFeatures(n)

val features = hasher.transform(df).select("features").as[Vector].collect()
def idx: Any => Int = murmur3FeatureIdx(n)
val expected = Seq(
Vectors.sparse(n, Seq((idx("中文"), 2.0), (idx("unicode=中文"), 1.0)))
)
assert(features.zip(expected).forall { case (e, a) => e ~== a absTol 1e-14 })
// scalastyle:on nonascii
}

test("read/write") {
val t = new FeatureHasher()
.setInputCols(Array("myCol1", "myCol2", "myCol3"))
.setOutputCol("myOutputCol")
.setNumFeatures(10)
testDefaultReadWrite(t)
}
}
Loading