Skip to content

Commit bb465ca

Browse files
author
qiping.lqp
committed
Merge branch 'master' of https://github.com/apache/spark into dt-preprune
Conflicts: mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Strategy.scala
2 parents cadd569 + c419e4f commit bb465ca

File tree

17 files changed

+120
-74
lines changed

17 files changed

+120
-74
lines changed

core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,11 +96,13 @@ private[spark] class ApplicationInfo(
9696

9797
def retryCount = _retryCount
9898

99-
def incrementRetryCount = {
99+
def incrementRetryCount() = {
100100
_retryCount += 1
101101
_retryCount
102102
}
103103

104+
def resetRetryCount() = _retryCount = 0
105+
104106
def markFinished(endState: ApplicationState.Value) {
105107
state = endState
106108
endTime = System.currentTimeMillis()

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -296,28 +296,34 @@ private[spark] class Master(
296296
val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
297297
execOption match {
298298
case Some(exec) => {
299+
val appInfo = idToApp(appId)
299300
exec.state = state
301+
if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() }
300302
exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus)
301303
if (ExecutorState.isFinished(state)) {
302-
val appInfo = idToApp(appId)
303304
// Remove this executor from the worker and app
304-
logInfo("Removing executor " + exec.fullId + " because it is " + state)
305+
logInfo(s"Removing executor ${exec.fullId} because it is $state")
305306
appInfo.removeExecutor(exec)
306307
exec.worker.removeExecutor(exec)
307308

308-
val normalExit = exitStatus.exists(_ == 0)
309+
val normalExit = exitStatus == Some(0)
309310
// Only retry certain number of times so we don't go into an infinite loop.
310-
if (!normalExit && appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
311-
schedule()
312-
} else if (!normalExit) {
313-
logError("Application %s with ID %s failed %d times, removing it".format(
314-
appInfo.desc.name, appInfo.id, appInfo.retryCount))
315-
removeApplication(appInfo, ApplicationState.FAILED)
311+
if (!normalExit) {
312+
if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) {
313+
schedule()
314+
} else {
315+
val execs = appInfo.executors.values
316+
if (!execs.exists(_.state == ExecutorState.RUNNING)) {
317+
logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
318+
s"${appInfo.retryCount} times; removing it")
319+
removeApplication(appInfo, ApplicationState.FAILED)
320+
}
321+
}
316322
}
317323
}
318324
}
319325
case None =>
320-
logWarning("Got status update for unknown executor " + appId + "/" + execId)
326+
logWarning(s"Got status update for unknown executor $appId/$execId")
321327
}
322328
}
323329

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,8 @@ private[spark] class ExecutorRunner(
159159
Files.write(header, stderr, Charsets.UTF_8)
160160
stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
161161

162+
state = ExecutorState.RUNNING
163+
worker ! ExecutorStateChanged(appId, execId, state, None, None)
162164
// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
163165
// or with nonzero exit code
164166
val exitCode = process.waitFor()

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ private[spark] class Worker(
234234
try {
235235
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
236236
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
237-
self, workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.RUNNING)
237+
self, workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.LOADING)
238238
executors(appId + "/" + execId) = manager
239239
manager.start()
240240
coresUsed += cores_

dev/run-tests

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ echo "========================================================================="
9393
# echo "q" is needed because sbt on encountering a build file with failure
9494
# (either resolution or compilation) prompts the user for input either q, r,
9595
# etc to quit or retry. This echo is there to make it not block.
96-
BUILD_MVN_PROFILE_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-thriftserver "
96+
BUILD_MVN_PROFILE_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive "
9797
echo -e "q\n" | sbt/sbt $BUILD_MVN_PROFILE_ARGS clean package assembly/assembly | \
9898
grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including"
9999

docs/mllib-decision-tree.md

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ The ordered splits create "bins" and the maximum number of such
8080
bins can be specified using the `maxBins` parameter.
8181

8282
Note that the number of bins cannot be greater than the number of instances `$N$` (a rare scenario
83-
since the default `maxBins` value is 100). The tree algorithm automatically reduces the number of
83+
since the default `maxBins` value is 32). The tree algorithm automatically reduces the number of
8484
bins if the condition is not satisfied.
8585

8686
**Categorical features**
@@ -117,7 +117,7 @@ all nodes at each level of the tree. This could lead to high memory requirements
117117
of the tree, potentially leading to memory overflow errors. To alleviate this problem, a `maxMemoryInMB`
118118
training parameter specifies the maximum amount of memory at the workers (twice as much at the
119119
master) to be allocated to the histogram computation. The default value is conservatively chosen to
120-
be 128 MB to allow the decision algorithm to work in most scenarios. Once the memory requirements
120+
be 256 MB to allow the decision algorithm to work in most scenarios. Once the memory requirements
121121
for a level-wise computation cross the `maxMemoryInMB` threshold, the node training tasks at each
122122
subsequent level are split into smaller tasks.
123123

@@ -167,7 +167,7 @@ val numClasses = 2
167167
val categoricalFeaturesInfo = Map[Int, Int]()
168168
val impurity = "gini"
169169
val maxDepth = 5
170-
val maxBins = 100
170+
val maxBins = 32
171171

172172
val model = DecisionTree.trainClassifier(data, numClasses, categoricalFeaturesInfo, impurity,
173173
maxDepth, maxBins)
@@ -213,7 +213,7 @@ Integer numClasses = 2;
213213
HashMap<Integer, Integer> categoricalFeaturesInfo = new HashMap<Integer, Integer>();
214214
String impurity = "gini";
215215
Integer maxDepth = 5;
216-
Integer maxBins = 100;
216+
Integer maxBins = 32;
217217

