diff --git a/R/pkg/R/mllib_classification.R b/R/pkg/R/mllib_classification.R index 7cd072a1d6f8..68cf74be9818 100644 --- a/R/pkg/R/mllib_classification.R +++ b/R/pkg/R/mllib_classification.R @@ -58,6 +58,9 @@ setClass("NaiveBayesModel", representation(jobj = "jobj")) #' @param regParam The regularization parameter. Only supports L2 regularization currently. #' @param maxIter Maximum iteration number. #' @param tol Convergence tolerance of iterations. +#' @param solver Optimization solver, supported options: "owlqn" or "l-bfgs". Default is "l-bfgs" +#' @param loss Loss function, supported options: "hinge" and "squared_hinge". Default is +# "squared_hinge" #' @param standardization Whether to standardize the training features before fitting the model. #' The coefficients of models will be always returned on the original scale, #' so it will be transparent for users. Note that with/without @@ -107,7 +110,10 @@ setClass("NaiveBayesModel", representation(jobj = "jobj")) setMethod("spark.svmLinear", signature(data = "SparkDataFrame", formula = "formula"), function(data, formula, regParam = 0.0, maxIter = 100, tol = 1E-6, standardization = TRUE, threshold = 0.0, weightCol = NULL, aggregationDepth = 2, - handleInvalid = c("error", "keep", "skip")) { + handleInvalid = c("error", "keep", "skip"), solver = c("l-bfgs", "owlqn"), + loss = c("squared_hinge", "hinge")) { + solver <- match.arg(solver) + loss <- match.arg(loss) formula <- paste(deparse(formula), collapse = "") if (!is.null(weightCol) && weightCol == "") { @@ -121,7 +127,8 @@ setMethod("spark.svmLinear", signature(data = "SparkDataFrame", formula = "formu jobj <- callJStatic("org.apache.spark.ml.r.LinearSVCWrapper", "fit", data@sdf, formula, as.numeric(regParam), as.integer(maxIter), as.numeric(tol), as.logical(standardization), as.numeric(threshold), - weightCol, as.integer(aggregationDepth), handleInvalid) + weightCol, as.integer(aggregationDepth), handleInvalid, solver, + loss) new("LinearSVCModel", jobj = jobj) }) diff --git a/R/pkg/tests/fulltests/test_mllib_classification.R b/R/pkg/tests/fulltests/test_mllib_classification.R index a4d0397236d1..7b1edc71b8b0 100644 --- a/R/pkg/tests/fulltests/test_mllib_classification.R +++ b/R/pkg/tests/fulltests/test_mllib_classification.R @@ -30,7 +30,8 @@ absoluteSparkPath <- function(x) { test_that("spark.svmLinear", { df <- suppressWarnings(createDataFrame(iris)) training <- df[df$Species %in% c("versicolor", "virginica"), ] - model <- spark.svmLinear(training, Species ~ ., regParam = 0.01, maxIter = 10) + model <- spark.svmLinear(training, Species ~ ., regParam = 0.01, maxIter = 10, + solver = "owlqn", loss = "hinge") summary <- summary(model) # test summary coefficients return matrix type diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index ce400f4f1faf..bc0757f836e0 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -17,10 +17,12 @@ package org.apache.spark.ml.classification +import java.util.Locale + import scala.collection.mutable import breeze.linalg.{DenseVector => BDV} -import breeze.optimize.{CachedDiffFunction, OWLQN => BreezeOWLQN} +import breeze.optimize.{CachedDiffFunction, LBFGS => BreezeLBFGS, OWLQN => BreezeOWLQN} import org.apache.hadoop.fs.Path import org.apache.spark.SparkException @@ -28,7 +30,7 @@ import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.internal.Logging import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg._ -import org.apache.spark.ml.optim.aggregator.HingeAggregator +import org.apache.spark.ml.optim.aggregator.{HingeAggregator, SquaredHingeAggregator} import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ @@ -42,7 +44,26 @@ import org.apache.spark.sql.functions.{col, lit} /** Params for linear SVM Classifier. */ private[classification] trait LinearSVCParams extends ClassifierParams with HasRegParam with HasMaxIter with HasFitIntercept with HasTol with HasStandardization with HasWeightCol - with HasAggregationDepth with HasThreshold { + with HasAggregationDepth with HasThreshold with HasSolver { + + /** + * Specifies the loss function. Currently "hinge" and "squared_hinge" are supported. + * "hinge" is the standard SVM loss (a.k.a. L1 loss) while "squared_hinge" is the square of + * the hinge loss (a.k.a. L2 loss). + * + * @see Hinge loss (Wikipedia) + * + * @group param + */ + @Since("2.3.0") + final val loss: Param[String] = new Param(this, "loss", "Specifies the loss " + + "function. hinge is the standard SVM loss while squared_hinge is the square of the hinge loss.", + (s: String) => LinearSVC.supportedLoss.contains(s.toLowerCase(Locale.ROOT))) + + /** @group getParam */ + @Since("2.3.0") + def getLoss: String = $(loss) + /** * Param for threshold in binary classification prediction. @@ -63,8 +84,11 @@ private[classification] trait LinearSVCParams extends ClassifierParams with HasR * * Linear SVM Classifier * - * This binary classifier optimizes the Hinge Loss using the OWLQN optimizer. - * Only supports L2 regularization currently. + * This binary classifier implements a linear SVM classifier. Currently "hinge" and + * "squared_hinge" loss functions are supported. "hinge" is the standard SVM loss (a.k.a. L1 loss) + * while "squared_hinge" is the square of the hinge loss (a.k.a. L2 loss). Both LBFGS and OWL-QN + * optimizers are supported and can be specified via setting the solver param. + * By default, L2 SVM (Squared Hinge Loss) and L-BFGS optimizer are used. * */ @Since("2.2.0") @@ -74,6 +98,8 @@ class LinearSVC @Since("2.2.0") ( extends Classifier[Vector, LinearSVC, LinearSVCModel] with LinearSVCParams with DefaultParamsWritable { + import LinearSVC._ + @Since("2.2.0") def this() = this(Identifiable.randomUID("linearsvc")) @@ -159,6 +185,31 @@ class LinearSVC @Since("2.2.0") ( def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value) setDefault(aggregationDepth -> 2) + /** + * Set the loss function. Default is "squared_hinge". + * + * @group setParam + */ + @Since("2.3.0") + def setLoss(value: String): this.type = set(loss, value) + setDefault(loss -> SQUARED_HINGE) + + /** + * Set solver for LinearSVC. Supported options: "l-bfgs" and "owlqn" (case insensitve). + * - "l-bfgs" denotes Limited-memory BFGS which is a limited-memory quasi-Newton + * optimization method. + * - "owlqn" denotes Orthant-Wise Limited-memory Quasi-Newton algorithm . + * (default: "owlqn") + * @group setParam + */ + @Since("2.3.0") + def setSolver(value: String): this.type = { + require(supportedSolvers.contains(value.toLowerCase(Locale.ROOT)), s"Solver $value was" + + s" not supported. Supported options: ${supportedSolvers.mkString(", ")}") + set(solver, value) + } + setDefault(solver -> LBFGS) + @Since("2.2.0") override def copy(extra: ParamMap): LinearSVC = defaultCopy(extra) @@ -225,12 +276,27 @@ class LinearSVC @Since("2.2.0") ( None } - val getAggregatorFunc = new HingeAggregator(bcFeaturesStd, $(fitIntercept))(_) - val costFun = new RDDLossFunction(instances, getAggregatorFunc, regularization, - $(aggregationDepth)) + val costFun = $(loss) match { + case HINGE => + val getAggregatorFunc = new HingeAggregator(bcFeaturesStd, $(fitIntercept))(_) + new RDDLossFunction(instances, getAggregatorFunc, regularization, + $(aggregationDepth)) + case SQUARED_HINGE => + val getAggregatorFunc = new SquaredHingeAggregator(bcFeaturesStd, $(fitIntercept))(_) + new RDDLossFunction(instances, getAggregatorFunc, regularization, + $(aggregationDepth)) + case unexpected => throw new SparkException( + s"unexpected loss Function in LinearSVC: $unexpected") + } + + val optimizer = $(solver).toLowerCase(Locale.ROOT) match { + case LBFGS => new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol)) + case OWLQN => + def regParamL1Fun = (index: Int) => 0D + new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol)) + case _ => throw new SparkException ("unexpected solver: " + $(solver)) + } - def regParamL1Fun = (index: Int) => 0D - val optimizer = new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol)) val initialCoefWithIntercept = Vectors.zeros(numFeaturesPlusIntercept) val states = optimizer.iterations(new CachedDiffFunction(costFun), @@ -282,8 +348,27 @@ class LinearSVC @Since("2.2.0") ( @Since("2.2.0") object LinearSVC extends DefaultParamsReadable[LinearSVC] { + /** String name for Limited-memory BFGS. */ + private[classification] val LBFGS: String = "l-bfgs".toLowerCase(Locale.ROOT) + + /** String name for Orthant-Wise Limited-memory Quasi-Newton. */ + private[classification] val OWLQN: String = "owlqn".toLowerCase(Locale.ROOT) + + /* Set of optimizers that LinearSVC supports */ + private[classification] val supportedSolvers = Array(LBFGS, OWLQN) + + /** String name for Hinge Loss. */ + private[classification] val HINGE: String = "hinge".toLowerCase(Locale.ROOT) + + /** String name for Squared Hinge Loss. */ + private[classification] val SQUARED_HINGE: String = "squared_hinge".toLowerCase(Locale.ROOT) + + /* Set of loss function that LinearSVC supports */ + private[classification] val supportedLoss = Array(HINGE, SQUARED_HINGE) + @Since("2.2.0") override def load(path: String): LinearSVC = super.load(path) + } /** diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/SquaredHingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/SquaredHingeAggregator.scala new file mode 100644 index 000000000000..d9c245ba883e --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/SquaredHingeAggregator.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.optim.aggregator + +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.linalg._ + +/** + * SquaredHingeAggregator computes the gradient and loss for squared Hinge loss function, as used in + * binary classification for instances in sparse or dense vector in an online fashion. + * + * Two SquaredHingeAggregator can be merged together to have a summary of loss and gradient of + * the corresponding joint dataset. + * + * This class standardizes feature values during computation using bcFeaturesStd. + * + * @param bcCoefficients The coefficients corresponding to the features. + * @param fitIntercept Whether to fit an intercept term. + * @param bcFeaturesStd The standard deviation values of the features. + */ +private[ml] class SquaredHingeAggregator( + bcFeaturesStd: Broadcast[Array[Double]], + fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector]) + extends DifferentiableLossAggregator[Instance, SquaredHingeAggregator] { + + private val numFeatures: Int = bcFeaturesStd.value.length + private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures + 1 else numFeatures + @transient private lazy val coefficientsArray = bcCoefficients.value match { + case DenseVector(values) => values + case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" + + s" but got type ${bcCoefficients.value.getClass}.") + } + protected override val dim: Int = numFeaturesPlusIntercept + + /** + * Add a new training instance to this SquaredHingeAggregator, and update the loss and gradient + * of the objective function. + * + * @param instance The instance of data point to be added. + * @return This SquaredHingeAggregator object. + */ + def add(instance: Instance): this.type = { + instance match { case Instance(label, weight, features) => + require(numFeatures == features.size, s"Dimensions mismatch when adding new instance." + + s" Expecting $numFeatures but got ${features.size}.") + require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0") + + if (weight == 0.0) return this + val localFeaturesStd = bcFeaturesStd.value + val localCoefficients = coefficientsArray + val localGradientSumArray = gradientSumArray + + val dotProduct = { + var sum = 0.0 + features.foreachActive { (index, value) => + if (localFeaturesStd(index) != 0.0 && value != 0.0) { + sum += localCoefficients(index) * value / localFeaturesStd(index) + } + } + if (fitIntercept) sum += localCoefficients(numFeaturesPlusIntercept - 1) + sum + } + // Our loss function with {0, 1} labels is (max(0, 1 - (2y - 1) (f_w(x))))^2 + // Therefore the gradient is 2 * ((2y - 1) f_w(x) - 1) * (2y - 1) * x + val labelScaled = 2 * label - 1.0 + val scaledDoctProduct = labelScaled * dotProduct + val loss = if (1.0 > scaledDoctProduct) { + val hingeLoss = 1.0 - scaledDoctProduct + hingeLoss * hingeLoss * weight + } else { + 0.0 + } + + if (1.0 > scaledDoctProduct) { + val gradientScale = (scaledDoctProduct - 1) * labelScaled * 2 * weight + features.foreachActive { (index, value) => + if (localFeaturesStd(index) != 0.0 && value != 0.0) { + localGradientSumArray(index) += value * gradientScale / localFeaturesStd(index) + } + } + if (fitIntercept) { + localGradientSumArray(localGradientSumArray.length - 1) += gradientScale + } + } // else gradient will not be updated. + + lossSum += loss + weightSum += weight + this + } + } +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala index 7a22a71c3a81..ee919421f421 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala @@ -70,7 +70,7 @@ private[r] object LinearSVCWrapper val PREDICTED_LABEL_INDEX_COL = "pred_label_idx" val PREDICTED_LABEL_COL = "prediction" - def fit( + def fit( // scalastyle:ignore data: DataFrame, formula: String, regParam: Double, @@ -80,7 +80,9 @@ private[r] object LinearSVCWrapper threshold: Double, weightCol: String, aggregationDepth: Int, - handleInvalid: String + handleInvalid: String, + solver: String, + loss: String ): LinearSVCWrapper = { val rFormula = new RFormula() @@ -107,6 +109,8 @@ private[r] object LinearSVCWrapper .setPredictionCol(PREDICTED_LABEL_INDEX_COL) .setThreshold(threshold) .setAggregationDepth(aggregationDepth) + .setSolver(solver) + .setLoss(loss) if (weightCol != null) svc.setWeightCol(weightCol) diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala index 41a5d22dd628..2baf1604620a 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala @@ -23,9 +23,8 @@ import breeze.linalg.{DenseVector => BDV} import org.apache.spark.SparkFunSuite import org.apache.spark.ml.classification.LinearSVCSuite._ -import org.apache.spark.ml.feature.{Instance, LabeledPoint} +import org.apache.spark.ml.feature.LabeledPoint import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors} -import org.apache.spark.ml.optim.aggregator.HingeAggregator import org.apache.spark.ml.param.ParamsSuite import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} import org.apache.spark.ml.util.TestingUtils._ @@ -39,11 +38,12 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau import testImplicits._ private val nPoints = 50 - @transient var smallBinaryDataset: Dataset[_] = _ + @transient var smallTrainingDataset: Dataset[_] = _ @transient var smallValidationDataset: Dataset[_] = _ @transient var binaryDataset: Dataset[_] = _ - @transient var smallSparseBinaryDataset: Dataset[_] = _ + // equivalent with smallTrainingDataset but in sparse Vectors. + @transient var smallSparseTrainingDataset: Dataset[_] = _ @transient var smallSparseValidationDataset: Dataset[_] = _ override def beforeAll(): Unit = { @@ -53,16 +53,15 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau val A = 0.01 val B = -1.5 val C = 1.0 - smallBinaryDataset = generateSVMInput(A, Array[Double](B, C), nPoints, 42).toDF() + smallTrainingDataset = generateSVMInput(A, Array[Double](B, C), nPoints, 42).toDF() smallValidationDataset = generateSVMInput(A, Array[Double](B, C), nPoints, 17).toDF() binaryDataset = generateSVMInput(1.0, Array[Double](1.0, 2.0, 3.0, 4.0), 10000, 42).toDF() // Dataset for testing SparseVector val toSparse: Vector => SparseVector = _.asInstanceOf[DenseVector].toSparse val sparse = udf(toSparse) - smallSparseBinaryDataset = smallBinaryDataset.withColumn("features", sparse('features)) + smallSparseTrainingDataset = smallTrainingDataset.withColumn("features", sparse('features)) smallSparseValidationDataset = smallValidationDataset.withColumn("features", sparse('features)) - } /** @@ -76,21 +75,29 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau } test("Linear SVC binary classification") { - val svm = new LinearSVC() - val model = svm.fit(smallBinaryDataset) - assert(model.transform(smallValidationDataset) - .where("prediction=label").count() > nPoints * 0.8) - val sparseModel = svm.fit(smallSparseBinaryDataset) - checkModels(model, sparseModel) + LinearSVC.supportedSolvers.foreach { opt => + LinearSVC.supportedLoss.foreach { loss => + val svm = new LinearSVC().setLoss(loss).setSolver(opt) + val model = svm.fit(smallTrainingDataset) + assert(model.transform(smallValidationDataset) + .where("prediction=label").count() > nPoints * 0.8) + val sparseModel = svm.fit(smallSparseTrainingDataset) + checkModelsEqual(model, sparseModel) + } + } } test("Linear SVC binary classification with regularization") { - val svm = new LinearSVC() - val model = svm.setRegParam(0.1).fit(smallBinaryDataset) - assert(model.transform(smallValidationDataset) - .where("prediction=label").count() > nPoints * 0.8) - val sparseModel = svm.fit(smallSparseBinaryDataset) - checkModels(model, sparseModel) + LinearSVC.supportedSolvers.foreach { opt => + LinearSVC.supportedLoss.foreach { loss => + val svm = new LinearSVC().setSolver(opt).setLoss(loss).setMaxIter(10) + val model = svm.setRegParam(0.1).fit(smallTrainingDataset) + assert(model.transform(smallValidationDataset) + .where("prediction=label").count() > nPoints * 0.8) + val sparseModel = svm.fit(smallSparseTrainingDataset) + checkModelsEqual(model, sparseModel) + } + } } test("params") { @@ -113,8 +120,10 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau assert(lsvc.getFeaturesCol === "features") assert(lsvc.getPredictionCol === "prediction") assert(lsvc.getRawPredictionCol === "rawPrediction") - val model = lsvc.setMaxIter(5).fit(smallBinaryDataset) - model.transform(smallBinaryDataset) + assert(lsvc.getSolver === LinearSVC.LBFGS) + assert(lsvc.getLoss === LinearSVC.SQUARED_HINGE) + val model = lsvc.setMaxIter(5).fit(smallTrainingDataset) + model.transform(smallTrainingDataset) .select("label", "prediction", "rawPrediction") .collect() assert(model.getThreshold === 0.0) @@ -126,6 +135,13 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau assert(model.numFeatures === 2) MLTestingUtils.checkCopyAndUids(lsvc, model) + withClue("lossFunction should be case-insensitive") { + lsvc.setLoss("HINGE") + lsvc.setLoss("Squared_hinge") + intercept[IllegalArgumentException] { + val model = lsvc.setLoss("hing") + } + } } test("LinearSVC threshold acts on rawPrediction") { @@ -163,51 +179,39 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau test("linear svc doesn't fit intercept when fitIntercept is off") { val lsvc = new LinearSVC().setFitIntercept(false).setMaxIter(5) - val model = lsvc.fit(smallBinaryDataset) + val model = lsvc.fit(smallTrainingDataset) assert(model.intercept === 0.0) val lsvc2 = new LinearSVC().setFitIntercept(true).setMaxIter(5) - val model2 = lsvc2.fit(smallBinaryDataset) + val model2 = lsvc2.fit(smallTrainingDataset) assert(model2.intercept !== 0.0) } - test("sparse coefficients in HingeAggregator") { - val bcCoefficients = spark.sparkContext.broadcast(Vectors.sparse(2, Array(0), Array(1.0))) - val bcFeaturesStd = spark.sparkContext.broadcast(Array(1.0)) - val agg = new HingeAggregator(bcFeaturesStd, true)(bcCoefficients) - val thrown = withClue("LinearSVCAggregator cannot handle sparse coefficients") { - intercept[IllegalArgumentException] { - agg.add(Instance(1.0, 1.0, Vectors.dense(1.0))) - } - } - assert(thrown.getMessage.contains("coefficients only supports dense")) - - bcCoefficients.destroy(blocking = false) - bcFeaturesStd.destroy(blocking = false) - } - test("linearSVC with sample weights") { def modelEquals(m1: LinearSVCModel, m2: LinearSVCModel): Unit = { - assert(m1.coefficients ~== m2.coefficients absTol 0.05) + assert(m1.coefficients ~== m2.coefficients absTol 0.07) assert(m1.intercept ~== m2.intercept absTol 0.05) } - - val estimator = new LinearSVC().setRegParam(0.01).setTol(0.01) - val dataset = smallBinaryDataset - MLTestingUtils.testArbitrarilyScaledWeights[LinearSVCModel, LinearSVC]( - dataset.as[LabeledPoint], estimator, modelEquals) - MLTestingUtils.testOutliersWithSmallWeights[LinearSVCModel, LinearSVC]( - dataset.as[LabeledPoint], estimator, 2, modelEquals, outlierRatio = 3) - MLTestingUtils.testOversamplingVsWeighting[LinearSVCModel, LinearSVC]( - dataset.as[LabeledPoint], estimator, modelEquals, 42L) + LinearSVC.supportedSolvers.foreach { opt => + val estimator = new LinearSVC().setRegParam(0.02).setTol(0.01).setSolver(opt) + .setLoss("hinge") + val dataset = smallTrainingDataset + MLTestingUtils.testArbitrarilyScaledWeights[LinearSVCModel, LinearSVC]( + dataset.as[LabeledPoint], estimator, modelEquals) + MLTestingUtils.testOutliersWithSmallWeights[LinearSVCModel, LinearSVC]( + dataset.as[LabeledPoint], estimator, 2, modelEquals, outlierRatio = 3) + MLTestingUtils.testOversamplingVsWeighting[LinearSVCModel, LinearSVC]( + dataset.as[LabeledPoint], estimator, modelEquals, 42L) + } } - test("linearSVC comparison with R e1071 and scikit-learn") { - val trainer1 = new LinearSVC() - .setRegParam(0.00002) // set regParam = 2.0 / datasize / c + test("linearSVC OWLQN hinge comparison with R e1071 and scikit-learn") { + val trainer = new LinearSVC().setSolver(LinearSVC.OWLQN) + .setRegParam(2.0 / 10 / 10000) // set regParam = 2.0 / datasize / c .setMaxIter(200) .setTol(1e-4) - val model1 = trainer1.fit(binaryDataset) + .setLoss("hinge") + val model = trainer.fit(binaryDataset) /* Use the following R code to load the data and train the model using glmnet package. @@ -230,8 +234,8 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau */ val coefficientsR = Vectors.dense(7.310338, 14.89741, 22.21005, 29.83508) val interceptR = 7.440177 - assert(model1.intercept ~== interceptR relTol 1E-2) - assert(model1.coefficients ~== coefficientsR relTol 1E-2) + assert(model.intercept ~== interceptR relTol 1E-2) + assert(model.coefficients ~== coefficientsR relTol 1E-2) /* Use the following python code to load the data and train the model using scikit-learn package. @@ -253,8 +257,85 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau val coefficientsSK = Vectors.dense(7.24690165, 14.77029087, 21.99924004, 29.5575729) val interceptSK = 7.36947518 - assert(model1.intercept ~== interceptSK relTol 1E-3) - assert(model1.coefficients ~== coefficientsSK relTol 4E-3) + assert(model.intercept ~== interceptSK relTol 1E-3) + assert(model.coefficients ~== coefficientsSK relTol 4E-3) + } + + test("linearSVC L-BFGS squared_hinge loss comparison with scikit-learn (liblinear)") { + val linearSVC = new LinearSVC() + .setLoss("squared_hinge") + .setSolver("L-BFGS") + .setRegParam(2.0 / 10 / 1000) + .setMaxIter(100) + .setTol(1e-4) + .setFitIntercept(false) + val model = linearSVC.fit(binaryDataset.limit(1000)) + + /* + Use the following python code to load the data and train the model using scikit-learn package. + import numpy as np + from sklearn import svm + f = open("path/spark/assembly/target/tmp/LinearSVC/binaryDataset/part-00000") + data = np.loadtxt(f, delimiter=",")[:1000] + X = data[:, 1:] # select columns 1 through end + y = data[:, 0] # select column 0 as label + clf = svm.LinearSVC(fit_intercept=False, C=10, + loss='squared_hinge', tol=1e-4, random_state=42) + m = clf.fit(X, y) + print m.coef_ + print m.intercept_ + [[ 0.62836794 1.24577698 1.70704463 2.38387201]] + 0.0 + */ + + val coefficientsSK = Vectors.dense(0.62836794, 1.24577698, 1.70704463, 2.38387201) + assert(model.intercept === 0) + assert(model.coefficients ~== coefficientsSK relTol 1E-2) + } + + test("linearSVC L-BFGS and OWLQN get similar model for squared_hinge loss") { + val size = nPoints + val linearSVC = new LinearSVC() + .setLoss("squared_hinge") + .setSolver("L-BFGS") + .setRegParam(2.0 / 10 / size) // set regParam = 2.0 / datasize / c + .setMaxIter(200) + .setTol(1e-4) + val model = linearSVC.fit(smallTrainingDataset) + + val linearSVC2 = new LinearSVC() + .setLoss("squared_hinge") + .setSolver("OWLQN") + .setRegParam(2.0 / 10 / size) // set regParam = 2.0 / datasize / c + .setMaxIter(200) + .setTol(1e-4) + val model2 = linearSVC2.fit(smallTrainingDataset) + + assert(model.coefficients ~== model2.coefficients relTol 1E-3) + assert(model.intercept ~== model2.intercept relTol 1E-3) + } + + test("linearSVC L-BFGS and OWLQN get similar model for hinge loss") { + val linearSVC = new LinearSVC() + .setLoss("hinge") + .setSolver("L-BFGS") + .setRegParam(0.01) + .setMaxIter(200) + .setTol(1e-4) + .setFitIntercept(false) + val model = linearSVC.fit(smallTrainingDataset) + + val linearSVC2 = new LinearSVC() + .setLoss("hinge") + .setSolver("OWLQN") + .setRegParam(0.01) + .setMaxIter(200) + .setTol(1e-4) + .setFitIntercept(false) + val model2 = linearSVC2.fit(smallTrainingDataset) + assert(model.coefficients ~== model2.coefficients relTol 2E-2) + assert(model.intercept === 0) + assert(model2.intercept === 0) } test("read/write: SVM") { @@ -264,7 +345,7 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau assert(model.numFeatures === model2.numFeatures) } val svm = new LinearSVC() - testEstimatorAndModelReadWrite(svm, smallBinaryDataset, LinearSVCSuite.allParamSettings, + testEstimatorAndModelReadWrite(svm, smallTrainingDataset, LinearSVCSuite.allParamSettings, LinearSVCSuite.allParamSettings, checkModelData) } } @@ -280,7 +361,9 @@ object LinearSVCSuite { "threshold" -> 0.6, "predictionCol" -> "myPredict", "rawPredictionCol" -> "myRawPredict", - "aggregationDepth" -> 3 + "aggregationDepth" -> 3, + "solver" -> "owlqn", + "loss" -> "hinge" ) // Generate noisy input of the form Y = signum(x.dot(weights) + intercept + noise) @@ -300,7 +383,7 @@ object LinearSVCSuite { y.zip(x).map(p => LabeledPoint(p._1, Vectors.dense(p._2))) } - def checkModels(model1: LinearSVCModel, model2: LinearSVCModel): Unit = { + def checkModelsEqual(model1: LinearSVCModel, model2: LinearSVCModel): Unit = { assert(model1.intercept == model2.intercept) assert(model1.coefficients.equals(model2.coefficients)) } diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala index 61b48ffa1094..e7bcfe33adf9 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala @@ -130,7 +130,7 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { val gradientCoef = new Array[Double](numFeatures) var gradientIntercept = 0.0 instances.foreach { case Instance(l, w, f) => - val margin = BLAS.dot(f, Vectors.dense(coefArray)) + intercept + val margin = BLAS.dot(f, Vectors.dense(stdCoef)) + intercept if (1.0 > (2 * l - 1.0) * margin) { gradientCoef.indices.foreach { i => gradientCoef(i) += f(i) * -(2 * l - 1.0) * w / featuresStd(i) @@ -160,4 +160,19 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { assert(aggConstantFeatureBinary.gradient(1) == aggConstantFeatureBinaryFiltered.gradient(0)) } + test("sparse coefficients in HingeAggregator") { + val bcCoefficients = spark.sparkContext.broadcast(Vectors.sparse(2, Array(0), Array(1.0))) + val bcFeaturesStd = spark.sparkContext.broadcast(Array(1.0)) + val agg = new HingeAggregator(bcFeaturesStd, true)(bcCoefficients) + val thrown = withClue("HingeAggregator cannot handle sparse coefficients") { + intercept[IllegalArgumentException] { + agg.add(Instance(1.0, 1.0, Vectors.dense(1.0))) + } + } + assert(thrown.getMessage.contains("coefficients only supports dense")) + + bcCoefficients.destroy(blocking = false) + bcFeaturesStd.destroy(blocking = false) + } + } diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/SquaredHingeAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/SquaredHingeAggregatorSuite.scala new file mode 100644 index 000000000000..f0c9d9affe54 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/SquaredHingeAggregatorSuite.scala @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.ml.optim.aggregator + +import org.apache.spark.SparkFunSuite +import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors} +import org.apache.spark.ml.util.TestingUtils._ +import org.apache.spark.mllib.util.MLlibTestSparkContext + +class SquaredHingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { + + import DifferentiableLossAggregatorSuite.getClassificationSummarizers + + @transient var instances: Array[Instance] = _ + @transient var instancesConstantFeature: Array[Instance] = _ + @transient var instancesConstantFeatureFiltered: Array[Instance] = _ + + override def beforeAll(): Unit = { + super.beforeAll() + instances = Array( + Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)), + Instance(1.0, 0.5, Vectors.dense(1.5, 1.0)), + Instance(0.0, 0.3, Vectors.dense(4.0, 0.5)) + ) + instancesConstantFeature = Array( + Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)), + Instance(1.0, 0.5, Vectors.dense(1.0, 1.0)), + Instance(1.0, 0.3, Vectors.dense(1.0, 0.5)) + ) + instancesConstantFeatureFiltered = Array( + Instance(0.0, 0.1, Vectors.dense(2.0)), + Instance(1.0, 0.5, Vectors.dense(1.0)), + Instance(2.0, 0.3, Vectors.dense(0.5)) + ) + } + + /** Get summary statistics for some data and create a new SquaredHingeAggregator. */ + private def getNewAggregator( + instances: Array[Instance], + coefficients: Vector, + fitIntercept: Boolean): SquaredHingeAggregator = { + val (featuresSummarizer, ySummarizer) = + DifferentiableLossAggregatorSuite.getClassificationSummarizers(instances) + val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt) + val bcFeaturesStd = spark.sparkContext.broadcast(featuresStd) + val bcCoefficients = spark.sparkContext.broadcast(coefficients) + new SquaredHingeAggregator(bcFeaturesStd, fitIntercept)(bcCoefficients) + } + + test("SquaredHingeAggregator check add method input size") { + val coefArray = Array(1.0, 2.0) + val interceptArray = Array(2.0) + val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ interceptArray), + fitIntercept = true) + withClue("SquaredHingeAggregator features dimension must match coefficients dimension") { + intercept[IllegalArgumentException] { + agg.add(Instance(1.0, 1.0, Vectors.dense(2.0))) + } + } + } + + test("SquaredHingeAggregator negative weight") { + val coefArray = Array(1.0, 2.0) + val interceptArray = Array(2.0) + val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ interceptArray), + fitIntercept = true) + withClue("SquaredHingeAggregator does not support negative instance weights") { + intercept[IllegalArgumentException] { + agg.add(Instance(1.0, -1.0, Vectors.dense(2.0, 1.0))) + } + } + } + + test("SquaredHingeAggregator check sizes") { + val rng = new scala.util.Random + val numFeatures = instances.head.features.size + val coefWithIntercept = Vectors.dense(Array.fill(numFeatures + 1)(rng.nextDouble)) + val coefWithoutIntercept = Vectors.dense(Array.fill(numFeatures)(rng.nextDouble)) + val aggIntercept = getNewAggregator(instances, coefWithIntercept, fitIntercept = true) + val aggNoIntercept = getNewAggregator(instances, coefWithoutIntercept, + fitIntercept = false) + instances.foreach(aggIntercept.add) + instances.foreach(aggNoIntercept.add) + + assert(aggIntercept.gradient.size === numFeatures + 1) + assert(aggNoIntercept.gradient.size === numFeatures) + } + + test("SquaredHingeAggregator check correctness") { + val coefArray = Array(1.0, 2.0) + val intercept = 1.0 + val numFeatures = instances.head.features.size + val (featuresSummarizer, _) = getClassificationSummarizers(instances) + val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt) + val weightSum = instances.map(_.weight).sum + + val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ Array(intercept)), + fitIntercept = true) + instances.foreach(agg.add) + + // compute the loss + val stdCoef = coefArray.indices.map(i => coefArray(i) / featuresStd(i)).toArray + val lossSum = instances.map { case Instance(l, w, f) => + val margin = BLAS.dot(Vectors.dense(stdCoef), f) + intercept + val labelScaled = 2 * l - 1.0 + if (1.0 > labelScaled * margin) { + val hingeLoss = 1.0 - labelScaled * margin + hingeLoss * hingeLoss * w + } else { + 0.0 + } + }.sum + val loss = lossSum / weightSum + + // compute the gradients + val gradientCoef = new Array[Double](numFeatures) + var gradientIntercept = 0.0 + instances.foreach { case Instance(l, w, f) => + val margin = BLAS.dot(f, Vectors.dense(stdCoef)) + intercept + if (1.0 > (2 * l - 1.0) * margin) { + val gradientScale = ((2 * l - 1) * margin - 1) * (2 * l - 1) * 2 + gradientCoef.indices.foreach { i => + gradientCoef(i) += f(i) * gradientScale * w / featuresStd(i) + } + gradientIntercept += gradientScale * w + } + } + val gradient = Vectors.dense((gradientCoef ++ Array(gradientIntercept)).map(_ / weightSum)) + + assert(loss ~== agg.loss relTol 0.01) + assert(gradient ~== agg.gradient relTol 0.01) + } + + test("check with zero standard deviation") { + val binaryCoefArray = Array(1.0, 2.0) + val intercept = 1.0 + val aggConstantFeatureBinary = getNewAggregator(instancesConstantFeature, + Vectors.dense(binaryCoefArray ++ Array(intercept)), fitIntercept = true) + instancesConstantFeature.foreach(aggConstantFeatureBinary.add) + + val aggConstantFeatureBinaryFiltered = getNewAggregator(instancesConstantFeatureFiltered, + Vectors.dense(binaryCoefArray.tail ++ Array(intercept)), fitIntercept = true) + instancesConstantFeatureFiltered.foreach(aggConstantFeatureBinaryFiltered.add) + + // constant features should not affect gradient + assert(aggConstantFeatureBinary.gradient(0) === 0.0) + assert(aggConstantFeatureBinary.gradient(1) == aggConstantFeatureBinaryFiltered.gradient(0)) + } + + test("sparse coefficients in SquaredHingeAggregator") { + val bcCoefficients = spark.sparkContext.broadcast(Vectors.sparse(2, Array(0), Array(1.0))) + val bcFeaturesStd = spark.sparkContext.broadcast(Array(1.0)) + val agg = new SquaredHingeAggregator(bcFeaturesStd, true)(bcCoefficients) + val thrown = withClue("SquaredHingeAggregator cannot handle sparse coefficients") { + intercept[IllegalArgumentException] { + agg.add(Instance(1.0, 1.0, Vectors.dense(1.0))) + } + } + assert(thrown.getMessage.contains("coefficients only supports dense")) + + bcCoefficients.destroy(blocking = false) + bcFeaturesStd.destroy(blocking = false) + } + +} diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 27ad1e80aa0d..38762a3671c0 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -81,9 +81,9 @@ class LinearSVC(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredictionCol, Ha >>> svm = LinearSVC(maxIter=5, regParam=0.01) >>> model = svm.fit(df) >>> model.coefficients - DenseVector([0.0, -0.2792, -0.1833]) + DenseVector([0.0, 0.0844, -0.7532]) >>> model.intercept - 1.0206118982229047 + 1.5941035229606713 >>> model.numClasses 2 >>> model.numFeatures @@ -93,7 +93,7 @@ class LinearSVC(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredictionCol, Ha >>> result.prediction 1.0 >>> result.rawPrediction - DenseVector([-1.4831, 1.4831]) + DenseVector([-2.2629, 2.2629]) >>> svm_path = temp_path + "/svm" >>> svm.save(svm_path) >>> svm2 = LinearSVC.load(svm_path)