Skip to content

Commit 53c2c06

Browse files
zhengruifengcmonkey
authored andcommitted
[SPARK-18206][ML] Add instrumentation for MLP,NB,LDA,AFT,GLM,Isotonic,LiR
## What changes were proposed in this pull request? add instrumentation for MLP,NB,LDA,AFT,GLM,Isotonic,LiR ## How was this patch tested? local test in spark-shell Author: Zheng RuiFeng <[email protected]> Author: Ruifeng Zheng <[email protected]> Closes apache#15671 from zhengruifeng/lir_instr.
1 parent c4079cf commit 53c2c06

File tree

13 files changed

+105
-33
lines changed

13 files changed

+105
-33
lines changed

mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,9 @@ class GBTClassifier @Since("1.4.0") (
160160
val boostingStrategy = super.getOldBoostingStrategy(categoricalFeatures, OldAlgo.Classification)
161161

162162
val instr = Instrumentation.create(this, oldDataset)
163-
instr.logParams(params: _*)
163+
instr.logParams(labelCol, featuresCol, predictionCol, impurity, lossType,
164+
maxDepth, maxBins, maxIter, maxMemoryInMB, minInfoGain, minInstancesPerNode,
165+
seed, stepSize, subsamplingRate, cacheNodeIds, checkpointInterval)
164166
instr.logNumFeatures(numFeatures)
165167
instr.logNumClasses(2)
166168

mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,8 +232,15 @@ class MultilayerPerceptronClassifier @Since("1.5.0") (
232232
* @return Fitted model
233233
*/
234234
override protected def train(dataset: Dataset[_]): MultilayerPerceptronClassificationModel = {
235+
val instr = Instrumentation.create(this, dataset)
236+
instr.logParams(labelCol, featuresCol, predictionCol, layers, maxIter, tol,
237+
blockSize, solver, stepSize, seed)
238+
235239
val myLayers = $(layers)
236240
val labels = myLayers.last
241+
instr.logNumClasses(labels)
242+
instr.logNumFeatures(myLayers.head)
243+
237244
val lpData = extractLabeledPoints(dataset)
238245
val data = lpData.map(lp => LabelConverter.encodeLabeledPoint(lp, labels))
239246
val topology = FeedForwardTopology.multiLayerPerceptron(myLayers, softmaxOnTop = true)
@@ -258,7 +265,10 @@ class MultilayerPerceptronClassifier @Since("1.5.0") (
258265
}
259266
trainer.setStackSize($(blockSize))
260267
val mlpModel = trainer.train(data)
261-
new MultilayerPerceptronClassificationModel(uid, myLayers, mlpModel.weights)
268+
val model = new MultilayerPerceptronClassificationModel(uid, myLayers, mlpModel.weights)
269+
270+
instr.logSuccess(model)
271+
model
262272
}
263273
}
264274

mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,12 @@ class NaiveBayes @Since("1.5.0") (
147147
}
148148
}
149149

150+
val instr = Instrumentation.create(this, dataset)
151+
instr.logParams(labelCol, featuresCol, weightCol, predictionCol, rawPredictionCol,
152+
probabilityCol, modelType, smoothing, thresholds)
153+
150154
val numFeatures = dataset.select(col($(featuresCol))).head().getAs[Vector](0).size
155+
instr.logNumFeatures(numFeatures)
151156
val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol))
152157

153158
// Aggregates term frequencies per label.
@@ -169,6 +174,7 @@ class NaiveBayes @Since("1.5.0") (
169174
}).collect().sortBy(_._1)
170175

171176
val numLabels = aggregated.length
177+
instr.logNumClasses(numLabels)
172178
val numDocuments = aggregated.map(_._2._1).sum
173179

174180
val labelArray = new Array[Double](numLabels)
@@ -198,7 +204,9 @@ class NaiveBayes @Since("1.5.0") (
198204

199205
val pi = Vectors.dense(piArray)
200206
val theta = new DenseMatrix(numLabels, numFeatures, thetaArray, true)
201-
new NaiveBayesModel(uid, pi, theta).setOldLabels(labelArray)
207+
val model = new NaiveBayesModel(uid, pi, theta).setOldLabels(labelArray)
208+
instr.logSuccess(model)
209+
model
202210
}
203211

204212
@Since("1.5.0")

mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,9 @@ class RandomForestClassifier @Since("1.4.0") (
131131
super.getOldStrategy(categoricalFeatures, numClasses, OldAlgo.Classification, getOldImpurity)
132132

133133
val instr = Instrumentation.create(this, oldDataset)
134-
instr.logParams(params: _*)
134+
instr.logParams(labelCol, featuresCol, predictionCol, probabilityCol, rawPredictionCol,
135+
impurity, numTrees, featureSubsetStrategy, maxDepth, maxBins, maxMemoryInMB, minInfoGain,
136+
minInstancesPerNode, seed, subsamplingRate, thresholds, cacheNodeIds, checkpointInterval)
135137

136138
val trees = RandomForest
137139
.run(oldDataset, strategy, getNumTrees, getFeatureSubsetStrategy, getSeed, Some(instr))

mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -888,6 +888,12 @@ class LDA @Since("1.6.0") (
888888
@Since("2.0.0")
889889
override def fit(dataset: Dataset[_]): LDAModel = {
890890
transformSchema(dataset.schema, logging = true)
891+
892+
val instr = Instrumentation.create(this, dataset)
893+
instr.logParams(featuresCol, topicDistributionCol, k, maxIter, subsamplingRate,
894+
checkpointInterval, keepLastCheckpoint, optimizeDocConcentration, topicConcentration,
895+
learningDecay, optimizer, learningOffset, seed)
896+
891897
val oldLDA = new OldLDA()
892898
.setK($(k))
893899
.setDocConcentration(getOldDocConcentration)
@@ -905,7 +911,11 @@ class LDA @Since("1.6.0") (
905911
case m: OldDistributedLDAModel =>
906912
new DistributedLDAModel(uid, m.vocabSize, m, dataset.sparkSession, None)
907913
}
908-
copyValues(newModel).setParent(this)
914+
915+
instr.logNumFeatures(newModel.vocabSize)
916+
val model = copyValues(newModel).setParent(this)
917+
instr.logSuccess(model)
918+
model
909919
}
910920

911921
@Since("1.6.0")

mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -457,10 +457,12 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
457457
.map { row =>
458458
Rating(row.getInt(0), row.getInt(1), row.getFloat(2))
459459
}
460+
460461
val instr = Instrumentation.create(this, ratings)
461-
instr.logParams(rank, numUserBlocks, numItemBlocks, implicitPrefs, alpha,
462-
userCol, itemCol, ratingCol, predictionCol, maxIter,
463-
regParam, nonnegative, checkpointInterval, seed)
462+
instr.logParams(rank, numUserBlocks, numItemBlocks, implicitPrefs, alpha, userCol,
463+
itemCol, ratingCol, predictionCol, maxIter, regParam, nonnegative, checkpointInterval,
464+
seed, intermediateStorageLevel, finalStorageLevel)
465+
464466
val (userFactors, itemFactors) = ALS.train(ratings, rank = $(rank),
465467
numUserBlocks = $(numUserBlocks), numItemBlocks = $(numItemBlocks),
466468
maxIter = $(maxIter), regParam = $(regParam), implicitPrefs = $(implicitPrefs),

mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,12 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S
227227
val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt)
228228
val numFeatures = featuresStd.size
229229

230+
val instr = Instrumentation.create(this, dataset)
231+
instr.logParams(labelCol, featuresCol, censorCol, predictionCol, quantilesCol,
232+
fitIntercept, maxIter, tol, aggregationDepth)
233+
instr.logNamedValue("quantileProbabilities.size", $(quantileProbabilities).length)
234+
instr.logNumFeatures(numFeatures)
235+
230236
if (!$(fitIntercept) && (0 until numFeatures).exists { i =>
231237
featuresStd(i) == 0.0 && featuresSummarizer.mean(i) != 0.0 }) {
232238
logWarning("Fitting AFTSurvivalRegressionModel without intercept on dataset with " +
@@ -276,8 +282,10 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S
276282
val coefficients = Vectors.dense(rawCoefficients)
277283
val intercept = parameters(1)
278284
val scale = math.exp(parameters(0))
279-
val model = new AFTSurvivalRegressionModel(uid, coefficients, intercept, scale)
280-
copyValues(model.setParent(this))
285+
val model = copyValues(new AFTSurvivalRegressionModel(uid, coefficients,
286+
intercept, scale).setParent(this))
287+
instr.logSuccess(model)
288+
model
281289
}
282290

283291
@Since("1.6.0")

mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,9 @@ class GBTRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: String)
148148
val boostingStrategy = super.getOldBoostingStrategy(categoricalFeatures, OldAlgo.Regression)
149149

150150
val instr = Instrumentation.create(this, oldDataset)
151-
instr.logParams(params: _*)
151+
instr.logParams(labelCol, featuresCol, predictionCol, impurity, lossType,
152+
maxDepth, maxBins, maxIter, maxMemoryInMB, minInfoGain, minInstancesPerNode,
153+
seed, stepSize, subsamplingRate, cacheNodeIds, checkpointInterval)
152154
instr.logNumFeatures(numFeatures)
153155

154156
val (baseLearners, learnerWeights) = GradientBoostedTrees.run(oldDataset, boostingStrategy,

mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,11 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val
251251
val familyAndLink = new FamilyAndLink(familyObj, linkObj)
252252

253253
val numFeatures = dataset.select(col($(featuresCol))).first().getAs[Vector](0).size
254+
val instr = Instrumentation.create(this, dataset)
255+
instr.logParams(labelCol, featuresCol, weightCol, predictionCol, linkPredictionCol,
256+
family, solver, fitIntercept, link, maxIter, regParam, tol)
257+
instr.logNumFeatures(numFeatures)
258+
254259
if (numFeatures > WeightedLeastSquares.MAX_NUM_FEATURES) {
255260
val msg = "Currently, GeneralizedLinearRegression only supports number of features" +
256261
s" <= ${WeightedLeastSquares.MAX_NUM_FEATURES}. Found $numFeatures in the input dataset."
@@ -264,7 +269,7 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val
264269
Instance(label, weight, features)
265270
}
266271

267-
if (familyObj == Gaussian && linkObj == Identity) {
272+
val model = if (familyObj == Gaussian && linkObj == Identity) {
268273
// TODO: Make standardizeFeatures and standardizeLabel configurable.
269274
val optimizer = new WeightedLeastSquares($(fitIntercept), $(regParam), elasticNetParam = 0.0,
270275
standardizeFeatures = true, standardizeLabel = true)
@@ -274,21 +279,23 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val
274279
.setParent(this))
275280
val trainingSummary = new GeneralizedLinearRegressionTrainingSummary(dataset, model,
276281
wlsModel.diagInvAtWA.toArray, 1, getSolver)
277-
return model.setSummary(Some(trainingSummary))
282+
model.setSummary(Some(trainingSummary))
283+
} else {
284+
// Fit Generalized Linear Model by iteratively reweighted least squares (IRLS).
285+
val initialModel = familyAndLink.initialize(instances, $(fitIntercept), $(regParam))
286+
val optimizer = new IterativelyReweightedLeastSquares(initialModel,
287+
familyAndLink.reweightFunc, $(fitIntercept), $(regParam), $(maxIter), $(tol))
288+
val irlsModel = optimizer.fit(instances)
289+
val model = copyValues(
290+
new GeneralizedLinearRegressionModel(uid, irlsModel.coefficients, irlsModel.intercept)
291+
.setParent(this))
292+
val trainingSummary = new GeneralizedLinearRegressionTrainingSummary(dataset, model,
293+
irlsModel.diagInvAtWA.toArray, irlsModel.numIterations, getSolver)
294+
model.setSummary(Some(trainingSummary))
278295
}
279296

280-
// Fit Generalized Linear Model by iteratively reweighted least squares (IRLS).
281-
val initialModel = familyAndLink.initialize(instances, $(fitIntercept), $(regParam))
282-
val optimizer = new IterativelyReweightedLeastSquares(initialModel, familyAndLink.reweightFunc,
283-
$(fitIntercept), $(regParam), $(maxIter), $(tol))
284-
val irlsModel = optimizer.fit(instances)
285-
286-
val model = copyValues(
287-
new GeneralizedLinearRegressionModel(uid, irlsModel.coefficients, irlsModel.intercept)
288-
.setParent(this))
289-
val trainingSummary = new GeneralizedLinearRegressionTrainingSummary(dataset, model,
290-
irlsModel.diagInvAtWA.toArray, irlsModel.numIterations, getSolver)
291-
model.setSummary(Some(trainingSummary))
297+
instr.logSuccess(model)
298+
model
292299
}
293300

294301
@Since("2.0.0")

mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,10 +171,16 @@ class IsotonicRegression @Since("1.5.0") (@Since("1.5.0") override val uid: Stri
171171
val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE
172172
if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)
173173

174+
val instr = Instrumentation.create(this, dataset)
175+
instr.logParams(labelCol, featuresCol, weightCol, predictionCol, featureIndex, isotonic)
176+
instr.logNumFeatures(1)
177+
174178
val isotonicRegression = new MLlibIsotonicRegression().setIsotonic($(isotonic))
175179
val oldModel = isotonicRegression.run(instances)
176180

177-
copyValues(new IsotonicRegressionModel(uid, oldModel).setParent(this))
181+
val model = copyValues(new IsotonicRegressionModel(uid, oldModel).setParent(this))
182+
instr.logSuccess(model)
183+
model
178184
}
179185

180186
@Since("1.5.0")

0 commit comments

Comments
 (0)