218218
// Train a DecisionTree model for classification.
219219
final DecisionTreeModel model = DecisionTree.trainClassifier(data, numClasses,
@@ -250,7 +250,7 @@ data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt').cache()
250250
# Train a DecisionTree model.
251251
# Empty categoricalFeaturesInfo indicates all features are continuous.
252252
model = DecisionTree.trainClassifier(data, numClasses=2, categoricalFeaturesInfo={},
253-
impurity='gini', maxDepth=5, maxBins=100)
253+
impurity='gini', maxDepth=5, maxBins=32)
254254

255255
# Evaluate model on training instances and compute training error
256256
predictions = model.predict(data.map(lambda x: x.features))
@@ -293,7 +293,7 @@ val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt").cache
293293
val categoricalFeaturesInfo = Map[Int, Int]()
294294
val impurity = "variance"
295295
val maxDepth = 5
296-
val maxBins = 100
296+
val maxBins = 32
297297

298298
val model = DecisionTree.trainRegressor(data, categoricalFeaturesInfo, impurity,
299299
maxDepth, maxBins)
@@ -338,7 +338,7 @@ JavaSparkContext sc = new JavaSparkContext(sparkConf);
338338
HashMap<Integer, Integer> categoricalFeaturesInfo = new HashMap<Integer, Integer>();
339339
String impurity = "variance";
340340
Integer maxDepth = 5;
341-
Integer maxBins = 100;
341+
Integer maxBins = 32;
342342

343343
// Train a DecisionTree model.
344344
final DecisionTreeModel model = DecisionTree.trainRegressor(data,
@@ -380,7 +380,7 @@ data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt').cache()
380380
# Train a DecisionTree model.
381381
# Empty categoricalFeaturesInfo indicates all features are continuous.
382382
model = DecisionTree.trainRegressor(data, categoricalFeaturesInfo={},
383-
impurity='variance', maxDepth=5, maxBins=100)
383+
impurity='variance', maxDepth=5, maxBins=32)
384384

385385
# Evaluate model on training instances and compute training error
386386
predictions = model.predict(data.map(lambda x: x.features))

examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTree.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public static void main(String[] args) {
6363
HashMap<Integer, Integer> categoricalFeaturesInfo = new HashMap<Integer, Integer>();
6464
String impurity = "gini";
6565
Integer maxDepth = 5;
66-
Integer maxBins = 100;
66+
Integer maxBins = 32;
6767

6868
// Train a DecisionTree model for classification.
6969
final DecisionTreeModel model = DecisionTree.trainClassifier(data, numClasses,

examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,9 @@ object DecisionTreeRunner {
5252
input: String = null,
5353
dataFormat: String = "libsvm",
5454
algo: Algo = Classification,
55-
maxDepth: Int = 4,
55+
maxDepth: Int = 5,
5656
impurity: ImpurityType = Gini,
57-
maxBins: Int = 100,
57+
maxBins: Int = 32,
5858
fracTest: Double = 0.2)
5959

6060
def main(args: Array[String]) {

mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -331,9 +331,9 @@ object DecisionTree extends Serializable with Logging {
331331
* Supported values: "gini" (recommended) or "entropy".
332332
* @param maxDepth Maximum depth of the tree.
333333
* E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.
334-
* (suggested value: 4)
334+
* (suggested value: 5)
335335
* @param maxBins maximum number of bins used for splitting features
336-
* (suggested value: 100)
336+
* (suggested value: 32)
337337
* @return DecisionTreeModel that can be used for prediction
338338
*/
339339
def trainClassifier(
@@ -375,9 +375,9 @@ object DecisionTree extends Serializable with Logging {
375375
* Supported values: "variance".
376376
* @param maxDepth Maximum depth of the tree.
377377
* E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.
378-
* (suggested value: 4)
378+
* (suggested value: 5)
379379
* @param maxBins maximum number of bins used for splitting features
380-
* (suggested value: 100)
380+
* (suggested value: 32)
381381
* @return DecisionTreeModel that can be used for prediction
382382
*/
383383
def trainRegressor(

mllib/src/main/scala/org/apache/spark/mllib/tree/configuration/Strategy.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,20 +56,20 @@ import org.apache.spark.mllib.tree.configuration.QuantileStrategy._
5656
* If a split has less information gain than minInfoGain,
5757
* this split will not be considered as a valid split.
5858
* @param maxMemoryInMB Maximum memory in MB allocated to histogram aggregation. Default value is
59-
* 128 MB.
59+
* 256 MB.
6060
*/
6161
@Experimental
6262
class Strategy (
6363
val algo: Algo,
6464
val impurity: Impurity,
6565
val maxDepth: Int,
6666
val numClassesForClassification: Int = 2,
67-
val maxBins: Int = 100,
67+
val maxBins: Int = 32,
6868
val quantileCalculationStrategy: QuantileStrategy = Sort,
6969
val categoricalFeaturesInfo: Map[Int, Int] = Map[Int, Int](),
7070
val minInstancesPerNode: Int = 0,
7171
val minInfoGain: Double = 0.0,
72-
val maxMemoryInMB: Int = 128) extends Serializable {
72+
val maxMemoryInMB: Int = 256) extends Serializable {
7373

7474
if (algo == Classification) {
7575
require(numClassesForClassification >= 2)

0 commit comments

Comments
 (0)