From 6ab19a963f35de29af0a6b7b1598d5add78f200a Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Tue, 23 Aug 2016 12:29:06 +0200 Subject: [PATCH 01/13] initial WIP --- .../spark/ml/feature/FeatureHasher.scala | 163 ++++++++++++++++++ 1 file changed, 163 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala new file mode 100644 index 000000000000..1722eec00c4c --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import org.apache.spark.SparkException +import org.apache.spark.annotation.Since +import org.apache.spark.ml.Transformer +import org.apache.spark.ml.attribute.AttributeGroup +import org.apache.spark.ml.linalg.Vectors +import org.apache.spark.ml.param.{IntParam, ParamMap, ParamValidators} +import org.apache.spark.ml.param.shared.{HasInputCol, HasInputCols, HasOutputCol} +import org.apache.spark.ml.util.{DefaultParamsWritable, Identifiable, SchemaUtils} +import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.hash.Murmur3_x86_32._ +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.Utils +import org.apache.spark.util.collection.OpenHashMap + + +@Since("2.1.0") +class FeatureHasher(@Since("2.1.0") override val uid: String) extends Transformer + with HasInputCols with HasOutputCol with DefaultParamsWritable { + + @Since("2.1.0") + def this() = this(Identifiable.randomUID("featHasher")) + + /** + * Number of features. Should be > 0. + * (default = 2^18^) + * @group param + */ + @Since("2.1.0") + val numFeatures = new IntParam(this, "numFeatures", "number of features (> 0)", + ParamValidators.gt(0)) + setDefault(numFeatures -> (1 << 18)) + + /** @group getParam */ + @Since("2.1.0") + def getNumFeatures: Int = $(numFeatures) + + /** @group setParam */ + @Since("2.1.0") + def setNumFeatures(value: Int): this.type = set(numFeatures, value) + + /** @group setParam */ + @Since("2.1.0") + def setInputCols(values: String*): this.type = setInputCols(values.toArray) + + /** @group setParam */ + @Since("2.1.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + + override def transform(dataset: Dataset[_]): DataFrame = { + val os = transformSchema(dataset.schema) + + val inputFields = $(inputCols).map(c => dataset.schema(c)) + val featureCols = inputFields.map { f => + f.dataType match { + case DoubleType | StringType => dataset(f.name) + case _: NumericType | BooleanType => dataset(f.name).cast(DoubleType) + } + } + + val realFields = os.fields.filter(f => f.dataType.isInstanceOf[NumericType]).map(_.name).toSet + val hashFunc: Any => Int = FeatureHasher.murmur3Hash + + def hashFeatures = udf { row: Row => + val map = new OpenHashMap[Int, Double]() + val s = $(inputCols).zipWithIndex.map { case (field, i) => + if (realFields(field)) { + val value = row.getDouble(i) + val hash = hashFunc(field) + val idx = Utils.nonNegativeMod(hash, $(numFeatures)) + (idx, value) + } else { + val value = row.getString(i) + val idx = Utils.nonNegativeMod(hashFunc(value), $(numFeatures)) + (idx, 1.0) + } + } + Vectors.sparse($(numFeatures), s.toSeq) + } + + dataset.select( + col("*"), + hashFeatures(struct(featureCols: _*)).as($(outputCol))) // , featureAttrs.toMetadata())) + + /* + dataset.select($(inputCols).map(col(_)): _*).map { case row => + val map = new OpenHashMap[Int, Double]() + val s = $(inputCols).zipWithIndex.map { case (field, i) => + if (realFields(field)) { + val value = row.getDouble(i) + val hash = hashFunc(field) + val idx = Utils.nonNegativeMod(hash, $(numFeatures)) + (idx, value) + } else { + val value = row.getString(i) + val idx = Utils.nonNegativeMod(hashFunc(value), $(numFeatures)) + (idx, 1.0) + } + } + Row(Vectors.sparse($(numFeatures), s.toSeq)) + }.toDF($(outputCol)) + */ + } + + override def copy(extra: ParamMap): Transformer = defaultCopy(extra) + + override def transformSchema(schema: StructType): StructType = { + val fields = schema($(inputCols).toSet) + require(fields.map(_.dataType).forall { case dt => + dt.isInstanceOf[NumericType] || dt.isInstanceOf[StringType] + }) + val attrGroup = new AttributeGroup($(outputCol), $(numFeatures)) + SchemaUtils.appendColumn(schema, attrGroup.toStructField()) + } +} + +object FeatureHasher { + + private val seed = 42 + + /** + * Calculate a hash code value for the term object using + * Austin Appleby's MurmurHash 3 algorithm (MurmurHash3_x86_32). + * This is the default hash algorithm used from Spark 2.0 onwards. + */ + private[feature] def murmur3Hash(term: Any): Int = { + term match { + case null => seed + case b: Boolean => hashInt(if (b) 1 else 0, seed) + case b: Byte => hashInt(b, seed) + case s: Short => hashInt(s, seed) + case i: Int => hashInt(i, seed) + case l: Long => hashLong(l, seed) + case f: Float => hashInt(java.lang.Float.floatToIntBits(f), seed) + case d: Double => hashLong(java.lang.Double.doubleToLongBits(d), seed) + case s: String => + val utf8 = UTF8String.fromString(s) + hashUnsafeBytes(utf8.getBaseObject, utf8.getBaseOffset, utf8.numBytes(), seed) + case _ => throw new SparkException("HashingTF with murmur3 algorithm does not " + + s"support type ${term.getClass.getCanonicalName} of input data.") + } + } +} From ebd2cbf3467f26121c602f7c77c2018253cbdf18 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Wed, 1 Feb 2017 12:43:07 +0200 Subject: [PATCH 02/13] Further work --- .../spark/ml/feature/FeatureHasher.scala | 46 +++++++++++-------- 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala index 1722eec00c4c..e4f1d05c9a85 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala @@ -34,39 +34,44 @@ import org.apache.spark.util.Utils import org.apache.spark.util.collection.OpenHashMap -@Since("2.1.0") -class FeatureHasher(@Since("2.1.0") override val uid: String) extends Transformer +@Since("2.2.0") +class FeatureHasher(@Since("2.2.0") override val uid: String) extends Transformer with HasInputCols with HasOutputCol with DefaultParamsWritable { - @Since("2.1.0") - def this() = this(Identifiable.randomUID("featHasher")) + @Since("2.2.0") + def this() = this(Identifiable.randomUID("featureHasher")) /** * Number of features. Should be > 0. * (default = 2^18^) * @group param */ - @Since("2.1.0") + @Since("2.2.0") val numFeatures = new IntParam(this, "numFeatures", "number of features (> 0)", ParamValidators.gt(0)) + setDefault(numFeatures -> (1 << 18)) /** @group getParam */ - @Since("2.1.0") + @Since("2.2.0") def getNumFeatures: Int = $(numFeatures) /** @group setParam */ - @Since("2.1.0") + @Since("2.2.0") def setNumFeatures(value: Int): this.type = set(numFeatures, value) /** @group setParam */ - @Since("2.1.0") + @Since("2.2.0") def setInputCols(values: String*): this.type = setInputCols(values.toArray) /** @group setParam */ - @Since("2.1.0") + @Since("2.2.0") def setInputCols(value: Array[String]): this.type = set(inputCols, value) + /** @group setParam */ + @Since("2.2.0") + def setOutputCol(value: String): this.type = set(outputCol, value) + override def transform(dataset: Dataset[_]): DataFrame = { val os = transformSchema(dataset.schema) @@ -74,7 +79,7 @@ class FeatureHasher(@Since("2.1.0") override val uid: String) extends Transforme val featureCols = inputFields.map { f => f.dataType match { case DoubleType | StringType => dataset(f.name) - case _: NumericType | BooleanType => dataset(f.name).cast(DoubleType) + case _: NumericType | BooleanType => dataset(f.name).cast(DoubleType).alias(f.name) } } @@ -83,19 +88,22 @@ class FeatureHasher(@Since("2.1.0") override val uid: String) extends Transforme def hashFeatures = udf { row: Row => val map = new OpenHashMap[Int, Double]() - val s = $(inputCols).zipWithIndex.map { case (field, i) => - if (realFields(field)) { - val value = row.getDouble(i) + $(inputCols).foreach { case field => + val (rawIdx, value) = if (realFields(field)) { + val value = row.getDouble(row.fieldIndex(field)) val hash = hashFunc(field) - val idx = Utils.nonNegativeMod(hash, $(numFeatures)) - (idx, value) + (hash, value) } else { - val value = row.getString(i) - val idx = Utils.nonNegativeMod(hashFunc(value), $(numFeatures)) - (idx, 1.0) + val value = row.getString(row.fieldIndex(field)) + val fieldName = s"$field=$value" + val hash = hashFunc(fieldName) + (hash, 1.0) } + val idx = Utils.nonNegativeMod(rawIdx, $(numFeatures)) + map.changeValue(idx, value, v => v + value) + (idx, value) } - Vectors.sparse($(numFeatures), s.toSeq) + Vectors.sparse($(numFeatures), map.toSeq) } dataset.select( From ba255bfda792d58aaded892e49c6cf48f0391159 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Thu, 22 Jun 2017 12:52:12 +0200 Subject: [PATCH 03/13] Clean up --- .../spark/ml/feature/FeatureHasher.scala | 105 +++++------------- 1 file changed, 30 insertions(+), 75 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala index e4f1d05c9a85..d39f6c616e80 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala @@ -17,28 +17,26 @@ package org.apache.spark.ml.feature -import org.apache.spark.SparkException import org.apache.spark.annotation.Since import org.apache.spark.ml.Transformer import org.apache.spark.ml.attribute.AttributeGroup import org.apache.spark.ml.linalg.Vectors import org.apache.spark.ml.param.{IntParam, ParamMap, ParamValidators} -import org.apache.spark.ml.param.shared.{HasInputCol, HasInputCols, HasOutputCol} +import org.apache.spark.ml.param.shared.{HasInputCols, HasOutputCol} import org.apache.spark.ml.util.{DefaultParamsWritable, Identifiable, SchemaUtils} +import org.apache.spark.mllib.feature.{HashingTF => OldHashingTF} import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.hash.Murmur3_x86_32._ -import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils import org.apache.spark.util.collection.OpenHashMap -@Since("2.2.0") -class FeatureHasher(@Since("2.2.0") override val uid: String) extends Transformer +@Since("2.3.0") +class FeatureHasher(@Since("2.3.0") override val uid: String) extends Transformer with HasInputCols with HasOutputCol with DefaultParamsWritable { - @Since("2.2.0") + @Since("2.3.0") def this() = this(Identifiable.randomUID("featureHasher")) /** @@ -46,88 +44,73 @@ class FeatureHasher(@Since("2.2.0") override val uid: String) extends Transforme * (default = 2^18^) * @group param */ - @Since("2.2.0") + @Since("2.3.0") val numFeatures = new IntParam(this, "numFeatures", "number of features (> 0)", ParamValidators.gt(0)) setDefault(numFeatures -> (1 << 18)) /** @group getParam */ - @Since("2.2.0") + @Since("2.3.0") def getNumFeatures: Int = $(numFeatures) /** @group setParam */ - @Since("2.2.0") + @Since("2.3.0") def setNumFeatures(value: Int): this.type = set(numFeatures, value) /** @group setParam */ - @Since("2.2.0") + @Since("2.3.0") def setInputCols(values: String*): this.type = setInputCols(values.toArray) /** @group setParam */ - @Since("2.2.0") + @Since("2.3.0") def setInputCols(value: Array[String]): this.type = set(inputCols, value) /** @group setParam */ - @Since("2.2.0") + @Since("2.3.0") def setOutputCol(value: String): this.type = set(outputCol, value) override def transform(dataset: Dataset[_]): DataFrame = { + val hashFunc: Any => Int = OldHashingTF.murmur3Hash + val numFeatures = $(numFeatures) + val os = transformSchema(dataset.schema) - val inputFields = $(inputCols).map(c => dataset.schema(c)) - val featureCols = inputFields.map { f => - f.dataType match { - case DoubleType | StringType => dataset(f.name) - case _: NumericType | BooleanType => dataset(f.name).cast(DoubleType).alias(f.name) + val featureCols = $(inputCols).map { colName => + val field = dataset.schema(colName) + field.dataType match { + case DoubleType | StringType => dataset(field.name) + case _: NumericType | BooleanType => dataset(field.name).cast(DoubleType).alias(field.name) } } val realFields = os.fields.filter(f => f.dataType.isInstanceOf[NumericType]).map(_.name).toSet - val hashFunc: Any => Int = FeatureHasher.murmur3Hash def hashFeatures = udf { row: Row => val map = new OpenHashMap[Int, Double]() - $(inputCols).foreach { case field => - val (rawIdx, value) = if (realFields(field)) { - val value = row.getDouble(row.fieldIndex(field)) - val hash = hashFunc(field) + $(inputCols).foreach { case colName => + val fieldIndex = row.fieldIndex(colName) + val (rawIdx, value) = if (realFields(colName)) { + val value = row.getDouble(fieldIndex) + val hash = hashFunc(colName) (hash, value) } else { - val value = row.getString(row.fieldIndex(field)) - val fieldName = s"$field=$value" + val value = row.getString(fieldIndex) + val fieldName = s"$colName=$value" val hash = hashFunc(fieldName) (hash, 1.0) } - val idx = Utils.nonNegativeMod(rawIdx, $(numFeatures)) + val idx = Utils.nonNegativeMod(rawIdx, numFeatures) map.changeValue(idx, value, v => v + value) (idx, value) } - Vectors.sparse($(numFeatures), map.toSeq) + Vectors.sparse(numFeatures, map.toSeq) } + val metadata = os($(outputCol)).metadata dataset.select( col("*"), - hashFeatures(struct(featureCols: _*)).as($(outputCol))) // , featureAttrs.toMetadata())) - - /* - dataset.select($(inputCols).map(col(_)): _*).map { case row => - val map = new OpenHashMap[Int, Double]() - val s = $(inputCols).zipWithIndex.map { case (field, i) => - if (realFields(field)) { - val value = row.getDouble(i) - val hash = hashFunc(field) - val idx = Utils.nonNegativeMod(hash, $(numFeatures)) - (idx, value) - } else { - val value = row.getString(i) - val idx = Utils.nonNegativeMod(hashFunc(value), $(numFeatures)) - (idx, 1.0) - } - } - Row(Vectors.sparse($(numFeatures), s.toSeq)) - }.toDF($(outputCol)) - */ + hashFeatures(struct(featureCols: _*)).as($(outputCol), metadata)) } override def copy(extra: ParamMap): Transformer = defaultCopy(extra) @@ -141,31 +124,3 @@ class FeatureHasher(@Since("2.2.0") override val uid: String) extends Transforme SchemaUtils.appendColumn(schema, attrGroup.toStructField()) } } - -object FeatureHasher { - - private val seed = 42 - - /** - * Calculate a hash code value for the term object using - * Austin Appleby's MurmurHash 3 algorithm (MurmurHash3_x86_32). - * This is the default hash algorithm used from Spark 2.0 onwards. - */ - private[feature] def murmur3Hash(term: Any): Int = { - term match { - case null => seed - case b: Boolean => hashInt(if (b) 1 else 0, seed) - case b: Byte => hashInt(b, seed) - case s: Short => hashInt(s, seed) - case i: Int => hashInt(i, seed) - case l: Long => hashLong(l, seed) - case f: Float => hashInt(java.lang.Float.floatToIntBits(f), seed) - case d: Double => hashLong(java.lang.Double.doubleToLongBits(d), seed) - case s: String => - val utf8 = UTF8String.fromString(s) - hashUnsafeBytes(utf8.getBaseObject, utf8.getBaseOffset, utf8.numBytes(), seed) - case _ => throw new SparkException("HashingTF with murmur3 algorithm does not " + - s"support type ${term.getClass.getCanonicalName} of input data.") - } - } -} From 0be1e6572110d7d550f69fd86d3dd4e96660fde6 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Thu, 22 Jun 2017 12:52:37 +0200 Subject: [PATCH 04/13] Add tests --- .../spark/ml/feature/FeatureHasherSuite.scala | 81 +++++++++++++++++++ .../spark/ml/feature/HashingTFSuite.scala | 8 +- 2 files changed, 88 insertions(+), 1 deletion(-) create mode 100644 mllib/src/test/scala/org/apache/spark/ml/feature/FeatureHasherSuite.scala diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/FeatureHasherSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/FeatureHasherSuite.scala new file mode 100644 index 000000000000..a4a56e547593 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/FeatureHasherSuite.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import org.apache.spark.SparkFunSuite +import org.apache.spark.ml.attribute.AttributeGroup +import org.apache.spark.ml.linalg.{Vector, Vectors} +import org.apache.spark.ml.param.ParamsSuite +import org.apache.spark.ml.util.DefaultReadWriteTest +import org.apache.spark.ml.util.TestingUtils._ +import org.apache.spark.mllib.util.MLlibTestSparkContext + +class FeatureHasherSuite extends SparkFunSuite + with MLlibTestSparkContext + with DefaultReadWriteTest { + + import testImplicits._ + import HashingTFSuite.murmur3FeatureIdx + + test("params") { + ParamsSuite.checkParams(new FeatureHasher) + } + + test("specify input cols using varargs or array") { + val featureHasher1 = new FeatureHasher() + .setInputCols("int", "double", "float", "stringNum", "string") + val featureHasher2 = new FeatureHasher() + .setInputCols(Array("int", "double", "float", "stringNum", "string")) + assert(featureHasher1.getInputCols === featureHasher2.getInputCols) + } + + test("feature hashing") { + val df = Seq( + (2, 2.0, 2.0f, "2", "foo"), + (3, 3.0, 3.0f, "3", "bar") + ).toDF("int", "double", "float", "stringNum", "string") + val n = 100 + val featureHasher = new FeatureHasher() + .setInputCols("int", "double", "float", "stringNum", "string") + .setOutputCol("features") + .setNumFeatures(n) + val output = featureHasher.transform(df) + val attrGroup = AttributeGroup.fromStructField(output.schema("features")) + require(attrGroup.numAttributes === Some(n)) + val features = output.select("features").as[Vector].collect() + // Assume perfect hash on field names + def idx: Any => Int = murmur3FeatureIdx(n) + + // check expected indices + val expected = Seq( + Vectors.sparse(n, Seq((idx("int"), 2.0), (idx("double"), 2.0), (idx("float"), 2.0), + (idx("stringNum=2"), 1.0), (idx("string=foo"), 1.0))), + Vectors.sparse(n, Seq((idx("int"), 3.0), (idx("double"), 3.0), (idx("float"), 3.0), + (idx("stringNum=3"), 1.0), (idx("string=bar"), 1.0))) + ) + assert(features.zip(expected).forall { case (e, a) => e ~== a absTol 1e-14 }) + } + + test("read/write") { + val t = new FeatureHasher() + .setInputCols(Array("myCol1", "myCol2", "myCol3")) + .setOutputCol("myOutputCol") + .setNumFeatures(10) + testDefaultReadWrite(t) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/HashingTFSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/HashingTFSuite.scala index 1d14866cc933..a46272fdce1f 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/HashingTFSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/HashingTFSuite.scala @@ -30,6 +30,7 @@ import org.apache.spark.util.Utils class HashingTFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { import testImplicits._ + import HashingTFSuite.murmur3FeatureIdx test("params") { ParamsSuite.checkParams(new HashingTF) @@ -77,7 +78,12 @@ class HashingTFSuite extends SparkFunSuite with MLlibTestSparkContext with Defau testDefaultReadWrite(t) } - private def murmur3FeatureIdx(numFeatures: Int)(term: Any): Int = { +} + +object HashingTFSuite { + + private[feature] def murmur3FeatureIdx(numFeatures: Int)(term: Any): Int = { Utils.nonNegativeMod(MLlibHashingTF.murmur3Hash(term), numFeatures) } + } From 2f3ea21e2e1835d7218e8c7bd096cc0787ed595c Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Thu, 22 Jun 2017 15:08:26 +0200 Subject: [PATCH 05/13] Copy, save/load, clean up --- .../apache/spark/ml/feature/FeatureHasher.scala | 17 ++++++++++++----- .../spark/ml/feature/FeatureHasherSuite.scala | 6 +++++- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala index d39f6c616e80..c6885520de7b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala @@ -23,7 +23,7 @@ import org.apache.spark.ml.attribute.AttributeGroup import org.apache.spark.ml.linalg.Vectors import org.apache.spark.ml.param.{IntParam, ParamMap, ParamValidators} import org.apache.spark.ml.param.shared.{HasInputCols, HasOutputCol} -import org.apache.spark.ml.util.{DefaultParamsWritable, Identifiable, SchemaUtils} +import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable, SchemaUtils} import org.apache.spark.mllib.feature.{HashingTF => OldHashingTF} import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions._ @@ -72,7 +72,7 @@ class FeatureHasher(@Since("2.3.0") override val uid: String) extends Transforme override def transform(dataset: Dataset[_]): DataFrame = { val hashFunc: Any => Int = OldHashingTF.murmur3Hash - val numFeatures = $(numFeatures) + val n = $(numFeatures) val os = transformSchema(dataset.schema) @@ -100,11 +100,11 @@ class FeatureHasher(@Since("2.3.0") override val uid: String) extends Transforme val hash = hashFunc(fieldName) (hash, 1.0) } - val idx = Utils.nonNegativeMod(rawIdx, numFeatures) + val idx = Utils.nonNegativeMod(rawIdx, n) map.changeValue(idx, value, v => v + value) (idx, value) } - Vectors.sparse(numFeatures, map.toSeq) + Vectors.sparse(n, map.toSeq) } val metadata = os($(outputCol)).metadata @@ -113,7 +113,7 @@ class FeatureHasher(@Since("2.3.0") override val uid: String) extends Transforme hashFeatures(struct(featureCols: _*)).as($(outputCol), metadata)) } - override def copy(extra: ParamMap): Transformer = defaultCopy(extra) + override def copy(extra: ParamMap): FeatureHasher = defaultCopy(extra) override def transformSchema(schema: StructType): StructType = { val fields = schema($(inputCols).toSet) @@ -124,3 +124,10 @@ class FeatureHasher(@Since("2.3.0") override val uid: String) extends Transforme SchemaUtils.appendColumn(schema, attrGroup.toStructField()) } } + +@Since("2.3.0") +object FeatureHasher extends DefaultParamsReadable[FeatureHasher] { + + @Since("2.3.0") + override def load(path: String): FeatureHasher = super.load(path) +} diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/FeatureHasherSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/FeatureHasherSuite.scala index a4a56e547593..f56e387ea808 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/FeatureHasherSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/FeatureHasherSuite.scala @@ -24,6 +24,7 @@ import org.apache.spark.ml.param.ParamsSuite import org.apache.spark.ml.util.DefaultReadWriteTest import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder class FeatureHasherSuite extends SparkFunSuite with MLlibTestSparkContext @@ -49,6 +50,7 @@ class FeatureHasherSuite extends SparkFunSuite (2, 2.0, 2.0f, "2", "foo"), (3, 3.0, 3.0f, "3", "bar") ).toDF("int", "double", "float", "stringNum", "string") + val n = 100 val featureHasher = new FeatureHasher() .setInputCols("int", "double", "float", "stringNum", "string") @@ -57,10 +59,12 @@ class FeatureHasherSuite extends SparkFunSuite val output = featureHasher.transform(df) val attrGroup = AttributeGroup.fromStructField(output.schema("features")) require(attrGroup.numAttributes === Some(n)) + + implicit val vectorEncoder = ExpressionEncoder[Vector]() val features = output.select("features").as[Vector].collect() + // Assume perfect hash on field names def idx: Any => Int = murmur3FeatureIdx(n) - // check expected indices val expected = Seq( Vectors.sparse(n, Seq((idx("int"), 2.0), (idx("double"), 2.0), (idx("float"), 2.0), From 7d678fbf5f88d377b79153212a3e0a2596039b17 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Mon, 26 Jun 2017 14:38:02 +0200 Subject: [PATCH 06/13] Move numFeatures to HasNumFeatures shared trait --- .../spark/ml/feature/FeatureHasher.scala | 19 ++---------- .../apache/spark/ml/feature/HashingTF.scala | 21 +++----------- .../ml/param/shared/SharedParamsCodeGen.scala | 4 ++- .../spark/ml/param/shared/sharedParams.scala | 29 +++++++++++++++---- 4 files changed, 32 insertions(+), 41 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala index c6885520de7b..d1d7b5edbe2d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala @@ -22,7 +22,7 @@ import org.apache.spark.ml.Transformer import org.apache.spark.ml.attribute.AttributeGroup import org.apache.spark.ml.linalg.Vectors import org.apache.spark.ml.param.{IntParam, ParamMap, ParamValidators} -import org.apache.spark.ml.param.shared.{HasInputCols, HasOutputCol} +import org.apache.spark.ml.param.shared.{HasInputCols, HasNumFeatures, HasOutputCol} import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable, SchemaUtils} import org.apache.spark.mllib.feature.{HashingTF => OldHashingTF} import org.apache.spark.sql.{DataFrame, Dataset, Row} @@ -34,26 +34,11 @@ import org.apache.spark.util.collection.OpenHashMap @Since("2.3.0") class FeatureHasher(@Since("2.3.0") override val uid: String) extends Transformer - with HasInputCols with HasOutputCol with DefaultParamsWritable { + with HasInputCols with HasOutputCol with HasNumFeatures with DefaultParamsWritable { @Since("2.3.0") def this() = this(Identifiable.randomUID("featureHasher")) - /** - * Number of features. Should be > 0. - * (default = 2^18^) - * @group param - */ - @Since("2.3.0") - val numFeatures = new IntParam(this, "numFeatures", "number of features (> 0)", - ParamValidators.gt(0)) - - setDefault(numFeatures -> (1 << 18)) - - /** @group getParam */ - @Since("2.3.0") - def getNumFeatures: Int = $(numFeatures) - /** @group setParam */ @Since("2.3.0") def setNumFeatures(value: Int): this.type = set(numFeatures, value) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala index db432b6fefaf..1f9f429dc643 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala @@ -21,7 +21,7 @@ import org.apache.spark.annotation.Since import org.apache.spark.ml.Transformer import org.apache.spark.ml.attribute.AttributeGroup import org.apache.spark.ml.param._ -import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol} +import org.apache.spark.ml.param.shared.{HasInputCol, HasNumFeatures, HasOutputCol} import org.apache.spark.ml.util._ import org.apache.spark.mllib.feature import org.apache.spark.sql.{DataFrame, Dataset} @@ -37,8 +37,8 @@ import org.apache.spark.sql.types.{ArrayType, StructType} * otherwise the features will not be mapped evenly to the columns. */ @Since("1.2.0") -class HashingTF @Since("1.4.0") (@Since("1.4.0") override val uid: String) - extends Transformer with HasInputCol with HasOutputCol with DefaultParamsWritable { +class HashingTF @Since("1.4.0") (@Since("1.4.0") override val uid: String) extends Transformer + with HasInputCol with HasOutputCol with HasNumFeatures with DefaultParamsWritable { @Since("1.2.0") def this() = this(Identifiable.randomUID("hashingTF")) @@ -51,15 +51,6 @@ class HashingTF @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) - /** - * Number of features. Should be greater than 0. - * (default = 2^18^) - * @group param - */ - @Since("1.2.0") - val numFeatures = new IntParam(this, "numFeatures", "number of features (> 0)", - ParamValidators.gt(0)) - /** * Binary toggle to control term frequency counts. * If true, all non-zero counts are set to 1. This is useful for discrete probabilistic @@ -72,11 +63,7 @@ class HashingTF @Since("1.4.0") (@Since("1.4.0") override val uid: String) "This is useful for discrete probabilistic models that model binary events rather " + "than integer counts") - setDefault(numFeatures -> (1 << 18), binary -> false) - - /** @group getParam */ - @Since("1.2.0") - def getNumFeatures: Int = $(numFeatures) + setDefault(binary -> false) /** @group setParam */ @Since("1.2.0") diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala index 013817a41baf..cf65b9ab2518 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala @@ -83,7 +83,9 @@ private[shared] object SharedParamsCodeGen { ParamDesc[String]("solver", "the solver algorithm for optimization. If this is not set or " + "empty, default value is 'auto'", Some("\"auto\"")), ParamDesc[Int]("aggregationDepth", "suggested depth for treeAggregate (>= 2)", Some("2"), - isValid = "ParamValidators.gtEq(2)", isExpertParam = true)) + isValid = "ParamValidators.gtEq(2)", isExpertParam = true), + ParamDesc[Int]("numFeatures", "number of features (> 0)", Some("1<<18"), + isValid = "ParamValidators.gt(0)")) val code = genSharedParams(params) val file = "src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala" diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala index 50619607a505..6e8cc9aaf6c8 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala @@ -29,7 +29,7 @@ import org.apache.spark.ml.param._ private[ml] trait HasRegParam extends Params { /** - * Param for regularization parameter (>= 0). + * Param for regularization parameter (>= 0). * @group param */ final val regParam: DoubleParam = new DoubleParam(this, "regParam", "regularization parameter (>= 0)", ParamValidators.gtEq(0)) @@ -44,7 +44,7 @@ private[ml] trait HasRegParam extends Params { private[ml] trait HasMaxIter extends Params { /** - * Param for maximum number of iterations (>= 0). + * Param for maximum number of iterations (>= 0). * @group param */ final val maxIter: IntParam = new IntParam(this, "maxIter", "maximum number of iterations (>= 0)", ParamValidators.gtEq(0)) @@ -238,7 +238,7 @@ private[ml] trait HasOutputCol extends Params { private[ml] trait HasCheckpointInterval extends Params { /** - * Param for set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. + * Param for set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. * @group param */ final val checkpointInterval: IntParam = new IntParam(this, "checkpointInterval", "set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations", (interval: Int) => interval == -1 || interval >= 1) @@ -334,7 +334,7 @@ private[ml] trait HasElasticNetParam extends Params { private[ml] trait HasTol extends Params { /** - * Param for the convergence tolerance for iterative algorithms (>= 0). + * Param for the convergence tolerance for iterative algorithms (>= 0). * @group param */ final val tol: DoubleParam = new DoubleParam(this, "tol", "the convergence tolerance for iterative algorithms (>= 0)", ParamValidators.gtEq(0)) @@ -349,7 +349,7 @@ private[ml] trait HasTol extends Params { private[ml] trait HasStepSize extends Params { /** - * Param for Step size to be used for each iteration of optimization (> 0). + * Param for Step size to be used for each iteration of optimization (> 0). * @group param */ final val stepSize: DoubleParam = new DoubleParam(this, "stepSize", "Step size to be used for each iteration of optimization (> 0)", ParamValidators.gt(0)) @@ -396,7 +396,7 @@ private[ml] trait HasSolver extends Params { private[ml] trait HasAggregationDepth extends Params { /** - * Param for suggested depth for treeAggregate (>= 2). + * Param for suggested depth for treeAggregate (>= 2). * @group expertParam */ final val aggregationDepth: IntParam = new IntParam(this, "aggregationDepth", "suggested depth for treeAggregate (>= 2)", ParamValidators.gtEq(2)) @@ -406,4 +406,21 @@ private[ml] trait HasAggregationDepth extends Params { /** @group expertGetParam */ final def getAggregationDepth: Int = $(aggregationDepth) } + +/** + * Trait for shared param numFeatures (default: 1<<18). + */ +private[ml] trait HasNumFeatures extends Params { + + /** + * Param for number of features (> 0). + * @group param + */ + final val numFeatures: IntParam = new IntParam(this, "numFeatures", "number of features (> 0)", ParamValidators.gt(0)) + + setDefault(numFeatures, 1<<18) + + /** @group getParam */ + final def getNumFeatures: Int = $(numFeatures) +} // scalastyle:on From 60572776de80ebcf1782c3d7def749557c8bec61 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Mon, 3 Jul 2017 09:18:25 +0200 Subject: [PATCH 07/13] Update shared params from codegen run --- .../spark/ml/param/shared/sharedParams.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala index 6e8cc9aaf6c8..488153f8bec2 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala @@ -29,7 +29,7 @@ import org.apache.spark.ml.param._ private[ml] trait HasRegParam extends Params { /** - * Param for regularization parameter (>= 0). + * Param for regularization parameter (>= 0). * @group param */ final val regParam: DoubleParam = new DoubleParam(this, "regParam", "regularization parameter (>= 0)", ParamValidators.gtEq(0)) @@ -44,7 +44,7 @@ private[ml] trait HasRegParam extends Params { private[ml] trait HasMaxIter extends Params { /** - * Param for maximum number of iterations (>= 0). + * Param for maximum number of iterations (>= 0). * @group param */ final val maxIter: IntParam = new IntParam(this, "maxIter", "maximum number of iterations (>= 0)", ParamValidators.gtEq(0)) @@ -238,7 +238,7 @@ private[ml] trait HasOutputCol extends Params { private[ml] trait HasCheckpointInterval extends Params { /** - * Param for set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. + * Param for set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. * @group param */ final val checkpointInterval: IntParam = new IntParam(this, "checkpointInterval", "set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations", (interval: Int) => interval == -1 || interval >= 1) @@ -334,7 +334,7 @@ private[ml] trait HasElasticNetParam extends Params { private[ml] trait HasTol extends Params { /** - * Param for the convergence tolerance for iterative algorithms (>= 0). + * Param for the convergence tolerance for iterative algorithms (>= 0). * @group param */ final val tol: DoubleParam = new DoubleParam(this, "tol", "the convergence tolerance for iterative algorithms (>= 0)", ParamValidators.gtEq(0)) @@ -349,7 +349,7 @@ private[ml] trait HasTol extends Params { private[ml] trait HasStepSize extends Params { /** - * Param for Step size to be used for each iteration of optimization (> 0). + * Param for Step size to be used for each iteration of optimization (> 0). * @group param */ final val stepSize: DoubleParam = new DoubleParam(this, "stepSize", "Step size to be used for each iteration of optimization (> 0)", ParamValidators.gt(0)) @@ -396,7 +396,7 @@ private[ml] trait HasSolver extends Params { private[ml] trait HasAggregationDepth extends Params { /** - * Param for suggested depth for treeAggregate (>= 2). + * Param for suggested depth for treeAggregate (>= 2). * @group expertParam */ final val aggregationDepth: IntParam = new IntParam(this, "aggregationDepth", "suggested depth for treeAggregate (>= 2)", ParamValidators.gtEq(2)) @@ -413,7 +413,7 @@ private[ml] trait HasAggregationDepth extends Params { private[ml] trait HasNumFeatures extends Params { /** - * Param for number of features (> 0). + * Param for number of features (> 0). * @group param */ final val numFeatures: IntParam = new IntParam(this, "numFeatures", "number of features (> 0)", ParamValidators.gt(0)) From 9edb3bda8cbc4e00f05b91718249edf2750fc028 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Mon, 3 Jul 2017 11:32:32 +0200 Subject: [PATCH 08/13] Update tests. Null values ignored in feature hashing. --- .../spark/ml/feature/FeatureHasher.scala | 25 +++---- .../spark/ml/feature/FeatureHasherSuite.scala | 71 ++++++++++++++++--- 2 files changed, 76 insertions(+), 20 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala index d1d7b5edbe2d..db6a6fd4a57a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala @@ -75,19 +75,20 @@ class FeatureHasher(@Since("2.3.0") override val uid: String) extends Transforme val map = new OpenHashMap[Int, Double]() $(inputCols).foreach { case colName => val fieldIndex = row.fieldIndex(colName) - val (rawIdx, value) = if (realFields(colName)) { - val value = row.getDouble(fieldIndex) - val hash = hashFunc(colName) - (hash, value) - } else { - val value = row.getString(fieldIndex) - val fieldName = s"$colName=$value" - val hash = hashFunc(fieldName) - (hash, 1.0) + if (!row.isNullAt(fieldIndex)) { + val (rawIdx, value) = if (realFields(colName)) { + val value = row.getDouble(fieldIndex) + val hash = hashFunc(colName) + (hash, value) + } else { + val value = row.getString(fieldIndex) + val fieldName = s"$colName=$value" + val hash = hashFunc(fieldName) + (hash, 1.0) + } + val idx = Utils.nonNegativeMod(rawIdx, n) + map.changeValue(idx, value, v => v + value) } - val idx = Utils.nonNegativeMod(rawIdx, n) - map.changeValue(idx, value, v => v + value) - (idx, value) } Vectors.sparse(n, map.toSeq) } diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/FeatureHasherSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/FeatureHasherSuite.scala index f56e387ea808..ef18b1b79043 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/FeatureHasherSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/FeatureHasherSuite.scala @@ -33,6 +33,8 @@ class FeatureHasherSuite extends SparkFunSuite import testImplicits._ import HashingTFSuite.murmur3FeatureIdx + implicit val vectorEncoder = ExpressionEncoder[Vector]() + test("params") { ParamsSuite.checkParams(new FeatureHasher) } @@ -47,8 +49,8 @@ class FeatureHasherSuite extends SparkFunSuite test("feature hashing") { val df = Seq( - (2, 2.0, 2.0f, "2", "foo"), - (3, 3.0, 3.0f, "3", "bar") + (3, 4.0, 5.0f, "1", "foo"), + (6, 7.0, 8.0f, "2", "bar") ).toDF("int", "double", "float", "stringNum", "string") val n = 100 @@ -60,17 +62,70 @@ class FeatureHasherSuite extends SparkFunSuite val attrGroup = AttributeGroup.fromStructField(output.schema("features")) require(attrGroup.numAttributes === Some(n)) - implicit val vectorEncoder = ExpressionEncoder[Vector]() val features = output.select("features").as[Vector].collect() - // Assume perfect hash on field names def idx: Any => Int = murmur3FeatureIdx(n) // check expected indices val expected = Seq( - Vectors.sparse(n, Seq((idx("int"), 2.0), (idx("double"), 2.0), (idx("float"), 2.0), - (idx("stringNum=2"), 1.0), (idx("string=foo"), 1.0))), - Vectors.sparse(n, Seq((idx("int"), 3.0), (idx("double"), 3.0), (idx("float"), 3.0), - (idx("stringNum=3"), 1.0), (idx("string=bar"), 1.0))) + Vectors.sparse(n, Seq((idx("int"), 3.0), (idx("double"), 4.0), (idx("float"), 5.0), + (idx("stringNum=1"), 1.0), (idx("string=foo"), 1.0))), + Vectors.sparse(n, Seq((idx("int"), 6.0), (idx("double"), 7.0), (idx("float"), 8.0), + (idx("stringNum=2"), 1.0), (idx("string=bar"), 1.0))) + ) + assert(features.zip(expected).forall { case (e, a) => e ~== a absTol 1e-14 }) + } + + test("hash collisions sum feature values") { + val df = Seq( + (1.0, "foo", "foo"), + (2.0, "bar", "baz") + ).toDF("double", "string1", "string2") + + val n = 1 + val featureHasher = new FeatureHasher() + .setInputCols("double", "string1", "string2") + .setOutputCol("features") + .setNumFeatures(n) + val output = featureHasher.transform(df) + + val features = output.select("features").as[Vector].collect() + def idx: Any => Int = murmur3FeatureIdx(n) + // everything should hash into one field + assert(idx("double") === idx("string1=foo")) + assert(idx("string1=foo") === idx("string2=foo")) + assert(idx("string2=foo") === idx("string1=bar")) + assert(idx("string1=bar") === idx("string2=baz")) + val expected = Seq( + Vectors.sparse(n, Seq((idx("string1=foo"), 3.0))), + Vectors.sparse(n, Seq((idx("string2=bar"), 4.0))) + ) + assert(features.zip(expected).forall { case (e, a) => e ~== a absTol 1e-14 }) + } + + test("ignores null values in feature hashing") { + import org.apache.spark.sql.functions._ + + val df = Seq( + (2.0, "foo", null), + (3.0, "bar", "baz") + ).toDF("double", "string1", "string2").select( + when(col("double") === 3.0, null).otherwise(col("double")).alias("double"), + col("string1"), + col("string2") + ) + + val n = 100 + val featureHasher = new FeatureHasher() + .setInputCols("double", "string1", "string2") + .setOutputCol("features") + .setNumFeatures(n) + val output = featureHasher.transform(df) + + val features = output.select("features").as[Vector].collect() + def idx: Any => Int = murmur3FeatureIdx(n) + val expected = Seq( + Vectors.sparse(n, Seq((idx("double"), 2.0), (idx("string1=foo"), 1.0))), + Vectors.sparse(n, Seq((idx("string1=bar"), 1.0), (idx("string2=baz"), 1.0))) ) assert(features.zip(expected).forall { case (e, a) => e ~== a absTol 1e-14 }) } From 8c5cb302012df7b2b39b22d77312625f06b779ec Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Wed, 12 Jul 2017 12:22:32 +0200 Subject: [PATCH 09/13] Revert "Move numFeatures to HasNumFeatures shared trait" This reverts commit 7d678fbf5f88d377b79153212a3e0a2596039b17. --- .../spark/ml/feature/FeatureHasher.scala | 19 +++++++++++++++-- .../apache/spark/ml/feature/HashingTF.scala | 21 +++++++++++++++---- .../ml/param/shared/SharedParamsCodeGen.scala | 4 +--- .../spark/ml/param/shared/sharedParams.scala | 17 --------------- 4 files changed, 35 insertions(+), 26 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala index db6a6fd4a57a..23ad01d262f1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala @@ -22,7 +22,7 @@ import org.apache.spark.ml.Transformer import org.apache.spark.ml.attribute.AttributeGroup import org.apache.spark.ml.linalg.Vectors import org.apache.spark.ml.param.{IntParam, ParamMap, ParamValidators} -import org.apache.spark.ml.param.shared.{HasInputCols, HasNumFeatures, HasOutputCol} +import org.apache.spark.ml.param.shared.{HasInputCols, HasOutputCol} import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable, SchemaUtils} import org.apache.spark.mllib.feature.{HashingTF => OldHashingTF} import org.apache.spark.sql.{DataFrame, Dataset, Row} @@ -34,11 +34,26 @@ import org.apache.spark.util.collection.OpenHashMap @Since("2.3.0") class FeatureHasher(@Since("2.3.0") override val uid: String) extends Transformer - with HasInputCols with HasOutputCol with HasNumFeatures with DefaultParamsWritable { + with HasInputCols with HasOutputCol with DefaultParamsWritable { @Since("2.3.0") def this() = this(Identifiable.randomUID("featureHasher")) + /** + * Number of features. Should be > 0. + * (default = 2^18^) + * @group param + */ + @Since("2.3.0") + val numFeatures = new IntParam(this, "numFeatures", "number of features (> 0)", + ParamValidators.gt(0)) + + setDefault(numFeatures -> (1 << 18)) + + /** @group getParam */ + @Since("2.3.0") + def getNumFeatures: Int = $(numFeatures) + /** @group setParam */ @Since("2.3.0") def setNumFeatures(value: Int): this.type = set(numFeatures, value) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala index 1f9f429dc643..db432b6fefaf 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala @@ -21,7 +21,7 @@ import org.apache.spark.annotation.Since import org.apache.spark.ml.Transformer import org.apache.spark.ml.attribute.AttributeGroup import org.apache.spark.ml.param._ -import org.apache.spark.ml.param.shared.{HasInputCol, HasNumFeatures, HasOutputCol} +import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol} import org.apache.spark.ml.util._ import org.apache.spark.mllib.feature import org.apache.spark.sql.{DataFrame, Dataset} @@ -37,8 +37,8 @@ import org.apache.spark.sql.types.{ArrayType, StructType} * otherwise the features will not be mapped evenly to the columns. */ @Since("1.2.0") -class HashingTF @Since("1.4.0") (@Since("1.4.0") override val uid: String) extends Transformer - with HasInputCol with HasOutputCol with HasNumFeatures with DefaultParamsWritable { +class HashingTF @Since("1.4.0") (@Since("1.4.0") override val uid: String) + extends Transformer with HasInputCol with HasOutputCol with DefaultParamsWritable { @Since("1.2.0") def this() = this(Identifiable.randomUID("hashingTF")) @@ -51,6 +51,15 @@ class HashingTF @Since("1.4.0") (@Since("1.4.0") override val uid: String) exten @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** + * Number of features. Should be greater than 0. + * (default = 2^18^) + * @group param + */ + @Since("1.2.0") + val numFeatures = new IntParam(this, "numFeatures", "number of features (> 0)", + ParamValidators.gt(0)) + /** * Binary toggle to control term frequency counts. * If true, all non-zero counts are set to 1. This is useful for discrete probabilistic @@ -63,7 +72,11 @@ class HashingTF @Since("1.4.0") (@Since("1.4.0") override val uid: String) exten "This is useful for discrete probabilistic models that model binary events rather " + "than integer counts") - setDefault(binary -> false) + setDefault(numFeatures -> (1 << 18), binary -> false) + + /** @group getParam */ + @Since("1.2.0") + def getNumFeatures: Int = $(numFeatures) /** @group setParam */ @Since("1.2.0") diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala index cf65b9ab2518..013817a41baf 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala @@ -83,9 +83,7 @@ private[shared] object SharedParamsCodeGen { ParamDesc[String]("solver", "the solver algorithm for optimization. If this is not set or " + "empty, default value is 'auto'", Some("\"auto\"")), ParamDesc[Int]("aggregationDepth", "suggested depth for treeAggregate (>= 2)", Some("2"), - isValid = "ParamValidators.gtEq(2)", isExpertParam = true), - ParamDesc[Int]("numFeatures", "number of features (> 0)", Some("1<<18"), - isValid = "ParamValidators.gt(0)")) + isValid = "ParamValidators.gtEq(2)", isExpertParam = true)) val code = genSharedParams(params) val file = "src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala" diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala index 488153f8bec2..50619607a505 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala @@ -406,21 +406,4 @@ private[ml] trait HasAggregationDepth extends Params { /** @group expertGetParam */ final def getAggregationDepth: Int = $(aggregationDepth) } - -/** - * Trait for shared param numFeatures (default: 1<<18). - */ -private[ml] trait HasNumFeatures extends Params { - - /** - * Param for number of features (> 0). - * @group param - */ - final val numFeatures: IntParam = new IntParam(this, "numFeatures", "number of features (> 0)", ParamValidators.gt(0)) - - setDefault(numFeatures, 1<<18) - - /** @group getParam */ - final def getNumFeatures: Int = $(numFeatures) -} // scalastyle:on From b580a5c80421256e8d82f4e7cda7879ecc59bbbd Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Wed, 12 Jul 2017 15:25:16 +0200 Subject: [PATCH 10/13] Address various review comments --- .../spark/ml/feature/FeatureHasher.scala | 89 +++++++++++++---- .../spark/ml/feature/FeatureHasherSuite.scala | 95 +++++++++++++++---- 2 files changed, 144 insertions(+), 40 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala index 23ad01d262f1..fa993288a2be 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala @@ -31,7 +31,46 @@ import org.apache.spark.sql.types._ import org.apache.spark.util.Utils import org.apache.spark.util.collection.OpenHashMap - +/** + * Feature hashing projects a set of categorical or numerical features into a feature vector of + * specified dimension (typically substantially smaller than that of the original feature + * space). This is done using the hashing trick (https://en.wikipedia.org/wiki/Feature_hashing) + * to map features to indices in the feature vector. + * + * The [[FeatureHasher]] transformer operates on multiple columns. Each column may be numeric + * (representing a real feature) or string (representing a categorical feature). Boolean columns + * are also supported, and treated as categorical features. For numeric features, the hash value of + * the column name is used to map the feature value to its index in the feature vector. + * For categorical features, the hash value of the string "column_name=value" is used to map to the + * vector index, with an indicator value of `1.0`. Thus, categorical features are "one-hot" encoded + * (similarly to using [[OneHotEncoder]] with `dropLast=false`). + * + * Null (missing) values are ignored (implicitly zero in the resulting feature vector). + * + * Since a simple modulo is used to transform the hash function to a vector index, + * it is advisable to use a power of two as the numFeatures parameter; + * otherwise the features will not be mapped evenly to the vector indices. + * + * {{{ + * val df = Seq( + * (2.0, true, "1", "foo"), + * (3.0, false, "2", "bar") + * ).toDF("real", "bool", "stringNum", "string") + * + * val hasher = new FeatureHasher() + * .setInputCols("real", "bool", "stringNum", "num") + * .setOutputCol("features") + * + * hasher.transform(df).show() + * + * +----+-----+---------+------+--------------------+ + * |real| bool|stringNum|string| features| + * +----+-----+---------+------+--------------------+ + * | 2.0| true| 1| foo|(262144,[51871,63...| + * | 3.0|false| 2| bar|(262144,[6031,806...| + * +----+-----+---------+------+--------------------+ + * }}} + */ @Since("2.3.0") class FeatureHasher(@Since("2.3.0") override val uid: String) extends Transformer with HasInputCols with HasOutputCol with DefaultParamsWritable { @@ -40,7 +79,7 @@ class FeatureHasher(@Since("2.3.0") override val uid: String) extends Transforme def this() = this(Identifiable.randomUID("featureHasher")) /** - * Number of features. Should be > 0. + * Number of features. Should be greater than 0. * (default = 2^18^) * @group param */ @@ -74,29 +113,35 @@ class FeatureHasher(@Since("2.3.0") override val uid: String) extends Transforme val hashFunc: Any => Int = OldHashingTF.murmur3Hash val n = $(numFeatures) - val os = transformSchema(dataset.schema) - - val featureCols = $(inputCols).map { colName => - val field = dataset.schema(colName) - field.dataType match { - case DoubleType | StringType => dataset(field.name) - case _: NumericType | BooleanType => dataset(field.name).cast(DoubleType).alias(field.name) + val outputSchema = transformSchema(dataset.schema) + val realFields = outputSchema.fields.filter { f => + f.dataType.isInstanceOf[NumericType] + }.map(_.name).toSet + + def getDouble(x: Any): Double = { + x match { + case n: java.lang.Number => + n.doubleValue() + case other => + // will throw ClassCastException if it cannot be cast, as would row.getDouble + other.asInstanceOf[Double] } } - val realFields = os.fields.filter(f => f.dataType.isInstanceOf[NumericType]).map(_.name).toSet - - def hashFeatures = udf { row: Row => + val hashFeatures = udf { row: Row => val map = new OpenHashMap[Int, Double]() $(inputCols).foreach { case colName => val fieldIndex = row.fieldIndex(colName) if (!row.isNullAt(fieldIndex)) { val (rawIdx, value) = if (realFields(colName)) { - val value = row.getDouble(fieldIndex) + // numeric values are kept as is, with vector index based on hash of "column_name" + val value = getDouble(row.get(fieldIndex)) val hash = hashFunc(colName) (hash, value) } else { - val value = row.getString(fieldIndex) + // string and boolean values are treated as categorical, with an indicator value of 1.0 + // and vector index based on hash of "column_name=value" + val value = row.get(fieldIndex).toString val fieldName = s"$colName=$value" val hash = hashFunc(fieldName) (hash, 1.0) @@ -108,19 +153,25 @@ class FeatureHasher(@Since("2.3.0") override val uid: String) extends Transforme Vectors.sparse(n, map.toSeq) } - val metadata = os($(outputCol)).metadata + val metadata = outputSchema($(outputCol)).metadata dataset.select( col("*"), - hashFeatures(struct(featureCols: _*)).as($(outputCol), metadata)) + hashFeatures(struct($(inputCols).map(col(_)): _*)).as($(outputCol), metadata)) } override def copy(extra: ParamMap): FeatureHasher = defaultCopy(extra) override def transformSchema(schema: StructType): StructType = { val fields = schema($(inputCols).toSet) - require(fields.map(_.dataType).forall { case dt => - dt.isInstanceOf[NumericType] || dt.isInstanceOf[StringType] - }) + fields.foreach { case fieldSchema => + val dataType = fieldSchema.dataType + val fieldName = fieldSchema.name + require(dataType.isInstanceOf[NumericType] || + dataType.isInstanceOf[StringType] || + dataType.isInstanceOf[BooleanType], + s"FeatureHasher requires columns to be of NumericType, BooleanType or StringType. " + + s"Column $fieldName was $dataType") + } val attrGroup = new AttributeGroup($(outputCol), $(numFeatures)) SchemaUtils.appendColumn(schema, attrGroup.toStructField()) } diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/FeatureHasherSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/FeatureHasherSuite.scala index ef18b1b79043..06cfabf52198 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/FeatureHasherSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/FeatureHasherSuite.scala @@ -25,12 +25,15 @@ import org.apache.spark.ml.util.DefaultReadWriteTest import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.types._ class FeatureHasherSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { import testImplicits._ + import HashingTFSuite.murmur3FeatureIdx implicit val vectorEncoder = ExpressionEncoder[Vector]() @@ -49,16 +52,16 @@ class FeatureHasherSuite extends SparkFunSuite test("feature hashing") { val df = Seq( - (3, 4.0, 5.0f, "1", "foo"), - (6, 7.0, 8.0f, "2", "bar") - ).toDF("int", "double", "float", "stringNum", "string") + (2.0, true, "1", "foo"), + (3.0, false, "2", "bar") + ).toDF("real", "bool", "stringNum", "string") val n = 100 - val featureHasher = new FeatureHasher() - .setInputCols("int", "double", "float", "stringNum", "string") + val hasher = new FeatureHasher() + .setInputCols("real", "bool", "stringNum", "string") .setOutputCol("features") .setNumFeatures(n) - val output = featureHasher.transform(df) + val output = hasher.transform(df) val attrGroup = AttributeGroup.fromStructField(output.schema("features")) require(attrGroup.numAttributes === Some(n)) @@ -67,31 +70,63 @@ class FeatureHasherSuite extends SparkFunSuite def idx: Any => Int = murmur3FeatureIdx(n) // check expected indices val expected = Seq( - Vectors.sparse(n, Seq((idx("int"), 3.0), (idx("double"), 4.0), (idx("float"), 5.0), + Vectors.sparse(n, Seq((idx("real"), 2.0), (idx("bool=true"), 1.0), (idx("stringNum=1"), 1.0), (idx("string=foo"), 1.0))), - Vectors.sparse(n, Seq((idx("int"), 6.0), (idx("double"), 7.0), (idx("float"), 8.0), + Vectors.sparse(n, Seq((idx("real"), 3.0), (idx("bool=false"), 1.0), (idx("stringNum=2"), 1.0), (idx("string=bar"), 1.0))) ) assert(features.zip(expected).forall { case (e, a) => e ~== a absTol 1e-14 }) } + test("hashing works for all numeric types") { + val df = Seq(5.0, 10.0, 15.0).toDF("real") + + val hasher = new FeatureHasher() + .setInputCols("real") + .setOutputCol("features") + + val expectedResult = hasher.transform(df).select("features").as[Vector].collect() + // check all numeric types work as expected. String & boolean types are tested in default case + val types = + Seq(ShortType, LongType, IntegerType, FloatType, ByteType, DoubleType, DecimalType(10, 0)) + types.foreach { t => + val castDF = df.select(col("real").cast(t)) + val castResult = hasher.transform(castDF).select("features").as[Vector].collect() + withClue(s"FeatureHasher works for all numeric types (testing $t): ") { + assert(castResult.zip(expectedResult).forall { case (actual, expected) => + actual ~== expected absTol 1e-14 + }) + } + } + } + + test("invalid input type should fail") { + val df = Seq( + Vectors.dense(1), + Vectors.dense(2) + ).toDF("vec") + + intercept[IllegalArgumentException] { + new FeatureHasher().setInputCols("vec").transform(df) + } + } + test("hash collisions sum feature values") { val df = Seq( (1.0, "foo", "foo"), (2.0, "bar", "baz") - ).toDF("double", "string1", "string2") + ).toDF("real", "string1", "string2") val n = 1 - val featureHasher = new FeatureHasher() - .setInputCols("double", "string1", "string2") + val hasher = new FeatureHasher() + .setInputCols("real", "string1", "string2") .setOutputCol("features") .setNumFeatures(n) - val output = featureHasher.transform(df) - val features = output.select("features").as[Vector].collect() + val features = hasher.transform(df).select("features").as[Vector].collect() def idx: Any => Int = murmur3FeatureIdx(n) // everything should hash into one field - assert(idx("double") === idx("string1=foo")) + assert(idx("real") === idx("string1=foo")) assert(idx("string1=foo") === idx("string2=foo")) assert(idx("string2=foo") === idx("string1=bar")) assert(idx("string1=bar") === idx("string2=baz")) @@ -108,28 +143,46 @@ class FeatureHasherSuite extends SparkFunSuite val df = Seq( (2.0, "foo", null), (3.0, "bar", "baz") - ).toDF("double", "string1", "string2").select( - when(col("double") === 3.0, null).otherwise(col("double")).alias("double"), + ).toDF("real", "string1", "string2").select( + when(col("real") === 3.0, null).otherwise(col("real")).alias("real"), col("string1"), col("string2") ) val n = 100 - val featureHasher = new FeatureHasher() - .setInputCols("double", "string1", "string2") + val hasher = new FeatureHasher() + .setInputCols("real", "string1", "string2") .setOutputCol("features") .setNumFeatures(n) - val output = featureHasher.transform(df) - val features = output.select("features").as[Vector].collect() + val features = hasher.transform(df).select("features").as[Vector].collect() def idx: Any => Int = murmur3FeatureIdx(n) val expected = Seq( - Vectors.sparse(n, Seq((idx("double"), 2.0), (idx("string1=foo"), 1.0))), + Vectors.sparse(n, Seq((idx("real"), 2.0), (idx("string1=foo"), 1.0))), Vectors.sparse(n, Seq((idx("string1=bar"), 1.0), (idx("string2=baz"), 1.0))) ) assert(features.zip(expected).forall { case (e, a) => e ~== a absTol 1e-14 }) } + test("unicode column names and values") { + // scalastyle:off nonascii + val df = Seq((2.0, "中文")).toDF("中文", "unicode") + + val n = 100 + val hasher = new FeatureHasher() + .setInputCols("中文", "unicode") + .setOutputCol("features") + .setNumFeatures(n) + + val features = hasher.transform(df).select("features").as[Vector].collect() + def idx: Any => Int = murmur3FeatureIdx(n) + val expected = Seq( + Vectors.sparse(n, Seq((idx("中文"), 2.0), (idx("unicode=中文"), 1.0))) + ) + assert(features.zip(expected).forall { case (e, a) => e ~== a absTol 1e-14 }) + // scalastyle:on nonascii + } + test("read/write") { val t = new FeatureHasher() .setInputCols(Array("myCol1", "myCol2", "myCol3")) From 990b816428f8e5b94c08749650be05a3f52d07db Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Tue, 18 Jul 2017 09:21:04 +0200 Subject: [PATCH 11/13] Address review comments 2 --- .../org/apache/spark/ml/feature/FeatureHasher.scala | 10 +++++++--- .../apache/spark/ml/feature/FeatureHasherSuite.scala | 4 ++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala index fa993288a2be..57660d52b71d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala @@ -109,9 +109,11 @@ class FeatureHasher(@Since("2.3.0") override val uid: String) extends Transforme @Since("2.3.0") def setOutputCol(value: String): this.type = set(outputCol, value) + @Since("2.3.0") override def transform(dataset: Dataset[_]): DataFrame = { val hashFunc: Any => Int = OldHashingTF.murmur3Hash val n = $(numFeatures) + val localInputCols = $(inputCols) val outputSchema = transformSchema(dataset.schema) val realFields = outputSchema.fields.filter { f => @@ -130,7 +132,7 @@ class FeatureHasher(@Since("2.3.0") override val uid: String) extends Transforme val hashFeatures = udf { row: Row => val map = new OpenHashMap[Int, Double]() - $(inputCols).foreach { case colName => + localInputCols.foreach { colName => val fieldIndex = row.fieldIndex(colName) if (!row.isNullAt(fieldIndex)) { val (rawIdx, value) = if (realFields(colName)) { @@ -156,14 +158,16 @@ class FeatureHasher(@Since("2.3.0") override val uid: String) extends Transforme val metadata = outputSchema($(outputCol)).metadata dataset.select( col("*"), - hashFeatures(struct($(inputCols).map(col(_)): _*)).as($(outputCol), metadata)) + hashFeatures(struct($(inputCols).map(col): _*)).as($(outputCol), metadata)) } + @Since("2.3.0") override def copy(extra: ParamMap): FeatureHasher = defaultCopy(extra) + @Since("2.3.0") override def transformSchema(schema: StructType): StructType = { val fields = schema($(inputCols).toSet) - fields.foreach { case fieldSchema => + fields.foreach { fieldSchema => val dataType = fieldSchema.dataType val fieldName = fieldSchema.name require(dataType.isInstanceOf[NumericType] || diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/FeatureHasherSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/FeatureHasherSuite.scala index 06cfabf52198..407371a82666 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/FeatureHasherSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/FeatureHasherSuite.scala @@ -36,7 +36,7 @@ class FeatureHasherSuite extends SparkFunSuite import HashingTFSuite.murmur3FeatureIdx - implicit val vectorEncoder = ExpressionEncoder[Vector]() + implicit private val vectorEncoder = ExpressionEncoder[Vector]() test("params") { ParamsSuite.checkParams(new FeatureHasher) @@ -63,7 +63,7 @@ class FeatureHasherSuite extends SparkFunSuite .setNumFeatures(n) val output = hasher.transform(df) val attrGroup = AttributeGroup.fromStructField(output.schema("features")) - require(attrGroup.numAttributes === Some(n)) + assert(attrGroup.numAttributes === Some(n)) val features = output.select("features").as[Vector].collect() // Assume perfect hash on field names From a91b53f7482b8a05734e77f42491a70f1e3e77f1 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Tue, 25 Jul 2017 13:21:03 +0200 Subject: [PATCH 12/13] Update doc string with more detailed behavior info --- .../spark/ml/feature/FeatureHasher.scala | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala index 57660d52b71d..ef1d1f309674 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala @@ -37,13 +37,19 @@ import org.apache.spark.util.collection.OpenHashMap * space). This is done using the hashing trick (https://en.wikipedia.org/wiki/Feature_hashing) * to map features to indices in the feature vector. * - * The [[FeatureHasher]] transformer operates on multiple columns. Each column may be numeric - * (representing a real feature) or string (representing a categorical feature). Boolean columns - * are also supported, and treated as categorical features. For numeric features, the hash value of - * the column name is used to map the feature value to its index in the feature vector. - * For categorical features, the hash value of the string "column_name=value" is used to map to the - * vector index, with an indicator value of `1.0`. Thus, categorical features are "one-hot" encoded - * (similarly to using [[OneHotEncoder]] with `dropLast=false`). + * The [[FeatureHasher]] transformer operates on multiple columns. Each column may contain either + * numeric or categorical features. Behavior and handling of column data types is as follows: + * -Numeric columns: For numeric features, the hash value of the column name is used to map the + * feature value to its index in the feature vector. Numeric features are never + * treated as categorical, even when they are integers. You must explicitly + * convert numeric columns containing categorical features to strings first. + * -String columns: For categorical features, the hash value of the string "column_name=value" + * is used to map to the vector index, with an indicator value of `1.0`. + * Thus, categorical features are "one-hot" encoded + * (similarly to using [[OneHotEncoder]] with `dropLast=false`). + * -Boolean columns: Boolean values are treated in the same way as string columns. That is, + * boolean features are represented as "column_name=true" or "column_name=false", + * with an indicator value of `1.0`. * * Null (missing) values are ignored (implicitly zero in the resulting feature vector). * From d6a311748486490215264fbdc0a6f8cb4cf7e6e1 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Wed, 26 Jul 2017 09:29:50 +0200 Subject: [PATCH 13/13] Add @Experimental tag --- .../main/scala/org/apache/spark/ml/feature/FeatureHasher.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala index ef1d1f309674..d22bf164c313 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala @@ -17,7 +17,7 @@ package org.apache.spark.ml.feature -import org.apache.spark.annotation.Since +import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.ml.Transformer import org.apache.spark.ml.attribute.AttributeGroup import org.apache.spark.ml.linalg.Vectors @@ -77,6 +77,7 @@ import org.apache.spark.util.collection.OpenHashMap * +----+-----+---------+------+--------------------+ * }}} */ +@Experimental @Since("2.3.0") class FeatureHasher(@Since("2.3.0") override val uid: String) extends Transformer with HasInputCols with HasOutputCol with DefaultParamsWritable {