Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 15 additions & 23 deletions R/pkg/R/mllib_classification.R
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,16 @@ setClass("MultilayerPerceptronClassificationModel", representation(jobj = "jobj"
#' @note NaiveBayesModel since 2.0.0
setClass("NaiveBayesModel", representation(jobj = "jobj"))

#' linear SVM Model
#' Linear SVM Model
#'
#' Fits an linear SVM model against a SparkDataFrame. It is a binary classifier, similar to svm in glmnet package
#' Fits a linear SVM model against a SparkDataFrame, similar to svm in e1071 package.
#' Currently only supports binary classification model with linear kernel.
#' Users can print, make predictions on the produced model and save the model to the input path.
#'
#' @param data SparkDataFrame for training.
#' @param formula A symbolic description of the model to be fitted. Currently only a few formula
#' operators are supported, including '~', '.', ':', '+', and '-'.
#' @param regParam The regularization parameter.
#' @param regParam The regularization parameter. Only supports L2 regularization currently.
#' @param maxIter Maximum iteration number.
#' @param tol Convergence tolerance of iterations.
#' @param standardization Whether to standardize the training features before fitting the model. The coefficients
Expand Down Expand Up @@ -111,10 +112,10 @@ setMethod("spark.svmLinear", signature(data = "SparkDataFrame", formula = "formu
new("LinearSVCModel", jobj = jobj)
})

# Predicted values based on an LinearSVCModel model
# Predicted values based on a LinearSVCModel model

#' @param newData a SparkDataFrame for testing.
#' @return \code{predict} returns the predicted values based on an LinearSVCModel.
#' @return \code{predict} returns the predicted values based on a LinearSVCModel.
#' @rdname spark.svmLinear
#' @aliases predict,LinearSVCModel,SparkDataFrame-method
#' @export
Expand All @@ -124,36 +125,27 @@ setMethod("predict", signature(object = "LinearSVCModel"),
predict_internal(object, newData)
})

# Get the summary of an LinearSVCModel
# Get the summary of a LinearSVCModel

#' @param object an LinearSVCModel fitted by \code{spark.svmLinear}.
#' @param object a LinearSVCModel fitted by \code{spark.svmLinear}.
#' @return \code{summary} returns summary information of the fitted model, which is a list.
#' The list includes \code{coefficients} (coefficients of the fitted model),
#' \code{intercept} (intercept of the fitted model), \code{numClasses} (number of classes),
#' \code{numFeatures} (number of features).
#' \code{numClasses} (number of classes), \code{numFeatures} (number of features).
#' @rdname spark.svmLinear
#' @aliases summary,LinearSVCModel-method
#' @export
#' @note summary(LinearSVCModel) since 2.2.0
setMethod("summary", signature(object = "LinearSVCModel"),
function(object) {
jobj <- object@jobj
features <- callJMethod(jobj, "features")
labels <- callJMethod(jobj, "labels")
coefficients <- callJMethod(jobj, "coefficients")
nCol <- length(coefficients) / length(features)
coefficients <- matrix(unlist(coefficients), ncol = nCol)
intercept <- callJMethod(jobj, "intercept")
features <- callJMethod(jobj, "rFeatures")
coefficients <- callJMethod(jobj, "rCoefficients")
coefficients <- as.matrix(unlist(coefficients))
colnames(coefficients) <- c("Estimate")
rownames(coefficients) <- unlist(features)
numClasses <- callJMethod(jobj, "numClasses")
numFeatures <- callJMethod(jobj, "numFeatures")
if (nCol == 1) {
colnames(coefficients) <- c("Estimate")
} else {
colnames(coefficients) <- unlist(labels)
}
rownames(coefficients) <- unlist(features)
list(coefficients = coefficients, intercept = intercept,
numClasses = numClasses, numFeatures = numFeatures)
list(coefficients = coefficients, numClasses = numClasses, numFeatures = numFeatures)
})

# Save fitted LinearSVCModel to the input path
Expand Down
16 changes: 16 additions & 0 deletions R/pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -907,3 +907,19 @@ basenameSansExtFromUrl <- function(url) {
isAtomicLengthOne <- function(x) {
is.atomic(x) && length(x) == 1
}

is_cran <- function() {
!identical(Sys.getenv("NOT_CRAN"), "true")
}

is_windows <- function() {
.Platform$OS.type == "windows"
}

hadoop_home_set <- function() {
!identical(Sys.getenv("HADOOP_HOME"), "")
}

not_cran_or_windows_with_hadoop <- function() {
!is_cran() && (!is_windows() || hadoop_home_set())
}
93 changes: 50 additions & 43 deletions R/pkg/inst/tests/testthat/test_mllib_classification.R
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ test_that("spark.svmLinear", {
expect_true(class(summary$coefficients[, 1]) == "numeric")

coefs <- summary$coefficients[, "Estimate"]
expected_coefs <- c(-0.1563083, -0.460648, 0.2276626, 1.055085)
expected_coefs <- c(-0.06004978, -0.1563083, -0.460648, 0.2276626, 1.055085)
expect_true(all(abs(coefs - expected_coefs) < 0.1))
expect_equal(summary$intercept, -0.06004978, tolerance = 1e-2)

# Test prediction with string label
prediction <- predict(model, training)
Expand All @@ -50,15 +49,17 @@ test_that("spark.svmLinear", {
expect_equal(sort(as.list(take(select(prediction, "prediction"), 10))[[1]]), expected)

# Test model save and load
modelPath <- tempfile(pattern = "spark-svm-linear", fileext = ".tmp")
write.ml(model, modelPath)
expect_error(write.ml(model, modelPath))
write.ml(model, modelPath, overwrite = TRUE)
model2 <- read.ml(modelPath)
coefs <- summary(model)$coefficients
coefs2 <- summary(model2)$coefficients
expect_equal(coefs, coefs2)
unlink(modelPath)
if (not_cran_or_windows_with_hadoop()) {
modelPath <- tempfile(pattern = "spark-svm-linear", fileext = ".tmp")
write.ml(model, modelPath)
expect_error(write.ml(model, modelPath))
write.ml(model, modelPath, overwrite = TRUE)
model2 <- read.ml(modelPath)
coefs <- summary(model)$coefficients
coefs2 <- summary(model2)$coefficients
expect_equal(coefs, coefs2)
unlink(modelPath)
}

# Test prediction with numeric label
label <- c(0.0, 0.0, 0.0, 1.0, 1.0)
Expand Down Expand Up @@ -128,15 +129,17 @@ test_that("spark.logit", {
expect_true(all(abs(setosaCoefs - setosaCoefs) < 0.1))

# Test model save and load
modelPath <- tempfile(pattern = "spark-logit", fileext = ".tmp")
write.ml(model, modelPath)
expect_error(write.ml(model, modelPath))
write.ml(model, modelPath, overwrite = TRUE)
model2 <- read.ml(modelPath)
coefs <- summary(model)$coefficients
coefs2 <- summary(model2)$coefficients
expect_equal(coefs, coefs2)
unlink(modelPath)
if (not_cran_or_windows_with_hadoop()) {
modelPath <- tempfile(pattern = "spark-logit", fileext = ".tmp")
write.ml(model, modelPath)
expect_error(write.ml(model, modelPath))
write.ml(model, modelPath, overwrite = TRUE)
model2 <- read.ml(modelPath)
coefs <- summary(model)$coefficients
coefs2 <- summary(model2)$coefficients
expect_equal(coefs, coefs2)
unlink(modelPath)
}

# R code to reproduce the result.
# nolint start
Expand Down Expand Up @@ -243,19 +246,21 @@ test_that("spark.mlp", {
expect_equal(head(mlpPredictions$prediction, 6), c("1.0", "0.0", "0.0", "0.0", "0.0", "0.0"))

# Test model save/load
modelPath <- tempfile(pattern = "spark-mlp", fileext = ".tmp")
write.ml(model, modelPath)
expect_error(write.ml(model, modelPath))
write.ml(model, modelPath, overwrite = TRUE)
model2 <- read.ml(modelPath)
summary2 <- summary(model2)

expect_equal(summary2$numOfInputs, 4)
expect_equal(summary2$numOfOutputs, 3)
expect_equal(summary2$layers, c(4, 5, 4, 3))
expect_equal(length(summary2$weights), 64)

unlink(modelPath)
if (not_cran_or_windows_with_hadoop()) {
modelPath <- tempfile(pattern = "spark-mlp", fileext = ".tmp")
write.ml(model, modelPath)
expect_error(write.ml(model, modelPath))
write.ml(model, modelPath, overwrite = TRUE)
model2 <- read.ml(modelPath)
summary2 <- summary(model2)

expect_equal(summary2$numOfInputs, 4)
expect_equal(summary2$numOfOutputs, 3)
expect_equal(summary2$layers, c(4, 5, 4, 3))
expect_equal(length(summary2$weights), 64)

unlink(modelPath)
}

# Test default parameter
model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3))
Expand Down Expand Up @@ -354,16 +359,18 @@ test_that("spark.naiveBayes", {
"Yes", "Yes", "No", "No"))

# Test model save/load
modelPath <- tempfile(pattern = "spark-naiveBayes", fileext = ".tmp")
write.ml(m, modelPath)
expect_error(write.ml(m, modelPath))
write.ml(m, modelPath, overwrite = TRUE)
m2 <- read.ml(modelPath)
s2 <- summary(m2)
expect_equal(s$apriori, s2$apriori)
expect_equal(s$tables, s2$tables)

unlink(modelPath)
if (not_cran_or_windows_with_hadoop()) {
modelPath <- tempfile(pattern = "spark-naiveBayes", fileext = ".tmp")
write.ml(m, modelPath)
expect_error(write.ml(m, modelPath))
write.ml(m, modelPath, overwrite = TRUE)
m2 <- read.ml(modelPath)
s2 <- summary(m2)
expect_equal(s$apriori, s2$apriori)
expect_equal(s$tables, s2$tables)

unlink(modelPath)
}

# Test e1071::naiveBayes
if (requireNamespace("e1071", quietly = TRUE)) {
Expand Down
112 changes: 60 additions & 52 deletions R/pkg/inst/tests/testthat/test_mllib_clustering.R
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,20 @@ test_that("spark.bisectingKmeans", {
c(0, 1, 2, 3))

# Test model save/load
modelPath <- tempfile(pattern = "spark-bisectingkmeans", fileext = ".tmp")
write.ml(model, modelPath)
expect_error(write.ml(model, modelPath))
write.ml(model, modelPath, overwrite = TRUE)
model2 <- read.ml(modelPath)
summary2 <- summary(model2)
expect_equal(sort(unlist(summary.model$size)), sort(unlist(summary2$size)))
expect_equal(summary.model$coefficients, summary2$coefficients)
expect_true(!summary.model$is.loaded)
expect_true(summary2$is.loaded)

unlink(modelPath)
if (not_cran_or_windows_with_hadoop()) {
modelPath <- tempfile(pattern = "spark-bisectingkmeans", fileext = ".tmp")
write.ml(model, modelPath)
expect_error(write.ml(model, modelPath))
write.ml(model, modelPath, overwrite = TRUE)
model2 <- read.ml(modelPath)
summary2 <- summary(model2)
expect_equal(sort(unlist(summary.model$size)), sort(unlist(summary2$size)))
expect_equal(summary.model$coefficients, summary2$coefficients)
expect_true(!summary.model$is.loaded)
expect_true(summary2$is.loaded)

unlink(modelPath)
}
})

test_that("spark.gaussianMixture", {
Expand Down Expand Up @@ -125,18 +127,20 @@ test_that("spark.gaussianMixture", {
expect_equal(p$prediction, c(0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1))

# Test model save/load
modelPath <- tempfile(pattern = "spark-gaussianMixture", fileext = ".tmp")
write.ml(model, modelPath)
expect_error(write.ml(model, modelPath))
write.ml(model, modelPath, overwrite = TRUE)
model2 <- read.ml(modelPath)
stats2 <- summary(model2)
expect_equal(stats$lambda, stats2$lambda)
expect_equal(unlist(stats$mu), unlist(stats2$mu))
expect_equal(unlist(stats$sigma), unlist(stats2$sigma))
expect_equal(unlist(stats$loglik), unlist(stats2$loglik))

unlink(modelPath)
if (not_cran_or_windows_with_hadoop()) {
modelPath <- tempfile(pattern = "spark-gaussianMixture", fileext = ".tmp")
write.ml(model, modelPath)
expect_error(write.ml(model, modelPath))
write.ml(model, modelPath, overwrite = TRUE)
model2 <- read.ml(modelPath)
stats2 <- summary(model2)
expect_equal(stats$lambda, stats2$lambda)
expect_equal(unlist(stats$mu), unlist(stats2$mu))
expect_equal(unlist(stats$sigma), unlist(stats2$sigma))
expect_equal(unlist(stats$loglik), unlist(stats2$loglik))

unlink(modelPath)
}
})

test_that("spark.kmeans", {
Expand Down Expand Up @@ -171,18 +175,20 @@ test_that("spark.kmeans", {
expect_true(class(summary.model$coefficients[1, ]) == "numeric")

# Test model save/load
modelPath <- tempfile(pattern = "spark-kmeans", fileext = ".tmp")
write.ml(model, modelPath)
expect_error(write.ml(model, modelPath))
write.ml(model, modelPath, overwrite = TRUE)
model2 <- read.ml(modelPath)
summary2 <- summary(model2)
expect_equal(sort(unlist(summary.model$size)), sort(unlist(summary2$size)))
expect_equal(summary.model$coefficients, summary2$coefficients)
expect_true(!summary.model$is.loaded)
expect_true(summary2$is.loaded)

unlink(modelPath)
if (not_cran_or_windows_with_hadoop()) {
modelPath <- tempfile(pattern = "spark-kmeans", fileext = ".tmp")
write.ml(model, modelPath)
expect_error(write.ml(model, modelPath))
write.ml(model, modelPath, overwrite = TRUE)
model2 <- read.ml(modelPath)
summary2 <- summary(model2)
expect_equal(sort(unlist(summary.model$size)), sort(unlist(summary2$size)))
expect_equal(summary.model$coefficients, summary2$coefficients)
expect_true(!summary.model$is.loaded)
expect_true(summary2$is.loaded)

unlink(modelPath)
}

# Test Kmeans on dataset that is sensitive to seed value
col1 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0)
Expand Down Expand Up @@ -236,22 +242,24 @@ test_that("spark.lda with libsvm", {
expect_true(logPrior <= 0 & !is.na(logPrior))

# Test model save/load
modelPath <- tempfile(pattern = "spark-lda", fileext = ".tmp")
write.ml(model, modelPath)
expect_error(write.ml(model, modelPath))
write.ml(model, modelPath, overwrite = TRUE)
model2 <- read.ml(modelPath)
stats2 <- summary(model2)

expect_true(stats2$isDistributed)
expect_equal(logLikelihood, stats2$logLikelihood)
expect_equal(logPerplexity, stats2$logPerplexity)
expect_equal(vocabSize, stats2$vocabSize)
expect_equal(vocabulary, stats2$vocabulary)
expect_equal(trainingLogLikelihood, stats2$trainingLogLikelihood)
expect_equal(logPrior, stats2$logPrior)

unlink(modelPath)
if (not_cran_or_windows_with_hadoop()) {
modelPath <- tempfile(pattern = "spark-lda", fileext = ".tmp")
write.ml(model, modelPath)
expect_error(write.ml(model, modelPath))
write.ml(model, modelPath, overwrite = TRUE)
model2 <- read.ml(modelPath)
stats2 <- summary(model2)

expect_true(stats2$isDistributed)
expect_equal(logLikelihood, stats2$logLikelihood)
expect_equal(logPerplexity, stats2$logPerplexity)
expect_equal(vocabSize, stats2$vocabSize)
expect_equal(vocabulary, stats2$vocabulary)
expect_equal(trainingLogLikelihood, stats2$trainingLogLikelihood)
expect_equal(logPrior, stats2$logPrior)

unlink(modelPath)
}
})

test_that("spark.lda with text input", {
Expand Down
16 changes: 9 additions & 7 deletions R/pkg/inst/tests/testthat/test_mllib_fpm.R
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,17 @@ test_that("spark.fpGrowth", {

expect_equivalent(expected_predictions, collect(predict(model, new_data)))

modelPath <- tempfile(pattern = "spark-fpm", fileext = ".tmp")
write.ml(model, modelPath, overwrite = TRUE)
loaded_model <- read.ml(modelPath)
if (not_cran_or_windows_with_hadoop()) {
modelPath <- tempfile(pattern = "spark-fpm", fileext = ".tmp")
write.ml(model, modelPath, overwrite = TRUE)
loaded_model <- read.ml(modelPath)

expect_equivalent(
itemsets,
collect(spark.freqItemsets(loaded_model)))
expect_equivalent(
itemsets,
collect(spark.freqItemsets(loaded_model)))

unlink(modelPath)
unlink(modelPath)
}

model_without_numpartitions <- spark.fpGrowth(data, minSupport = 0.3, minConfidence = 0.8)
expect_equal(
Expand Down
Loading