Skip to content

Commit c121047

Browse files
author
Andrew Or
committed
Merge branch 'master' of github.com:apache/spark into dag-viz-streaming
2 parents 65ef3e9 + 5db18ba commit c121047

File tree

199 files changed

+8507
-2224
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

199 files changed

+8507
-2224
lines changed

R/pkg/NAMESPACE

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ exportMethods("arrange",
3737
"registerTempTable",
3838
"rename",
3939
"repartition",
40-
"sampleDF",
40+
"sample",
4141
"sample_frac",
4242
"saveAsParquetFile",
4343
"saveAsTable",
@@ -53,7 +53,8 @@ exportMethods("arrange",
5353
"unpersist",
5454
"where",
5555
"withColumn",
56-
"withColumnRenamed")
56+
"withColumnRenamed",
57+
"write.df")
5758

5859
exportClasses("Column")
5960

@@ -101,6 +102,7 @@ export("cacheTable",
101102
"jsonFile",
102103
"loadDF",
103104
"parquetFile",
105+
"read.df",
104106
"sql",
105107
"table",
106108
"tableNames",

R/pkg/R/DataFrame.R

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ setMethod("isLocal",
150150
callJMethod(x@sdf, "isLocal")
151151
})
152152

153-
#' ShowDF
153+
#' showDF
154154
#'
155155
#' Print the first numRows rows of a DataFrame
156156
#'
@@ -170,7 +170,8 @@ setMethod("isLocal",
170170
setMethod("showDF",
171171
signature(x = "DataFrame"),
172172
function(x, numRows = 20) {
173-
callJMethod(x@sdf, "showString", numToInt(numRows))
173+
s <- callJMethod(x@sdf, "showString", numToInt(numRows))
174+
cat(s)
174175
})
175176

176177
#' show
@@ -187,7 +188,7 @@ setMethod("showDF",
187188
#' sqlCtx <- sparkRSQL.init(sc)
188189
#' path <- "path/to/file.json"
189190
#' df <- jsonFile(sqlCtx, path)
190-
#' show(df)
191+
#' df
191192
#'}
192193
setMethod("show", "DataFrame",
193194
function(object) {
@@ -293,8 +294,8 @@ setMethod("registerTempTable",
293294
#'\dontrun{
294295
#' sc <- sparkR.init()
295296
#' sqlCtx <- sparkRSQL.init(sc)
296-
#' df <- loadDF(sqlCtx, path, "parquet")
297-
#' df2 <- loadDF(sqlCtx, path2, "parquet")
297+
#' df <- read.df(sqlCtx, path, "parquet")
298+
#' df2 <- read.df(sqlCtx, path2, "parquet")
298299
#' registerTempTable(df, "table1")
299300
#' insertInto(df2, "table1", overwrite = TRUE)
300301
#'}
@@ -472,14 +473,14 @@ setMethod("distinct",
472473
dataFrame(sdf)
473474
})
474475

475-
#' SampleDF
476+
#' Sample
476477
#'
477478
#' Return a sampled subset of this DataFrame using a random seed.
478479
#'
479480
#' @param x A SparkSQL DataFrame
480481
#' @param withReplacement Sampling with replacement or not
481482
#' @param fraction The (rough) sample target fraction
482-
#' @rdname sampleDF
483+
#' @rdname sample
483484
#' @aliases sample_frac
484485
#' @export
485486
#' @examples
@@ -488,10 +489,10 @@ setMethod("distinct",
488489
#' sqlCtx <- sparkRSQL.init(sc)
489490
#' path <- "path/to/file.json"
490491
#' df <- jsonFile(sqlCtx, path)
491-
#' collect(sampleDF(df, FALSE, 0.5))
492-
#' collect(sampleDF(df, TRUE, 0.5))
492+
#' collect(sample(df, FALSE, 0.5))
493+
#' collect(sample(df, TRUE, 0.5))
493494
#'}
494-
setMethod("sampleDF",
495+
setMethod("sample",
495496
# TODO : Figure out how to send integer as java.lang.Long to JVM so
496497
# we can send seed as an argument through callJMethod
497498
signature(x = "DataFrame", withReplacement = "logical",
@@ -502,13 +503,13 @@ setMethod("sampleDF",
502503
dataFrame(sdf)
503504
})
504505

505-
#' @rdname sampleDF
506-
#' @aliases sampleDF
506+
#' @rdname sample
507+
#' @aliases sample
507508
setMethod("sample_frac",
508509
signature(x = "DataFrame", withReplacement = "logical",
509510
fraction = "numeric"),
510511
function(x, withReplacement, fraction) {
511-
sampleDF(x, withReplacement, fraction)
512+
sample(x, withReplacement, fraction)
512513
})
513514

514515
#' Count
@@ -1302,17 +1303,17 @@ setMethod("except",
13021303
#' @param source A name for external data source
13031304
#' @param mode One of 'append', 'overwrite', 'error', 'ignore'
13041305
#'
1305-
#' @rdname saveAsTable
1306+
#' @rdname write.df
13061307
#' @export
13071308
#' @examples
13081309
#'\dontrun{
13091310
#' sc <- sparkR.init()
13101311
#' sqlCtx <- sparkRSQL.init(sc)
13111312
#' path <- "path/to/file.json"
13121313
#' df <- jsonFile(sqlCtx, path)
1313-
#' saveAsTable(df, "myfile")
1314+
#' write.df(df, "myfile", "parquet", "overwrite")
13141315
#' }
1315-
setMethod("saveDF",
1316+
setMethod("write.df",
13161317
signature(df = "DataFrame", path = 'character', source = 'character',
13171318
mode = 'character'),
13181319
function(df, path = NULL, source = NULL, mode = "append", ...){
@@ -1333,6 +1334,15 @@ setMethod("saveDF",
13331334
callJMethod(df@sdf, "save", source, jmode, options)
13341335
})
13351336

1337+
#' @rdname write.df
1338+
#' @aliases saveDF
1339+
#' @export
1340+
setMethod("saveDF",
1341+
signature(df = "DataFrame", path = 'character', source = 'character',
1342+
mode = 'character'),
1343+
function(df, path = NULL, source = NULL, mode = "append", ...){
1344+
write.df(df, path, source, mode, ...)
1345+
})
13361346

13371347
#' saveAsTable
13381348
#'

R/pkg/R/RDD.R

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ setMethod("initialize", "RDD", function(.Object, jrdd, serializedMode,
6767
})
6868

6969
setMethod("show", "RDD",
70-
function(.Object) {
71-
cat(paste(callJMethod(.Object@jrdd, "toString"), "\n", sep=""))
70+
function(object) {
71+
cat(paste(callJMethod(getJRDD(object), "toString"), "\n", sep=""))
7272
})
7373

7474
setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val) {
@@ -927,7 +927,7 @@ setMethod("takeSample", signature(x = "RDD", withReplacement = "logical",
927927
MAXINT)))))
928928

929929
# TODO(zongheng): investigate if this call is an in-place shuffle?
930-
sample(samples)[1:total]
930+
base::sample(samples)[1:total]
931931
})
932932

933933
# Creates tuples of the elements in this RDD by applying a function.
@@ -996,7 +996,7 @@ setMethod("coalesce",
996996
if (shuffle || numPartitions > SparkR:::numPartitions(x)) {
997997
func <- function(partIndex, part) {
998998
set.seed(partIndex) # partIndex as seed
999-
start <- as.integer(sample(numPartitions, 1) - 1)
999+
start <- as.integer(base::sample(numPartitions, 1) - 1)
10001000
lapply(seq_along(part),
10011001
function(i) {
10021002
pos <- (start + i) %% numPartitions

R/pkg/R/SQLContext.R

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ clearCache <- function(sqlCtx) {
421421
#' \dontrun{
422422
#' sc <- sparkR.init()
423423
#' sqlCtx <- sparkRSQL.init(sc)
424-
#' df <- loadDF(sqlCtx, path, "parquet")
424+
#' df <- read.df(sqlCtx, path, "parquet")
425425
#' registerTempTable(df, "table")
426426
#' dropTempTable(sqlCtx, "table")
427427
#' }
@@ -450,10 +450,10 @@ dropTempTable <- function(sqlCtx, tableName) {
450450
#'\dontrun{
451451
#' sc <- sparkR.init()
452452
#' sqlCtx <- sparkRSQL.init(sc)
453-
#' df <- load(sqlCtx, "path/to/file.json", source = "json")
453+
#' df <- read.df(sqlCtx, "path/to/file.json", source = "json")
454454
#' }
455455

456-
loadDF <- function(sqlCtx, path = NULL, source = NULL, ...) {
456+
read.df <- function(sqlCtx, path = NULL, source = NULL, ...) {
457457
options <- varargsToEnv(...)
458458
if (!is.null(path)) {
459459
options[['path']] <- path
@@ -462,6 +462,13 @@ loadDF <- function(sqlCtx, path = NULL, source = NULL, ...) {
462462
dataFrame(sdf)
463463
}
464464

465+
#' @aliases loadDF
466+
#' @export
467+
468+
loadDF <- function(sqlCtx, path = NULL, source = NULL, ...) {
469+
read.df(sqlCtx, path, source, ...)
470+
}
471+
465472
#' Create an external table
466473
#'
467474
#' Creates an external table based on the dataset in a data source,

R/pkg/R/generics.R

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -456,19 +456,19 @@ setGeneric("rename", function(x, ...) { standardGeneric("rename") })
456456
#' @export
457457
setGeneric("registerTempTable", function(x, tableName) { standardGeneric("registerTempTable") })
458458

459-
#' @rdname sampleDF
459+
#' @rdname sample
460460
#' @export
461-
setGeneric("sample_frac",
461+
setGeneric("sample",
462462
function(x, withReplacement, fraction, seed) {
463-
standardGeneric("sample_frac")
464-
})
463+
standardGeneric("sample")
464+
})
465465

466-
#' @rdname sampleDF
466+
#' @rdname sample
467467
#' @export
468-
setGeneric("sampleDF",
468+
setGeneric("sample_frac",
469469
function(x, withReplacement, fraction, seed) {
470-
standardGeneric("sampleDF")
471-
})
470+
standardGeneric("sample_frac")
471+
})
472472

473473
#' @rdname saveAsParquetFile
474474
#' @export
@@ -480,7 +480,11 @@ setGeneric("saveAsTable", function(df, tableName, source, mode, ...) {
480480
standardGeneric("saveAsTable")
481481
})
482482

483-
#' @rdname saveAsTable
483+
#' @rdname write.df
484+
#' @export
485+
setGeneric("write.df", function(df, path, source, mode, ...) { standardGeneric("write.df") })
486+
487+
#' @rdname write.df
484488
#' @export
485489
setGeneric("saveDF", function(df, path, source, mode, ...) { standardGeneric("saveDF") })
486490

R/pkg/R/group.R

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,7 @@ setMethod("agg",
102102
}
103103
}
104104
jcols <- lapply(cols, function(c) { c@jc })
105-
# the GroupedData.agg(col, cols*) API does not contain grouping Column
106-
sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "aggWithGrouping",
107-
x@sgd, listToSeq(jcols))
105+
sdf <- callJMethod(x@sgd, "agg", jcols[[1]], listToSeq(jcols[-1]))
108106
} else {
109107
stop("agg can only support Column or character")
110108
}

R/pkg/inst/tests/test_sparkSQL.R

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -209,18 +209,18 @@ test_that("registerTempTable() results in a queryable table and sql() results in
209209
})
210210

211211
test_that("insertInto() on a registered table", {
212-
df <- loadDF(sqlCtx, jsonPath, "json")
213-
saveDF(df, parquetPath, "parquet", "overwrite")
214-
dfParquet <- loadDF(sqlCtx, parquetPath, "parquet")
212+
df <- read.df(sqlCtx, jsonPath, "json")
213+
write.df(df, parquetPath, "parquet", "overwrite")
214+
dfParquet <- read.df(sqlCtx, parquetPath, "parquet")
215215

216216
lines <- c("{\"name\":\"Bob\", \"age\":24}",
217217
"{\"name\":\"James\", \"age\":35}")
218218
jsonPath2 <- tempfile(pattern="jsonPath2", fileext=".tmp")
219219
parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
220220
writeLines(lines, jsonPath2)
221-
df2 <- loadDF(sqlCtx, jsonPath2, "json")
222-
saveDF(df2, parquetPath2, "parquet", "overwrite")
223-
dfParquet2 <- loadDF(sqlCtx, parquetPath2, "parquet")
221+
df2 <- read.df(sqlCtx, jsonPath2, "json")
222+
write.df(df2, parquetPath2, "parquet", "overwrite")
223+
dfParquet2 <- read.df(sqlCtx, parquetPath2, "parquet")
224224

225225
registerTempTable(dfParquet, "table1")
226226
insertInto(dfParquet2, "table1")
@@ -421,12 +421,12 @@ test_that("distinct() on DataFrames", {
421421
expect_true(count(uniques) == 3)
422422
})
423423

424-
test_that("sampleDF on a DataFrame", {
424+
test_that("sample on a DataFrame", {
425425
df <- jsonFile(sqlCtx, jsonPath)
426-
sampled <- sampleDF(df, FALSE, 1.0)
426+
sampled <- sample(df, FALSE, 1.0)
427427
expect_equal(nrow(collect(sampled)), count(df))
428428
expect_true(inherits(sampled, "DataFrame"))
429-
sampled2 <- sampleDF(df, FALSE, 0.1)
429+
sampled2 <- sample(df, FALSE, 0.1)
430430
expect_true(count(sampled2) < 3)
431431

432432
# Also test sample_frac
@@ -491,16 +491,16 @@ test_that("column calculation", {
491491
expect_true(count(df2) == 3)
492492
})
493493

494-
test_that("load() from json file", {
495-
df <- loadDF(sqlCtx, jsonPath, "json")
494+
test_that("read.df() from json file", {
495+
df <- read.df(sqlCtx, jsonPath, "json")
496496
expect_true(inherits(df, "DataFrame"))
497497
expect_true(count(df) == 3)
498498
})
499499

500-
test_that("save() as parquet file", {
501-
df <- loadDF(sqlCtx, jsonPath, "json")
502-
saveDF(df, parquetPath, "parquet", mode="overwrite")
503-
df2 <- loadDF(sqlCtx, parquetPath, "parquet")
500+
test_that("write.df() as parquet file", {
501+
df <- read.df(sqlCtx, jsonPath, "json")
502+
write.df(df, parquetPath, "parquet", mode="overwrite")
503+
df2 <- read.df(sqlCtx, parquetPath, "parquet")
504504
expect_true(inherits(df2, "DataFrame"))
505505
expect_true(count(df2) == 3)
506506
})
@@ -653,7 +653,8 @@ test_that("toJSON() returns an RDD of the correct values", {
653653

654654
test_that("showDF()", {
655655
df <- jsonFile(sqlCtx, jsonPath)
656-
expect_output(showDF(df), "+----+-------+\n| age| name|\n+----+-------+\n|null|Michael|\n| 30| Andy|\n| 19| Justin|\n+----+-------+\n")
656+
s <- capture.output(showDF(df))
657+
expect_output(s , "+----+-------+\n| age| name|\n+----+-------+\n|null|Michael|\n| 30| Andy|\n| 19| Justin|\n+----+-------+\n")
657658
})
658659

659660
test_that("isLocal()", {
@@ -669,7 +670,7 @@ test_that("unionAll(), except(), and intersect() on a DataFrame", {
669670
"{\"name\":\"James\", \"age\":35}")
670671
jsonPath2 <- tempfile(pattern="sparkr-test", fileext=".tmp")
671672
writeLines(lines, jsonPath2)
672-
df2 <- loadDF(sqlCtx, jsonPath2, "json")
673+
df2 <- read.df(sqlCtx, jsonPath2, "json")
673674

674675
unioned <- arrange(unionAll(df, df2), df$age)
675676
expect_true(inherits(unioned, "DataFrame"))
@@ -711,19 +712,19 @@ test_that("mutate() and rename()", {
711712
expect_true(columns(newDF2)[1] == "newerAge")
712713
})
713714

714-
test_that("saveDF() on DataFrame and works with parquetFile", {
715+
test_that("write.df() on DataFrame and works with parquetFile", {
715716
df <- jsonFile(sqlCtx, jsonPath)
716-
saveDF(df, parquetPath, "parquet", mode="overwrite")
717+
write.df(df, parquetPath, "parquet", mode="overwrite")
717718
parquetDF <- parquetFile(sqlCtx, parquetPath)
718719
expect_true(inherits(parquetDF, "DataFrame"))
719720
expect_equal(count(df), count(parquetDF))
720721
})
721722

722723
test_that("parquetFile works with multiple input paths", {
723724
df <- jsonFile(sqlCtx, jsonPath)
724-
saveDF(df, parquetPath, "parquet", mode="overwrite")
725+
write.df(df, parquetPath, "parquet", mode="overwrite")
725726
parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
726-
saveDF(df, parquetPath2, "parquet", mode="overwrite")
727+
write.df(df, parquetPath2, "parquet", mode="overwrite")
727728
parquetDF <- parquetFile(sqlCtx, parquetPath, parquetPath2)
728729
expect_true(inherits(parquetDF, "DataFrame"))
729730
expect_true(count(parquetDF) == count(df)*2)

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ can be run using:
7676
./dev/run-tests
7777

7878
Please see the guidance on how to
79-
[run all automated tests](https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-AutomatedTesting).
79+
[run tests for a module, or individual tests](https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools).
8080

8181
## A Note About Hadoop Versions
8282

0 commit comments

Comments
 (0)