Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
82 commits
Select commit Hold shift + click to select a range
9e5b4ce
[SPARK-19084][SQL] Ensure context class loader is set when initializi…
Mar 4, 2017
fbc4058
[SPARK-19816][SQL][TESTS] Fix an issue that DataFrameCallbackSuite do…
zsxwing Mar 4, 2017
6b0cfd9
[SPARK-19550][SPARKR][DOCS] Update R document to use JDK8
wangyum Mar 4, 2017
42c4cd9
[SPARK-19792][WEBUI] In the Master Page,the column named “Memory per …
10110346 Mar 5, 2017
f48461a
[SPARK-19805][TEST] Log the row type when query result dose not match
uncleGen Mar 5, 2017
14bb398
[SPARK-19254][SQL] Support Seq, Map, and Struct in functions.lit
maropu Mar 5, 2017
80d5338
[SPARK-19795][SPARKR] add column functions to_json, from_json
felixcheung Mar 5, 2017
28210cc
Merge pull request #6 from apache/master
lxsmnv Mar 5, 2017
369a148
[SPARK-19595][SQL] Support json array in from_json
HyukjinKwon Mar 5, 2017
70f9d7f
[SPARK-19535][ML] RecommendForAllUsers RecommendForAllItems for ALS o…
sueann Mar 6, 2017
224e0e7
[SPARK-19701][SQL][PYTHON] Throws a correct exception for 'in' operat…
HyukjinKwon Mar 6, 2017
207067e
[SPARK-19822][TEST] CheckpointSuite.testCheckpointedOperation: should…
uncleGen Mar 6, 2017
2a0bc86
[SPARK-17495][SQL] Support Decimal type in Hive-hash
tejasapatil Mar 6, 2017
339b53a
[SPARK-19737][SQL] New analysis rule for reporting unregistered funct…
liancheng Mar 6, 2017
46a64d1
[SPARK-19304][STREAMING][KINESIS] fix kinesis slow checkpoint recovery
Gauravshah Mar 6, 2017
096df6d
[SPARK-19257][SQL] location for table/partition/database should be ja…
windpiger Mar 6, 2017
12bf832
[SPARK-19796][CORE] Fix serialization of long property values in Task…
squito Mar 6, 2017
9991c2d
[SPARK-19211][SQL] Explicitly prevent Insert into View or Create View…
jiangxb1987 Mar 6, 2017
9265436
[SPARK-19382][ML] Test sparse vectors in LinearSVCSuite
wangmiao1981 Mar 6, 2017
f6471dc
[SPARK-19709][SQL] Read empty file with CSV data source
wojtek-szymanski Mar 6, 2017
b0a5cd8
[SPARK-19719][SS] Kafka writer for both structured streaming and batc…
Mar 7, 2017
9909f6d
[SPARK-19350][SQL] Cardinality estimation of Limit and Sample
Mar 7, 2017
1f6c090
[SPARK-19818][SPARKR] rbind should check for name consistency of inpu…
actuaryzhang Mar 7, 2017
e52499e
[SPARK-19832][SQL] DynamicPartitionWriteTask get partitionPath should…
windpiger Mar 7, 2017
932196d
[SPARK-17075][SQL][FOLLOWUP] fix filter estimation issues
Mar 7, 2017
030acdd
[SPARK-19637][SQL] Add to_json in FunctionRegistry
maropu Mar 7, 2017
c05baab
[SPARK-19765][SPARK-18549][SQL] UNCACHE TABLE should un-cache all cac…
cloud-fan Mar 7, 2017
4a9034b
[SPARK-17498][ML] StringIndexer enhancement for handling unseen labels
Mar 7, 2017
d69aeea
[SPARK-19516][DOC] update public doc to use SparkSession instead of S…
cloud-fan Mar 7, 2017
49570ed
[SPARK-19803][TEST] flaky BlockManagerReplicationSuite test failure
uncleGen Mar 7, 2017
6f46846
[SPARK-19561] [PYTHON] cast TimestampType.toInternal output to long
Mar 7, 2017
2e30c0b
[SPARK-19702][MESOS] Increase default refuse_seconds timeout in the M…
Mar 7, 2017
8e41c2e
[SPARK-19857][YARN] Correctly calculate next credential update time.
Mar 8, 2017
47b2f68
Revert "[SPARK-19561] [PYTHON] cast TimestampType.toInternal output t…
cloud-fan Mar 8, 2017
c96d14a
[SPARK-19843][SQL] UTF8String => (int / long) conversion expensive fo…
tejasapatil Mar 8, 2017
b9783a9
[SPARK-18389][SQL] Disallow cyclic view reference
jiangxb1987 Mar 8, 2017
ca849ac
[SPARK-19841][SS] watermarkPredicate should filter based on keys
zsxwing Mar 8, 2017
d8830c5
[SPARK-19859][SS] The new watermark should override the old one
zsxwing Mar 8, 2017
56e1bd3
[SPARK-17629][ML] methods to return synonyms directly
Mar 8, 2017
314e48a
[SPARK-18055][SQL] Use correct mirror in ExpresionEncoder
marmbrus Mar 8, 2017
1fa5886
[ML][MINOR] Separate estimator and model params for read/write test.
yanboliang Mar 8, 2017
81303f7
[SPARK-19806][ML][PYSPARK] PySpark GeneralizedLinearRegression suppor…
yanboliang Mar 8, 2017
3f9f918
[SPARK-19693][SQL] Make the SET mapreduce.job.reduces automatically c…
wangyum Mar 8, 2017
9ea201c
[SPARK-16440][MLLIB] Ensure broadcasted variables are destroyed even …
Mar 8, 2017
e442748
[SPARK-17080][SQL] join reorder
Mar 8, 2017
5f7d835
[SPARK-19865][SQL] remove the view identifier in SubqueryAlias
jiangxb1987 Mar 8, 2017
9a6ac72
[SPARK-19601][SQL] Fix CollapseRepartition rule to preserve shuffle-e…
gatorsmile Mar 8, 2017
e420fd4
[SPARK-19843][SQL][FOLLOWUP] Classdoc for `IntWrapper` and `LongWrapper`
tejasapatil Mar 8, 2017
f3387d9
[SPARK-19864][SQL][TEST] provide a makeQualifiedPath functions to opt…
windpiger Mar 8, 2017
e9e2c61
[SPARK-19727][SQL] Fix for round function that modifies original column
wojtek-szymanski Mar 8, 2017
1bf9012
[SPARK-19858][SS] Add output mode to flatMapGroupsWithState and disal…
zsxwing Mar 8, 2017
6570cfd
[SPARK-19540][SQL] Add ability to clone SparkSession wherein cloned s…
kunalkhamar Mar 8, 2017
4551290
[SPARK-15463][SQL] Add an API to load DataFrame from Dataset[String] …
HyukjinKwon Mar 8, 2017
a3648b5
[SPARK-19813] maxFilesPerTrigger combo latestFirst may miss old files…
brkyvz Mar 8, 2017
d809cee
[MINOR][SQL] The analyzer rules are fired twice for cases when Analys…
dilipbiswal Mar 9, 2017
09829be
[SPARK-19235][SQL][TESTS] Enable Test Cases in DDLSuite with Hive Met…
gatorsmile Mar 9, 2017
029e40b
[SPARK-19874][BUILD] Hide API docs for org.apache.spark.sql.internal
zsxwing Mar 9, 2017
eeb1d6d
[SPARK-19859][SS][FOLLOW-UP] The new watermark should override the ol…
uncleGen Mar 9, 2017
274973d
[SPARK-19763][SQL] qualified external datasource table location store…
windpiger Mar 9, 2017
206030b
[SPARK-19561][SQL] add int case handling for TimestampType
Mar 9, 2017
b60b9fc
[SPARK-19757][CORE] DriverEndpoint#makeOffers race against CoarseGrai…
jxiang Mar 9, 2017
3232e54
[SPARK-19793] Use clock.getTimeMillis when mark task as finished in T…
Mar 9, 2017
40da4d1
[SPARK-19715][STRUCTURED STREAMING] Option to Strip Paths in FileSource
lw-lin Mar 9, 2017
30b18e6
[SPARK-19861][SS] watermark should not be a negative time.
uncleGen Mar 9, 2017
cabe1df
[SPARK-12334][SQL][PYSPARK] Support read from multiple input paths fo…
zjffdu Mar 9, 2017
f79371a
[SPARK-19611][SQL] Introduce configurable table schema inference
Mar 9, 2017
82138e0
[SPARK-19886] Fix reportDataLoss if statement in SS KafkaSource
brkyvz Mar 10, 2017
5949e6c
[SPARK-19008][SQL] Improve performance of Dataset.map by eliminating …
kiszk Mar 10, 2017
501b711
[SPARK-19891][SS] Await Batch Lock notified on stream execution exit
Mar 10, 2017
fcb68e0
[SPARK-19786][SQL] Facilitate loop optimizations in a JIT compiler re…
kiszk Mar 10, 2017
dd9049e
[SPARK-19620][SQL] Fix incorrect exchange coordinator id in the physi…
carsonwang Mar 10, 2017
8f0490e
[SPARK-17979][SPARK-14453] Remove deprecated SPARK_YARN_USER_ENV and …
yongtang Mar 10, 2017
bc30351
[SPARK-19611][SQL] Preserve metastore field order when merging inferr…
Mar 10, 2017
ffee4f1
[SPARK-19905][SQL] Bring back Dataset.inputFiles for Hive SerDe tables
liancheng Mar 10, 2017
fb9beda
[SPARK-19893][SQL] should not run DataFrame set oprations with map type
cloud-fan Mar 11, 2017
f6fdf92
[SPARK-19723][SQL] create datasource table with an non-existent locat…
windpiger Mar 11, 2017
f851ecc
Merge pull request #7 from apache/master
lxsmnv Mar 11, 2017
e29a74d
[DOCS][SS] fix structured streaming python example
uncleGen Mar 12, 2017
2f5187b
[SPARK-19831][CORE] Reuse the existing cleanupThreadExecutor to clean…
hustfxj Mar 12, 2017
9f8ce48
[SPARK-19282][ML][SPARKR] RandomForest Wrapper and GBT Wrapper return…
keypointt Mar 12, 2017
0eaba14
Merge pull request #9 from apache/master
lxsmnv Mar 12, 2017
b346adb
Merge branch 'SPARK-19340' into master
lxsmnv Mar 12, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion R/WINDOWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ To build SparkR on Windows, the following steps are required
include Rtools and R in `PATH`.

