From e8f5d89ab2c344d52ae245b1b22cb4425ae6ffa0 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 1 May 2017 14:07:10 +0000 Subject: [PATCH 01/11] Add a Bucketizer that can bin multiple columns. --- .../apache/spark/ml/feature/Bucketizer.scala | 148 +++++++++++++++++- .../org/apache/spark/ml/param/params.scala | 39 +++++ .../spark/ml/feature/BucketizerSuite.scala | 2 +- .../apache/spark/ml/param/ParamsSuite.scala | 38 ++++- .../scala/org/apache/spark/sql/Dataset.scala | 50 ++++++ .../org/apache/spark/sql/DataFrameSuite.scala | 20 +++ 6 files changed, 291 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala index d1f3b2af1e482..57f995591c9b0 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala @@ -24,7 +24,7 @@ import org.apache.spark.annotation.Since import org.apache.spark.ml.Model import org.apache.spark.ml.attribute.NominalAttribute import org.apache.spark.ml.param._ -import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol} +import org.apache.spark.ml.param.shared.{HasInputCol, HasInputCols, HasOutputCol} import org.apache.spark.ml.util._ import org.apache.spark.sql._ import org.apache.spark.sql.expressions.UserDefinedFunction @@ -140,6 +140,139 @@ final class Bucketizer @Since("1.4.0") (@Since("1.4.0") override val uid: String } } +/** + * `MultipleBucketizer` maps columns of continuous features to columns of feature buckets. + */ +@Since("2.3.0") +final class MultipleBucketizer @Since("2.3.0") (@Since("2.3.0") override val uid: String) + extends Model[MultipleBucketizer] with HasInputCols with DefaultParamsWritable { + + @Since("2.3.0") + def this() = this(Identifiable.randomUID("multipleBucketizer")) + + /** + * Parameter for mapping continuous features into buckets. With n+1 splits, there are n buckets. + * A bucket defined by splits x,y holds values in the range [x,y) except the last bucket, which + * also includes y. Splits should be of length greater than or equal to 3 and strictly increasing. + * Values at -inf, inf must be explicitly provided to cover all Double values; + * otherwise, values outside the splits specified will be treated as errors. + * + * See also [[handleInvalid]], which can optionally create an additional bucket for NaN values. + * + * @group param + */ + @Since("2.3.0") + val splitsArray: DoubleArrayArrayParam = new DoubleArrayArrayParam(this, "splitsArray", + "The array of split points for mapping continuous features into buckets for multiple " + + "columns. For each input column, with n+1 splits, there are n buckets. A bucket defined by " + + "splits x,y holds values in the range [x,y) except the last bucket, which also includes y. " + + "The splits should be of length >= 3 and strictly increasing. Values at -inf, inf must be " + + "explicitly provided to cover all Double values; otherwise, values outside the splits " + + "specified will be treated as errors.", + Bucketizer.checkSplitsArray) + + /** + * Param for output column names. + * @group param + */ + @Since("2.3.0") + final val outputCols: StringArrayParam = new StringArrayParam(this, "outputCols", + "output column names") + + /** @group getParam */ + @Since("2.3.0") + def getSplitsArray: Array[Array[Double]] = $(splitsArray) + + /** @group getParam */ + @Since("2.3.0") + final def getOutputCols: Array[String] = $(outputCols) + + /** @group setParam */ + @Since("2.3.0") + def setSplitsArray(value: Array[Array[Double]]): this.type = set(splitsArray, value) + + /** @group setParam */ + @Since("2.3.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + + /** @group setParam */ + @Since("2.3.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + /** + * Param for how to handle invalid entries. Options are 'skip' (filter out rows with + * invalid values), 'error' (throw an error), or 'keep' (keep invalid values in a special + * additional bucket). + * Default: "error" + * @group param + */ + // TODO: Make MultipleBucketizer inherit from HasHandleInvalid. + @Since("2.3.0") + val handleInvalid: Param[String] = new Param[String](this, "handleInvalid", "how to handle " + + "invalid entries. Options are skip (filter out rows with invalid values), " + + "error (throw an error), or keep (keep invalid values in a special additional bucket).", + ParamValidators.inArray(Bucketizer.supportedHandleInvalids)) + + /** @group getParam */ + @Since("2.3.0") + def getHandleInvalid: String = $(handleInvalid) + + /** @group setParam */ + @Since("2.3.0") + def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + setDefault(handleInvalid, Bucketizer.ERROR_INVALID) + + @Since("2.3.0") + override def transform(dataset: Dataset[_]): DataFrame = { + transformSchema(dataset.schema) + val (filteredDataset, keepInvalid) = { + if (getHandleInvalid == Bucketizer.SKIP_INVALID) { + // "skip" NaN option is set, will filter out NaN values in the dataset + (dataset.na.drop().toDF(), false) + } else { + (dataset.toDF(), getHandleInvalid == Bucketizer.KEEP_INVALID) + } + } + + val bucketizers: Seq[UserDefinedFunction] = $(splitsArray).map { splits => + udf { (feature: Double) => + Bucketizer.binarySearchForBuckets(splits, feature, keepInvalid) + } + } + + val newCols = $(inputCols).zipWithIndex.map { case (inputCol, idx) => + bucketizers(idx)(filteredDataset(inputCol)) + } + val newFields = $(outputCols).zipWithIndex.map { case (outputCol, idx) => + prepOutputField(idx, outputCol) + } + filteredDataset.withColumns($(outputCols), newCols, newFields.map(_.metadata)) + } + + private def prepOutputField(idx: Int, outputCol: String): StructField = { + val buckets = $(splitsArray)(idx).sliding(2).map(bucket => bucket.mkString(", ")).toArray + val attr = new NominalAttribute(name = Some(outputCol), isOrdinal = Some(true), + values = Some(buckets)) + attr.toStructField() + } + + @Since("2.3.0") + override def transformSchema(schema: StructType): StructType = { + var transformedSchema = schema + $(inputCols).zip($(outputCols)).zipWithIndex.map { case ((inputCol, outputCol), idx) => + SchemaUtils.checkColumnType(transformedSchema, inputCol, DoubleType) + transformedSchema = SchemaUtils.appendColumn(transformedSchema, + prepOutputField(idx, outputCol)) + } + transformedSchema + } + + @Since("2.3.0") + override def copy(extra: ParamMap): MultipleBucketizer = { + defaultCopy[MultipleBucketizer](extra).setParent(parent) + } +} + @Since("1.6.0") object Bucketizer extends DefaultParamsReadable[Bucketizer] { @@ -167,6 +300,13 @@ object Bucketizer extends DefaultParamsReadable[Bucketizer] { } } + /** + * Check each splits in the splits array. + */ + private[feature] def checkSplitsArray(splitsArray: Array[Array[Double]]): Boolean = { + splitsArray.forall(checkSplits(_)) + } + /** * Binary searching in several buckets to place each data point. * @param splits array of split points @@ -211,3 +351,9 @@ object Bucketizer extends DefaultParamsReadable[Bucketizer] { @Since("1.6.0") override def load(path: String): Bucketizer = super.load(path) } + +@Since("2.3.0") +object MultipleBucketizer extends DefaultParamsReadable[MultipleBucketizer] { + @Since("2.3.0") + override def load(path: String): MultipleBucketizer = super.load(path) +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala index 12ad800206463..97a9bb4653b4e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala @@ -490,6 +490,45 @@ class DoubleArrayParam(parent: Params, name: String, doc: String, isValid: Array } } +/** + * :: DeveloperApi :: + * Specialized version of `Param[Array[Array[Double]]]` for Java. + */ +@DeveloperApi +class DoubleArrayArrayParam( + parent: Params, + name: String, + doc: String, + isValid: Array[Array[Double]] => Boolean) + extends Param[Array[Array[Double]]](parent, name, doc, isValid) { + + def this(parent: Params, name: String, doc: String) = + this(parent, name, doc, ParamValidators.alwaysTrue) + + /** Creates a param pair with a `java.util.List` of values (for Java and Python). */ + def w(value: java.util.List[java.util.List[java.lang.Double]]): ParamPair[Array[Array[Double]]] = + w(value.asScala.map(_.asScala.map(_.asInstanceOf[Double]).toArray).toArray) + + override def jsonEncode(value: Array[Array[Double]]): String = { + import org.json4s.JsonDSL._ + compact(render(value.toSeq.map(_.toSeq.map(DoubleParam.jValueEncode)))) + } + + override def jsonDecode(json: String): Array[Array[Double]] = { + parse(json) match { + case JArray(values) => + values.map { + case JArray(values) => + values.map(DoubleParam.jValueDecode).toArray + case _ => + throw new IllegalArgumentException(s"Cannot decode $json to Array[Array[Double]].") + }.toArray + case _ => + throw new IllegalArgumentException(s"Cannot decode $json to Array[Array[Double]].") + } + } +} + /** * :: DeveloperApi :: * Specialized version of `Param[Array[Int]]` for Java. diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala index aac29137d7911..fd7185b487aed 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala @@ -22,7 +22,7 @@ import scala.util.Random import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.ml.linalg.Vectors import org.apache.spark.ml.param.ParamsSuite -import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} +import org.apache.spark.ml.util.DefaultReadWriteTest import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.sql.{DataFrame, Row} diff --git a/mllib/src/test/scala/org/apache/spark/ml/param/ParamsSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/param/ParamsSuite.scala index 78a33e05e0e48..85198ad4c913a 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/param/ParamsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/param/ParamsSuite.scala @@ -121,10 +121,10 @@ class ParamsSuite extends SparkFunSuite { { // DoubleArrayParam val param = new DoubleArrayParam(dummy, "name", "doc") val values: Seq[Array[Double]] = Seq( - Array(), - Array(1.0), - Array(Double.NaN, Double.NegativeInfinity, Double.MinValue, -1.0, 0.0, - Double.MinPositiveValue, 1.0, Double.MaxValue, Double.PositiveInfinity)) + Array(), + Array(1.0), + Array(Double.NaN, Double.NegativeInfinity, Double.MinValue, -1.0, 0.0, + Double.MinPositiveValue, 1.0, Double.MaxValue, Double.PositiveInfinity)) for (value <- values) { val json = param.jsonEncode(value) val decoded = param.jsonDecode(json) @@ -139,6 +139,36 @@ class ParamsSuite extends SparkFunSuite { } } + { // DoubleArrayArrayParam + val param = new DoubleArrayArrayParam(dummy, "name", "doc") + val values: Seq[Array[Array[Double]]] = Seq( + Array(Array()), + Array(Array(1.0)), + Array(Array(1.0), Array(2.0)), + Array( + Array(Double.NaN, Double.NegativeInfinity, Double.MinValue, -1.0, 0.0, + Double.MinPositiveValue, 1.0, Double.MaxValue, Double.PositiveInfinity), + Array(Double.MaxValue, Double.PositiveInfinity, Double.MinPositiveValue, 1.0, + Double.NaN, Double.NegativeInfinity, Double.MinValue, -1.0, 0.0) + )) + + for (value <- values) { + val json = param.jsonEncode(value) + val decoded = param.jsonDecode(json) + assert(decoded.length === value.length) + decoded.zip(value).foreach { case (actualArray, expectedArray) => + assert(actualArray.length === expectedArray.length) + actualArray.zip(expectedArray).foreach { case (actual, expected) => + if (expected.isNaN) { + assert(actual.isNaN) + } else { + assert(actual === expected) + } + } + } + } + } + { // StringArrayParam val param = new StringArrayParam(dummy, "name", "doc") val values: Seq[Array[String]] = Seq( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index c6dcd93bbda66..93c75dc9b2b22 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1882,6 +1882,56 @@ class Dataset[T] private[sql]( } } + /** + * Returns a new Dataset by adding columns or replacing the existing columns that has + * the same names. + * + * @group untypedrel + * @since 2.3.0 + */ + def withColumns(colNames: Seq[String], cols: Seq[Column]): DataFrame = { + assert(colNames.size == cols.size, + s"The size of column names: ${colNames.size} isn't equal to " + + s"the size of columns: ${cols.size}") + + val resolver = sparkSession.sessionState.analyzer.resolver + val output = queryExecution.analyzed.output + + val columnMap = colNames.zip(cols).toMap + + val replacedAndExistingColumns = output.map { field => + val dupColumn = columnMap.find { case (colName, col) => + resolver(field.name, colName) + } + if (dupColumn.isDefined) { + val colName = dupColumn.get._1 + val col = dupColumn.get._2 + col.as(colName) + } else { + Column(field) + } + } + + val newColumns = columnMap.filter { case (colName, col) => + !output.exists(f => resolver(f.name, colName)) + }.map { case (colName, col) => col.as(colName) } + + select(replacedAndExistingColumns ++ newColumns : _*) + } + + /** + * Returns a new Dataset by adding columns with metadata. + */ + private[spark] def withColumns( + colNames: Seq[String], + cols: Seq[Column], + metadata: Seq[Metadata]): DataFrame = { + val newCols = colNames.zip(cols).zip(metadata).map { case ((colName, col), metadata) => + col.as(colName, metadata) + } + withColumns(colNames, newCols) + } + /** * Returns a new Dataset by adding a column with metadata. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index b4893b56a8a84..b82745c09a9b2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -555,6 +555,17 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { assert(df.schema.map(_.name) === Seq("key", "value", "newCol")) } + test("withColumns") { + val df = testData.toDF().withColumns(Seq("newCol1", "newCol2"), + Seq(col("key") + 1, col("key") + 2)) + checkAnswer( + df, + testData.collect().map { case Row(key: Int, value: String) => + Row(key, value, key + 1, key + 2) + }.toSeq) + assert(df.schema.map(_.name) === Seq("key", "value", "newCol1", "newCol2")) + } + test("replace column using withColumn") { val df2 = sparkContext.parallelize(Array(1, 2, 3)).toDF("x") val df3 = df2.withColumn("x", df2("x") + 1) @@ -563,6 +574,15 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { Row(2) :: Row(3) :: Row(4) :: Nil) } + test("replace column using withColumns") { + val df2 = sparkContext.parallelize(Array((1, 2), (2, 3), (3, 4))).toDF("x", "y") + val df3 = df2.withColumns(Seq("x", "newCol1", "newCol2"), + Seq(df2("x") + 1, df2("y"), df2("y") + 1)) + checkAnswer( + df3.select("x", "newCol1", "newCol2"), + Row(2, 2, 3) :: Row(3, 3, 4) :: Row(4, 4, 5) :: Nil) + } + test("drop column using drop") { val df = testData.drop("key") checkAnswer( From 38dce8b30ee3119d5c4c3c761e527fca5c2979f5 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 2 May 2017 09:26:49 +0000 Subject: [PATCH 02/11] Make withColumns as private[spark]. --- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 93c75dc9b2b22..e431253b0ebd0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1885,11 +1885,8 @@ class Dataset[T] private[sql]( /** * Returns a new Dataset by adding columns or replacing the existing columns that has * the same names. - * - * @group untypedrel - * @since 2.3.0 */ - def withColumns(colNames: Seq[String], cols: Seq[Column]): DataFrame = { + private[spark] def withColumns(colNames: Seq[String], cols: Seq[Column]): DataFrame = { assert(colNames.size == cols.size, s"The size of column names: ${colNames.size} isn't equal to " + s"the size of columns: ${cols.size}") From 6ff9c7998688107a835875ea41e6fe9576a1558c Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 3 May 2017 03:21:37 +0000 Subject: [PATCH 03/11] Add test for MultipleBucketizer. --- .../ml/feature/MultipleBucketizerSuite.scala | 184 ++++++++++++++++++ 1 file changed, 184 insertions(+) create mode 100644 mllib/src/test/scala/org/apache/spark/ml/feature/MultipleBucketizerSuite.scala diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/MultipleBucketizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/MultipleBucketizerSuite.scala new file mode 100644 index 0000000000000..261b9fda588be --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/MultipleBucketizerSuite.scala @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import scala.util.Random + +import org.apache.spark.{SparkException, SparkFunSuite} +import org.apache.spark.ml.linalg.Vectors +import org.apache.spark.ml.param.ParamsSuite +import org.apache.spark.ml.util.DefaultReadWriteTest +import org.apache.spark.ml.util.TestingUtils._ +import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.sql.{DataFrame, Row} + +class MultipleBucketizerSuite extends SparkFunSuite with MLlibTestSparkContext + with DefaultReadWriteTest { + + import testImplicits._ + + test("params") { + ParamsSuite.checkParams(new MultipleBucketizer) + } + + test("Bucket continuous features, without -inf,inf") { + // Check a set of valid feature values. + val splits = Array(Array(-0.5, 0.0, 0.5), Array(-0.1, 0.3, 0.5)) + val validData1 = Array(-0.5, -0.3, 0.0, 0.2) + val validData2 = Array(0.5, 0.3, 0.0, -0.1) + val expectedBuckets1 = Array(0.0, 0.0, 1.0, 1.0) + val expectedBuckets2 = Array(1.0, 1.0, 0.0, 0.0) + + val data = (0 until validData1.length).map { idx => + (validData1(idx), validData2(idx), expectedBuckets1(idx), expectedBuckets2(idx)) + } + val dataFrame: DataFrame = data.toSeq.toDF("feature1", "feature2", "expected1", "expected2") + + val bucketizer1: MultipleBucketizer = new MultipleBucketizer() + .setInputCols(Array("feature1", "feature2")) + .setOutputCols(Array("result1", "result2")) + .setSplitsArray(splits) + + bucketizer1.transform(dataFrame).select("result1", "expected1", "result2", "expected2") + .collect().foreach { + case Row(r1: Double, e1: Double, r2: Double, e2: Double) => + assert(r1 === e1, + s"The feature value is not correct after bucketing. Expected $e1 but found $r1") + assert(r2 === e2, + s"The feature value is not correct after bucketing. Expected $e2 but found $r2") + } + + // Check for exceptions when using a set of invalid feature values. + val invalidData1: Array[Double] = Array(-0.9) ++ validData1 + val invalidData2 = Array(0.51) ++ validData1 + val badDF1 = invalidData1.zipWithIndex.toSeq.toDF("feature", "idx") + + val bucketizer2: MultipleBucketizer = new MultipleBucketizer() + .setInputCols(Array("feature")) + .setOutputCols(Array("result")) + .setSplitsArray(Array(splits(0))) + + withClue("Invalid feature value -0.9 was not caught as an invalid feature!") { + intercept[SparkException] { + bucketizer2.transform(badDF1).collect() + } + } + val badDF2 = invalidData2.zipWithIndex.toSeq.toDF("feature", "idx") + withClue("Invalid feature value 0.51 was not caught as an invalid feature!") { + intercept[SparkException] { + bucketizer2.transform(badDF2).collect() + } + } + } + + test("Bucket continuous features, with -inf,inf") { + val splits = Array( + Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity), + Array(Double.NegativeInfinity, -0.3, 0.2, 0.5, Double.PositiveInfinity)) + + val validData1 = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9) + val validData2 = Array(-0.1, -0.5, -0.2, 0.0, 0.1, 0.3, 0.5) + val expectedBuckets1 = Array(0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0) + val expectedBuckets2 = Array(1.0, 0.0, 1.0, 1.0, 1.0, 2.0, 3.0) + + val data = (0 until validData1.length).map { idx => + (validData1(idx), validData2(idx), expectedBuckets1(idx), expectedBuckets2(idx)) + } + val dataFrame: DataFrame = data.toSeq.toDF("feature1", "feature2", "expected1", "expected2") + + val bucketizer: MultipleBucketizer = new MultipleBucketizer() + .setInputCols(Array("feature1", "feature2")) + .setOutputCols(Array("result1", "result2")) + .setSplitsArray(splits) + + bucketizer.transform(dataFrame).select("result1", "expected1", "result2", "expected2") + .collect().foreach { + case Row(r1: Double, e1: Double, r2: Double, e2: Double) => + assert(r1 === e1, + s"The feature value is not correct after bucketing. Expected $e1 but found $r1") + assert(r2 === e2, + s"The feature value is not correct after bucketing. Expected $e2 but found $r2") + } + } + + test("Bucket continuous features, with NaN data but non-NaN splits") { + val splits = Array( + Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity), + Array(Double.NegativeInfinity, -0.1, 0.2, 0.6, Double.PositiveInfinity)) + + val validData1 = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9, Double.NaN, Double.NaN, Double.NaN) + val validData2 = Array(0.2, -0.1, 0.3, 0.0, 0.1, 0.3, 0.5, 0.8, Double.NaN, Double.NaN) + val expectedBuckets1 = Array(0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0, 4.0, 4.0, 4.0) + val expectedBuckets2 = Array(2.0, 1.0, 2.0, 1.0, 1.0, 2.0, 2.0, 3.0, 4.0, 4.0) + + val data = (0 until validData1.length).map { idx => + (validData1(idx), validData2(idx), expectedBuckets1(idx), expectedBuckets2(idx)) + } + val dataFrame: DataFrame = data.toSeq.toDF("feature1", "feature2", "expected1", "expected2") + + val bucketizer: MultipleBucketizer = new MultipleBucketizer() + .setInputCols(Array("feature1", "feature2")) + .setOutputCols(Array("result1", "result2")) + .setSplitsArray(splits) + + bucketizer.setHandleInvalid("keep") + bucketizer.transform(dataFrame).select("result1", "expected1", "result2", "expected2") + .collect().foreach { + case Row(r1: Double, e1: Double, r2: Double, e2: Double) => + assert(r1 === e1, + s"The feature value is not correct after bucketing. Expected $e1 but found $r1") + assert(r2 === e2, + s"The feature value is not correct after bucketing. Expected $e2 but found $r2") + } + + bucketizer.setHandleInvalid("skip") + val skipResults1: Array[Double] = bucketizer.transform(dataFrame) + .select("result1").as[Double].collect() + assert(skipResults1.length === 7) + assert(skipResults1.forall(_ !== 4.0)) + + val skipResults2: Array[Double] = bucketizer.transform(dataFrame) + .select("result2").as[Double].collect() + assert(skipResults2.length === 7) + assert(skipResults2.forall(_ !== 4.0)) + + bucketizer.setHandleInvalid("error") + withClue("Bucketizer should throw error when setHandleInvalid=error and given NaN values") { + intercept[SparkException] { + bucketizer.transform(dataFrame).collect() + } + } + } + + test("Bucket continuous features, with NaN splits") { + val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity, Double.NaN) + withClue("Invalid NaN split was not caught during Bucketizer initialization") { + intercept[IllegalArgumentException] { + new MultipleBucketizer().setSplitsArray(Array(splits)) + } + } + } + + test("read/write") { + val t = new MultipleBucketizer() + .setInputCols(Array("myInputCol")) + .setOutputCols(Array("myOutputCol")) + .setSplitsArray(Array(Array(0.1, 0.8, 0.9))) + testDefaultReadWrite(t) + } +} From 8386d1ea52a5daecea24334deb8a122e46333d84 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 12 Jun 2017 08:29:22 +0000 Subject: [PATCH 04/11] Integrated multiple column bucketizing into Bucketizer. --- .../apache/spark/ml/feature/Bucketizer.scala | 86 +++++++------------ .../ml/feature/MultipleBucketizerSuite.scala | 25 +++--- 2 files changed, 45 insertions(+), 66 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala index 57f995591c9b0..8e8e8a2ef7e24 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala @@ -32,11 +32,13 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{DoubleType, StructField, StructType} /** - * `Bucketizer` maps a column of continuous features to a column of feature buckets. + * `Bucketizer` maps a column of continuous features to a column of feature buckets. Since 2.3.0, + * `Bucketizer` can also map multiple columns at once. */ @Since("1.4.0") final class Bucketizer @Since("1.4.0") (@Since("1.4.0") override val uid: String) - extends Model[Bucketizer] with HasInputCol with HasOutputCol with DefaultParamsWritable { + extends Model[Bucketizer] with HasInputCol with HasOutputCol + with MultipleBucketizerInterface with DefaultParamsWritable { @Since("1.4.0") def this() = this(Identifiable.randomUID("bucketizer")) @@ -100,8 +102,25 @@ final class Bucketizer @Since("1.4.0") (@Since("1.4.0") override val uid: String def setHandleInvalid(value: String): this.type = set(handleInvalid, value) setDefault(handleInvalid, Bucketizer.ERROR_INVALID) + /** + * Determines whether this `Bucketizer` is going to map multiple columns. Only if all necessary + * params for bucketizing multiple columns are set, we go for the path to map multiple columns. + * By default `Bucketizer` just maps a column of continuous features. + */ + private[ml] def isBucketizeMultipleInputCols(): Boolean = { + isSet(inputCols) && isSet(splitsArray) && isSet(outputCols) + } + @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { + if (isBucketizeMultipleInputCols()) { + this.asInstanceOf[MultipleBucketizerInterface].transform(dataset, getHandleInvalid) + } else { + transformSingleColumn(dataset) + } + } + + private def transformSingleColumn(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema) val (filteredDataset, keepInvalid) = { if (getHandleInvalid == Bucketizer.SKIP_INVALID) { @@ -141,23 +160,13 @@ final class Bucketizer @Since("1.4.0") (@Since("1.4.0") override val uid: String } /** - * `MultipleBucketizer` maps columns of continuous features to columns of feature buckets. + * `MultipleBucketizerInterface` maps columns of continuous features to columns of feature buckets. */ @Since("2.3.0") -final class MultipleBucketizer @Since("2.3.0") (@Since("2.3.0") override val uid: String) - extends Model[MultipleBucketizer] with HasInputCols with DefaultParamsWritable { - - @Since("2.3.0") - def this() = this(Identifiable.randomUID("multipleBucketizer")) - +private[ml] trait MultipleBucketizerInterface extends HasInputCols { /** - * Parameter for mapping continuous features into buckets. With n+1 splits, there are n buckets. - * A bucket defined by splits x,y holds values in the range [x,y) except the last bucket, which - * also includes y. Splits should be of length greater than or equal to 3 and strictly increasing. - * Values at -inf, inf must be explicitly provided to cover all Double values; - * otherwise, values outside the splits specified will be treated as errors. - * - * See also [[handleInvalid]], which can optionally create an additional bucket for NaN values. + * Parameter for specifying multiple splits parameters. Each element in this array can be used to + * map continuous features into buckets. * * @group param */ @@ -199,38 +208,15 @@ final class MultipleBucketizer @Since("2.3.0") (@Since("2.3.0") override val uid @Since("2.3.0") def setOutputCols(value: Array[String]): this.type = set(outputCols, value) - /** - * Param for how to handle invalid entries. Options are 'skip' (filter out rows with - * invalid values), 'error' (throw an error), or 'keep' (keep invalid values in a special - * additional bucket). - * Default: "error" - * @group param - */ - // TODO: Make MultipleBucketizer inherit from HasHandleInvalid. @Since("2.3.0") - val handleInvalid: Param[String] = new Param[String](this, "handleInvalid", "how to handle " + - "invalid entries. Options are skip (filter out rows with invalid values), " + - "error (throw an error), or keep (keep invalid values in a special additional bucket).", - ParamValidators.inArray(Bucketizer.supportedHandleInvalids)) - - /** @group getParam */ - @Since("2.3.0") - def getHandleInvalid: String = $(handleInvalid) - - /** @group setParam */ - @Since("2.3.0") - def setHandleInvalid(value: String): this.type = set(handleInvalid, value) - setDefault(handleInvalid, Bucketizer.ERROR_INVALID) - - @Since("2.3.0") - override def transform(dataset: Dataset[_]): DataFrame = { - transformSchema(dataset.schema) + private[ml] def transform(dataset: Dataset[_], handleInvalid: String): DataFrame = { + transformMultipleSchema(dataset.schema) val (filteredDataset, keepInvalid) = { - if (getHandleInvalid == Bucketizer.SKIP_INVALID) { + if (handleInvalid == Bucketizer.SKIP_INVALID) { // "skip" NaN option is set, will filter out NaN values in the dataset (dataset.na.drop().toDF(), false) } else { - (dataset.toDF(), getHandleInvalid == Bucketizer.KEEP_INVALID) + (dataset.toDF(), handleInvalid == Bucketizer.KEEP_INVALID) } } @@ -256,8 +242,7 @@ final class MultipleBucketizer @Since("2.3.0") (@Since("2.3.0") override val uid attr.toStructField() } - @Since("2.3.0") - override def transformSchema(schema: StructType): StructType = { + private def transformMultipleSchema(schema: StructType): StructType = { var transformedSchema = schema $(inputCols).zip($(outputCols)).zipWithIndex.map { case ((inputCol, outputCol), idx) => SchemaUtils.checkColumnType(transformedSchema, inputCol, DoubleType) @@ -266,11 +251,6 @@ final class MultipleBucketizer @Since("2.3.0") (@Since("2.3.0") override val uid } transformedSchema } - - @Since("2.3.0") - override def copy(extra: ParamMap): MultipleBucketizer = { - defaultCopy[MultipleBucketizer](extra).setParent(parent) - } } @Since("1.6.0") @@ -351,9 +331,3 @@ object Bucketizer extends DefaultParamsReadable[Bucketizer] { @Since("1.6.0") override def load(path: String): Bucketizer = super.load(path) } - -@Since("2.3.0") -object MultipleBucketizer extends DefaultParamsReadable[MultipleBucketizer] { - @Since("2.3.0") - override def load(path: String): MultipleBucketizer = super.load(path) -} diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/MultipleBucketizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/MultipleBucketizerSuite.scala index 261b9fda588be..6752278788da2 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/MultipleBucketizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/MultipleBucketizerSuite.scala @@ -32,10 +32,6 @@ class MultipleBucketizerSuite extends SparkFunSuite with MLlibTestSparkContext import testImplicits._ - test("params") { - ParamsSuite.checkParams(new MultipleBucketizer) - } - test("Bucket continuous features, without -inf,inf") { // Check a set of valid feature values. val splits = Array(Array(-0.5, 0.0, 0.5), Array(-0.1, 0.3, 0.5)) @@ -49,11 +45,13 @@ class MultipleBucketizerSuite extends SparkFunSuite with MLlibTestSparkContext } val dataFrame: DataFrame = data.toSeq.toDF("feature1", "feature2", "expected1", "expected2") - val bucketizer1: MultipleBucketizer = new MultipleBucketizer() + val bucketizer1: Bucketizer = new Bucketizer() .setInputCols(Array("feature1", "feature2")) .setOutputCols(Array("result1", "result2")) .setSplitsArray(splits) + assert(bucketizer1.isBucketizeMultipleInputCols()) + bucketizer1.transform(dataFrame).select("result1", "expected1", "result2", "expected2") .collect().foreach { case Row(r1: Double, e1: Double, r2: Double, e2: Double) => @@ -68,11 +66,13 @@ class MultipleBucketizerSuite extends SparkFunSuite with MLlibTestSparkContext val invalidData2 = Array(0.51) ++ validData1 val badDF1 = invalidData1.zipWithIndex.toSeq.toDF("feature", "idx") - val bucketizer2: MultipleBucketizer = new MultipleBucketizer() + val bucketizer2: Bucketizer = new Bucketizer() .setInputCols(Array("feature")) .setOutputCols(Array("result")) .setSplitsArray(Array(splits(0))) + assert(bucketizer2.isBucketizeMultipleInputCols()) + withClue("Invalid feature value -0.9 was not caught as an invalid feature!") { intercept[SparkException] { bucketizer2.transform(badDF1).collect() @@ -101,11 +101,13 @@ class MultipleBucketizerSuite extends SparkFunSuite with MLlibTestSparkContext } val dataFrame: DataFrame = data.toSeq.toDF("feature1", "feature2", "expected1", "expected2") - val bucketizer: MultipleBucketizer = new MultipleBucketizer() + val bucketizer: Bucketizer = new Bucketizer() .setInputCols(Array("feature1", "feature2")) .setOutputCols(Array("result1", "result2")) .setSplitsArray(splits) + assert(bucketizer.isBucketizeMultipleInputCols()) + bucketizer.transform(dataFrame).select("result1", "expected1", "result2", "expected2") .collect().foreach { case Row(r1: Double, e1: Double, r2: Double, e2: Double) => @@ -131,11 +133,13 @@ class MultipleBucketizerSuite extends SparkFunSuite with MLlibTestSparkContext } val dataFrame: DataFrame = data.toSeq.toDF("feature1", "feature2", "expected1", "expected2") - val bucketizer: MultipleBucketizer = new MultipleBucketizer() + val bucketizer: Bucketizer = new Bucketizer() .setInputCols(Array("feature1", "feature2")) .setOutputCols(Array("result1", "result2")) .setSplitsArray(splits) + assert(bucketizer.isBucketizeMultipleInputCols()) + bucketizer.setHandleInvalid("keep") bucketizer.transform(dataFrame).select("result1", "expected1", "result2", "expected2") .collect().foreach { @@ -169,16 +173,17 @@ class MultipleBucketizerSuite extends SparkFunSuite with MLlibTestSparkContext val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity, Double.NaN) withClue("Invalid NaN split was not caught during Bucketizer initialization") { intercept[IllegalArgumentException] { - new MultipleBucketizer().setSplitsArray(Array(splits)) + new Bucketizer().setSplitsArray(Array(splits)) } } } test("read/write") { - val t = new MultipleBucketizer() + val t = new Bucketizer() .setInputCols(Array("myInputCol")) .setOutputCols(Array("myOutputCol")) .setSplitsArray(Array(Array(0.1, 0.8, 0.9))) + assert(t.isBucketizeMultipleInputCols()) testDefaultReadWrite(t) } } From 7c38b77c30747e316328afbe25cc8bff1a51cd40 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 18 Sep 2017 03:54:29 +0000 Subject: [PATCH 05/11] Address comment. --- .../apache/spark/ml/feature/Bucketizer.scala | 10 +++-- .../ml/feature/MultipleBucketizerSuite.scala | 37 ++++++++++++++++--- 2 files changed, 38 insertions(+), 9 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala index e2383eaa282ca..ba6cc7d12f898 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala @@ -144,8 +144,12 @@ final class Bucketizer @Since("1.4.0") (@Since("1.4.0") override val uid: String @Since("1.4.0") override def transformSchema(schema: StructType): StructType = { - SchemaUtils.checkNumericType(schema, $(inputCol)) - SchemaUtils.appendColumn(schema, prepOutputField(schema)) + if (isBucketizeMultipleInputCols()) { + this.asInstanceOf[MultipleBucketizerInterface].transformMultipleSchema(schema) + } else { + SchemaUtils.checkNumericType(schema, $(inputCol)) + SchemaUtils.appendColumn(schema, prepOutputField(schema)) + } } @Since("1.4.1") @@ -237,7 +241,7 @@ private[ml] trait MultipleBucketizerInterface extends HasInputCols { attr.toStructField() } - private def transformMultipleSchema(schema: StructType): StructType = { + private[ml] def transformMultipleSchema(schema: StructType): StructType = { var transformedSchema = schema $(inputCols).zip($(outputCols)).zipWithIndex.map { case ((inputCol, outputCol), idx) => SchemaUtils.checkColumnType(transformedSchema, inputCol, DoubleType) diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/MultipleBucketizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/MultipleBucketizerSuite.scala index 6752278788da2..2e7ab86dc29c4 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/MultipleBucketizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/MultipleBucketizerSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.ml.feature import scala.util.Random import org.apache.spark.{SparkException, SparkFunSuite} +import org.apache.spark.ml.Pipeline import org.apache.spark.ml.linalg.Vectors import org.apache.spark.ml.param.ParamsSuite import org.apache.spark.ml.util.DefaultReadWriteTest @@ -56,9 +57,9 @@ class MultipleBucketizerSuite extends SparkFunSuite with MLlibTestSparkContext .collect().foreach { case Row(r1: Double, e1: Double, r2: Double, e2: Double) => assert(r1 === e1, - s"The feature value is not correct after bucketing. Expected $e1 but found $r1") + s"The feature value is not correct after bucketing. Expected $e1 but found $r1") assert(r2 === e2, - s"The feature value is not correct after bucketing. Expected $e2 but found $r2") + s"The feature value is not correct after bucketing. Expected $e2 but found $r2") } // Check for exceptions when using a set of invalid feature values. @@ -112,9 +113,9 @@ class MultipleBucketizerSuite extends SparkFunSuite with MLlibTestSparkContext .collect().foreach { case Row(r1: Double, e1: Double, r2: Double, e2: Double) => assert(r1 === e1, - s"The feature value is not correct after bucketing. Expected $e1 but found $r1") + s"The feature value is not correct after bucketing. Expected $e1 but found $r1") assert(r2 === e2, - s"The feature value is not correct after bucketing. Expected $e2 but found $r2") + s"The feature value is not correct after bucketing. Expected $e2 but found $r2") } } @@ -145,9 +146,9 @@ class MultipleBucketizerSuite extends SparkFunSuite with MLlibTestSparkContext .collect().foreach { case Row(r1: Double, e1: Double, r2: Double, e2: Double) => assert(r1 === e1, - s"The feature value is not correct after bucketing. Expected $e1 but found $r1") + s"The feature value is not correct after bucketing. Expected $e1 but found $r1") assert(r2 === e2, - s"The feature value is not correct after bucketing. Expected $e2 but found $r2") + s"The feature value is not correct after bucketing. Expected $e2 but found $r2") } bucketizer.setHandleInvalid("skip") @@ -186,4 +187,28 @@ class MultipleBucketizerSuite extends SparkFunSuite with MLlibTestSparkContext assert(t.isBucketizeMultipleInputCols()) testDefaultReadWrite(t) } + + test("Bucketizer in a pipeline") { + val df = Seq((0.5, 0.3, 1.0, 1.0), (0.5, -0.4, 1.0, 0.0)) + .toDF("feature1", "feature2", "expected1", "expected2") + + val bucket = new Bucketizer() + .setInputCols(Array("feature1", "feature2")) + .setOutputCols(Array("result1", "result2")) + .setSplitsArray(Array(Array(-0.5, 0.0, 0.5), Array(-0.5, 0.0, 0.5))) + + assert(bucket.isBucketizeMultipleInputCols()) + + val pl = new Pipeline() + .setStages(Array(bucket)) + .fit(df) + pl.transform(df).select("result1", "expected1", "result2", "expected2") + .collect().foreach { + case Row(r1: Double, e1: Double, r2: Double, e2: Double) => + assert(r1 === e1, + s"The feature value is not correct after bucketing. Expected $e1 but found $r1") + assert(r2 === e2, + s"The feature value is not correct after bucketing. Expected $e2 but found $r2") + } + } } From 92ef9bde1e048eef7e3b530286723cad5773debc Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 19 Sep 2017 08:14:12 +0000 Subject: [PATCH 06/11] Merge multiple input columns stuff into Bucketizer class. --- .../examples/ml/JavaBucketizerExample.java | 41 ++++ .../apache/spark/ml/feature/Bucketizer.scala | 142 +++++------- .../spark/ml/feature/JavaBucketizerSuite.java | 35 +++ .../spark/ml/feature/BucketizerSuite.scala | 180 +++++++++++++++ .../ml/feature/MultipleBucketizerSuite.scala | 214 ------------------ 5 files changed, 310 insertions(+), 302 deletions(-) delete mode 100644 mllib/src/test/scala/org/apache/spark/ml/feature/MultipleBucketizerSuite.scala diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaBucketizerExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaBucketizerExample.java index f00993833321d..3e49bf04ac892 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaBucketizerExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaBucketizerExample.java @@ -33,6 +33,13 @@ import org.apache.spark.sql.types.StructType; // $example off$ +/** + * An example for Bucketizer. + * Run with + *
+ * bin/run-example ml.JavaBucketizerExample
+ * 
+ */ public class JavaBucketizerExample { public static void main(String[] args) { SparkSession spark = SparkSession @@ -68,6 +75,40 @@ public static void main(String[] args) { bucketedData.show(); // $example off$ + // $example on$ + // Bucketize multiple columns at one pass. + double[][] splitsArray = { + {Double.NEGATIVE_INFINITY, -0.5, 0.0, 0.5, Double.POSITIVE_INFINITY}, + {Double.NEGATIVE_INFINITY, -0.3, 0.0, 0.3, Double.POSITIVE_INFINITY} + }; + + List data2 = Arrays.asList( + RowFactory.create(-999.9, -999.9), + RowFactory.create(-0.5, -0.2), + RowFactory.create(-0.3, -0.1), + RowFactory.create(0.0, 0.0), + RowFactory.create(0.2, 0.4), + RowFactory.create(999.9, 999.9) + ); + StructType schema2 = new StructType(new StructField[]{ + new StructField("features1", DataTypes.DoubleType, false, Metadata.empty()), + new StructField("features2", DataTypes.DoubleType, false, Metadata.empty()) + }); + Dataset dataFrame2 = spark.createDataFrame(data2, schema2); + + Bucketizer bucketizer2 = new Bucketizer() + .setInputCols(new String[] {"features1", "features2"}) + .setOutputCols(new String[] {"bucketedFeatures1", "bucketedFeatures2"}) + .setSplitsArray(splitsArray); + // Transform original data into its bucket index. + Dataset bucketedData2 = bucketizer2.transform(dataFrame2); + + System.out.println("Bucketizer output with [" + + (bucketizer2.getSplitsArray()[0].length-1) + ", " + + (bucketizer2.getSplitsArray()[1].length-1) + "] buckets for each input column"); + bucketedData2.show(); + // $example off$ + spark.stop(); } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala index ba6cc7d12f898..6f6883ab68987 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.types.{DoubleType, StructField, StructType} @Since("1.4.0") final class Bucketizer @Since("1.4.0") (@Since("1.4.0") override val uid: String) extends Model[Bucketizer] with HasHandleInvalid with HasInputCol with HasOutputCol - with MultipleBucketizerInterface with DefaultParamsWritable { + with HasInputCols with DefaultParamsWritable { @Since("1.4.0") def this() = this(Identifiable.randomUID("bucketizer")) @@ -97,72 +97,6 @@ final class Bucketizer @Since("1.4.0") (@Since("1.4.0") override val uid: String def setHandleInvalid(value: String): this.type = set(handleInvalid, value) setDefault(handleInvalid, Bucketizer.ERROR_INVALID) - /** - * Determines whether this `Bucketizer` is going to map multiple columns. Only if all necessary - * params for bucketizing multiple columns are set, we go for the path to map multiple columns. - * By default `Bucketizer` just maps a column of continuous features. - */ - private[ml] def isBucketizeMultipleInputCols(): Boolean = { - isSet(inputCols) && isSet(splitsArray) && isSet(outputCols) - } - - @Since("2.0.0") - override def transform(dataset: Dataset[_]): DataFrame = { - if (isBucketizeMultipleInputCols()) { - this.asInstanceOf[MultipleBucketizerInterface].transform(dataset, getHandleInvalid) - } else { - transformSingleColumn(dataset) - } - } - - private def transformSingleColumn(dataset: Dataset[_]): DataFrame = { - transformSchema(dataset.schema) - val (filteredDataset, keepInvalid) = { - if (getHandleInvalid == Bucketizer.SKIP_INVALID) { - // "skip" NaN option is set, will filter out NaN values in the dataset - (dataset.na.drop().toDF(), false) - } else { - (dataset.toDF(), getHandleInvalid == Bucketizer.KEEP_INVALID) - } - } - - val bucketizer: UserDefinedFunction = udf { (feature: Double) => - Bucketizer.binarySearchForBuckets($(splits), feature, keepInvalid) - }.withName("bucketizer") - - val newCol = bucketizer(filteredDataset($(inputCol)).cast(DoubleType)) - val newField = prepOutputField(filteredDataset.schema) - filteredDataset.withColumn($(outputCol), newCol, newField.metadata) - } - - private def prepOutputField(schema: StructType): StructField = { - val buckets = $(splits).sliding(2).map(bucket => bucket.mkString(", ")).toArray - val attr = new NominalAttribute(name = Some($(outputCol)), isOrdinal = Some(true), - values = Some(buckets)) - attr.toStructField() - } - - @Since("1.4.0") - override def transformSchema(schema: StructType): StructType = { - if (isBucketizeMultipleInputCols()) { - this.asInstanceOf[MultipleBucketizerInterface].transformMultipleSchema(schema) - } else { - SchemaUtils.checkNumericType(schema, $(inputCol)) - SchemaUtils.appendColumn(schema, prepOutputField(schema)) - } - } - - @Since("1.4.1") - override def copy(extra: ParamMap): Bucketizer = { - defaultCopy[Bucketizer](extra).setParent(parent) - } -} - -/** - * `MultipleBucketizerInterface` maps columns of continuous features to columns of feature buckets. - */ -@Since("2.3.0") -private[ml] trait MultipleBucketizerInterface extends HasInputCols { /** * Parameter for specifying multiple splits parameters. Each element in this array can be used to * map continuous features into buckets. @@ -207,48 +141,80 @@ private[ml] trait MultipleBucketizerInterface extends HasInputCols { @Since("2.3.0") def setOutputCols(value: Array[String]): this.type = set(outputCols, value) - @Since("2.3.0") - private[ml] def transform(dataset: Dataset[_], handleInvalid: String): DataFrame = { - transformMultipleSchema(dataset.schema) + /** + * Determines whether this `Bucketizer` is going to map multiple columns. Only if all necessary + * params for bucketizing multiple columns are set, we go for the path to map multiple columns. + * By default `Bucketizer` just maps a column of continuous features. + */ + private[ml] def isBucketizeMultipleInputCols(): Boolean = { + isSet(inputCols) && isSet(splitsArray) && isSet(outputCols) + } + + @Since("2.0.0") + override def transform(dataset: Dataset[_]): DataFrame = { + transformSchema(dataset.schema) + val (filteredDataset, keepInvalid) = { - if (handleInvalid == Bucketizer.SKIP_INVALID) { + if (getHandleInvalid == Bucketizer.SKIP_INVALID) { // "skip" NaN option is set, will filter out NaN values in the dataset (dataset.na.drop().toDF(), false) } else { - (dataset.toDF(), handleInvalid == Bucketizer.KEEP_INVALID) + (dataset.toDF(), getHandleInvalid == Bucketizer.KEEP_INVALID) } } - val bucketizers: Seq[UserDefinedFunction] = $(splitsArray).map { splits => + val seqOfSplits = if (isBucketizeMultipleInputCols()) { + $(splitsArray).toSeq + } else { + Seq($(splits)) + } + + val bucketizers: Seq[UserDefinedFunction] = seqOfSplits.zipWithIndex.map { case (splits, idx) => udf { (feature: Double) => Bucketizer.binarySearchForBuckets(splits, feature, keepInvalid) - } + }.withName(s"bucketizer_$idx") } - val newCols = $(inputCols).zipWithIndex.map { case (inputCol, idx) => - bucketizers(idx)(filteredDataset(inputCol)) + val (inputColumns, outputColumns) = if (isBucketizeMultipleInputCols()) { + ($(inputCols).toSeq, $(outputCols).toSeq) + } else { + (Seq($(inputCol)), Seq($(outputCol))) } - val newFields = $(outputCols).zipWithIndex.map { case (outputCol, idx) => - prepOutputField(idx, outputCol) + val newCols = inputColumns.zipWithIndex.map { case (inputCol, idx) => + bucketizers(idx)(filteredDataset(inputCol).cast(DoubleType)) } - filteredDataset.withColumns($(outputCols), newCols, newFields.map(_.metadata)) + val newFields = outputColumns.zipWithIndex.map { case (outputCol, idx) => + prepOutputField(seqOfSplits(idx), outputCol) + } + filteredDataset.withColumns(outputColumns, newCols, newFields.map(_.metadata)) } - private def prepOutputField(idx: Int, outputCol: String): StructField = { - val buckets = $(splitsArray)(idx).sliding(2).map(bucket => bucket.mkString(", ")).toArray + private def prepOutputField(splits: Array[Double], outputCol: String): StructField = { + val buckets = splits.sliding(2).map(bucket => bucket.mkString(", ")).toArray val attr = new NominalAttribute(name = Some(outputCol), isOrdinal = Some(true), values = Some(buckets)) attr.toStructField() } - private[ml] def transformMultipleSchema(schema: StructType): StructType = { - var transformedSchema = schema - $(inputCols).zip($(outputCols)).zipWithIndex.map { case ((inputCol, outputCol), idx) => - SchemaUtils.checkColumnType(transformedSchema, inputCol, DoubleType) - transformedSchema = SchemaUtils.appendColumn(transformedSchema, - prepOutputField(idx, outputCol)) + @Since("1.4.0") + override def transformSchema(schema: StructType): StructType = { + if (isBucketizeMultipleInputCols()) { + var transformedSchema = schema + $(inputCols).zip($(outputCols)).zipWithIndex.map { case ((inputCol, outputCol), idx) => + SchemaUtils.checkNumericType(transformedSchema, inputCol) + transformedSchema = SchemaUtils.appendColumn(transformedSchema, + prepOutputField($(splitsArray)(idx), outputCol)) + } + transformedSchema + } else { + SchemaUtils.checkNumericType(schema, $(inputCol)) + SchemaUtils.appendColumn(schema, prepOutputField($(splits), $(outputCol))) } - transformedSchema + } + + @Since("1.4.1") + override def copy(extra: ParamMap): Bucketizer = { + defaultCopy[Bucketizer](extra).setParent(parent) } } diff --git a/mllib/src/test/java/org/apache/spark/ml/feature/JavaBucketizerSuite.java b/mllib/src/test/java/org/apache/spark/ml/feature/JavaBucketizerSuite.java index 87639380bdcf4..e65265bf74a88 100644 --- a/mllib/src/test/java/org/apache/spark/ml/feature/JavaBucketizerSuite.java +++ b/mllib/src/test/java/org/apache/spark/ml/feature/JavaBucketizerSuite.java @@ -61,4 +61,39 @@ public void bucketizerTest() { Assert.assertTrue((index >= 0) && (index <= 1)); } } + + @Test + public void bucketizerMultipleColumnsTest() { + double[][] splitsArray = { + {-0.5, 0.0, 0.5}, + {-0.5, 0.0, 0.2, 0.5} + }; + + StructType schema = new StructType(new StructField[]{ + new StructField("feature1", DataTypes.DoubleType, false, Metadata.empty()), + new StructField("feature2", DataTypes.DoubleType, false, Metadata.empty()), + }); + Dataset dataset = spark.createDataFrame( + Arrays.asList( + RowFactory.create(-0.5, -0.5), + RowFactory.create(-0.3, -0.3), + RowFactory.create(0.0, 0.0), + RowFactory.create(0.2, 0.3)), + schema); + + Bucketizer bucketizer = new Bucketizer() + .setInputCols(new String[] {"feature1", "feature2"}) + .setOutputCols(new String[] {"result1", "result2"}) + .setSplitsArray(splitsArray); + + List result = bucketizer.transform(dataset).select("result1", "result2").collectAsList(); + + for (Row r : result) { + double index1 = r.getDouble(0); + Assert.assertTrue((index1 >= 0) && (index1 <= 1)); + + double index2 = r.getDouble(1); + Assert.assertTrue((index2 >= 0) && (index2 <= 2)); + } + } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala index ebda136756667..d399c1081363b 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.ml.feature import scala.util.Random import org.apache.spark.{SparkException, SparkFunSuite} +import org.apache.spark.ml.Pipeline import org.apache.spark.ml.linalg.Vectors import org.apache.spark.ml.param.ParamsSuite import org.apache.spark.ml.util.DefaultReadWriteTest @@ -187,6 +188,185 @@ class BucketizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defa } } } + + test("multiple columns: Bucket continuous features, without -inf,inf") { + // Check a set of valid feature values. + val splits = Array(Array(-0.5, 0.0, 0.5), Array(-0.1, 0.3, 0.5)) + val validData1 = Array(-0.5, -0.3, 0.0, 0.2) + val validData2 = Array(0.5, 0.3, 0.0, -0.1) + val expectedBuckets1 = Array(0.0, 0.0, 1.0, 1.0) + val expectedBuckets2 = Array(1.0, 1.0, 0.0, 0.0) + + val data = (0 until validData1.length).map { idx => + (validData1(idx), validData2(idx), expectedBuckets1(idx), expectedBuckets2(idx)) + } + val dataFrame: DataFrame = data.toSeq.toDF("feature1", "feature2", "expected1", "expected2") + + val bucketizer1: Bucketizer = new Bucketizer() + .setInputCols(Array("feature1", "feature2")) + .setOutputCols(Array("result1", "result2")) + .setSplitsArray(splits) + + assert(bucketizer1.isBucketizeMultipleInputCols()) + + bucketizer1.transform(dataFrame).select("result1", "expected1", "result2", "expected2") + .collect().foreach { + case Row(r1: Double, e1: Double, r2: Double, e2: Double) => + assert(r1 === e1, + s"The feature value is not correct after bucketing. Expected $e1 but found $r1") + assert(r2 === e2, + s"The feature value is not correct after bucketing. Expected $e2 but found $r2") + } + + // Check for exceptions when using a set of invalid feature values. + val invalidData1: Array[Double] = Array(-0.9) ++ validData1 + val invalidData2 = Array(0.51) ++ validData1 + val badDF1 = invalidData1.zipWithIndex.toSeq.toDF("feature", "idx") + + val bucketizer2: Bucketizer = new Bucketizer() + .setInputCols(Array("feature")) + .setOutputCols(Array("result")) + .setSplitsArray(Array(splits(0))) + + assert(bucketizer2.isBucketizeMultipleInputCols()) + + withClue("Invalid feature value -0.9 was not caught as an invalid feature!") { + intercept[SparkException] { + bucketizer2.transform(badDF1).collect() + } + } + val badDF2 = invalidData2.zipWithIndex.toSeq.toDF("feature", "idx") + withClue("Invalid feature value 0.51 was not caught as an invalid feature!") { + intercept[SparkException] { + bucketizer2.transform(badDF2).collect() + } + } + } + + test("multiple columns: Bucket continuous features, with -inf,inf") { + val splits = Array( + Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity), + Array(Double.NegativeInfinity, -0.3, 0.2, 0.5, Double.PositiveInfinity)) + + val validData1 = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9) + val validData2 = Array(-0.1, -0.5, -0.2, 0.0, 0.1, 0.3, 0.5) + val expectedBuckets1 = Array(0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0) + val expectedBuckets2 = Array(1.0, 0.0, 1.0, 1.0, 1.0, 2.0, 3.0) + + val data = (0 until validData1.length).map { idx => + (validData1(idx), validData2(idx), expectedBuckets1(idx), expectedBuckets2(idx)) + } + val dataFrame: DataFrame = data.toSeq.toDF("feature1", "feature2", "expected1", "expected2") + + val bucketizer: Bucketizer = new Bucketizer() + .setInputCols(Array("feature1", "feature2")) + .setOutputCols(Array("result1", "result2")) + .setSplitsArray(splits) + + assert(bucketizer.isBucketizeMultipleInputCols()) + + bucketizer.transform(dataFrame).select("result1", "expected1", "result2", "expected2") + .collect().foreach { + case Row(r1: Double, e1: Double, r2: Double, e2: Double) => + assert(r1 === e1, + s"The feature value is not correct after bucketing. Expected $e1 but found $r1") + assert(r2 === e2, + s"The feature value is not correct after bucketing. Expected $e2 but found $r2") + } + } + + test("multiple columns: Bucket continuous features, with NaN data but non-NaN splits") { + val splits = Array( + Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity), + Array(Double.NegativeInfinity, -0.1, 0.2, 0.6, Double.PositiveInfinity)) + + val validData1 = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9, Double.NaN, Double.NaN, Double.NaN) + val validData2 = Array(0.2, -0.1, 0.3, 0.0, 0.1, 0.3, 0.5, 0.8, Double.NaN, Double.NaN) + val expectedBuckets1 = Array(0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0, 4.0, 4.0, 4.0) + val expectedBuckets2 = Array(2.0, 1.0, 2.0, 1.0, 1.0, 2.0, 2.0, 3.0, 4.0, 4.0) + + val data = (0 until validData1.length).map { idx => + (validData1(idx), validData2(idx), expectedBuckets1(idx), expectedBuckets2(idx)) + } + val dataFrame: DataFrame = data.toSeq.toDF("feature1", "feature2", "expected1", "expected2") + + val bucketizer: Bucketizer = new Bucketizer() + .setInputCols(Array("feature1", "feature2")) + .setOutputCols(Array("result1", "result2")) + .setSplitsArray(splits) + + assert(bucketizer.isBucketizeMultipleInputCols()) + + bucketizer.setHandleInvalid("keep") + bucketizer.transform(dataFrame).select("result1", "expected1", "result2", "expected2") + .collect().foreach { + case Row(r1: Double, e1: Double, r2: Double, e2: Double) => + assert(r1 === e1, + s"The feature value is not correct after bucketing. Expected $e1 but found $r1") + assert(r2 === e2, + s"The feature value is not correct after bucketing. Expected $e2 but found $r2") + } + + bucketizer.setHandleInvalid("skip") + val skipResults1: Array[Double] = bucketizer.transform(dataFrame) + .select("result1").as[Double].collect() + assert(skipResults1.length === 7) + assert(skipResults1.forall(_ !== 4.0)) + + val skipResults2: Array[Double] = bucketizer.transform(dataFrame) + .select("result2").as[Double].collect() + assert(skipResults2.length === 7) + assert(skipResults2.forall(_ !== 4.0)) + + bucketizer.setHandleInvalid("error") + withClue("Bucketizer should throw error when setHandleInvalid=error and given NaN values") { + intercept[SparkException] { + bucketizer.transform(dataFrame).collect() + } + } + } + + test("multiple columns: Bucket continuous features, with NaN splits") { + val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity, Double.NaN) + withClue("Invalid NaN split was not caught during Bucketizer initialization") { + intercept[IllegalArgumentException] { + new Bucketizer().setSplitsArray(Array(splits)) + } + } + } + + test("multiple columns:: read/write") { + val t = new Bucketizer() + .setInputCols(Array("myInputCol")) + .setOutputCols(Array("myOutputCol")) + .setSplitsArray(Array(Array(0.1, 0.8, 0.9))) + assert(t.isBucketizeMultipleInputCols()) + testDefaultReadWrite(t) + } + + test("Bucketizer in a pipeline") { + val df = Seq((0.5, 0.3, 1.0, 1.0), (0.5, -0.4, 1.0, 0.0)) + .toDF("feature1", "feature2", "expected1", "expected2") + + val bucket = new Bucketizer() + .setInputCols(Array("feature1", "feature2")) + .setOutputCols(Array("result1", "result2")) + .setSplitsArray(Array(Array(-0.5, 0.0, 0.5), Array(-0.5, 0.0, 0.5))) + + assert(bucket.isBucketizeMultipleInputCols()) + + val pl = new Pipeline() + .setStages(Array(bucket)) + .fit(df) + pl.transform(df).select("result1", "expected1", "result2", "expected2") + .collect().foreach { + case Row(r1: Double, e1: Double, r2: Double, e2: Double) => + assert(r1 === e1, + s"The feature value is not correct after bucketing. Expected $e1 but found $r1") + assert(r2 === e2, + s"The feature value is not correct after bucketing. Expected $e2 but found $r2") + } + } } private object BucketizerSuite extends SparkFunSuite { diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/MultipleBucketizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/MultipleBucketizerSuite.scala deleted file mode 100644 index 2e7ab86dc29c4..0000000000000 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/MultipleBucketizerSuite.scala +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.ml.feature - -import scala.util.Random - -import org.apache.spark.{SparkException, SparkFunSuite} -import org.apache.spark.ml.Pipeline -import org.apache.spark.ml.linalg.Vectors -import org.apache.spark.ml.param.ParamsSuite -import org.apache.spark.ml.util.DefaultReadWriteTest -import org.apache.spark.ml.util.TestingUtils._ -import org.apache.spark.mllib.util.MLlibTestSparkContext -import org.apache.spark.sql.{DataFrame, Row} - -class MultipleBucketizerSuite extends SparkFunSuite with MLlibTestSparkContext - with DefaultReadWriteTest { - - import testImplicits._ - - test("Bucket continuous features, without -inf,inf") { - // Check a set of valid feature values. - val splits = Array(Array(-0.5, 0.0, 0.5), Array(-0.1, 0.3, 0.5)) - val validData1 = Array(-0.5, -0.3, 0.0, 0.2) - val validData2 = Array(0.5, 0.3, 0.0, -0.1) - val expectedBuckets1 = Array(0.0, 0.0, 1.0, 1.0) - val expectedBuckets2 = Array(1.0, 1.0, 0.0, 0.0) - - val data = (0 until validData1.length).map { idx => - (validData1(idx), validData2(idx), expectedBuckets1(idx), expectedBuckets2(idx)) - } - val dataFrame: DataFrame = data.toSeq.toDF("feature1", "feature2", "expected1", "expected2") - - val bucketizer1: Bucketizer = new Bucketizer() - .setInputCols(Array("feature1", "feature2")) - .setOutputCols(Array("result1", "result2")) - .setSplitsArray(splits) - - assert(bucketizer1.isBucketizeMultipleInputCols()) - - bucketizer1.transform(dataFrame).select("result1", "expected1", "result2", "expected2") - .collect().foreach { - case Row(r1: Double, e1: Double, r2: Double, e2: Double) => - assert(r1 === e1, - s"The feature value is not correct after bucketing. Expected $e1 but found $r1") - assert(r2 === e2, - s"The feature value is not correct after bucketing. Expected $e2 but found $r2") - } - - // Check for exceptions when using a set of invalid feature values. - val invalidData1: Array[Double] = Array(-0.9) ++ validData1 - val invalidData2 = Array(0.51) ++ validData1 - val badDF1 = invalidData1.zipWithIndex.toSeq.toDF("feature", "idx") - - val bucketizer2: Bucketizer = new Bucketizer() - .setInputCols(Array("feature")) - .setOutputCols(Array("result")) - .setSplitsArray(Array(splits(0))) - - assert(bucketizer2.isBucketizeMultipleInputCols()) - - withClue("Invalid feature value -0.9 was not caught as an invalid feature!") { - intercept[SparkException] { - bucketizer2.transform(badDF1).collect() - } - } - val badDF2 = invalidData2.zipWithIndex.toSeq.toDF("feature", "idx") - withClue("Invalid feature value 0.51 was not caught as an invalid feature!") { - intercept[SparkException] { - bucketizer2.transform(badDF2).collect() - } - } - } - - test("Bucket continuous features, with -inf,inf") { - val splits = Array( - Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity), - Array(Double.NegativeInfinity, -0.3, 0.2, 0.5, Double.PositiveInfinity)) - - val validData1 = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9) - val validData2 = Array(-0.1, -0.5, -0.2, 0.0, 0.1, 0.3, 0.5) - val expectedBuckets1 = Array(0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0) - val expectedBuckets2 = Array(1.0, 0.0, 1.0, 1.0, 1.0, 2.0, 3.0) - - val data = (0 until validData1.length).map { idx => - (validData1(idx), validData2(idx), expectedBuckets1(idx), expectedBuckets2(idx)) - } - val dataFrame: DataFrame = data.toSeq.toDF("feature1", "feature2", "expected1", "expected2") - - val bucketizer: Bucketizer = new Bucketizer() - .setInputCols(Array("feature1", "feature2")) - .setOutputCols(Array("result1", "result2")) - .setSplitsArray(splits) - - assert(bucketizer.isBucketizeMultipleInputCols()) - - bucketizer.transform(dataFrame).select("result1", "expected1", "result2", "expected2") - .collect().foreach { - case Row(r1: Double, e1: Double, r2: Double, e2: Double) => - assert(r1 === e1, - s"The feature value is not correct after bucketing. Expected $e1 but found $r1") - assert(r2 === e2, - s"The feature value is not correct after bucketing. Expected $e2 but found $r2") - } - } - - test("Bucket continuous features, with NaN data but non-NaN splits") { - val splits = Array( - Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity), - Array(Double.NegativeInfinity, -0.1, 0.2, 0.6, Double.PositiveInfinity)) - - val validData1 = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9, Double.NaN, Double.NaN, Double.NaN) - val validData2 = Array(0.2, -0.1, 0.3, 0.0, 0.1, 0.3, 0.5, 0.8, Double.NaN, Double.NaN) - val expectedBuckets1 = Array(0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0, 4.0, 4.0, 4.0) - val expectedBuckets2 = Array(2.0, 1.0, 2.0, 1.0, 1.0, 2.0, 2.0, 3.0, 4.0, 4.0) - - val data = (0 until validData1.length).map { idx => - (validData1(idx), validData2(idx), expectedBuckets1(idx), expectedBuckets2(idx)) - } - val dataFrame: DataFrame = data.toSeq.toDF("feature1", "feature2", "expected1", "expected2") - - val bucketizer: Bucketizer = new Bucketizer() - .setInputCols(Array("feature1", "feature2")) - .setOutputCols(Array("result1", "result2")) - .setSplitsArray(splits) - - assert(bucketizer.isBucketizeMultipleInputCols()) - - bucketizer.setHandleInvalid("keep") - bucketizer.transform(dataFrame).select("result1", "expected1", "result2", "expected2") - .collect().foreach { - case Row(r1: Double, e1: Double, r2: Double, e2: Double) => - assert(r1 === e1, - s"The feature value is not correct after bucketing. Expected $e1 but found $r1") - assert(r2 === e2, - s"The feature value is not correct after bucketing. Expected $e2 but found $r2") - } - - bucketizer.setHandleInvalid("skip") - val skipResults1: Array[Double] = bucketizer.transform(dataFrame) - .select("result1").as[Double].collect() - assert(skipResults1.length === 7) - assert(skipResults1.forall(_ !== 4.0)) - - val skipResults2: Array[Double] = bucketizer.transform(dataFrame) - .select("result2").as[Double].collect() - assert(skipResults2.length === 7) - assert(skipResults2.forall(_ !== 4.0)) - - bucketizer.setHandleInvalid("error") - withClue("Bucketizer should throw error when setHandleInvalid=error and given NaN values") { - intercept[SparkException] { - bucketizer.transform(dataFrame).collect() - } - } - } - - test("Bucket continuous features, with NaN splits") { - val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity, Double.NaN) - withClue("Invalid NaN split was not caught during Bucketizer initialization") { - intercept[IllegalArgumentException] { - new Bucketizer().setSplitsArray(Array(splits)) - } - } - } - - test("read/write") { - val t = new Bucketizer() - .setInputCols(Array("myInputCol")) - .setOutputCols(Array("myOutputCol")) - .setSplitsArray(Array(Array(0.1, 0.8, 0.9))) - assert(t.isBucketizeMultipleInputCols()) - testDefaultReadWrite(t) - } - - test("Bucketizer in a pipeline") { - val df = Seq((0.5, 0.3, 1.0, 1.0), (0.5, -0.4, 1.0, 0.0)) - .toDF("feature1", "feature2", "expected1", "expected2") - - val bucket = new Bucketizer() - .setInputCols(Array("feature1", "feature2")) - .setOutputCols(Array("result1", "result2")) - .setSplitsArray(Array(Array(-0.5, 0.0, 0.5), Array(-0.5, 0.0, 0.5))) - - assert(bucket.isBucketizeMultipleInputCols()) - - val pl = new Pipeline() - .setStages(Array(bucket)) - .fit(df) - pl.transform(df).select("result1", "expected1", "result2", "expected2") - .collect().foreach { - case Row(r1: Double, e1: Double, r2: Double, e2: Double) => - assert(r1 === e1, - s"The feature value is not correct after bucketing. Expected $e1 but found $r1") - assert(r2 === e2, - s"The feature value is not correct after bucketing. Expected $e2 but found $r2") - } - } -} From 60d3ba1ec3c2c9d767e8f63f43aadda2de4c4e28 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 20 Sep 2017 03:08:33 +0000 Subject: [PATCH 07/11] Sync withColumns and related tests. --- .../main/scala/org/apache/spark/sql/Dataset.scala | 4 +++- .../scala/org/apache/spark/sql/DataFrameSuite.scala | 13 +++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 94ede8c53a8b3..0fc6e2a56e3da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2106,9 +2106,11 @@ class Dataset[T] private[sql]( * the same names. */ private[spark] def withColumns(colNames: Seq[String], cols: Seq[Column]): DataFrame = { - assert(colNames.size == cols.size, + require(colNames.size == cols.size, s"The size of column names: ${colNames.size} isn't equal to " + s"the size of columns: ${cols.size}") + require(colNames.distinct.size == colNames.size, + s"It is disallowed to use duplicate column names: $colNames") val resolver = sparkSession.sessionState.analyzer.resolver val output = queryExecution.analyzed.output diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index df7044ec6f71d..3a161efb5f2ce 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -650,6 +650,19 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { Row(key, value, key + 1, key + 2) }.toSeq) assert(df.schema.map(_.name) === Seq("key", "value", "newCol1", "newCol2")) + + val err = intercept[IllegalArgumentException] { + testData.toDF().withColumns(Seq("newCol1"), + Seq(col("key") + 1, col("key") + 2)) + } + assert( + err.getMessage.contains("The size of column names: 1 isn't equal to the size of columns: 2")) + + val err2 = intercept[IllegalArgumentException] { + testData.toDF().withColumns(Seq("newCol1", "newCol1"), + Seq(col("key") + 1, col("key") + 2)) + } + assert(err2.getMessage.contains("It is disallowed to use duplicate column names")) } test("replace column using withColumn") { From f70fc2a956082a83859313f663649a9b17dbbf36 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 21 Sep 2017 03:19:06 +0000 Subject: [PATCH 08/11] Print log warning if inputCol and inputCols are both set. --- .../apache/spark/ml/feature/Bucketizer.scala | 28 +++++++++++++------ .../spark/ml/feature/BucketizerSuite.scala | 23 +++++++++++---- 2 files changed, 36 insertions(+), 15 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala index 6f6883ab68987..a5b1d016cf2c2 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala @@ -33,7 +33,9 @@ import org.apache.spark.sql.types.{DoubleType, StructField, StructType} /** * `Bucketizer` maps a column of continuous features to a column of feature buckets. Since 2.3.0, - * `Bucketizer` can also map multiple columns at once. + * `Bucketizer` can also map multiple columns at once. Whether it goes to map a column or multiple + * columns, it depends on which parameter of `inputCol` and `inputCols` is set. When both are set, + * a log warning will be printed and by default it chooses `inputCol`. */ @Since("1.4.0") final class Bucketizer @Since("1.4.0") (@Since("1.4.0") override val uid: String) @@ -142,12 +144,20 @@ final class Bucketizer @Since("1.4.0") (@Since("1.4.0") override val uid: String def setOutputCols(value: Array[String]): this.type = set(outputCols, value) /** - * Determines whether this `Bucketizer` is going to map multiple columns. Only if all necessary - * params for bucketizing multiple columns are set, we go for the path to map multiple columns. - * By default `Bucketizer` just maps a column of continuous features. + * Determines whether this `Bucketizer` is going to map multiple columns. If and only if + * `inputCols` is set, it will map multiple columns. Otherwise, it just maps a column specified + * by `inputCol`. A warning will be printed if both are set. */ - private[ml] def isBucketizeMultipleInputCols(): Boolean = { - isSet(inputCols) && isSet(splitsArray) && isSet(outputCols) + private[ml] def isBucketizeMultipleColumns(): Boolean = { + if (isSet(inputCols) && isSet(inputCol)) { + logWarning("Both `inputCol` and `inputCols` are set, we ignore `inputCols` and this " + + "`Bucketizer` only map one column specified by `inputCol`") + false + } else if (isSet(inputCols)) { + true + } else { + false + } } @Since("2.0.0") @@ -163,7 +173,7 @@ final class Bucketizer @Since("1.4.0") (@Since("1.4.0") override val uid: String } } - val seqOfSplits = if (isBucketizeMultipleInputCols()) { + val seqOfSplits = if (isBucketizeMultipleColumns()) { $(splitsArray).toSeq } else { Seq($(splits)) @@ -175,7 +185,7 @@ final class Bucketizer @Since("1.4.0") (@Since("1.4.0") override val uid: String }.withName(s"bucketizer_$idx") } - val (inputColumns, outputColumns) = if (isBucketizeMultipleInputCols()) { + val (inputColumns, outputColumns) = if (isBucketizeMultipleColumns()) { ($(inputCols).toSeq, $(outputCols).toSeq) } else { (Seq($(inputCol)), Seq($(outputCol))) @@ -198,7 +208,7 @@ final class Bucketizer @Since("1.4.0") (@Since("1.4.0") override val uid: String @Since("1.4.0") override def transformSchema(schema: StructType): StructType = { - if (isBucketizeMultipleInputCols()) { + if (isBucketizeMultipleColumns()) { var transformedSchema = schema $(inputCols).zip($(outputCols)).zipWithIndex.map { case ((inputCol, outputCol), idx) => SchemaUtils.checkNumericType(transformedSchema, inputCol) diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala index d399c1081363b..ca934b677e1ba 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala @@ -207,7 +207,7 @@ class BucketizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defa .setOutputCols(Array("result1", "result2")) .setSplitsArray(splits) - assert(bucketizer1.isBucketizeMultipleInputCols()) + assert(bucketizer1.isBucketizeMultipleColumns()) bucketizer1.transform(dataFrame).select("result1", "expected1", "result2", "expected2") .collect().foreach { @@ -228,7 +228,7 @@ class BucketizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defa .setOutputCols(Array("result")) .setSplitsArray(Array(splits(0))) - assert(bucketizer2.isBucketizeMultipleInputCols()) + assert(bucketizer2.isBucketizeMultipleColumns()) withClue("Invalid feature value -0.9 was not caught as an invalid feature!") { intercept[SparkException] { @@ -263,7 +263,7 @@ class BucketizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defa .setOutputCols(Array("result1", "result2")) .setSplitsArray(splits) - assert(bucketizer.isBucketizeMultipleInputCols()) + assert(bucketizer.isBucketizeMultipleColumns()) bucketizer.transform(dataFrame).select("result1", "expected1", "result2", "expected2") .collect().foreach { @@ -295,7 +295,7 @@ class BucketizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defa .setOutputCols(Array("result1", "result2")) .setSplitsArray(splits) - assert(bucketizer.isBucketizeMultipleInputCols()) + assert(bucketizer.isBucketizeMultipleColumns()) bucketizer.setHandleInvalid("keep") bucketizer.transform(dataFrame).select("result1", "expected1", "result2", "expected2") @@ -340,7 +340,7 @@ class BucketizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defa .setInputCols(Array("myInputCol")) .setOutputCols(Array("myOutputCol")) .setSplitsArray(Array(Array(0.1, 0.8, 0.9))) - assert(t.isBucketizeMultipleInputCols()) + assert(t.isBucketizeMultipleColumns()) testDefaultReadWrite(t) } @@ -353,7 +353,7 @@ class BucketizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defa .setOutputCols(Array("result1", "result2")) .setSplitsArray(Array(Array(-0.5, 0.0, 0.5), Array(-0.5, 0.0, 0.5))) - assert(bucket.isBucketizeMultipleInputCols()) + assert(bucket.isBucketizeMultipleColumns()) val pl = new Pipeline() .setStages(Array(bucket)) @@ -367,6 +367,17 @@ class BucketizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defa s"The feature value is not correct after bucketing. Expected $e2 but found $r2") } } + + test("Both inputCol and inputCols are set") { + val bucket = new Bucketizer() + .setInputCol("feature1") + .setOutputCol("result") + .setSplits(Array(-0.5, 0.0, 0.5)) + .setInputCols(Array("feature1", "feature2")) + + // When both are set, we ignore `inputCols` and just map the column specified by `inputCol`. + assert(bucket.isBucketizeMultipleColumns() == false) + } } private object BucketizerSuite extends SparkFunSuite { From 000844ab1f0dffef9b51b96f7edc1e1ab9e9e0b7 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 2 Oct 2017 23:42:27 +0000 Subject: [PATCH 09/11] Add test for withColumns with metadata. --- .../scala/org/apache/spark/sql/Dataset.scala | 3 ++ .../org/apache/spark/sql/DataFrameSuite.scala | 28 +++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 32ecd5b171511..35e845dc5b912 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2126,6 +2126,9 @@ class Dataset[T] private[sql]( colNames: Seq[String], cols: Seq[Column], metadata: Seq[Metadata]): DataFrame = { + require(colNames.size == metadata.size, + s"The size of column names: ${colNames.size} isn't equal to " + + s"the size of metadata elements: ${metadata.size}") val newCols = colNames.zip(cols).zip(metadata).map { case ((colName, col), metadata) => col.as(colName, metadata) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 672deeac597f1..744ef91c93703 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -684,6 +684,34 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } } + test("withColumns: given metadata") { + def buildMetadata(num: Int): Seq[Metadata] = { + (0 until num).map { n => + val builder = new MetadataBuilder + builder.putLong("key", n.toLong) + builder.build() + } + } + + val df = testData.toDF().withColumns( + Seq("newCol1", "newCol2"), + Seq(col("key") + 1, col("key") + 2), + buildMetadata(2)) + + df.select("newCol1", "newCol2").schema.zipWithIndex.foreach { case (col, idx) => + assert(col.metadata.getLong("key").toInt === idx) + } + + val err = intercept[IllegalArgumentException] { + testData.toDF().withColumns( + Seq("newCol1", "newCol2"), + Seq(col("key") + 1, col("key") + 2), + buildMetadata(1)) + } + assert(err.getMessage.contains( + "The size of column names: 2 isn't equal to the size of metadata elements: 1")) + } + test("replace column using withColumn") { val df2 = sparkContext.parallelize(Array(1, 2, 3)).toDF("x") val df3 = df2.withColumn("x", df2("x") + 1) From 1889995c12e55b2420726540756b4b0b69b1bb28 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 10 Oct 2017 13:00:02 +0000 Subject: [PATCH 10/11] Re-implement withColumn with metadata by withColumns, so same test can cover both. --- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 35e845dc5b912..f7f37583e8223 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2138,9 +2138,8 @@ class Dataset[T] private[sql]( /** * Returns a new Dataset by adding a column with metadata. */ - private[spark] def withColumn(colName: String, col: Column, metadata: Metadata): DataFrame = { - withColumn(colName, col.as(colName, metadata)) - } + private[spark] def withColumn(colName: String, col: Column, metadata: Metadata): DataFrame = + withColumns(Seq(colName), Seq(col), Seq(metadata)) /** * Returns a new Dataset with a column renamed. From bb19708dcd359c434c0fac779125f949541f9b8c Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 11 Oct 2017 08:48:19 +0000 Subject: [PATCH 11/11] Address comments. --- .../spark/examples/ml/BucketizerExample.scala | 36 +++++- .../apache/spark/ml/feature/Bucketizer.scala | 37 +++--- .../ml/param/shared/SharedParamsCodeGen.scala | 1 + .../spark/ml/param/shared/sharedParams.scala | 15 +++ .../spark/ml/feature/BucketizerSuite.scala | 116 ++++++++++++------ 5 files changed, 146 insertions(+), 59 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/BucketizerExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/BucketizerExample.scala index 04e4eccd436ed..7e65f9c88907d 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/BucketizerExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/BucketizerExample.scala @@ -22,7 +22,13 @@ package org.apache.spark.examples.ml import org.apache.spark.ml.feature.Bucketizer // $example off$ import org.apache.spark.sql.SparkSession - +/** + * An example for Bucketizer. + * Run with + * {{{ + * bin/run-example ml.BucketizerExample + * }}} + */ object BucketizerExample { def main(args: Array[String]): Unit = { val spark = SparkSession @@ -48,6 +54,34 @@ object BucketizerExample { bucketedData.show() // $example off$ + // $example on$ + val splitsArray = Array( + Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity), + Array(Double.NegativeInfinity, -0.3, 0.0, 0.3, Double.PositiveInfinity)) + + val data2 = Array( + (-999.9, -999.9), + (-0.5, -0.2), + (-0.3, -0.1), + (0.0, 0.0), + (0.2, 0.4), + (999.9, 999.9)) + val dataFrame2 = spark.createDataFrame(data2).toDF("features1", "features2") + + val bucketizer2 = new Bucketizer() + .setInputCols(Array("features1", "features2")) + .setOutputCols(Array("bucketedFeatures1", "bucketedFeatures2")) + .setSplitsArray(splitsArray) + + // Transform original data into its bucket index. + val bucketedData2 = bucketizer2.transform(dataFrame2) + + println(s"Bucketizer output with [" + + s"${bucketizer2.getSplitsArray(0).length-1}, " + + s"${bucketizer2.getSplitsArray(1).length-1}] buckets for each input column") + bucketedData2.show() + // $example off$ + spark.stop() } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala index a5b1d016cf2c2..e07f2a107badb 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala @@ -24,7 +24,7 @@ import org.apache.spark.annotation.Since import org.apache.spark.ml.Model import org.apache.spark.ml.attribute.NominalAttribute import org.apache.spark.ml.param._ -import org.apache.spark.ml.param.shared.{HasHandleInvalid, HasInputCol, HasInputCols, HasOutputCol} +import org.apache.spark.ml.param.shared.{HasHandleInvalid, HasInputCol, HasInputCols, HasOutputCol, HasOutputCols} import org.apache.spark.ml.util._ import org.apache.spark.sql._ import org.apache.spark.sql.expressions.UserDefinedFunction @@ -33,14 +33,15 @@ import org.apache.spark.sql.types.{DoubleType, StructField, StructType} /** * `Bucketizer` maps a column of continuous features to a column of feature buckets. Since 2.3.0, - * `Bucketizer` can also map multiple columns at once. Whether it goes to map a column or multiple - * columns, it depends on which parameter of `inputCol` and `inputCols` is set. When both are set, - * a log warning will be printed and by default it chooses `inputCol`. + * `Bucketizer` can map multiple columns at once by setting the `inputCols` parameter. Note that + * when both the `inputCol` and `inputCols` parameters are set, a log warning will be printed and + * only `inputCol` will take effect, while `inputCols` will be ignored. The `splits` parameter is + * only used for single column usage, and `splitsArray` is for multiple columns. */ @Since("1.4.0") final class Bucketizer @Since("1.4.0") (@Since("1.4.0") override val uid: String) extends Model[Bucketizer] with HasHandleInvalid with HasInputCol with HasOutputCol - with HasInputCols with DefaultParamsWritable { + with HasInputCols with HasOutputCols with DefaultParamsWritable { @Since("1.4.0") def this() = this(Identifiable.randomUID("bucketizer")) @@ -84,7 +85,9 @@ final class Bucketizer @Since("1.4.0") (@Since("1.4.0") override val uid: String /** * Param for how to handle invalid entries. Options are 'skip' (filter out rows with * invalid values), 'error' (throw an error), or 'keep' (keep invalid values in a special - * additional bucket). + * additional bucket). Note that in the multiple column case, the invalid handling is applied + * to all columns. That said for 'error' it will throw an error if any invalids are found in + * any column, for 'skip' it will skip rows with any invalids in any columns, etc. * Default: "error" * @group param */ @@ -115,22 +118,10 @@ final class Bucketizer @Since("1.4.0") (@Since("1.4.0") override val uid: String "specified will be treated as errors.", Bucketizer.checkSplitsArray) - /** - * Param for output column names. - * @group param - */ - @Since("2.3.0") - final val outputCols: StringArrayParam = new StringArrayParam(this, "outputCols", - "output column names") - /** @group getParam */ @Since("2.3.0") def getSplitsArray: Array[Array[Double]] = $(splitsArray) - /** @group getParam */ - @Since("2.3.0") - final def getOutputCols: Array[String] = $(outputCols) - /** @group setParam */ @Since("2.3.0") def setSplitsArray(value: Array[Array[Double]]): this.type = set(splitsArray, value) @@ -148,7 +139,7 @@ final class Bucketizer @Since("1.4.0") (@Since("1.4.0") override val uid: String * `inputCols` is set, it will map multiple columns. Otherwise, it just maps a column specified * by `inputCol`. A warning will be printed if both are set. */ - private[ml] def isBucketizeMultipleColumns(): Boolean = { + private[feature] def isBucketizeMultipleColumns(): Boolean = { if (isSet(inputCols) && isSet(inputCol)) { logWarning("Both `inputCol` and `inputCols` are set, we ignore `inputCols` and this " + "`Bucketizer` only map one column specified by `inputCol`") @@ -162,7 +153,7 @@ final class Bucketizer @Since("1.4.0") (@Since("1.4.0") override val uid: String @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { - transformSchema(dataset.schema) + val transformedSchema = transformSchema(dataset.schema) val (filteredDataset, keepInvalid) = { if (getHandleInvalid == Bucketizer.SKIP_INVALID) { @@ -193,10 +184,10 @@ final class Bucketizer @Since("1.4.0") (@Since("1.4.0") override val uid: String val newCols = inputColumns.zipWithIndex.map { case (inputCol, idx) => bucketizers(idx)(filteredDataset(inputCol).cast(DoubleType)) } - val newFields = outputColumns.zipWithIndex.map { case (outputCol, idx) => - prepOutputField(seqOfSplits(idx), outputCol) + val metadata = outputColumns.map { col => + transformedSchema(col).metadata } - filteredDataset.withColumns(outputColumns, newCols, newFields.map(_.metadata)) + filteredDataset.withColumns(outputColumns, newCols, metadata) } private def prepOutputField(splits: Array[Double], outputCol: String): StructField = { diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala index 1860fe8361749..64163d3c16039 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala @@ -60,6 +60,7 @@ private[shared] object SharedParamsCodeGen { ParamDesc[String]("inputCol", "input column name"), ParamDesc[Array[String]]("inputCols", "input column names"), ParamDesc[String]("outputCol", "output column name", Some("uid + \"__output\"")), + ParamDesc[Array[String]]("outputCols", "output column names"), ParamDesc[Int]("checkpointInterval", "set checkpoint interval (>= 1) or " + "disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed " + "every 10 iterations", isValid = "(interval: Int) => interval == -1 || interval >= 1"), diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala index 6061d9ca0a084..b9f4791b37de1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala @@ -230,6 +230,21 @@ private[ml] trait HasOutputCol extends Params { final def getOutputCol: String = $(outputCol) } +/** + * Trait for shared param outputCols. + */ +private[ml] trait HasOutputCols extends Params { + + /** + * Param for output column names. + * @group param + */ + final val outputCols: StringArrayParam = new StringArrayParam(this, "outputCols", "output column names") + + /** @group getParam */ + final def getOutputCols: Array[String] = $(outputCols) +} + /** * Trait for shared param checkpointInterval. */ diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala index ca934b677e1ba..748dbd1b995d3 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala @@ -200,7 +200,7 @@ class BucketizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defa val data = (0 until validData1.length).map { idx => (validData1(idx), validData2(idx), expectedBuckets1(idx), expectedBuckets2(idx)) } - val dataFrame: DataFrame = data.toSeq.toDF("feature1", "feature2", "expected1", "expected2") + val dataFrame: DataFrame = data.toDF("feature1", "feature2", "expected1", "expected2") val bucketizer1: Bucketizer = new Bucketizer() .setInputCols(Array("feature1", "feature2")) @@ -210,16 +210,12 @@ class BucketizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defa assert(bucketizer1.isBucketizeMultipleColumns()) bucketizer1.transform(dataFrame).select("result1", "expected1", "result2", "expected2") - .collect().foreach { - case Row(r1: Double, e1: Double, r2: Double, e2: Double) => - assert(r1 === e1, - s"The feature value is not correct after bucketing. Expected $e1 but found $r1") - assert(r2 === e2, - s"The feature value is not correct after bucketing. Expected $e2 but found $r2") - } + BucketizerSuite.checkBucketResults(bucketizer1.transform(dataFrame), + Seq("result1", "result2"), + Seq("expected1", "expected2")) // Check for exceptions when using a set of invalid feature values. - val invalidData1: Array[Double] = Array(-0.9) ++ validData1 + val invalidData1 = Array(-0.9) ++ validData1 val invalidData2 = Array(0.51) ++ validData1 val badDF1 = invalidData1.zipWithIndex.toSeq.toDF("feature", "idx") @@ -256,7 +252,7 @@ class BucketizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defa val data = (0 until validData1.length).map { idx => (validData1(idx), validData2(idx), expectedBuckets1(idx), expectedBuckets2(idx)) } - val dataFrame: DataFrame = data.toSeq.toDF("feature1", "feature2", "expected1", "expected2") + val dataFrame: DataFrame = data.toDF("feature1", "feature2", "expected1", "expected2") val bucketizer: Bucketizer = new Bucketizer() .setInputCols(Array("feature1", "feature2")) @@ -265,14 +261,9 @@ class BucketizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defa assert(bucketizer.isBucketizeMultipleColumns()) - bucketizer.transform(dataFrame).select("result1", "expected1", "result2", "expected2") - .collect().foreach { - case Row(r1: Double, e1: Double, r2: Double, e2: Double) => - assert(r1 === e1, - s"The feature value is not correct after bucketing. Expected $e1 but found $r1") - assert(r2 === e2, - s"The feature value is not correct after bucketing. Expected $e2 but found $r2") - } + BucketizerSuite.checkBucketResults(bucketizer.transform(dataFrame), + Seq("result1", "result2"), + Seq("expected1", "expected2")) } test("multiple columns: Bucket continuous features, with NaN data but non-NaN splits") { @@ -288,7 +279,7 @@ class BucketizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defa val data = (0 until validData1.length).map { idx => (validData1(idx), validData2(idx), expectedBuckets1(idx), expectedBuckets2(idx)) } - val dataFrame: DataFrame = data.toSeq.toDF("feature1", "feature2", "expected1", "expected2") + val dataFrame: DataFrame = data.toDF("feature1", "feature2", "expected1", "expected2") val bucketizer: Bucketizer = new Bucketizer() .setInputCols(Array("feature1", "feature2")) @@ -298,14 +289,9 @@ class BucketizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defa assert(bucketizer.isBucketizeMultipleColumns()) bucketizer.setHandleInvalid("keep") - bucketizer.transform(dataFrame).select("result1", "expected1", "result2", "expected2") - .collect().foreach { - case Row(r1: Double, e1: Double, r2: Double, e2: Double) => - assert(r1 === e1, - s"The feature value is not correct after bucketing. Expected $e1 but found $r1") - assert(r2 === e2, - s"The feature value is not correct after bucketing. Expected $e2 but found $r2") - } + BucketizerSuite.checkBucketResults(bucketizer.transform(dataFrame), + Seq("result1", "result2"), + Seq("expected1", "expected2")) bucketizer.setHandleInvalid("skip") val skipResults1: Array[Double] = bucketizer.transform(dataFrame) @@ -335,7 +321,7 @@ class BucketizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defa } } - test("multiple columns:: read/write") { + test("multiple columns: read/write") { val t = new Bucketizer() .setInputCols(Array("myInputCol")) .setOutputCols(Array("myOutputCol")) @@ -359,13 +345,51 @@ class BucketizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defa .setStages(Array(bucket)) .fit(df) pl.transform(df).select("result1", "expected1", "result2", "expected2") - .collect().foreach { - case Row(r1: Double, e1: Double, r2: Double, e2: Double) => - assert(r1 === e1, - s"The feature value is not correct after bucketing. Expected $e1 but found $r1") - assert(r2 === e2, - s"The feature value is not correct after bucketing. Expected $e2 but found $r2") - } + + BucketizerSuite.checkBucketResults(pl.transform(df), + Seq("result1", "result2"), Seq("expected1", "expected2")) + } + + test("Compare single/multiple column(s) Bucketizer in pipeline") { + val df = Seq((0.5, 0.3, 1.0, 1.0), (0.5, -0.4, 1.0, 0.0)) + .toDF("feature1", "feature2", "expected1", "expected2") + + val multiColsBucket = new Bucketizer() + .setInputCols(Array("feature1", "feature2")) + .setOutputCols(Array("result1", "result2")) + .setSplitsArray(Array(Array(-0.5, 0.0, 0.5), Array(-0.5, 0.0, 0.5))) + + val plForMultiCols = new Pipeline() + .setStages(Array(multiColsBucket)) + .fit(df) + + val bucketForCol1 = new Bucketizer() + .setInputCol("feature1") + .setOutputCol("result1") + .setSplits(Array(-0.5, 0.0, 0.5)) + val bucketForCol2 = new Bucketizer() + .setInputCol("feature2") + .setOutputCol("result2") + .setSplits(Array(-0.5, 0.0, 0.5)) + + val plForSingleCol = new Pipeline() + .setStages(Array(bucketForCol1, bucketForCol2)) + .fit(df) + + val resultForSingleCol = plForSingleCol.transform(df) + .select("result1", "expected1", "result2", "expected2") + .collect() + val resultForMultiCols = plForMultiCols.transform(df) + .select("result1", "expected1", "result2", "expected2") + .collect() + + resultForSingleCol.zip(resultForMultiCols).foreach { + case (rowForSingle, rowForMultiCols) => + assert(rowForSingle.getDouble(0) == rowForMultiCols.getDouble(0) && + rowForSingle.getDouble(1) == rowForMultiCols.getDouble(1) && + rowForSingle.getDouble(2) == rowForMultiCols.getDouble(2) && + rowForSingle.getDouble(3) == rowForMultiCols.getDouble(3)) + } } test("Both inputCol and inputCols are set") { @@ -411,4 +435,26 @@ private object BucketizerSuite extends SparkFunSuite { i += 1 } } + + /** Checks if bucketized results match expected ones. */ + def checkBucketResults( + bucketResult: DataFrame, + resultColumns: Seq[String], + expectedColumns: Seq[String]): Unit = { + assert(resultColumns.length == expectedColumns.length, + s"Given ${resultColumns.length} result columns doesn't match " + + s"${expectedColumns.length} expected columns.") + assert(resultColumns.length > 0, "At least one result and expected columns are needed.") + + val allColumns = resultColumns ++ expectedColumns + bucketResult.select(allColumns.head, allColumns.tail: _*).collect().foreach { + case row => + for (idx <- 0 until row.length / 2) { + val result = row.getDouble(idx) + val expected = row.getDouble(idx + row.length / 2) + assert(result === expected, "The feature value is not correct after bucketing. " + + s"Expected $expected but found $result.") + } + } + } }