Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.storage.StorageLevel
/** Params for linear SVM Classifier. */
private[classification] trait LinearSVCParams extends ClassifierParams with HasRegParam
with HasMaxIter with HasFitIntercept with HasTol with HasStandardization with HasWeightCol
with HasAggregationDepth with HasThreshold with HasBlockSize {
with HasAggregationDepth with HasThreshold with HasMaxBlockSizeInMB {

/**
* Param for threshold in binary classification prediction.
Expand All @@ -57,7 +57,7 @@ private[classification] trait LinearSVCParams extends ClassifierParams with HasR
"threshold in binary classification prediction applied to rawPrediction")

setDefault(regParam -> 0.0, maxIter -> 100, fitIntercept -> true, tol -> 1E-6,
standardization -> true, threshold -> 0.0, aggregationDepth -> 2, blockSize -> 1)
standardization -> true, threshold -> 0.0, aggregationDepth -> 2, maxBlockSizeInMB -> 0.0)
}

/**
Expand Down Expand Up @@ -153,22 +153,13 @@ class LinearSVC @Since("2.2.0") (
def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value)

/**
* Set block size for stacking input data in matrices.
* If blockSize == 1, then stacking will be skipped, and each vector is treated individually;
* If blockSize > 1, then vectors will be stacked to blocks, and high-level BLAS routines
* will be used if possible (for example, GEMV instead of DOT, GEMM instead of GEMV).
* Recommended size is between 10 and 1000. An appropriate choice of the block size depends
* on the sparsity and dim of input datasets, the underlying BLAS implementation (for example,
* f2jBLAS, OpenBLAS, intel MKL) and its configuration (for example, number of threads).
* Note that existing BLAS implementations are mainly optimized for dense matrices, if the
* input dataset is sparse, stacking may bring no performance gain, the worse is possible
* performance regression.
* Default is 1.
* Sets the value of param [[maxBlockSizeInMB]].
* Default is 0.0.
*
* @group expertSetParam
*/
@Since("3.1.0")
def setBlockSize(value: Int): this.type = set(blockSize, value)
def setMaxBlockSizeInMB(value: Double): this.type = set(maxBlockSizeInMB, value)

@Since("2.2.0")
override def copy(extra: ParamMap): LinearSVC = defaultCopy(extra)
Expand All @@ -177,19 +168,19 @@ class LinearSVC @Since("2.2.0") (
instr.logPipelineStage(this)
instr.logDataset(dataset)
instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol,
regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth, blockSize)
regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth,
maxBlockSizeInMB)

if (dataset.storageLevel != StorageLevel.NONE) {
instr.logWarning(s"Input instances will be standardized, blockified to blocks, and " +
s"then cached during training. Be careful of double caching!")
}

val instances = extractInstances(dataset)
.setName("training instances")

if (dataset.storageLevel == StorageLevel.NONE && $(blockSize) == 1) {
instances.persist(StorageLevel.MEMORY_AND_DISK)
}

var requestedMetrics = Seq("mean", "std", "count")
if ($(blockSize) != 1) requestedMetrics +:= "numNonZeros"
val (summarizer, labelSummarizer) = Summarizer
.getClassificationSummarizers(instances, $(aggregationDepth), requestedMetrics)
.getClassificationSummarizers(instances, $(aggregationDepth), Seq("mean", "std", "count"))

val histogram = labelSummarizer.histogram
val numInvalid = labelSummarizer.countInvalid
Expand All @@ -199,14 +190,12 @@ class LinearSVC @Since("2.2.0") (
instr.logNamedValue("lowestLabelWeight", labelSummarizer.histogram.min.toString)
instr.logNamedValue("highestLabelWeight", labelSummarizer.histogram.max.toString)
instr.logSumOfWeights(summarizer.weightSum)
if ($(blockSize) > 1) {
val scale = 1.0 / summarizer.count / numFeatures
val sparsity = 1 - summarizer.numNonzeros.toArray.map(_ * scale).sum
instr.logNamedValue("sparsity", sparsity.toString)
if (sparsity > 0.5) {
instr.logWarning(s"sparsity of input dataset is $sparsity, " +
s"which may hurt performance in high-level BLAS.")
}

var actualBlockSizeInMB = $(maxBlockSizeInMB)
if (actualBlockSizeInMB == 0) {
actualBlockSizeInMB = InstanceBlock.DefaultBlockSizeInMB
require(actualBlockSizeInMB > 0, "inferred actual BlockSizeInMB must > 0")
instr.logNamedValue("actualBlockSizeInMB", actualBlockSizeInMB.toString)
}

val numClasses = MetadataUtils.getNumClasses(dataset.schema($(labelCol))) match {
Expand Down Expand Up @@ -245,12 +234,8 @@ class LinearSVC @Since("2.2.0") (
Note that the intercept in scaled space and original space is the same;
as a result, no scaling is needed.
*/
val (rawCoefficients, objectiveHistory) = if ($(blockSize) == 1) {
trainOnRows(instances, featuresStd, regularization, optimizer)
} else {
trainOnBlocks(instances, featuresStd, regularization, optimizer)
}
if (instances.getStorageLevel != StorageLevel.NONE) instances.unpersist()
val (rawCoefficients, objectiveHistory) =
trainImpl(instances, actualBlockSizeInMB, featuresStd, regularization, optimizer)

if (rawCoefficients == null) {
val msg = s"${optimizer.getClass.getName} failed."
Expand Down Expand Up @@ -284,35 +269,9 @@ class LinearSVC @Since("2.2.0") (
model.setSummary(Some(summary))
}

private def trainOnRows(
instances: RDD[Instance],
featuresStd: Array[Double],
regularization: Option[L2Regularization],
optimizer: BreezeOWLQN[Int, BDV[Double]]): (Array[Double], Array[Double]) = {
val numFeatures = featuresStd.length
val numFeaturesPlusIntercept = if ($(fitIntercept)) numFeatures + 1 else numFeatures

val bcFeaturesStd = instances.context.broadcast(featuresStd)
val getAggregatorFunc = new HingeAggregator(bcFeaturesStd, $(fitIntercept))(_)
val costFun = new RDDLossFunction(instances, getAggregatorFunc,
regularization, $(aggregationDepth))

val states = optimizer.iterations(new CachedDiffFunction(costFun),
Vectors.zeros(numFeaturesPlusIntercept).asBreeze.toDenseVector)

val arrayBuilder = mutable.ArrayBuilder.make[Double]
var state: optimizer.State = null
while (states.hasNext) {
state = states.next()
arrayBuilder += state.adjustedValue
}
bcFeaturesStd.destroy()

(if (state != null) state.x.toArray else null, arrayBuilder.result)
}

private def trainOnBlocks(
private def trainImpl(
instances: RDD[Instance],
actualBlockSizeInMB: Double,
featuresStd: Array[Double],
regularization: Option[L2Regularization],
optimizer: BreezeOWLQN[Int, BDV[Double]]): (Array[Double], Array[Double]) = {
Expand All @@ -326,9 +285,11 @@ class LinearSVC @Since("2.2.0") (
val func = StandardScalerModel.getTransformFunc(Array.empty, inverseStd, false, true)
iter.map { case Instance(label, weight, vec) => Instance(label, weight, func(vec)) }
}
val blocks = InstanceBlock.blokify(standardized, $(blockSize))

val maxMemUsage = (actualBlockSizeInMB * 1024L * 1024L).ceil.toLong
val blocks = InstanceBlock.blokifyWithMaxMemUsage(standardized, maxMemUsage)
.persist(StorageLevel.MEMORY_AND_DISK)
.setName(s"training blocks (blockSize=${$(blockSize)})")
.setName(s"training blocks (blockSizeInMB=$actualBlockSizeInMB)")

val getAggregatorFunc = new BlockHingeAggregator($(fitIntercept))(_)
val costFun = new RDDLossFunction(blocks, getAggregatorFunc,
Expand Down
71 changes: 71 additions & 0 deletions mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.ml.feature

import scala.collection.mutable

import org.apache.spark.ml.linalg._
import org.apache.spark.rdd.RDD

Expand Down Expand Up @@ -100,6 +102,32 @@ private[spark] case class InstanceBlock(

private[spark] object InstanceBlock {

/**
* Suggested value for BlockSizeInMB in Level-2 routine cases.
* According to performance tests of BLAS routine (see SPARK-31714) and
* LinearSVC (see SPARK-32907), 1.0 MB should be an acceptable value for
* linear models using Level-2 routine (GEMV) to perform prediction and
* gradient computation.
*/
val DefaultBlockSizeInMB = 1.0

private def getBlockMemUsage(
numCols: Long,
numRows: Long,
nnz: Long,
allUnitWeight: Boolean): Long = {
val doubleBytes = java.lang.Double.BYTES
val arrayHeader = 12L
val denseSize = Matrices.getDenseSize(numCols, numRows)
val sparseSize = Matrices.getSparseSize(nnz, numRows + 1)
val matrixSize = math.min(denseSize, sparseSize)
if (allUnitWeight) {
matrixSize + doubleBytes * numRows + arrayHeader * 2
} else {
matrixSize + doubleBytes * numRows * 2 + arrayHeader * 2
}
}

def fromInstances(instances: Seq[Instance]): InstanceBlock = {
val labels = instances.map(_.label).toArray
val weights = if (instances.exists(_.weight != 1)) {
Expand All @@ -114,6 +142,49 @@ private[spark] object InstanceBlock {
def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = {
instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances))
}

def blokifyWithMaxMemUsage(
instanceIterator: Iterator[Instance],
maxMemUsage: Long): Iterator[InstanceBlock] = {
require(maxMemUsage > 0)

new Iterator[InstanceBlock]() {
private var numCols = -1L

override def hasNext: Boolean = instanceIterator.hasNext

override def next(): InstanceBlock = {
val buff = mutable.ArrayBuilder.make[Instance]
var buffCnt = 0L
var buffNnz = 0L
var buffUnitWeight = true
var blockMemUsage = 0L

while (instanceIterator.hasNext && blockMemUsage < maxMemUsage) {
val instance = instanceIterator.next()
if (numCols < 0L) numCols = instance.features.size
require(numCols == instance.features.size)

buff += instance
buffCnt += 1L
buffNnz += instance.features.numNonzeros
buffUnitWeight &&= (instance.weight == 1)
blockMemUsage = getBlockMemUsage(numCols, buffCnt, buffNnz, buffUnitWeight)
}

// the block memory usage may slightly exceed threshold, not a big issue.
// and this ensure even if one row exceed block limit, each block has one row.
InstanceBlock.fromInstances(buff.result())
}
}
}

def blokifyWithMaxMemUsage(
instances: RDD[Instance],
maxMemUsage: Long): RDD[InstanceBlock] = {
require(maxMemUsage > 0)
instances.mapPartitions(iter => blokifyWithMaxMemUsage(iter, maxMemUsage))
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,12 @@ private[shared] object SharedParamsCodeGen {
ParamDesc[Int]("blockSize", "block size for stacking input data in matrices. Data is " +
"stacked within partitions. If block size is more than remaining data in a partition " +
"then it is adjusted to the size of this data.",
isValid = "ParamValidators.gt(0)", isExpertParam = true)
isValid = "ParamValidators.gt(0)", isExpertParam = true),
ParamDesc[Double]("maxBlockSizeInMB", "Maximum memory in MB for stacking input data " +
"into blocks. Data is stacked within partitions. If more than remaining data size in a " +
"partition then it is adjusted to the data size. If 0, try to infer an appropriate " +
"value. Must be >= 0.",
Some("0.0"), isValid = "ParamValidators.gtEq(0.0)", isExpertParam = true)
)

val code = genSharedParams(params)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,4 +562,22 @@ trait HasBlockSize extends Params {
/** @group expertGetParam */
final def getBlockSize: Int = $(blockSize)
}

/**
* Trait for shared param maxBlockSizeInMB (default: 0.0). This trait may be changed or
* removed between minor versions.
*/
trait HasMaxBlockSizeInMB extends Params {

/**
* Param for Maximum memory in MB for stacking input data into blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. If 0, try to infer an appropriate value. Must be &gt;= 0..
* @group expertParam
*/
final val maxBlockSizeInMB: DoubleParam = new DoubleParam(this, "maxBlockSizeInMB", "Maximum memory in MB for stacking input data into blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. If 0, try to infer an appropriate value. Must be >= 0.", ParamValidators.gtEq(0.0))

setDefault(maxBlockSizeInMB, 0.0)

/** @group expertGetParam */
final def getMaxBlockSizeInMB: Double = $(maxBlockSizeInMB)
}
// scalastyle:on
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,8 @@ class LinearSVCSuite extends MLTest with DefaultReadWriteTest {
.setFitIntercept(fitIntercept)
.setMaxIter(5)
val model = lsvc.fit(dataset)
Seq(4, 16, 64).foreach { blockSize =>
val model2 = lsvc.setBlockSize(blockSize).fit(dataset)
Seq(0, 0.01, 0.1, 1, 2, 4).foreach { s =>
val model2 = lsvc.setMaxBlockSizeInMB(s).fit(dataset)
assert(model.intercept ~== model2.intercept relTol 1e-9)
assert(model.coefficients ~== model2.coefficients relTol 1e-9)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,58 @@ class InstanceSuite extends SparkFunSuite{
}
}

test("InstanceBlock: blokify with max memory usage") {
val instance1 = Instance(19.0, 2.0, Vectors.dense(1.0, 7.0))
val instance2 = Instance(17.0, 1.0, Vectors.dense(0.0, 5.0).toSparse)
val instances = Seq(instance1, instance2)

val blocks = InstanceBlock
.blokifyWithMaxMemUsage(Iterator.apply(instance1, instance2), 128).toArray
require(blocks.length == 1)
val block = blocks.head
assert(block.size === 2)
assert(block.numFeatures === 2)
block.instanceIterator.zipWithIndex.foreach {
case (instance, i) =>
assert(instance.label === instances(i).label)
assert(instance.weight === instances(i).weight)
assert(instance.features.toArray === instances(i).features.toArray)
}
Seq(0, 1).foreach { i =>
val nzIter = block.getNonZeroIter(i)
val vec = Vectors.sparse(2, nzIter.toSeq)
assert(vec.toArray === instances(i).features.toArray)
}

// instances larger than maxMemUsage
val denseInstance = Instance(-1.0, 2.0, Vectors.dense(Array.fill(1000)(1.0)))
InstanceBlock.blokifyWithMaxMemUsage(Iterator.single(denseInstance), 64).size
InstanceBlock.blokifyWithMaxMemUsage(Iterator.fill(10)(denseInstance), 64).size

// different numFeatures
intercept[IllegalArgumentException] {
InstanceBlock.blokifyWithMaxMemUsage(Iterator.apply(instance1, denseInstance), 64).size
}

// nnz = 10
val sparseInstance = Instance(-2.0, 3.0,
Vectors.sparse(1000, Array.range(0, 1000, 100), Array.fill(10)(0.1)))

// normally, memory usage of a block does not exceed maxMemUsage too much
val maxMemUsage = 1 << 18
val mixedIter = Iterator.fill(100)(denseInstance) ++
Iterator.fill(1000)(sparseInstance) ++
Iterator.fill(10)(denseInstance) ++
Iterator.fill(10)(sparseInstance) ++
Iterator.fill(100)(denseInstance) ++
Iterator.fill(100)(sparseInstance)
InstanceBlock.blokifyWithMaxMemUsage(mixedIter, maxMemUsage)
.foreach { block =>
val doubleBytes = java.lang.Double.BYTES
val arrayHeader = 12L
val blockMemUsage = block.matrix.getSizeInBytes +
(block.labels.length + block.weights.length) * doubleBytes + arrayHeader * 2
require(blockMemUsage < maxMemUsage * 1.05)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add test:

  • Generate a sparse and dense instance mixed list (a list which some segment is dense but others are very sparse), verify each block size won't exceed the blockMem limit too much. (Such as: (actual block mem size)/confg <= 1.1 ?)

}
Loading