From a121bd0f6e5f2387bd502976940c08f6a4a2e4b1 Mon Sep 17 00:00:00 2001 From: Aaron Staple Date: Fri, 13 Feb 2015 14:24:07 -0800 Subject: [PATCH] [SPARK-1503][MLLIB] Initial AcceleratedGradientDescent implementation. --- .../AcceleratedGradientDescent.scala | 237 +++++++++++++++++ .../AcceleratedGradientDescentSuite.scala | 242 ++++++++++++++++++ 2 files changed, 479 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/optimization/AcceleratedGradientDescent.scala create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/optimization/AcceleratedGradientDescentSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/AcceleratedGradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/AcceleratedGradientDescent.scala new file mode 100644 index 0000000000000..abd814a0e583d --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/AcceleratedGradientDescent.scala @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.optimization + +import scala.collection.mutable.ArrayBuffer + +import breeze.linalg.{DenseVector => BDV, norm} + +import org.apache.spark.Logging +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.rdd.RDD + +/** + * :: DeveloperApi :: + * This class optimizes a vector of weights via accelerated (proximal) gradient descent. + * The implementation is based on TFOCS [[http://cvxr.com/tfocs]], described in Becker, Candes, and + * Grant 2010. + * @param gradient Delegate that computes the loss function value and gradient for a vector of + * weights. + * @param updater Delegate that updates weights in the direction of a gradient. + */ +@DeveloperApi +class AcceleratedGradientDescent (private var gradient: Gradient, private var updater: Updater) + extends Optimizer { + + private var stepSize: Double = 1.0 + private var convergenceTol: Double = 1e-4 + private var numIterations: Int = 100 + private var regParam: Double = 0.0 + + /** + * Set the initial step size, used for the first step. Default 1.0. + * On subsequent steps, the step size will be adjusted by the acceleration algorithm. + */ + def setStepSize(step: Double): this.type = { + this.stepSize = step + this + } + + /** + * Set the optimization convergence tolerance. Default 1e-4. + * Smaller values will increase accuracy but require additional iterations. + */ + def setConvergenceTol(tol: Double): this.type = { + this.convergenceTol = tol + this + } + + /** + * Set the maximum number of iterations. Default 100. + */ + def setNumIterations(iters: Int): this.type = { + this.numIterations = iters + this + } + + /** + * Set the regularization parameter. Default 0.0. + */ + def setRegParam(regParam: Double): this.type = { + this.regParam = regParam + this + } + + /** + * Set a Gradient delegate for computing the loss function value and gradient. + */ + def setGradient(gradient: Gradient): this.type = { + this.gradient = gradient + this + } + + /** + * Set an Updater delegate for updating weights in the direction of a gradient. + * If regularization is used, the Updater will implement the regularization term's proximity + * operator. Thus the type of regularization penalty is configured by providing a corresponding + * Updater implementation. + */ + def setUpdater(updater: Updater): this.type = { + this.updater = updater + this + } + + /** + * Run accelerated gradient descent on the provided training data. + * @param data training data + * @param initialWeights initial weights + * @return solution vector + */ + def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = { + val (weights, _) = AcceleratedGradientDescent.run( + data, + gradient, + updater, + stepSize, + convergenceTol, + numIterations, + regParam, + initialWeights) + weights + } +} + +/** + * :: DeveloperApi :: + * Top-level method to run accelerated (proximal) gradient descent. + */ +@DeveloperApi +object AcceleratedGradientDescent extends Logging { + /** + * Run accelerated proximal gradient descent. + * The implementation is based on TFOCS [[http://cvxr.com/tfocs]], described in Becker, Candes, + * and Grant 2010. A limited but useful subset of the TFOCS feature set is implemented, including + * support for composite loss functions, the Auslender and Teboulle acceleration method, and + * automatic restart using the gradient test. A global Lipschitz bound is supported in preference + * to local Lipschitz estimation via backtracking. On each iteration, the loss function and + * gradient are caclculated from the full training dataset, requiring one Spark map reduce. + * + * @param data Input data. RDD containing data examples of the form (label, [feature values]). + * @param gradient Delegate that computes the loss function value and gradient for a vector of + weights (for one single data example). + * @param updater Delegate that updates weights in the direction of a gradient. + * @param stepSize Initial step size for the first step. + * @param convergenceTol Tolerance for convergence of the optimization algorithm. When the norm of + * the change in weight vectors between successive iterations falls below + * this relative tolerance, optimization is complete. + * @param numIterations Maximum number of iterations to run the algorithm. + * @param regParam The regularization parameter. + * @param initialWeights The initial weight values. + * + * @return A tuple containing two elements. The first element is a Vector containing the optimized + * weight for each feature, and the second element is an array containing the approximate + * loss computed on each iteration. + */ + def run( + data: RDD[(Double, Vector)], + gradient: Gradient, + updater: Updater, + stepSize: Double, + convergenceTol: Double, + numIterations: Int, + regParam: Double, + initialWeights: Vector): (Vector, Array[Double]) = { + + /** Returns the loss function and gradient for the provided weights 'x'. */ + def applySmooth(x: BDV[Double]): (Double, BDV[Double]) = { + val bcX = data.context.broadcast(Vectors.fromBreeze(x)) + + // Sum the loss function and gradient computed for each training example. + val (loss, grad, count) = data.treeAggregate((0.0, BDV.zeros[Double](x.size), 0L))( + seqOp = (c, v) => (c, v) match { case ((loss, grad, count), (label, features)) => + val l = gradient.compute(features, label, bcX.value, Vectors.fromBreeze(grad)) + (loss + l, grad, count + 1) + }, + combOp = (c1, c2) => (c1, c2) match { + case ((loss1, grad1, count1), (loss2, grad2, count2)) => + (loss1 + loss2, grad1 += grad2, count1 + count2) + }) + + // Divide the summed loss and gradient by the number of training examples. + (loss / count, grad / (count: Double)) + } + + /** + * Returns the regularization loss and updates weights according to the gradient and the + * proximity operator. + */ + def applyProjector(x: BDV[Double], g: BDV[Double], step: Double): (Double, BDV[Double]) = { + val (weights, regularization) = updater.compute(Vectors.fromBreeze(x), + Vectors.fromBreeze(g), + step, + iter = 1, // Passing 1 avoids step size + // rescaling within the updater. + regParam) + (regularization, BDV[Double](weights.toArray)) + } + + var x = BDV[Double](initialWeights.toArray) + var z = x + val L = 1.0 / stepSize // Infer a (global) Lipshitz bound from the provided stepSize. + var theta = Double.PositiveInfinity + var hasConverged = false + val lossHistory = new ArrayBuffer[Double](numIterations) + + for (i <- 1 to numIterations if !hasConverged) { + + // Auslender and Teboulle's accelerated method. + val (x_old, z_old) = (x, z) + theta = 2.0 / (1.0 + math.sqrt(1.0 + 4.0 / (theta * theta))) + val y = x_old * (1.0 - theta) + z_old * theta + val (f_y, g_y) = applySmooth(y) + val step = 1.0 / (theta * L) + z = applyProjector(z_old, g_y, step)._2 + x = x_old * (1.0 - theta) + z * theta + val d_x = x - x_old + + // Track loss history using the loss function at y, since f_y is already avaialble and + // computing f_x would require another (distributed) call to applySmooth. Start by finding + // c_y, the regularization component of the loss function at y. + val (c_y, _) = applyProjector(y, g_y, 0.0) + lossHistory.append(f_y + c_y) + + // Restart acceleration if indicated by the gradient test from O'Donoghue and Candes 2013. + if (g_y.dot(d_x) > 0.0) { + z = x + theta = Double.PositiveInfinity + } + + // Check convergence. + hasConverged = norm(d_x) match { + case 0.0 => i > 1 + case norm_dx => norm_dx < convergenceTol * math.max(norm(x), 1.0) + } + } + + logInfo("AcceleratedGradientDescent.run finished. Last 10 approximate losses %s".format( + lossHistory.takeRight(10).mkString(", "))) + + (Vectors.fromBreeze(x), lossHistory.toArray) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/AcceleratedGradientDescentSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/AcceleratedGradientDescentSuite.scala new file mode 100644 index 0000000000000..bfb5ba600e676 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/AcceleratedGradientDescentSuite.scala @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.optimization + +import scala.util.Random + +import breeze.linalg.{DenseVector => BDV, norm} +import org.scalatest.{FunSuite, Matchers} + +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.mllib.util.{LocalClusterSparkContext, MLlibTestSparkContext} +import org.apache.spark.mllib.util.TestingUtils._ + +class AcceleratedGradientDescentSuite extends FunSuite with MLlibTestSparkContext with Matchers { + + val nPoints = 10000 + val A = 2.0 + val B = -1.5 + + val initialB = -1.0 + val initialWeights = Array(initialB) + + val gradient = new LogisticGradient() + val miniBatchFrac = 1.0 + + val simpleUpdater = new SimpleUpdater() + val squaredL2Updater = new SquaredL2Updater() + + // Add an extra variable consisting of all 1.0's for the intercept. + val testData = GradientDescentSuite.generateGDInput(A, B, nPoints, 42) + val data = testData.map { case LabeledPoint(label, features) => + label -> Vectors.dense(1.0 +: features.toArray) + } + + lazy val dataRDD = sc.parallelize(data, 2).cache() + + test("The optimal loss returned by Accelerated Gradient Descent should be similar to that from" + + " Gradient Descent.") { + val regParam = 0 + + val initialWeightsWithIntercept = Vectors.dense(1.0 +: initialWeights.toArray) + val stepSize = 1.0 + val convergenceTol = 1e-12 + val numIterations = 10 + + val (_, lossAGD) = AcceleratedGradientDescent.run( + dataRDD, + gradient, + simpleUpdater, + stepSize, + convergenceTol, + numIterations, + regParam, + initialWeightsWithIntercept) + + // GD converges more slowly, requiring more iterations. + val numGDIterations = 50 + val (_, lossGD) = GradientDescent.runMiniBatchSGD( + dataRDD, + gradient, + simpleUpdater, + stepSize, + numGDIterations, + regParam, + miniBatchFrac, + initialWeightsWithIntercept) + + assert(lossAGD.last ~= lossGD.last relTol 0.02, + "The optimal losses of AGD and GD should match within 2%.") + // The 2% difference is based on observation and is not theoretically guaranteed. + } + + test("The L2-regularized optimal loss returned by Accelerated Gradient Descent should be" + + " similar to that from Gradient Descent.") { + val regParam = 0.2 + + // Provide non-zero weights to compare the loss values for these weights on the first iteration. + val initialWeightsWithIntercept = Vectors.dense(0.3, 0.12) + val stepSize = 1.0 + val convergenceTol = 1e-12 + val numIterations = 10 + + val (weightsAGD, lossAGD) = AcceleratedGradientDescent.run( + dataRDD, + gradient, + squaredL2Updater, + stepSize, + convergenceTol, + numIterations, + regParam, + initialWeightsWithIntercept) + + val numGDIterations = 50 + val (weightsGD, lossGD) = GradientDescent.runMiniBatchSGD( + dataRDD, + gradient, + squaredL2Updater, + stepSize, + numGDIterations, + regParam, + miniBatchFrac, + initialWeightsWithIntercept) + + assert(lossAGD.last ~= lossGD.last relTol 0.02, + "The optimal losses of AGD and GD should match within 2%.") + // The 2% difference is based on observation and is not theoretically guaranteed. + + assert( + (weightsAGD(0) ~= weightsGD(0) relTol 0.02) && (weightsAGD(1) ~= weightsGD(1) relTol 0.02), + "The optimal weights returned by AGD and GD should match within 2%.") + // The 2% difference is based on observation and is not theoretically guaranteed. + } + + test("The convergenceTol parameter should behave as expected.") { + val regParam = 0.0 + + val initialWeightsWithIntercept = Vectors.dense(0.0, 0.0) + val stepSize = 1.0 + + // Optimize with a high maximum number of iterations and a loose convergenceTol. The + // optimization will complete upon reaching convergenceTol, without reaching the iteration + // limit. + var numIterations = 1000 + var convergenceTol = 0.1 + val (weights1, loss1) = AcceleratedGradientDescent.run( + dataRDD, + gradient, + squaredL2Updater, + stepSize, + convergenceTol, + numIterations, + regParam, + initialWeightsWithIntercept) + + // Optimize using one fewer step than the above run, and a strict convergenceTol. The returned + // weights come from one iteration prior to the iteration producing weights1 above. + numIterations = loss1.length - 1 + convergenceTol = 0.0 + val (weights2, loss2) = AcceleratedGradientDescent.run( + dataRDD, + gradient, + squaredL2Updater, + stepSize, + convergenceTol, + numIterations, + regParam, + initialWeightsWithIntercept) + + assert(loss2.length == numIterations, + "AGD should run for the specified number of iterations.") + + val w1 = BDV[Double](weights1.toArray) + val w2 = BDV[Double](weights2.toArray) + assert(norm(w1 - w2) / norm(w1) < 0.1, + "The weights of AGD's last two steps should meet the convergence tolerance.") + + numIterations = 100 + convergenceTol = 0.01 + val (weights3, loss3) = AcceleratedGradientDescent.run( + dataRDD, + gradient, + squaredL2Updater, + stepSize, + convergenceTol, + numIterations, + regParam, + initialWeightsWithIntercept) + + assert(loss3.length > loss1.length, + "AGD runs for more iterations with a tighter convergence tolerance.") + } + + test("Optimize by calling the AcceleratedGradientDescent class directly.") { + val regParam = 0.2 + + val initialWeightsWithIntercept = Vectors.dense(1.0 +: initialWeights.toArray) + val stepSize = 1.0 + val convergenceTol = 1e-12 + val numIterations = 10 + + val agdOptimizer = new AcceleratedGradientDescent(gradient, squaredL2Updater) + .setStepSize(stepSize) + .setConvergenceTol(convergenceTol) + .setNumIterations(numIterations) + .setRegParam(regParam) + + val weightsAGD = agdOptimizer.optimize(dataRDD, initialWeightsWithIntercept) + + val numGDIterations = 50 + val (weightsGD, _) = GradientDescent.runMiniBatchSGD( + dataRDD, + gradient, + squaredL2Updater, + stepSize, + numGDIterations, + regParam, + miniBatchFrac, + initialWeightsWithIntercept) + + assert( + (weightsAGD(0) ~= weightsGD(0) relTol 0.02) && (weightsAGD(1) ~= weightsGD(1) relTol 0.02), + "The optimal weights returned by AGD and GD should match within 2%.") + // The 2% difference is based on observation and is not theoretically guaranteed. + } +} + +class AcceleratedGradientDescentClusterSuite extends FunSuite with LocalClusterSparkContext { + + test("task size should be small") { + val m = 10 + val n = 200000 + val examples = sc.parallelize(0 until m, 2).mapPartitionsWithIndex { (idx, iter) => + val random = new Random(idx) + iter.map(i => (1.0, Vectors.dense(Array.fill(n)(random.nextDouble)))) + }.cache() + val agd = new AcceleratedGradientDescent(new LogisticGradient, new SquaredL2Updater) + .setStepSize(1) + .setConvergenceTol(1e-12) + .setNumIterations(1) + .setRegParam(1.0) + val random = new Random(0) + // If we serialize data directly in the task closure, the size of the serialized task would be + // greater than 1MB and hence Spark would throw an error. + val weights = agd.optimize(examples, Vectors.dense(Array.fill(n)(random.nextDouble))) + } +}