From 6ec068e5f48d393d539f4600bca3cbd1ea7d65a3 Mon Sep 17 00:00:00 2001 From: actuaryzhang Date: Wed, 2 Aug 2017 23:37:41 -0700 Subject: [PATCH 1/3] add offset to SparkR --- R/pkg/R/mllib_regression.R | 18 ++++++++++++++---- R/pkg/tests/fulltests/test_mllib_regression.R | 7 +++++++ .../r/GeneralizedLinearRegressionWrapper.scala | 2 ++ 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/R/pkg/R/mllib_regression.R b/R/pkg/R/mllib_regression.R index 9ecd887f2c12..12c6e690e962 100644 --- a/R/pkg/R/mllib_regression.R +++ b/R/pkg/R/mllib_regression.R @@ -65,6 +65,8 @@ setClass("IsotonicRegressionModel", representation(jobj = "jobj")) #' @param maxIter integer giving the maximal number of IRLS iterations. #' @param weightCol the weight column name. If this is not set or \code{NULL}, we treat all instance #' weights as 1.0. +#' @param offsetCol the offset column name. If this is not set or empty, we treat all instance offsets +#' as 0.0. The feature specified as offset has a constant coefficient of 1.0. #' @param regParam regularization parameter for L2 regularization. #' @param var.power the power in the variance function of the Tweedie distribution which provides #' the relationship between the variance and mean of the distribution. Only @@ -125,7 +127,7 @@ setClass("IsotonicRegressionModel", representation(jobj = "jobj")) #' @seealso \link{glm}, \link{read.ml} setMethod("spark.glm", signature(data = "SparkDataFrame", formula = "formula"), function(data, formula, family = gaussian, tol = 1e-6, maxIter = 25, weightCol = NULL, - regParam = 0.0, var.power = 0.0, link.power = 1.0 - var.power, + offsetCol = NULL, regParam = 0.0, var.power = 0.0, link.power = 1.0 - var.power, stringIndexerOrderType = c("frequencyDesc", "frequencyAsc", "alphabetDesc", "alphabetAsc")) { @@ -159,10 +161,16 @@ setMethod("spark.glm", signature(data = "SparkDataFrame", formula = "formula"), weightCol <- as.character(weightCol) } + if (!is.null(offsetCol) && offsetCol == "") { + offsetCol <- NULL + } else if (!is.null(offsetCol)) { + offsetCol <- as.character(offsetCol) + } + # For known families, Gamma is upper-cased jobj <- callJStatic("org.apache.spark.ml.r.GeneralizedLinearRegressionWrapper", "fit", formula, data@sdf, tolower(family$family), family$link, - tol, as.integer(maxIter), weightCol, regParam, + tol, as.integer(maxIter), weightCol, offsetCol, regParam, as.double(var.power), as.double(link.power), stringIndexerOrderType) new("GeneralizedLinearRegressionModel", jobj = jobj) @@ -182,6 +190,8 @@ setMethod("spark.glm", signature(data = "SparkDataFrame", formula = "formula"), #' \code{poisson}, \code{Gamma}, and \code{tweedie}. #' @param weightCol the weight column name. If this is not set or \code{NULL}, we treat all instance #' weights as 1.0. +#' @param offsetCol the offset column name. If this is not set or empty, we treat all instance offsets +#' as 0.0. The feature specified as offset has a constant coefficient of 1.0. #' @param epsilon positive convergence tolerance of iterations. #' @param maxit integer giving the maximal number of IRLS iterations. #' @param var.power the index of the power variance function in the Tweedie family. @@ -207,11 +217,11 @@ setMethod("spark.glm", signature(data = "SparkDataFrame", formula = "formula"), #' @seealso \link{spark.glm} setMethod("glm", signature(formula = "formula", family = "ANY", data = "SparkDataFrame"), function(formula, family = gaussian, data, epsilon = 1e-6, maxit = 25, weightCol = NULL, - var.power = 0.0, link.power = 1.0 - var.power, + offsetCol = NULL, var.power = 0.0, link.power = 1.0 - var.power, stringIndexerOrderType = c("frequencyDesc", "frequencyAsc", "alphabetDesc", "alphabetAsc")) { spark.glm(data, formula, family, tol = epsilon, maxIter = maxit, weightCol = weightCol, - var.power = var.power, link.power = link.power, + offsetCol = offsetCol, var.power = var.power, link.power = link.power, stringIndexerOrderType = stringIndexerOrderType) }) diff --git a/R/pkg/tests/fulltests/test_mllib_regression.R b/R/pkg/tests/fulltests/test_mllib_regression.R index 6b72a09b200d..92b9523127ff 100644 --- a/R/pkg/tests/fulltests/test_mllib_regression.R +++ b/R/pkg/tests/fulltests/test_mllib_regression.R @@ -173,6 +173,13 @@ test_that("spark.glm summary", { expect_equal(stats$df.residual, rStats$df.residual) expect_equal(stats$aic, rStats$aic) + # Test spark.glm works with offset + stats <- summary(spark.glm(training, Sepal_Width ~ Sepal_Length + Species, + family = poisson(), offsetCol = "Pedal_Length")) + rStats <- summary(glm(Sepal.Width ~ Sepal.Length + Species, + data = iris, family = poisson(), offset = Pedal_Length)) + expect_true(all(abs(rStats$coefficients - stats$coefficients) < 1e-3)) + # Test summary works on base GLM models baseModel <- stats::glm(Sepal.Width ~ Sepal.Length + Species, data = iris) baseSummary <- summary(baseModel) diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala index 176a6cf85291..6692ce3fc642 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala @@ -74,6 +74,7 @@ private[r] object GeneralizedLinearRegressionWrapper tol: Double, maxIter: Int, weightCol: String, + offsetCol: String, regParam: Double, variancePower: Double, linkPower: Double, @@ -99,6 +100,7 @@ private[r] object GeneralizedLinearRegressionWrapper glr.setLink(link) } if (weightCol != null) glr.setWeightCol(weightCol) + if (offsetCol != null) glr.setOffsetCol(offsetCol) val pipeline = new Pipeline() .setStages(Array(rFormulaModel, glr)) From dc8ccbc19b93b527898f9d20ba71228c3c8c895e Mon Sep 17 00:00:00 2001 From: actuaryzhang Date: Thu, 3 Aug 2017 13:30:57 -0700 Subject: [PATCH 2/3] fix unit test --- R/pkg/R/mllib_regression.R | 34 +++++++++++-------- R/pkg/tests/fulltests/test_mllib_regression.R | 6 ++-- .../GeneralizedLinearRegressionWrapper.scala | 4 +-- 3 files changed, 24 insertions(+), 20 deletions(-) diff --git a/R/pkg/R/mllib_regression.R b/R/pkg/R/mllib_regression.R index 12c6e690e962..ebaeae970218 100644 --- a/R/pkg/R/mllib_regression.R +++ b/R/pkg/R/mllib_regression.R @@ -65,8 +65,6 @@ setClass("IsotonicRegressionModel", representation(jobj = "jobj")) #' @param maxIter integer giving the maximal number of IRLS iterations. #' @param weightCol the weight column name. If this is not set or \code{NULL}, we treat all instance #' weights as 1.0. -#' @param offsetCol the offset column name. If this is not set or empty, we treat all instance offsets -#' as 0.0. The feature specified as offset has a constant coefficient of 1.0. #' @param regParam regularization parameter for L2 regularization. #' @param var.power the power in the variance function of the Tweedie distribution which provides #' the relationship between the variance and mean of the distribution. Only @@ -78,6 +76,8 @@ setClass("IsotonicRegressionModel", representation(jobj = "jobj")) #' "frequencyDesc", "frequencyAsc", "alphabetDesc", and "alphabetAsc". #' The default value is "frequencyDesc". When the ordering is set to #' "alphabetDesc", this drops the same category as R when encoding strings. +#' @param offsetCol the offset column name. If this is not set or empty, we treat all instance offsets +#' as 0.0. The feature specified as offset has a constant coefficient of 1.0. #' @param ... additional arguments passed to the method. #' @aliases spark.glm,SparkDataFrame,formula-method #' @return \code{spark.glm} returns a fitted generalized linear model. @@ -127,9 +127,10 @@ setClass("IsotonicRegressionModel", representation(jobj = "jobj")) #' @seealso \link{glm}, \link{read.ml} setMethod("spark.glm", signature(data = "SparkDataFrame", formula = "formula"), function(data, formula, family = gaussian, tol = 1e-6, maxIter = 25, weightCol = NULL, - offsetCol = NULL, regParam = 0.0, var.power = 0.0, link.power = 1.0 - var.power, + regParam = 0.0, var.power = 0.0, link.power = 1.0 - var.power, stringIndexerOrderType = c("frequencyDesc", "frequencyAsc", - "alphabetDesc", "alphabetAsc")) { + "alphabetDesc", "alphabetAsc"), + offsetCol = NULL) { stringIndexerOrderType <- match.arg(stringIndexerOrderType) if (is.character(family)) { @@ -161,18 +162,19 @@ setMethod("spark.glm", signature(data = "SparkDataFrame", formula = "formula"), weightCol <- as.character(weightCol) } - if (!is.null(offsetCol) && offsetCol == "") { - offsetCol <- NULL - } else if (!is.null(offsetCol)) { + if (!is.null(offsetCol)) { offsetCol <- as.character(offsetCol) + if (nchar(offsetCol) == 0) { + offsetCol <- NULL + } } # For known families, Gamma is upper-cased jobj <- callJStatic("org.apache.spark.ml.r.GeneralizedLinearRegressionWrapper", "fit", formula, data@sdf, tolower(family$family), family$link, - tol, as.integer(maxIter), weightCol, offsetCol, regParam, + tol, as.integer(maxIter), weightCol, regParam, as.double(var.power), as.double(link.power), - stringIndexerOrderType) + stringIndexerOrderType, offsetCol) new("GeneralizedLinearRegressionModel", jobj = jobj) }) @@ -190,8 +192,6 @@ setMethod("spark.glm", signature(data = "SparkDataFrame", formula = "formula"), #' \code{poisson}, \code{Gamma}, and \code{tweedie}. #' @param weightCol the weight column name. If this is not set or \code{NULL}, we treat all instance #' weights as 1.0. -#' @param offsetCol the offset column name. If this is not set or empty, we treat all instance offsets -#' as 0.0. The feature specified as offset has a constant coefficient of 1.0. #' @param epsilon positive convergence tolerance of iterations. #' @param maxit integer giving the maximal number of IRLS iterations. #' @param var.power the index of the power variance function in the Tweedie family. @@ -202,6 +202,8 @@ setMethod("spark.glm", signature(data = "SparkDataFrame", formula = "formula"), #' "frequencyDesc", "frequencyAsc", "alphabetDesc", and "alphabetAsc". #' The default value is "frequencyDesc". When the ordering is set to #' "alphabetDesc", this drops the same category as R when encoding strings. +#' @param offsetCol the offset column name. If this is not set or empty, we treat all instance offsets +#' as 0.0. The feature specified as offset has a constant coefficient of 1.0. #' @return \code{glm} returns a fitted generalized linear model. #' @rdname glm #' @export @@ -217,12 +219,14 @@ setMethod("spark.glm", signature(data = "SparkDataFrame", formula = "formula"), #' @seealso \link{spark.glm} setMethod("glm", signature(formula = "formula", family = "ANY", data = "SparkDataFrame"), function(formula, family = gaussian, data, epsilon = 1e-6, maxit = 25, weightCol = NULL, - offsetCol = NULL, var.power = 0.0, link.power = 1.0 - var.power, + var.power = 0.0, link.power = 1.0 - var.power, stringIndexerOrderType = c("frequencyDesc", "frequencyAsc", - "alphabetDesc", "alphabetAsc")) { + "alphabetDesc", "alphabetAsc"), + offsetCol = NULL) { spark.glm(data, formula, family, tol = epsilon, maxIter = maxit, weightCol = weightCol, - offsetCol = offsetCol, var.power = var.power, link.power = link.power, - stringIndexerOrderType = stringIndexerOrderType) + var.power = var.power, link.power = link.power, + stringIndexerOrderType = stringIndexerOrderType, + offsetCol = offsetCol) }) # Returns the summary of a model produced by glm() or spark.glm(), similarly to R's summary(). diff --git a/R/pkg/tests/fulltests/test_mllib_regression.R b/R/pkg/tests/fulltests/test_mllib_regression.R index 92b9523127ff..8c78ed8d8547 100644 --- a/R/pkg/tests/fulltests/test_mllib_regression.R +++ b/R/pkg/tests/fulltests/test_mllib_regression.R @@ -175,9 +175,9 @@ test_that("spark.glm summary", { # Test spark.glm works with offset stats <- summary(spark.glm(training, Sepal_Width ~ Sepal_Length + Species, - family = poisson(), offsetCol = "Pedal_Length")) - rStats <- summary(glm(Sepal.Width ~ Sepal.Length + Species, - data = iris, family = poisson(), offset = Pedal_Length)) + family = poisson(), offsetCol = "Petal_Length")) + rStats <- suppressWarnings(summary(glm(Sepal.Width ~ Sepal.Length + Species, + data = iris, family = poisson(), offset = iris$Petal.Length))) expect_true(all(abs(rStats$coefficients - stats$coefficients) < 1e-3)) # Test summary works on base GLM models diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala index 6692ce3fc642..64575b0cb0cb 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala @@ -74,11 +74,11 @@ private[r] object GeneralizedLinearRegressionWrapper tol: Double, maxIter: Int, weightCol: String, - offsetCol: String, regParam: Double, variancePower: Double, linkPower: Double, - stringIndexerOrderType: String): GeneralizedLinearRegressionWrapper = { + stringIndexerOrderType: String, + offsetCol: String): GeneralizedLinearRegressionWrapper = { // scalastyle:on val rFormula = new RFormula().setFormula(formula) .setStringIndexerOrderType(stringIndexerOrderType) From 3c4ebf9df54a2e54684b2da5ea977b5d3255b62a Mon Sep 17 00:00:00 2001 From: actuaryzhang Date: Thu, 3 Aug 2017 15:42:01 -0700 Subject: [PATCH 3/3] fix unit test --- R/pkg/tests/fulltests/test_mllib_regression.R | 1 + 1 file changed, 1 insertion(+) diff --git a/R/pkg/tests/fulltests/test_mllib_regression.R b/R/pkg/tests/fulltests/test_mllib_regression.R index 8c78ed8d8547..23daca75fcc2 100644 --- a/R/pkg/tests/fulltests/test_mllib_regression.R +++ b/R/pkg/tests/fulltests/test_mllib_regression.R @@ -174,6 +174,7 @@ test_that("spark.glm summary", { expect_equal(stats$aic, rStats$aic) # Test spark.glm works with offset + training <- suppressWarnings(createDataFrame(iris)) stats <- summary(spark.glm(training, Sepal_Width ~ Sepal_Length + Species, family = poisson(), offsetCol = "Petal_Length")) rStats <- suppressWarnings(summary(glm(Sepal.Width ~ Sepal.Length + Species,