2. Install
[JDK7](http://www.oracle.com/technetwork/java/javase/downloads/jdk7-downloads-1880260.html) and set
[JDK8](http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html) and set
`JAVA_HOME` in the system environment variables.

3. Download and install [Maven](http://maven.apache.org/download.html). Also include the `bin`
Expand Down
2 changes: 2 additions & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ exportMethods("%in%",
"floor",
"format_number",
"format_string",
"from_json",
"from_unixtime",
"from_utc_timestamp",
"getField",
Expand Down Expand Up @@ -327,6 +328,7 @@ exportMethods("%in%",
"toDegrees",
"toRadians",
"to_date",
"to_json",
"to_timestamp",
"to_utc_timestamp",
"translate",
Expand Down
8 changes: 7 additions & 1 deletion R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -2642,6 +2642,7 @@ generateAliasesForIntersectedCols <- function (x, intersectedColNames, suffix) {
#'
#' Return a new SparkDataFrame containing the union of rows in this SparkDataFrame
#' and another SparkDataFrame. This is equivalent to \code{UNION ALL} in SQL.
#' Input SparkDataFrames can have different schemas (names and data types).
#'
#' Note: This does not remove duplicate rows across the two SparkDataFrames.
#'
Expand Down Expand Up @@ -2685,7 +2686,8 @@ setMethod("unionAll",

#' Union two or more SparkDataFrames
#'
#' Union two or more SparkDataFrames. This is equivalent to \code{UNION ALL} in SQL.
#' Union two or more SparkDataFrames by row. As in R's \code{rbind}, this method
#' requires that the input SparkDataFrames have the same column names.
#'
#' Note: This does not remove duplicate rows across the two SparkDataFrames.
#'
Expand All @@ -2709,6 +2711,10 @@ setMethod("unionAll",
setMethod("rbind",
signature(... = "SparkDataFrame"),
function(x, ..., deparse.level = 1) {
nm <- lapply(list(x, ...), names)
if (length(unique(nm)) != 1) {
stop("Names of input data frames are different.")
}
if (nargs() == 3) {
union(x, ...)
} else {
Expand Down
57 changes: 57 additions & 0 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -1793,6 +1793,33 @@ setMethod("to_date",
column(jc)
})

#' to_json
#'
#' Converts a column containing a \code{structType} into a Column of JSON string.
#' Resolving the Column can fail if an unsupported type is encountered.
#'
#' @param x Column containing the struct
#' @param ... additional named properties to control how it is converted, accepts the same options
#' as the JSON data source.
#'
#' @family normal_funcs
#' @rdname to_json
#' @name to_json
#' @aliases to_json,Column-method
#' @export
#' @examples
#' \dontrun{
#' to_json(df$t, dateFormat = 'dd/MM/yyyy')
#' select(df, to_json(df$t))
#'}
#' @note to_json since 2.2.0
setMethod("to_json", signature(x = "Column"),
function(x, ...) {
options <- varargsToStrEnv(...)
jc <- callJStatic("org.apache.spark.sql.functions", "to_json", x@jc, options)
column(jc)
})

#' to_timestamp
#'
#' Converts the column into a TimestampType. You may optionally specify a format
Expand Down Expand Up @@ -2403,6 +2430,36 @@ setMethod("date_format", signature(y = "Column", x = "character"),
column(jc)
})

#' from_json
#'
#' Parses a column containing a JSON string into a Column of \code{structType} with the specified
#' \code{schema}. If the string is unparseable, the Column will contains the value NA.
#'
#' @param x Column containing the JSON string.
#' @param schema a structType object to use as the schema to use when parsing the JSON string.
#' @param ... additional named properties to control how the json is parsed, accepts the same
#' options as the JSON data source.
#'
#' @family normal_funcs
#' @rdname from_json
#' @name from_json
#' @aliases from_json,Column,structType-method
#' @export
#' @examples
#' \dontrun{
#' schema <- structType(structField("name", "string"),
#' select(df, from_json(df$value, schema, dateFormat = "dd/MM/yyyy"))
#'}
#' @note from_json since 2.2.0
setMethod("from_json", signature(x = "Column", schema = "structType"),
function(x, schema, ...) {
options <- varargsToStrEnv(...)
jc <- callJStatic("org.apache.spark.sql.functions",
"from_json",
x@jc, schema$jobj, options)
column(jc)
})

#' from_utc_timestamp
#'
#' Given a timestamp, which corresponds to a certain time of day in UTC, returns another timestamp
Expand Down
8 changes: 8 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -991,6 +991,10 @@ setGeneric("format_number", function(y, x) { standardGeneric("format_number") })
#' @export
setGeneric("format_string", function(format, x, ...) { standardGeneric("format_string") })

#' @rdname from_json
#' @export
setGeneric("from_json", function(x, schema, ...) { standardGeneric("from_json") })

#' @rdname from_unixtime
#' @export
setGeneric("from_unixtime", function(x, ...) { standardGeneric("from_unixtime") })
Expand Down Expand Up @@ -1265,6 +1269,10 @@ setGeneric("toRadians", function(x) { standardGeneric("toRadians") })
#' @export
setGeneric("to_date", function(x, format) { standardGeneric("to_date") })

#' @rdname to_json
#' @export
setGeneric("to_json", function(x, ...) { standardGeneric("to_json") })

#' @rdname to_timestamp
#' @export
setGeneric("to_timestamp", function(x, format) { standardGeneric("to_timestamp") })
Expand Down
11 changes: 7 additions & 4 deletions R/pkg/R/mllib_tree.R
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@ summary.treeEnsemble <- function(model) {
numFeatures <- callJMethod(jobj, "numFeatures")
features <- callJMethod(jobj, "features")
featureImportances <- callJMethod(callJMethod(jobj, "featureImportances"), "toString")
maxDepth <- callJMethod(jobj, "maxDepth")
numTrees <- callJMethod(jobj, "numTrees")
treeWeights <- callJMethod(jobj, "treeWeights")
list(formula = formula,
numFeatures = numFeatures,
features = features,
featureImportances = featureImportances,
maxDepth = maxDepth,
numTrees = numTrees,
treeWeights = treeWeights,
jobj = jobj)
Expand All @@ -70,6 +72,7 @@ print.summary.treeEnsemble <- function(x) {
cat("\nNumber of features: ", x$numFeatures)
cat("\nFeatures: ", unlist(x$features))
cat("\nFeature importances: ", x$featureImportances)
cat("\nMax Depth: ", x$maxDepth)
cat("\nNumber of trees: ", x$numTrees)
cat("\nTree weights: ", unlist(x$treeWeights))

Expand Down Expand Up @@ -197,8 +200,8 @@ setMethod("spark.gbt", signature(data = "SparkDataFrame", formula = "formula"),
#' @return \code{summary} returns summary information of the fitted model, which is a list.
#' The list of components includes \code{formula} (formula),
#' \code{numFeatures} (number of features), \code{features} (list of features),
#' \code{featureImportances} (feature importances), \code{numTrees} (number of trees),
#' and \code{treeWeights} (tree weights).
#' \code{featureImportances} (feature importances), \code{maxDepth} (max depth of trees),
#' \code{numTrees} (number of trees), and \code{treeWeights} (tree weights).
#' @rdname spark.gbt
#' @aliases summary,GBTRegressionModel-method
#' @export
Expand Down Expand Up @@ -403,8 +406,8 @@ setMethod("spark.randomForest", signature(data = "SparkDataFrame", formula = "fo
#' @return \code{summary} returns summary information of the fitted model, which is a list.
#' The list of components includes \code{formula} (formula),
#' \code{numFeatures} (number of features), \code{features} (list of features),
#' \code{featureImportances} (feature importances), \code{numTrees} (number of trees),
#' and \code{treeWeights} (tree weights).
#' \code{featureImportances} (feature importances), \code{maxDepth} (max depth of trees),
#' \code{numTrees} (number of trees), and \code{treeWeights} (tree weights).
#' @rdname spark.randomForest
#' @aliases summary,RandomForestRegressionModel-method
#' @export
Expand Down
10 changes: 10 additions & 0 deletions R/pkg/inst/tests/testthat/test_mllib_tree.R
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ test_that("spark.gbt", {
tolerance = 1e-4)
stats <- summary(model)
expect_equal(stats$numTrees, 20)
expect_equal(stats$maxDepth, 5)
expect_equal(stats$formula, "Employed ~ .")
expect_equal(stats$numFeatures, 6)
expect_equal(length(stats$treeWeights), 20)
Expand All @@ -53,6 +54,7 @@ test_that("spark.gbt", {
expect_equal(stats$numFeatures, stats2$numFeatures)
expect_equal(stats$features, stats2$features)
expect_equal(stats$featureImportances, stats2$featureImportances)
expect_equal(stats$maxDepth, stats2$maxDepth)
expect_equal(stats$numTrees, stats2$numTrees)
expect_equal(stats$treeWeights, stats2$treeWeights)

Expand All @@ -66,6 +68,7 @@ test_that("spark.gbt", {
stats <- summary(model)
expect_equal(stats$numFeatures, 2)
expect_equal(stats$numTrees, 20)
expect_equal(stats$maxDepth, 5)
expect_error(capture.output(stats), NA)
expect_true(length(capture.output(stats)) > 6)
predictions <- collect(predict(model, data))$prediction
Expand Down Expand Up @@ -93,6 +96,7 @@ test_that("spark.gbt", {
expect_equal(iris2$NumericSpecies, as.double(collect(predict(m, df))$prediction))
expect_equal(s$numFeatures, 5)
expect_equal(s$numTrees, 20)
expect_equal(stats$maxDepth, 5)

# spark.gbt classification can work on libsvm data
data <- read.df(absoluteSparkPath("data/mllib/sample_binary_classification_data.txt"),
Expand All @@ -116,6 +120,7 @@ test_that("spark.randomForest", {

stats <- summary(model)
expect_equal(stats$numTrees, 1)
expect_equal(stats$maxDepth, 5)
expect_error(capture.output(stats), NA)
expect_true(length(capture.output(stats)) > 6)

Expand All @@ -129,6 +134,7 @@ test_that("spark.randomForest", {
tolerance = 1e-4)
stats <- summary(model)
expect_equal(stats$numTrees, 20)
expect_equal(stats$maxDepth, 5)

modelPath <- tempfile(pattern = "spark-randomForestRegression", fileext = ".tmp")
write.ml(model, modelPath)
Expand All @@ -141,6 +147,7 @@ test_that("spark.randomForest", {
expect_equal(stats$features, stats2$features)
expect_equal(stats$featureImportances, stats2$featureImportances)
expect_equal(stats$numTrees, stats2$numTrees)
expect_equal(stats$maxDepth, stats2$maxDepth)
expect_equal(stats$treeWeights, stats2$treeWeights)

unlink(modelPath)
Expand All @@ -153,6 +160,7 @@ test_that("spark.randomForest", {
stats <- summary(model)
expect_equal(stats$numFeatures, 2)
expect_equal(stats$numTrees, 20)
expect_equal(stats$maxDepth, 5)
expect_error(capture.output(stats), NA)
expect_true(length(capture.output(stats)) > 6)
# Test string prediction values
Expand Down Expand Up @@ -187,6 +195,8 @@ test_that("spark.randomForest", {
stats <- summary(model)
expect_equal(stats$numFeatures, 2)
expect_equal(stats$numTrees, 20)
expect_equal(stats$maxDepth, 5)

# Test numeric prediction values
predictions <- collect(predict(model, data))$prediction
expect_equal(length(grep("1.0", predictions)), 50)
Expand Down
54 changes: 45 additions & 9 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ mockLinesComplexType <-
complexTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
writeLines(mockLinesComplexType, complexTypeJsonPath)

# For test map type and struct type in DataFrame
mockLinesMapType <- c("{\"name\":\"Bob\",\"info\":{\"age\":16,\"height\":176.5}}",
"{\"name\":\"Alice\",\"info\":{\"age\":20,\"height\":164.3}}",
"{\"name\":\"David\",\"info\":{\"age\":60,\"height\":180}}")
mapTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
writeLines(mockLinesMapType, mapTypeJsonPath)

test_that("calling sparkRSQL.init returns existing SQL context", {
sqlContext <- suppressWarnings(sparkRSQL.init(sc))
expect_equal(suppressWarnings(sparkRSQL.init(sc)), sqlContext)
Expand Down Expand Up @@ -466,13 +473,6 @@ test_that("create DataFrame from a data.frame with complex types", {
expect_equal(ldf$an_envir, collected$an_envir)
})

# For test map type and struct type in DataFrame
mockLinesMapType <- c("{\"name\":\"Bob\",\"info\":{\"age\":16,\"height\":176.5}}",
"{\"name\":\"Alice\",\"info\":{\"age\":20,\"height\":164.3}}",
"{\"name\":\"David\",\"info\":{\"age\":60,\"height\":180}}")
mapTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
writeLines(mockLinesMapType, mapTypeJsonPath)

test_that("Collect DataFrame with complex types", {
# ArrayType
df <- read.json(complexTypeJsonPath)
Expand Down Expand Up @@ -1337,6 +1337,33 @@ test_that("column functions", {
df <- createDataFrame(data.frame(x = c(2.5, 3.5)))
expect_equal(collect(select(df, bround(df$x, 0)))[[1]][1], 2)
expect_equal(collect(select(df, bround(df$x, 0)))[[1]][2], 4)

# Test to_json(), from_json()
df <- read.json(mapTypeJsonPath)
j <- collect(select(df, alias(to_json(df$info), "json")))
expect_equal(j[order(j$json), ][1], "{\"age\":16,\"height\":176.5}")
df <- as.DataFrame(j)
schema <- structType(structField("age", "integer"),
structField("height", "double"))
s <- collect(select(df, alias(from_json(df$json, schema), "structcol")))
expect_equal(ncol(s), 1)
expect_equal(nrow(s), 3)
expect_is(s[[1]][[1]], "struct")
expect_true(any(apply(s, 1, function(x) { x[[1]]$age == 16 } )))

# passing option
df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}")))
schema2 <- structType(structField("date", "date"))
expect_error(tryCatch(collect(select(df, from_json(df$col, schema2))),
error = function(e) { stop(e) }),
paste0(".*(java.lang.NumberFormatException: For input string:).*"))
s <- collect(select(df, from_json(df$col, schema2, dateFormat = "dd/MM/yyyy")))
expect_is(s[[1]][[1]]$date, "Date")
expect_equal(as.character(s[[1]][[1]]$date), "2014-10-21")

# check for unparseable
df <- as.DataFrame(list(list("a" = "")))
expect_equal(collect(select(df, from_json(df$a, schema)))[[1]][[1]], NA)
})

test_that("column binary mathfunctions", {
Expand Down Expand Up @@ -1823,6 +1850,13 @@ test_that("union(), rbind(), except(), and intersect() on a DataFrame", {
expect_equal(count(unioned2), 12)
expect_equal(first(unioned2)$name, "Michael")

df3 <- df2
names(df3)[1] <- "newName"
expect_error(rbind(df, df3),
"Names of input data frames are different.")
expect_error(rbind(df, df2, df3),
"Names of input data frames are different.")

excepted <- arrange(except(df, df2), desc(df$age))
expect_is(unioned, "SparkDataFrame")
expect_equal(count(excepted), 2)
Expand Down Expand Up @@ -2558,8 +2592,8 @@ test_that("coalesce, repartition, numPartitions", {

df2 <- repartition(df1, 10)
expect_equal(getNumPartitions(df2), 10)
expect_equal(getNumPartitions(coalesce(df2, 13)), 5)
expect_equal(getNumPartitions(coalesce(df2, 7)), 5)
expect_equal(getNumPartitions(coalesce(df2, 13)), 10)
expect_equal(getNumPartitions(coalesce(df2, 7)), 7)
expect_equal(getNumPartitions(coalesce(df2, 3)), 3)
})

Expand Down Expand Up @@ -2867,5 +2901,7 @@ unlink(parquetPath)
unlink(orcPath)
unlink(jsonPath)
unlink(jsonPathNa)
unlink(complexTypeJsonPath)
unlink(mapTypeJsonPath)

sparkR.session.stop()
Loading