From bee486814c3290caeeede0236d245ab3395f07ee Mon Sep 17 00:00:00 2001 From: Kai Jiang Date: Sat, 4 Jun 2016 04:29:41 -0700 Subject: [PATCH 1/7] DecisionTree Wrapper in SparkR --- R/pkg/NAMESPACE | 3 +- R/pkg/R/generics.R | 5 + R/pkg/R/mllib.R | 68 ++++++++++ .../ml/r/DecisionTreeClassifierWrapper.scala | 123 ++++++++++++++++++ .../ml/r/DecisionTreeRegressorWrapper.scala | 122 +++++++++++++++++ .../org/apache/spark/ml/r/RWrappers.scala | 2 + 6 files changed, 322 insertions(+), 1 deletion(-) create mode 100644 mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala create mode 100644 mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 267a38c21530..a9227830e0db 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -43,7 +43,8 @@ exportMethods("glm", "spark.isoreg", "spark.gaussianMixture", "spark.als", - "spark.kstest") + "spark.kstest", + "spark.decisionTree") # Job group lifecycle management methods export("setJobGroup", diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 90a02e277831..02c3bfc1b00b 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -1358,6 +1358,11 @@ setGeneric("spark.perplexity", function(object, data) { standardGeneric("spark.p #' @export setGeneric("spark.isoreg", function(data, formula, ...) { standardGeneric("spark.isoreg") }) +#' @rdname spark.decisionTree +#' @export +setGeneric("spark.decisionTree", + function(data, formula, ...) { standardGeneric("spark.decisionTree") }) + #' @rdname spark.gaussianMixture #' @export setGeneric("spark.gaussianMixture", diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index b901307f8f40..5c85cfea9100 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -95,6 +95,20 @@ setClass("ALSModel", representation(jobj = "jobj")) #' @note KSTest since 2.1.0 setClass("KSTest", representation(jobj = "jobj")) +#' S4 class that represents a DecisionTreeRegressionModel +#' +#' @param jobj a Java object reference to the backing Scala DecisionTreeRegressionModel +#' @export +#' @note DecisionTreeRegressionModel since 2.1.0 +setClass("DecisionTreeRegressionModel", representation(jobj = "jobj")) + +#' S4 class that represents a DecisionTreeClassificationModel +#' +#' @param jobj a Java object reference to the backing Scala DecisionTreeClassificationModel +#' @export +#' @note DecisionTreeClassificationModel since 2.1.0 +setClass("DecisionTreeClassificationModel", representation(jobj = "jobj")) + #' Saves the MLlib model to the input path #' #' Saves the MLlib model to the input path. For more information, see the specific @@ -897,6 +911,22 @@ setMethod("write.ml", signature(object = "GaussianMixtureModel", path = "charact write_internal(object, path, overwrite) }) +#' Save the Decision Tree Regression model to the input path. +#' +#' @param object A fitted Decision tree regression model +#' @param path The directory where the model is saved +#' @param overwrite Overwrites or not if the output path already exists. Default is FALSE +#' which means throw exception if the output path exists. +#' +#' @aliases write.ml,DecisionTreeRegressionModel,character-method +#' @rdname spark.decisionTreeRegression +#' @export +#' @note write.ml(DecisionTreeRegressionModel, character) since 2.1.0 +setMethod("write.ml", signature(object = "DecisionTreeRegressionModel", path = "character"), + function(object, path, overwrite = FALSE) { + write_internal(object, path, overwrite) + }) + #' Load a fitted MLlib model from the input path. #' #' @param path path of the model to read. @@ -932,6 +962,8 @@ read.ml <- function(path) { new("GaussianMixtureModel", jobj = jobj) } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.ALSWrapper")) { new("ALSModel", jobj = jobj) + } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.DecisionTreeRegressorWrapper")) { + new("DecisionTreeRegressionModel", jobj = jobj) } else { stop("Unsupported model: ", jobj) } @@ -1427,3 +1459,39 @@ print.summary.KSTest <- function(x, ...) { cat(summaryStr, "\n") invisible(x) } + +#' Decision tree regression model. +#' +#' Fit Decision Tree regression model on a SparkDataFrame. +#' +#' @param data SparkDataFrame for training. +#' @param formula A symbolic description of the model to be fitted. Currently only a few formula +#' operators are supported, including '~', ':', '+', and '-'. +#' Note that operator '.' is not supported currently. +#' @return a fitted decision tree regression model +#' @rdname spark.decisionTreeRegressor +#' @seealso rpart: \url{https://cran.r-project.org/web/packages/rpart/} +#' @export +#' @examples +#' \dontrun{ +#' df <- createDataFrame(sqlContext, kyphosis) +#' model <- spark.decisionTree(df, Kyphosis ~ Age + Number + Start) +#' } +setMethod("spark.decisionTree", signature(data = "SparkDataFrame", formula = "formula"), + function(data, formula, type = c("regression", "classification")) { + formula <- paste(deparse(formula), collapse = "") + if (identical(type, "regression")) { + jobj <- callJStatic("org.apache.spark.ml.r.DecisionTreeRegressorWrapper", "fit", + data@sdf, formula) + new("DecisionTreeRegressionModel", jobj = jobj) + } else if (identical(type, "classification")) { + jobj <- callJStatic("org.apache.spark.ml.r.DecisionTreeClassificationWrapper", "fit", + data@sdf, formula) + new("DecisionTreeClassificationModel", jobj = jobj) + } + }) + +setMethod("predict", signature(object = "DecisionTreeRegressionModel"), + function(object, newData) { + return(dataFrame(callJMethod(object@jobj, "transform", newData@sdf))) + }) diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala new file mode 100644 index 000000000000..d59f40af9c15 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.r + +import org.apache.hadoop.fs.Path +import org.json4s._ +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.ml.{Pipeline, PipelineModel} +import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NominalAttribute} +import org.apache.spark.ml.classification.{DecisionTreeClassificationModel, DecisionTreeClassifier} +import org.apache.spark.ml.feature.{IndexToString, RFormula} +import org.apache.spark.ml.util._ +import org.apache.spark.sql.{DataFrame, Dataset} + +private[r] class DecisionTreeClassifierWrapper private ( + val pipeline: PipelineModel, + val features: Array[String], + val labels: Array[String]) extends MLWritable { + + import DecisionTreeClassifierWrapper.PREDICTED_LABEL_INDEX_COL + + private val DTModel: DecisionTreeClassificationModel = + pipeline.stages(1).asInstanceOf[DecisionTreeClassificationModel] + + lazy val maxDepth: Int = DTModel.getMaxDepth + + lazy val maxBins: Int = DTModel.getMaxBins + + def transform(dataset: Dataset[_]): DataFrame = { + pipeline.transform(dataset) + .drop(PREDICTED_LABEL_INDEX_COL) + .drop(DTModel.getFeaturesCol) + } + + override def write: MLWriter = new + DecisionTreeClassifierWrapper.DecisionTreeClassifierWrapperWriter(this) +} + +private[r] object DecisionTreeClassifierWrapper extends MLReadable[DecisionTreeClassifierWrapper] { + + val PREDICTED_LABEL_INDEX_COL = "pred_label_idx" + val PREDICTED_LABEL_COL = "prediction" + + def fit(data: DataFrame, formula: String): DecisionTreeClassifierWrapper = { + val rFormula = new RFormula() + .setFormula(formula) + .fit(data) + // get labels and feature names from output schema + val schema = rFormula.transform(data).schema + val labelAttr = Attribute.fromStructField(schema(rFormula.getLabelCol)) + .asInstanceOf[NominalAttribute] + val labels = labelAttr.values.get + val featureAttrs = AttributeGroup.fromStructField(schema(rFormula.getFeaturesCol)) + .attributes.get + val features = featureAttrs.map(_.name.get) + // assemble and fit the pipeline + val decisionTree = new DecisionTreeClassifier() + .setPredictionCol(PREDICTED_LABEL_INDEX_COL) + val idxToStr = new IndexToString() + .setInputCol(PREDICTED_LABEL_INDEX_COL) + .setOutputCol(PREDICTED_LABEL_COL) + .setLabels(labels) + val pipeline = new Pipeline() + .setStages(Array(rFormula, decisionTree, idxToStr)) + .fit(data) + new DecisionTreeClassifierWrapper(pipeline, features, labels) + } + + override def read: MLReader[DecisionTreeClassifierWrapper] = + new DecisionTreeClassifierWrapperReader + + override def load(path: String): DecisionTreeClassifierWrapper = super.load(path) + + class DecisionTreeClassifierWrapperWriter(instance: DecisionTreeClassifierWrapper) + extends MLWriter { + + override protected def saveImpl(path: String): Unit = { + val rMetadataPath = new Path(path, "rMetadata").toString + val pipelinePath = new Path(path, "pipeline").toString + + val rMetadata = ("class" -> instance.getClass.getName) ~ + ("features" -> instance.features.toSeq) ~ + ("labels" -> instance.labels.toSeq) + val rMetadataJson: String = compact(render(rMetadata)) + + sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + instance.pipeline.save(pipelinePath) + } + } + + class DecisionTreeClassifierWrapperReader extends MLReader[DecisionTreeClassifierWrapper] { + + override def load(path: String): DecisionTreeClassifierWrapper = { + implicit val format = DefaultFormats + val rMetadataPath = new Path(path, "rMetadata").toString + val pipelinePath = new Path(path, "pipeline").toString + val pipeline = PipelineModel.load(pipelinePath) + + val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadata = parse(rMetadataStr) + val features = (rMetadata \ "features").extract[Array[String]] + val labels = (rMetadata \ "labels").extract[Array[String]] + new DecisionTreeClassifierWrapper(pipeline, features, labels) + } + } +} \ No newline at end of file diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala new file mode 100644 index 000000000000..89a82c452153 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.r + +import org.apache.hadoop.fs.Path +import org.json4s._ +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.ml.{Pipeline, PipelineModel} +import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NominalAttribute} +import org.apache.spark.ml.feature.{IndexToString, RFormula} +import org.apache.spark.ml.regression.{DecisionTreeRegressionModel, DecisionTreeRegressor} +import org.apache.spark.ml.util._ +import org.apache.spark.sql.{DataFrame, Dataset} + +private[r] class DecisionTreeRegressorWrapper private ( + val pipeline: PipelineModel, + val features: Array[String], + val labels: Array[String]) extends MLWritable { + + import DecisionTreeRegressorWrapper.PREDICTED_LABEL_INDEX_COL + + private val DTModel: DecisionTreeRegressionModel = + pipeline.stages(1).asInstanceOf[DecisionTreeRegressionModel] + + lazy val maxDepth: Int = DTModel.getMaxDepth + + lazy val maxBins: Int = DTModel.getMaxBins + + def transform(dataset: Dataset[_]): DataFrame = { + pipeline.transform(dataset) + .drop(PREDICTED_LABEL_INDEX_COL) + .drop(DTModel.getFeaturesCol) + } + + override def write: MLWriter = new + DecisionTreeRegressorWrapper.DecisionTreeRegressorWrapperWriter(this) +} + +private[r] object DecisionTreeRegressorWrapper extends MLReadable[DecisionTreeRegressorWrapper] { + + val PREDICTED_LABEL_INDEX_COL = "pred_label_idx" + val PREDICTED_LABEL_COL = "prediction" + + def fit(data: DataFrame, formula: String): DecisionTreeRegressorWrapper = { + val rFormula = new RFormula() + .setFormula(formula) + .fit(data) + // get labels and feature names from output schema + val schema = rFormula.transform(data).schema + val labelAttr = Attribute.fromStructField(schema(rFormula.getLabelCol)) + .asInstanceOf[NominalAttribute] + val labels = labelAttr.values.get + val featureAttrs = AttributeGroup.fromStructField(schema(rFormula.getFeaturesCol)) + .attributes.get + val features = featureAttrs.map(_.name.get) + // assemble and fit the pipeline + val decisionTree = new DecisionTreeRegressor() + .setPredictionCol(PREDICTED_LABEL_INDEX_COL) + val idxToStr = new IndexToString() + .setInputCol(PREDICTED_LABEL_INDEX_COL) + .setOutputCol(PREDICTED_LABEL_COL) + .setLabels(labels) + val pipeline = new Pipeline() + .setStages(Array(rFormula, decisionTree, idxToStr)) + .fit(data) + new DecisionTreeRegressorWrapper(pipeline, features, labels) + } + + override def read: MLReader[DecisionTreeRegressorWrapper] = new DecisionTreeRegressorWrapperReader + + override def load(path: String): DecisionTreeRegressorWrapper = super.load(path) + + class DecisionTreeRegressorWrapperWriter(instance: DecisionTreeRegressorWrapper) + extends MLWriter { + + override protected def saveImpl(path: String): Unit = { + val rMetadataPath = new Path(path, "rMetadata").toString + val pipelinePath = new Path(path, "pipeline").toString + + val rMetadata = ("class" -> instance.getClass.getName) ~ + ("features" -> instance.features.toSeq) ~ + ("labels" -> instance.labels.toSeq) + val rMetadataJson: String = compact(render(rMetadata)) + + sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + instance.pipeline.save(pipelinePath) + } + } + + class DecisionTreeRegressorWrapperReader extends MLReader[DecisionTreeRegressorWrapper] { + + override def load(path: String): DecisionTreeRegressorWrapper = { + implicit val format = DefaultFormats + val rMetadataPath = new Path(path, "rMetadata").toString + val pipelinePath = new Path(path, "pipeline").toString + val pipeline = PipelineModel.load(pipelinePath) + + val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadata = parse(rMetadataStr) + val features = (rMetadata \ "features").extract[Array[String]] + val labels = (rMetadata \ "labels").extract[Array[String]] + new DecisionTreeRegressorWrapper(pipeline, features, labels) + } + } +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala index d64de1b6abb6..5de3a2101eae 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala @@ -54,6 +54,8 @@ private[r] object RWrappers extends MLReader[Object] { GaussianMixtureWrapper.load(path) case "org.apache.spark.ml.r.ALSWrapper" => ALSWrapper.load(path) + case "org.apache.spark.ml.r.DecisionTreeRegressorWrapper" => + DecisionTreeRegressorWrapper.load(path) case _ => throw new SparkException(s"SparkR read.ml does not support load $className") } From 463f9656279eac489d91c8fba89671786c2d78bb Mon Sep 17 00:00:00 2001 From: Kai Jiang Date: Sat, 1 Oct 2016 14:57:37 -0700 Subject: [PATCH 2/7] add --- R/pkg/R/mllib.R | 67 +++++++++++++++++++++++++++++++++++-------------- 1 file changed, 48 insertions(+), 19 deletions(-) diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 5c85cfea9100..a94a0bd3663a 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -118,7 +118,8 @@ setClass("DecisionTreeClassificationModel", representation(jobj = "jobj")) #' @export #' @seealso \link{spark.glm}, \link{glm}, #' @seealso \link{spark.als}, \link{spark.gaussianMixture}, \link{spark.isoreg}, \link{spark.kmeans}, -#' @seealso \link{spark.lda}, \link{spark.mlp}, \link{spark.naiveBayes}, \link{spark.survreg} +#' @seealso \link{spark.lda}, \link{spark.mlp}, \link{spark.naiveBayes}, \link{spark.survreg}, +#' @seealso \link{spark.decisionTree}, #' @seealso \link{read.ml} NULL @@ -131,7 +132,7 @@ NULL #' @export #' @seealso \link{spark.glm}, \link{glm}, #' @seealso \link{spark.als}, \link{spark.gaussianMixture}, \link{spark.isoreg}, \link{spark.kmeans}, -#' @seealso \link{spark.mlp}, \link{spark.naiveBayes}, \link{spark.survreg} +#' @seealso \link{spark.mlp}, \link{spark.naiveBayes}, \link{spark.survreg}, \link{spark.decisionTree} NULL write_internal <- function(object, path, overwrite = FALSE) { @@ -911,22 +912,6 @@ setMethod("write.ml", signature(object = "GaussianMixtureModel", path = "charact write_internal(object, path, overwrite) }) -#' Save the Decision Tree Regression model to the input path. -#' -#' @param object A fitted Decision tree regression model -#' @param path The directory where the model is saved -#' @param overwrite Overwrites or not if the output path already exists. Default is FALSE -#' which means throw exception if the output path exists. -#' -#' @aliases write.ml,DecisionTreeRegressionModel,character-method -#' @rdname spark.decisionTreeRegression -#' @export -#' @note write.ml(DecisionTreeRegressionModel, character) since 2.1.0 -setMethod("write.ml", signature(object = "DecisionTreeRegressionModel", path = "character"), - function(object, path, overwrite = FALSE) { - write_internal(object, path, overwrite) - }) - #' Load a fitted MLlib model from the input path. #' #' @param path path of the model to read. @@ -964,6 +949,8 @@ read.ml <- function(path) { new("ALSModel", jobj = jobj) } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.DecisionTreeRegressorWrapper")) { new("DecisionTreeRegressionModel", jobj = jobj) + } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.DecisionTreeClassifierWrapper")) { + new("DecisionTreeClassificationModel", jobj = jobj) } else { stop("Unsupported model: ", jobj) } @@ -1477,6 +1464,7 @@ print.summary.KSTest <- function(x, ...) { #' df <- createDataFrame(sqlContext, kyphosis) #' model <- spark.decisionTree(df, Kyphosis ~ Age + Number + Start) #' } +#' @note spark.decisionTree since 2.1.0 setMethod("spark.decisionTree", signature(data = "SparkDataFrame", formula = "formula"), function(data, formula, type = c("regression", "classification")) { formula <- paste(deparse(formula), collapse = "") @@ -1491,7 +1479,48 @@ setMethod("spark.decisionTree", signature(data = "SparkDataFrame", formula = "fo } }) +# Makes predictions from a Decision Tree model or a model produced by spark.decisionTree() + +#' @param newData a SparkDataFrame for testing. +#' @return \code{predict} returns a SparkDataFrame containing predicted labeled in a column named +#' "prediction" +#' @rdname spark.decisionTree +#' @export +#' @note predict(decisionTreeRegressionModel) since 2.1.0 setMethod("predict", signature(object = "DecisionTreeRegressionModel"), function(object, newData) { - return(dataFrame(callJMethod(object@jobj, "transform", newData@sdf))) + predict_internal(object, newData) }) + +#' Save the Decision Tree Regression model to the input path. +#' +#' @param object A fitted Decision tree regression model +#' @param path The directory where the model is saved +#' @param overwrite Overwrites or not if the output path already exists. Default is FALSE +#' which means throw exception if the output path exists. +#' +#' @aliases write.ml,DecisionTreeRegressionModel,character-method +#' @rdname spark.decisionTreeRegression +#' @export +#' @note write.ml(DecisionTreeRegressionModel, character) since 2.1.0 +setMethod("write.ml", signature(object = "DecisionTreeRegressionModel", path = "character"), +function(object, path, overwrite = FALSE) { + write_internal(object, path, overwrite) +}) + +# Get the summary of an IsotonicRegressionModel model + +#' @param object a fitted IsotonicRegressionModel +#' @param ... Other optional arguments to summary of an IsotonicRegressionModel +#' @return \code{summary} returns the model's boundaries and prediction as lists +#' @rdname spark.isoreg +#' @aliases summary,IsotonicRegressionModel-method +#' @export +#' @note summary(IsotonicRegressionModel) since 2.1.0 +setMethod("summary", signature(object = "DecisionTreeRegressionModel"), + function(object, ...) { + jobj <- object@jobj + boundaries <- callJMethod(jobj, "boundaries") + predictions <- callJMethod(jobj, "predictions") + return(list(boundaries = boundaries, predictions = predictions)) + }) \ No newline at end of file From b18b7181799ee082793e5036e7c91966c0034fa7 Mon Sep 17 00:00:00 2001 From: Kai Jiang Date: Wed, 5 Oct 2016 07:08:32 -0700 Subject: [PATCH 3/7] regression pass unit test --- R/pkg/NAMESPACE | 6 +- R/pkg/R/mllib.R | 139 +++++++++++++----- R/pkg/inst/tests/testthat/test_mllib.R | 30 ++++ .../ml/r/DecisionTreeClassifierWrapper.scala | 64 ++++---- .../ml/r/DecisionTreeRegressorWrapper.scala | 65 ++++---- 5 files changed, 207 insertions(+), 97 deletions(-) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index a9227830e0db..ea06df9363f2 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -348,7 +348,9 @@ export("as.DataFrame", "uncacheTable", "print.summary.GeneralizedLinearRegressionModel", "read.ml", - "print.summary.KSTest") + "print.summary.KSTest", + "print.summary.DecisionTreeRegressionModel", + "print.summary.DecisionTreeClassificationModel") export("structField", "structField.jobj", @@ -373,6 +375,8 @@ S3method(print, structField) S3method(print, structType) S3method(print, summary.GeneralizedLinearRegressionModel) S3method(print, summary.KSTest) +S3method(print, summary.DecisionTreeRegressionModel) +S3method(print, summary.DecisionTreeClassificationModel) S3method(structField, character) S3method(structField, jobj) S3method(structType, jobj) diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index a94a0bd3663a..f44b6e7f1c5c 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -1447,39 +1447,31 @@ print.summary.KSTest <- function(x, ...) { invisible(x) } -#' Decision tree regression model. +#' Decision Tree #' -#' Fit Decision Tree regression model on a SparkDataFrame. +#' @description +#' \code{spark.decisionTree} tree #' -#' @param data SparkDataFrame for training. -#' @param formula A symbolic description of the model to be fitted. Currently only a few formula -#' operators are supported, including '~', ':', '+', and '-'. -#' Note that operator '.' is not supported currently. -#' @return a fitted decision tree regression model -#' @rdname spark.decisionTreeRegressor -#' @seealso rpart: \url{https://cran.r-project.org/web/packages/rpart/} -#' @export -#' @examples -#' \dontrun{ -#' df <- createDataFrame(sqlContext, kyphosis) -#' model <- spark.decisionTree(df, Kyphosis ~ Age + Number + Start) -#' } +#' Decision Tree +#' +#' @param data a SparkDataFrame of user data. #' @note spark.decisionTree since 2.1.0 setMethod("spark.decisionTree", signature(data = "SparkDataFrame", formula = "formula"), - function(data, formula, type = c("regression", "classification")) { + function(data, formula, type = c("regression", "classification"), maxDepth = 5, maxBins = 32 ) { formula <- paste(deparse(formula), collapse = "") if (identical(type, "regression")) { jobj <- callJStatic("org.apache.spark.ml.r.DecisionTreeRegressorWrapper", "fit", - data@sdf, formula) + data@sdf, formula, as.integer(maxDepth), as.integer(maxBins)) new("DecisionTreeRegressionModel", jobj = jobj) } else if (identical(type, "classification")) { - jobj <- callJStatic("org.apache.spark.ml.r.DecisionTreeClassificationWrapper", "fit", - data@sdf, formula) + jobj <- callJStatic("org.apache.spark.ml.r.DecisionTreeClassifierWrapper", "fit", + data@sdf, formula, as.integer(maxDepth), as.integer(maxBins)) new("DecisionTreeClassificationModel", jobj = jobj) } }) -# Makes predictions from a Decision Tree model or a model produced by spark.decisionTree() +# Makes predictions from a Decision Tree Regression model or +# a model produced by spark.decisionTree() #' @param newData a SparkDataFrame for testing. #' @return \code{predict} returns a SparkDataFrame containing predicted labeled in a column named @@ -1492,6 +1484,20 @@ setMethod("predict", signature(object = "DecisionTreeRegressionModel"), predict_internal(object, newData) }) +# Makes predictions from a Decision Tree Classification model or +# a model produced by spark.decisionTree() + +#' @param newData a SparkDataFrame for testing. +#' @return \code{predict} returns a SparkDataFrame containing predicted labeled in a column named +#' "prediction" +#' @rdname spark.decisionTree +#' @export +#' @note predict(decisionTreeClassificationModel) since 2.1.0 +setMethod("predict", signature(object = "DecisionTreeClassificationModel"), + function(object, newData) { + predict_internal(object, newData) + }) + #' Save the Decision Tree Regression model to the input path. #' #' @param object A fitted Decision tree regression model @@ -1504,23 +1510,88 @@ setMethod("predict", signature(object = "DecisionTreeRegressionModel"), #' @export #' @note write.ml(DecisionTreeRegressionModel, character) since 2.1.0 setMethod("write.ml", signature(object = "DecisionTreeRegressionModel", path = "character"), -function(object, path, overwrite = FALSE) { - write_internal(object, path, overwrite) -}) + function(object, path, overwrite = FALSE) { + write_internal(object, path, overwrite) + }) -# Get the summary of an IsotonicRegressionModel model +#' Save the Decision Tree Classification model to the input path. +#' +#' @param object A fitted Decision tree classification model +#' @param path The directory where the model is saved +#' @param overwrite Overwrites or not if the output path already exists. Default is FALSE +#' which means throw exception if the output path exists. +#' +#' @aliases write.ml,DecisionTreeClassificationModel,character-method +#' @rdname spark.decisionTreeClassification +#' @export +#' @note write.ml(DecisionTreeClassificationModel, character) since 2.1.0 +setMethod("write.ml", signature(object = "DecisionTreeClassificationModel", path = "character"), + function(object, path, overwrite = FALSE) { + write_internal(object, path, overwrite) + }) -#' @param object a fitted IsotonicRegressionModel -#' @param ... Other optional arguments to summary of an IsotonicRegressionModel -#' @return \code{summary} returns the model's boundaries and prediction as lists -#' @rdname spark.isoreg -#' @aliases summary,IsotonicRegressionModel-method +# Get the summary of an DecisionTreeRegressionModel model + +#' @param object a fitted DecisionTreeRegressionModel +#' @param ... Other optional arguments to summary of a DecisionTreeRegressionModel +#' @return \code{summary} returns the model's features as lists, depth and number of nodes +#' @rdname spark.decisionTree +#' @aliases summary,DecisionTreeRegressionModel-method #' @export -#' @note summary(IsotonicRegressionModel) since 2.1.0 +#' @note summary(DecisionTreeRegressionModel) since 2.1.0 setMethod("summary", signature(object = "DecisionTreeRegressionModel"), function(object, ...) { jobj <- object@jobj - boundaries <- callJMethod(jobj, "boundaries") - predictions <- callJMethod(jobj, "predictions") - return(list(boundaries = boundaries, predictions = predictions)) - }) \ No newline at end of file + features <- callJMethod(jobj, "features") + depth <- callJMethod(jobj, "depth") + numNodes <- callJMethod(jobj, "numNodes") + ans <- list(features = features, depth = depth, numNodes = numNodes) + class(ans) <- "summary.DecisionTreeRegressionModel" + ans + }) + +# Get the summary of an DecisionTreeClassificationModel model + +#' @param object a fitted DecisionTreeClassificationModel +#' @param ... Other optional arguments to summary of a DecisionTreeClassificationModel +#' @return \code{summary} returns the model's features as lists, depth and number of nodes +#' @rdname spark.decisionTree +#' @aliases summary,DecisionTreeClassificationModel-method +#' @export +#' @note summary(DecisionTreeRegressionModel) since 2.1.0 +setMethod("summary", signature(object = "DecisionTreeClassificationModel"), +function(object, ...) { + jobj <- object@jobj + features <- callJMethod(jobj, "features") + depth <- callJMethod(jobj, "depth") + numNodes <- callJMethod(jobj, "numNodes") + ans <- list(features = features, depth = depth, numNodes = numNodes) + class(ans) <- "summary.DecisionTreeClassificationModel" + ans +}) + +# Prints the summary of Decision Tree Regression Model + +#' @rdname spark.decisionTree +#' @param x summary object of decisionTreeRegressionModel returned by \code{summary}. +#' @export +#' @note print.summary.DecisionTreeRegressionModel since 2.1.0 +print.summary.DecisionTreeRegressionModel <- function(x, ...) { + jobj <- x@jobj + summaryStr <- callJMethod(jobj, "summary") + cat(summaryStr, "\n") + invisible(x) + } + +# Prints the summary of Decision Tree Classification Model + +#' @rdname spark.decisionTree +#' @param x summary object of decisionTreeClassificationModel returned by \code{summary}. +#' @export +#' @note print.summary.DecisionTreeClassificationModel since 2.1.0 +print.summary.DecisionTreeClassificationModel <- function(x, ...) { + jobj <- x@jobj + summaryStr <- callJMethod(jobj, "summary") + cat(summaryStr, "\n") + invisible(x) +} diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index a1eaaf20916a..337e4d6b0c38 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -791,4 +791,34 @@ test_that("spark.kstest", { expect_match(capture.output(stats)[1], "Kolmogorov-Smirnov test summary:") }) +test_that("spark.decisionTree Regression", { + data <- suppressWarnings(createDataFrame(longley)) + model <- spark.decisionTree(data, Employed~., "regression", maxDepth=5, maxBins=16) + + #Test summary + stats <- summary(model) + expect_equal(stats$depth, 5) + expect_equal(stats$numNodes, 31) + + #Test model predict + predictions <- collect(predict(model, data)) + expect_equal(predictions$prediction, c(60.323, 61.122, 60.171, 61.187, + 63.221, 63.639, 64.989, 63.761, + 66.019, 67.857, 68.169, 66.513, + 68.655, 69.564, 69.331, 70.551), + tolerance = 1e-4) + + # Test model save/load + modelPath <- tempfile(pattern = "spark-decisionTreeRegression", fileext = ".tmp") + write.ml(model, modelPath) + expect_error(write.ml(model, modelPath)) + write.ml(model, modelPath, overwrite = TRUE) + model2 <- read.ml(modelPath) + stats2 <- summary(model2) + expect_equal(stats$depth, stats2$depth) + expect_equal(stats$numNodes, stats2$numNodes) + + unlink(modelPath) +}) + sparkR.session.stop() diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala index d59f40af9c15..c31e90de2492 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala @@ -23,29 +23,23 @@ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ import org.apache.spark.ml.{Pipeline, PipelineModel} -import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NominalAttribute} +import org.apache.spark.ml.attribute.AttributeGroup import org.apache.spark.ml.classification.{DecisionTreeClassificationModel, DecisionTreeClassifier} -import org.apache.spark.ml.feature.{IndexToString, RFormula} +import org.apache.spark.ml.feature.RFormula import org.apache.spark.ml.util._ import org.apache.spark.sql.{DataFrame, Dataset} private[r] class DecisionTreeClassifierWrapper private ( val pipeline: PipelineModel, val features: Array[String], - val labels: Array[String]) extends MLWritable { - - import DecisionTreeClassifierWrapper.PREDICTED_LABEL_INDEX_COL + val maxDepth: Int, + val maxBins: Int) extends MLWritable { private val DTModel: DecisionTreeClassificationModel = pipeline.stages(1).asInstanceOf[DecisionTreeClassificationModel] - lazy val maxDepth: Int = DTModel.getMaxDepth - - lazy val maxBins: Int = DTModel.getMaxBins - def transform(dataset: Dataset[_]): DataFrame = { pipeline.transform(dataset) - .drop(PREDICTED_LABEL_INDEX_COL) .drop(DTModel.getFeaturesCol) } @@ -54,33 +48,36 @@ private[r] class DecisionTreeClassifierWrapper private ( } private[r] object DecisionTreeClassifierWrapper extends MLReadable[DecisionTreeClassifierWrapper] { + def fit(data: DataFrame, + formula: String, + maxDepth: Int, + maxBins: Int): DecisionTreeClassifierWrapper = { - val PREDICTED_LABEL_INDEX_COL = "pred_label_idx" - val PREDICTED_LABEL_COL = "prediction" - - def fit(data: DataFrame, formula: String): DecisionTreeClassifierWrapper = { val rFormula = new RFormula() .setFormula(formula) - .fit(data) - // get labels and feature names from output schema - val schema = rFormula.transform(data).schema - val labelAttr = Attribute.fromStructField(schema(rFormula.getLabelCol)) - .asInstanceOf[NominalAttribute] - val labels = labelAttr.values.get - val featureAttrs = AttributeGroup.fromStructField(schema(rFormula.getFeaturesCol)) + .setFeaturesCol("features") + .setLabelCol("label") + + RWrapperUtils.checkDataColumns(rFormula, data) + val rFormulaModel = rFormula.fit(data) + + // get feature names from output schema + val schema = rFormulaModel.transform(data).schema + val featureAttrs = AttributeGroup.fromStructField(schema(rFormulaModel.getFeaturesCol)) .attributes.get val features = featureAttrs.map(_.name.get) + // assemble and fit the pipeline - val decisionTree = new DecisionTreeClassifier() - .setPredictionCol(PREDICTED_LABEL_INDEX_COL) - val idxToStr = new IndexToString() - .setInputCol(PREDICTED_LABEL_INDEX_COL) - .setOutputCol(PREDICTED_LABEL_COL) - .setLabels(labels) + val decisionTreeClassification = new DecisionTreeClassifier() + .setMaxDepth(maxDepth) + .setMaxBins(maxBins) + .setFeaturesCol(rFormula.getFeaturesCol) + val pipeline = new Pipeline() - .setStages(Array(rFormula, decisionTree, idxToStr)) + .setStages(Array(rFormulaModel, decisionTreeClassification)) .fit(data) - new DecisionTreeClassifierWrapper(pipeline, features, labels) + + new DecisionTreeClassifierWrapper(pipeline, features, maxDepth, maxBins) } override def read: MLReader[DecisionTreeClassifierWrapper] = @@ -97,7 +94,8 @@ private[r] object DecisionTreeClassifierWrapper extends MLReadable[DecisionTreeC val rMetadata = ("class" -> instance.getClass.getName) ~ ("features" -> instance.features.toSeq) ~ - ("labels" -> instance.labels.toSeq) + ("maxDepth" -> instance.maxDepth) ~ + ("maxBins" -> instance.maxBins) val rMetadataJson: String = compact(render(rMetadata)) sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) @@ -116,8 +114,10 @@ private[r] object DecisionTreeClassifierWrapper extends MLReadable[DecisionTreeC val rMetadataStr = sc.textFile(rMetadataPath, 1).first() val rMetadata = parse(rMetadataStr) val features = (rMetadata \ "features").extract[Array[String]] - val labels = (rMetadata \ "labels").extract[Array[String]] - new DecisionTreeClassifierWrapper(pipeline, features, labels) + val maxDepth = (rMetadata \ "maxDepth").extract[Int] + val maxBins = (rMetadata \ "maxBins").extract[Int] + + new DecisionTreeClassifierWrapper(pipeline, features, maxDepth, maxBins) } } } \ No newline at end of file diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala index 89a82c452153..138fb826dae1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala @@ -23,8 +23,8 @@ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ import org.apache.spark.ml.{Pipeline, PipelineModel} -import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NominalAttribute} -import org.apache.spark.ml.feature.{IndexToString, RFormula} +import org.apache.spark.ml.attribute.AttributeGroup +import org.apache.spark.ml.feature.RFormula import org.apache.spark.ml.regression.{DecisionTreeRegressionModel, DecisionTreeRegressor} import org.apache.spark.ml.util._ import org.apache.spark.sql.{DataFrame, Dataset} @@ -32,20 +32,19 @@ import org.apache.spark.sql.{DataFrame, Dataset} private[r] class DecisionTreeRegressorWrapper private ( val pipeline: PipelineModel, val features: Array[String], - val labels: Array[String]) extends MLWritable { - - import DecisionTreeRegressorWrapper.PREDICTED_LABEL_INDEX_COL + val maxDepth: Int, + val maxBins: Int) extends MLWritable { private val DTModel: DecisionTreeRegressionModel = pipeline.stages(1).asInstanceOf[DecisionTreeRegressionModel] - lazy val maxDepth: Int = DTModel.getMaxDepth + lazy val depth: Int = DTModel.depth + lazy val numNodes: Int = DTModel.numNodes - lazy val maxBins: Int = DTModel.getMaxBins + def summary: String = DTModel.toDebugString def transform(dataset: Dataset[_]): DataFrame = { pipeline.transform(dataset) - .drop(PREDICTED_LABEL_INDEX_COL) .drop(DTModel.getFeaturesCol) } @@ -54,33 +53,36 @@ private[r] class DecisionTreeRegressorWrapper private ( } private[r] object DecisionTreeRegressorWrapper extends MLReadable[DecisionTreeRegressorWrapper] { + def fit(data: DataFrame, + formula: String, + maxDepth: Int, + maxBins: Int): DecisionTreeRegressorWrapper = { - val PREDICTED_LABEL_INDEX_COL = "pred_label_idx" - val PREDICTED_LABEL_COL = "prediction" - - def fit(data: DataFrame, formula: String): DecisionTreeRegressorWrapper = { val rFormula = new RFormula() .setFormula(formula) - .fit(data) - // get labels and feature names from output schema - val schema = rFormula.transform(data).schema - val labelAttr = Attribute.fromStructField(schema(rFormula.getLabelCol)) - .asInstanceOf[NominalAttribute] - val labels = labelAttr.values.get - val featureAttrs = AttributeGroup.fromStructField(schema(rFormula.getFeaturesCol)) + .setFeaturesCol("features") + .setLabelCol("label") + + RWrapperUtils.checkDataColumns(rFormula, data) + val rFormulaModel = rFormula.fit(data) + + // get feature names from output schema + val schema = rFormulaModel.transform(data).schema + val featureAttrs = AttributeGroup.fromStructField(schema(rFormulaModel.getFeaturesCol)) .attributes.get val features = featureAttrs.map(_.name.get) + // assemble and fit the pipeline - val decisionTree = new DecisionTreeRegressor() - .setPredictionCol(PREDICTED_LABEL_INDEX_COL) - val idxToStr = new IndexToString() - .setInputCol(PREDICTED_LABEL_INDEX_COL) - .setOutputCol(PREDICTED_LABEL_COL) - .setLabels(labels) + val decisionTreeRegression = new DecisionTreeRegressor() + .setMaxDepth(maxDepth) + .setMaxBins(maxBins) + .setFeaturesCol(rFormula.getFeaturesCol) + val pipeline = new Pipeline() - .setStages(Array(rFormula, decisionTree, idxToStr)) + .setStages(Array(rFormulaModel, decisionTreeRegression)) .fit(data) - new DecisionTreeRegressorWrapper(pipeline, features, labels) + + new DecisionTreeRegressorWrapper(pipeline, features, maxDepth, maxBins) } override def read: MLReader[DecisionTreeRegressorWrapper] = new DecisionTreeRegressorWrapperReader @@ -96,7 +98,8 @@ private[r] object DecisionTreeRegressorWrapper extends MLReadable[DecisionTreeRe val rMetadata = ("class" -> instance.getClass.getName) ~ ("features" -> instance.features.toSeq) ~ - ("labels" -> instance.labels.toSeq) + ("maxDepth" -> instance.maxDepth) ~ + ("maxBins" -> instance.maxBins) val rMetadataJson: String = compact(render(rMetadata)) sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) @@ -115,8 +118,10 @@ private[r] object DecisionTreeRegressorWrapper extends MLReadable[DecisionTreeRe val rMetadataStr = sc.textFile(rMetadataPath, 1).first() val rMetadata = parse(rMetadataStr) val features = (rMetadata \ "features").extract[Array[String]] - val labels = (rMetadata \ "labels").extract[Array[String]] - new DecisionTreeRegressorWrapper(pipeline, features, labels) + val maxDepth = (rMetadata \ "maxDepth").extract[Int] + val maxBins = (rMetadata \ "maxBins").extract[Int] + + new DecisionTreeRegressorWrapper(pipeline, features, maxDepth, maxBins) } } } From 9787219defef01ad435608d328c4014874d9ea89 Mon Sep 17 00:00:00 2001 From: Kai Jiang Date: Thu, 6 Oct 2016 01:24:02 -0700 Subject: [PATCH 4/7] decision tree documentation --- R/pkg/R/mllib.R | 43 ++++++++++++++++--- .../org/apache/spark/ml/r/RWrappers.scala | 2 + 2 files changed, 40 insertions(+), 5 deletions(-) diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index f44b6e7f1c5c..2ed555516edb 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -1447,14 +1447,47 @@ print.summary.KSTest <- function(x, ...) { invisible(x) } -#' Decision Tree +#' Decision Tree Model for Regression and Classification #' -#' @description -#' \code{spark.decisionTree} tree +#' \code{spark.decisionTree} fits a Decision Tree Regression model or Classification model on +#' a SparkDataFrame. Users can call \code{summary} to get a summary of the fitted Decision Tree +#' model, \code{predict} to make predictions on new data, and \code{write.ml}/\code{read.ml} to +#' save/load fitted models. +#' For more details, see \href{https://en.wikipedia.org/wiki/Decision_tree_learning}{Decision Tree} +#' +#' @param data a SparkDataFrame for training. +#' @param formula a symbolic description of the model to be fitted. Currently only a few formula +#' operators are supported, including '~', ':', '+', and '-'. +#' @param type type of model to fit +#' @param maxDepth Maximum depth of the tree (>= 0). +#' @param maxBins Maximum number of bins used for discretizing continuous features and for choosing +#' how to split on features at each node. More bins give higher granularity. Must be +#' >= 2 and >= number of categories in any categorical feature. (default = 32) +#' @param ... additional arguments passed to the method. +#' @aliases spark.decisionTree,SparkDataFrame,formula-method +#' @return \code{spark.decisionTree} returns a fitted Decision Tree model. +#' @rdname spark.decisionTree +#' @name spark.decisionTree +#' @export +#' @examples +#' \dontrun{ +#' df <- createDataFrame(longley) #' -#' Decision Tree +#' # fit a Decision Tree Regression Model +#' model <- spark.decisionTree(data, Employed~., "regression", maxDepth=5, maxBins=16) #' -#' @param data a SparkDataFrame of user data. +#' # get the summary of the model +#' summary(model) +#' +#' # make predictions +#' predictions <- predict(model, df) +#' +#' # save and load the model +#' path <- "path/to/model" +#' write.ml(model, path) +#' savedModel <- read.ml(path) +#' summary(savedModel) +#' } #' @note spark.decisionTree since 2.1.0 setMethod("spark.decisionTree", signature(data = "SparkDataFrame", formula = "formula"), function(data, formula, type = c("regression", "classification"), maxDepth = 5, maxBins = 32 ) { diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala index 5de3a2101eae..e31de4689afe 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala @@ -56,6 +56,8 @@ private[r] object RWrappers extends MLReader[Object] { ALSWrapper.load(path) case "org.apache.spark.ml.r.DecisionTreeRegressorWrapper" => DecisionTreeRegressorWrapper.load(path) + case "org.apache.spark.ml.r.DecisionTreeClassifierWrapper" => + DecisionTreeClassifierWrapper.load(path) case _ => throw new SparkException(s"SparkR read.ml does not support load $className") } From d107ab9d5651fed0bb5b5ef6fe0aecf87baac759 Mon Sep 17 00:00:00 2001 From: Kai Jiang Date: Thu, 6 Oct 2016 01:41:48 -0700 Subject: [PATCH 5/7] formatting --- R/pkg/R/mllib.R | 3 ++- R/pkg/inst/tests/testthat/test_mllib.R | 2 +- .../org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 2ed555516edb..08b9388fd4e1 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -1490,7 +1490,8 @@ print.summary.KSTest <- function(x, ...) { #' } #' @note spark.decisionTree since 2.1.0 setMethod("spark.decisionTree", signature(data = "SparkDataFrame", formula = "formula"), - function(data, formula, type = c("regression", "classification"), maxDepth = 5, maxBins = 32 ) { + function(data, formula, type = c("regression", "classification"), + maxDepth = 5, maxBins = 32 ) { formula <- paste(deparse(formula), collapse = "") if (identical(type, "regression")) { jobj <- callJStatic("org.apache.spark.ml.r.DecisionTreeRegressorWrapper", "fit", diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index 337e4d6b0c38..dfb744c55d6a 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -793,7 +793,7 @@ test_that("spark.kstest", { test_that("spark.decisionTree Regression", { data <- suppressWarnings(createDataFrame(longley)) - model <- spark.decisionTree(data, Employed~., "regression", maxDepth=5, maxBins=16) + model <- spark.decisionTree(data, Employed~., "regression", maxDepth = 5, maxBins = 16) #Test summary stats <- summary(model) diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala index c31e90de2492..130bf9de9542 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala @@ -120,4 +120,4 @@ private[r] object DecisionTreeClassifierWrapper extends MLReadable[DecisionTreeC new DecisionTreeClassifierWrapper(pipeline, features, maxDepth, maxBins) } } -} \ No newline at end of file +} From d034735b31e83d182d92152230716d0d10a2c39d Mon Sep 17 00:00:00 2001 From: Kai Jiang Date: Thu, 6 Oct 2016 03:44:09 -0700 Subject: [PATCH 6/7] classification unit test --- R/pkg/R/mllib.R | 36 ++++++++++--------- R/pkg/inst/tests/testthat/test_mllib.R | 25 +++++++++++++ .../ml/r/DecisionTreeClassifierWrapper.scala | 6 ++++ 3 files changed, 50 insertions(+), 17 deletions(-) diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 08b9388fd4e1..defb8290381a 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -1474,7 +1474,7 @@ print.summary.KSTest <- function(x, ...) { #' df <- createDataFrame(longley) #' #' # fit a Decision Tree Regression Model -#' model <- spark.decisionTree(data, Employed~., "regression", maxDepth=5, maxBins=16) +#' model <- spark.decisionTree(data, Employed~., type = "regression", maxDepth = 5, maxBins = 16) #' #' # get the summary of the model #' summary(model) @@ -1579,7 +1579,7 @@ setMethod("summary", signature(object = "DecisionTreeRegressionModel"), features <- callJMethod(jobj, "features") depth <- callJMethod(jobj, "depth") numNodes <- callJMethod(jobj, "numNodes") - ans <- list(features = features, depth = depth, numNodes = numNodes) + ans <- list(features = features, depth = depth, numNodes = numNodes, jobj = jobj) class(ans) <- "summary.DecisionTreeRegressionModel" ans }) @@ -1594,15 +1594,17 @@ setMethod("summary", signature(object = "DecisionTreeRegressionModel"), #' @export #' @note summary(DecisionTreeRegressionModel) since 2.1.0 setMethod("summary", signature(object = "DecisionTreeClassificationModel"), -function(object, ...) { - jobj <- object@jobj - features <- callJMethod(jobj, "features") - depth <- callJMethod(jobj, "depth") - numNodes <- callJMethod(jobj, "numNodes") - ans <- list(features = features, depth = depth, numNodes = numNodes) - class(ans) <- "summary.DecisionTreeClassificationModel" - ans -}) + function(object, ...) { + jobj <- object@jobj + features <- callJMethod(jobj, "features") + depth <- callJMethod(jobj, "depth") + numNodes <- callJMethod(jobj, "numNodes") + numClasses <- callJMethod(jobj, "numClasses") + ans <- list(features = features, depth = depth, + numNodes = numNodes, numClasses = numClasses, jobj = jobj) + class(ans) <- "summary.DecisionTreeClassificationModel" + ans + }) # Prints the summary of Decision Tree Regression Model @@ -1611,11 +1613,11 @@ function(object, ...) { #' @export #' @note print.summary.DecisionTreeRegressionModel since 2.1.0 print.summary.DecisionTreeRegressionModel <- function(x, ...) { - jobj <- x@jobj - summaryStr <- callJMethod(jobj, "summary") - cat(summaryStr, "\n") - invisible(x) - } + jobj <- x$jobj + summaryStr <- callJMethod(jobj, "summary") + cat(summaryStr, "\n") + invisible(x) +} # Prints the summary of Decision Tree Classification Model @@ -1624,7 +1626,7 @@ print.summary.DecisionTreeRegressionModel <- function(x, ...) { #' @export #' @note print.summary.DecisionTreeClassificationModel since 2.1.0 print.summary.DecisionTreeClassificationModel <- function(x, ...) { - jobj <- x@jobj + jobj <- x$jobj summaryStr <- callJMethod(jobj, "summary") cat(summaryStr, "\n") invisible(x) diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index dfb744c55d6a..560eb17ac309 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -821,4 +821,29 @@ test_that("spark.decisionTree Regression", { unlink(modelPath) }) +test_that("spark.decisionTree Classification", { + data <- suppressWarnings(createDataFrame(iris)) + model <- spark.decisionTree(data, Species ~ Petal_Length + Petal_Width, "classification", + maxDepth = 5, maxBins = 16) + + #Test summary + stats <- summary(model) + expect_equal(stats$depth, 5) + expect_equal(stats$numNodes, 19) + expect_equal(stats$numClasses, 3) + + # Test model save/load + modelPath <- tempfile(pattern = "spark-decisionTreeClassification", fileext = ".tmp") + write.ml(model, modelPath) + expect_error(write.ml(model, modelPath)) + write.ml(model, modelPath, overwrite = TRUE) + model2 <- read.ml(modelPath) + stats2 <- summary(model2) + expect_equal(stats$depth, stats2$depth) + expect_equal(stats$numNodes, stats2$numNodes) + expect_equal(stats$numClasses, stats2$numClasses) + + unlink(modelPath) +}) + sparkR.session.stop() diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala index 130bf9de9542..36a8670ceb1f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala @@ -38,6 +38,12 @@ private[r] class DecisionTreeClassifierWrapper private ( private val DTModel: DecisionTreeClassificationModel = pipeline.stages(1).asInstanceOf[DecisionTreeClassificationModel] + lazy val depth: Int = DTModel.depth + lazy val numNodes: Int = DTModel.numNodes + lazy val numClasses: Int = DTModel.numClasses + + def summary: String = DTModel.toDebugString + def transform(dataset: Dataset[_]): DataFrame = { pipeline.transform(dataset) .drop(DTModel.getFeaturesCol) From 0694f84e86a18179f06abf028d6e573c5c1af4b0 Mon Sep 17 00:00:00 2001 From: Kai Jiang Date: Fri, 7 Oct 2016 23:35:17 -0700 Subject: [PATCH 7/7] address comments --- R/pkg/R/mllib.R | 56 +++++++++++++------------- R/pkg/inst/tests/testthat/test_mllib.R | 2 +- 2 files changed, 28 insertions(+), 30 deletions(-) diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index defb8290381a..ec779bf6fc08 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -117,9 +117,9 @@ setClass("DecisionTreeClassificationModel", representation(jobj = "jobj")) #' @name write.ml #' @export #' @seealso \link{spark.glm}, \link{glm}, -#' @seealso \link{spark.als}, \link{spark.gaussianMixture}, \link{spark.isoreg}, \link{spark.kmeans}, -#' @seealso \link{spark.lda}, \link{spark.mlp}, \link{spark.naiveBayes}, \link{spark.survreg}, -#' @seealso \link{spark.decisionTree}, +#' @seealso \link{spark.als}, link{spark.decisionTree}, \link{spark.gaussianMixture}, +#' @seealso \link{spark.isoreg}, \link{spark.kmeans}, \link{spark.lda}, \link{spark.mlp}, +#' @seealso \link{spark.naiveBayes}, \link{spark.survreg}, #' @seealso \link{read.ml} NULL @@ -131,8 +131,9 @@ NULL #' @name predict #' @export #' @seealso \link{spark.glm}, \link{glm}, -#' @seealso \link{spark.als}, \link{spark.gaussianMixture}, \link{spark.isoreg}, \link{spark.kmeans}, -#' @seealso \link{spark.mlp}, \link{spark.naiveBayes}, \link{spark.survreg}, \link{spark.decisionTree} +#' @seealso \link{spark.als}, \link{spark.decisionTree}, \link{spark.gaussianMixture}, +#' @seealso \link{spark.isoreg}, \link{spark.kmeans}, \link{spark.mlp}, \link{spark.naiveBayes}, +#' @seealso \link{spark.survreg}, NULL write_internal <- function(object, path, overwrite = FALSE) { @@ -1474,7 +1475,7 @@ print.summary.KSTest <- function(x, ...) { #' df <- createDataFrame(longley) #' #' # fit a Decision Tree Regression Model -#' model <- spark.decisionTree(data, Employed~., type = "regression", maxDepth = 5, maxBins = 16) +#' model <- spark.decisionTree(data, Employed ~ ., type = "regression", maxDepth = 5, maxBins = 16) #' #' # get the summary of the model #' summary(model) @@ -1492,16 +1493,22 @@ print.summary.KSTest <- function(x, ...) { setMethod("spark.decisionTree", signature(data = "SparkDataFrame", formula = "formula"), function(data, formula, type = c("regression", "classification"), maxDepth = 5, maxBins = 32 ) { + type <- match.arg(type) formula <- paste(deparse(formula), collapse = "") - if (identical(type, "regression")) { - jobj <- callJStatic("org.apache.spark.ml.r.DecisionTreeRegressorWrapper", "fit", - data@sdf, formula, as.integer(maxDepth), as.integer(maxBins)) - new("DecisionTreeRegressionModel", jobj = jobj) - } else if (identical(type, "classification")) { - jobj <- callJStatic("org.apache.spark.ml.r.DecisionTreeClassifierWrapper", "fit", - data@sdf, formula, as.integer(maxDepth), as.integer(maxBins)) - new("DecisionTreeClassificationModel", jobj = jobj) - } + switch(type, + regression = { + jobj <- callJStatic("org.apache.spark.ml.r.DecisionTreeRegressorWrapper", + "fit", data@sdf, formula, as.integer(maxDepth), + as.integer(maxBins)) + new("DecisionTreeRegressionModel", jobj = jobj) + }, + classification = { + jobj <- callJStatic("org.apache.spark.ml.r.DecisionTreeClassifierWrapper", + "fit", data@sdf, formula, as.integer(maxDepth), + as.integer(maxBins)) + new("DecisionTreeClassificationModel", jobj = jobj) + } + ) }) # Makes predictions from a Decision Tree Regression model or @@ -1518,12 +1525,6 @@ setMethod("predict", signature(object = "DecisionTreeRegressionModel"), predict_internal(object, newData) }) -# Makes predictions from a Decision Tree Classification model or -# a model produced by spark.decisionTree() - -#' @param newData a SparkDataFrame for testing. -#' @return \code{predict} returns a SparkDataFrame containing predicted labeled in a column named -#' "prediction" #' @rdname spark.decisionTree #' @export #' @note predict(decisionTreeClassificationModel) since 2.1.0 @@ -1566,9 +1567,9 @@ setMethod("write.ml", signature(object = "DecisionTreeClassificationModel", path # Get the summary of an DecisionTreeRegressionModel model -#' @param object a fitted DecisionTreeRegressionModel -#' @param ... Other optional arguments to summary of a DecisionTreeRegressionModel +#' @param object a fitted DecisionTreeRegressionModel or DecisionTreeClassificationModel #' @return \code{summary} returns the model's features as lists, depth and number of nodes +#' or number of classes. #' @rdname spark.decisionTree #' @aliases summary,DecisionTreeRegressionModel-method #' @export @@ -1586,13 +1587,10 @@ setMethod("summary", signature(object = "DecisionTreeRegressionModel"), # Get the summary of an DecisionTreeClassificationModel model -#' @param object a fitted DecisionTreeClassificationModel -#' @param ... Other optional arguments to summary of a DecisionTreeClassificationModel -#' @return \code{summary} returns the model's features as lists, depth and number of nodes #' @rdname spark.decisionTree #' @aliases summary,DecisionTreeClassificationModel-method #' @export -#' @note summary(DecisionTreeRegressionModel) since 2.1.0 +#' @note summary(DecisionTreeClassificationModel) since 2.1.0 setMethod("summary", signature(object = "DecisionTreeClassificationModel"), function(object, ...) { jobj <- object@jobj @@ -1609,7 +1607,8 @@ setMethod("summary", signature(object = "DecisionTreeClassificationModel"), # Prints the summary of Decision Tree Regression Model #' @rdname spark.decisionTree -#' @param x summary object of decisionTreeRegressionModel returned by \code{summary}. +#' @param x summary object of decisionTreeRegressionModel or decisionTreeClassificationModel +#' returned by \code{summary}. #' @export #' @note print.summary.DecisionTreeRegressionModel since 2.1.0 print.summary.DecisionTreeRegressionModel <- function(x, ...) { @@ -1622,7 +1621,6 @@ print.summary.DecisionTreeRegressionModel <- function(x, ...) { # Prints the summary of Decision Tree Classification Model #' @rdname spark.decisionTree -#' @param x summary object of decisionTreeClassificationModel returned by \code{summary}. #' @export #' @note print.summary.DecisionTreeClassificationModel since 2.1.0 print.summary.DecisionTreeClassificationModel <- function(x, ...) { diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index 560eb17ac309..3833b0370667 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -793,7 +793,7 @@ test_that("spark.kstest", { test_that("spark.decisionTree Regression", { data <- suppressWarnings(createDataFrame(longley)) - model <- spark.decisionTree(data, Employed~., "regression", maxDepth = 5, maxBins = 16) + model <- spark.decisionTree(data, Employed ~ ., "regression", maxDepth = 5, maxBins = 16) #Test summary stats <- summary(model)