Skip to content

Commit 09ff163

Browse files
committed
Merge pull request apache#204 from cafreeman/sparkr-sql
[SparkR-209] Remaining DataFrame methods + `dropTempTable`
2 parents 789be97 + 15a713f commit 09ff163

File tree

4 files changed

+294
-1
lines changed

4 files changed

+294
-1
lines changed

pkg/NAMESPACE

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ exportMethods("columns",
9292
"filter",
9393
"groupBy",
9494
"head",
95+
"insertInto",
96+
"intersect",
9597
"isLocal",
9698
"limit",
9799
"orderBy",
@@ -108,9 +110,13 @@ exportMethods("columns",
108110
"selectExpr",
109111
"showDF",
110112
"sortDF",
113+
"subtract",
111114
"toJSON",
112115
"toRDD",
113-
"where")
116+
"unionAll",
117+
"where",
118+
"withColumn",
119+
"withColumnRenamed")
114120

115121
exportClasses("Column")
116122

@@ -141,6 +147,7 @@ exportMethods("agg")
141147
export("cacheTable",
142148
"clearCache",
143149
"createExternalTable",
150+
"dropTempTable",
144151
"jsonFile",
145152
"jsonRDD",
146153
"loadDF",

pkg/R/DataFrame.R

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,36 @@ setMethod("registerTempTable",
263263
callJMethod(x@sdf, "registerTempTable", tableName)
264264
})
265265

266+
#' insertInto
267+
#'
268+
#' Insert the contents of a DataFrame into a table registered in the current SQL Context.
269+
#'
270+
#' @param x A SparkSQL DataFrame
271+
#' @param tableName A character vector containing the name of the table
272+
#' @param overwrite A logical argument indicating whether or not to overwrite
273+
#' the existing rows in the table.
274+
#'
275+
#' @rdname insertInto
276+
#' @export
277+
#' @examples
278+
#'\dontrun{
279+
#' sc <- sparkR.init()
280+
#' sqlCtx <- sparkRSQL.init(sc)
281+
#' df <- loadDF(sqlCtx, path, "parquet")
282+
#' df2 <- loadDF(sqlCtx, path2, "parquet")
283+
#' registerTempTable(df, "table1")
284+
#' insertInto(df2, "table1", overwrite = TRUE)
285+
#'}
286+
setGeneric("insertInto", function(x, tableName, ...) { standardGeneric("insertInto") })
287+
288+
#' @rdname insertInto
289+
#' @export
290+
setMethod("insertInto",
291+
signature(x = "DataFrame", tableName = "character"),
292+
function(x, tableName, overwrite = FALSE) {
293+
callJMethod(x@sdf, "insertInto", tableName, overwrite)
294+
})
295+
266296
#' Cache
267297
#'
268298
#' Persist with the default storage level (MEMORY_ONLY).
@@ -768,6 +798,20 @@ setMethod("select", signature(x = "DataFrame", col = "Column"),
768798
dataFrame(sdf)
769799
})
770800

801+
setMethod("select",
802+
signature(x = "DataFrame", col = "list"),
803+
function(x, col) {
804+
cols <- lapply(col, function(c) {
805+
if (class(c)== "Column") {
806+
c@jc
807+
} else {
808+
col(c)@jc
809+
}
810+
})
811+
sdf <- callJMethod(x@sdf, "select", listToSeq(cols))
812+
dataFrame(sdf)
813+
})
814+
771815
#' SelectExpr
772816
#'
773817
#' Select from a DataFrame using a set of SQL expressions.
@@ -798,6 +842,70 @@ setMethod("selectExpr",
798842
dataFrame(sdf)
799843
})
800844

845+
#' WithColumn
846+
#'
847+
#' Return a new DataFrame with the specified column added.
848+
#'
849+
#' @param x A DataFrame
850+
#' @param colName A string containing the name of the new column.
851+
#' @param col A Column expression.
852+
#' @return A DataFrame with the new column added.
853+
#' @rdname withColumn
854+
#' @export
855+
#' @examples
856+
#'\dontrun{
857+
#' sc <- sparkR.init()
858+
#' sqlCtx <- sparkRSQL.init(sc)
859+
#' path <- "path/to/file.json"
860+
#' df <- jsonFile(sqlCtx, path)
861+
#' newDF <- withColumn(df, "newCol", df$col1 * 5)
862+
#' }
863+
setGeneric("withColumn", function(x, colName, col) { standardGeneric("withColumn") })
864+
865+
#' @rdname withColumn
866+
#' @export
867+
setMethod("withColumn",
868+
signature(x = "DataFrame", colName = "character", col = "Column"),
869+
function(x, colName, col) {
870+
select(x, x$"*", alias(col, colName))
871+
})
872+
873+
#' WithColumnRenamed
874+
#'
875+
#' Rename an existing column in a DataFrame.
876+
#'
877+
#' @param x A DataFrame
878+
#' @param existingCol The name of the column you want to change.
879+
#' @param newCol The new column name.
880+
#' @return A DataFrame with the column name changed.
881+
#' @rdname withColumnRenamed
882+
#' @export
883+
#' @examples
884+
#'\dontrun{
885+
#' sc <- sparkR.init()
886+
#' sqlCtx <- sparkRSQL.init(sc)
887+
#' path <- "path/to/file.json"
888+
#' df <- jsonFile(sqlCtx, path)
889+
#' newDF <- withColumnRenamed(df, "col1", "newCol1")
890+
#' }
891+
setGeneric("withColumnRenamed", function(x, existingCol, newCol) {
892+
standardGeneric("withColumnRenamed") })
893+
894+
#' @rdname withColumnRenamed
895+
#' @export
896+
setMethod("withColumnRenamed",
897+
signature(x = "DataFrame", existingCol = "character", newCol = "character"),
898+
function(x, existingCol, newCol) {
899+
cols <- lapply(columns(x), function(c) {
900+
if (c == existingCol) {
901+
alias(col(c), newCol)
902+
} else {
903+
col(c)
904+
}
905+
})
906+
select(x, cols)
907+
})
908+
801909
#' SortDF
802910
#'
803911
#' Sort a DataFrame by the specified column(s).
@@ -939,6 +1047,92 @@ setMethod("join",
9391047
dataFrame(sdf)
9401048
})
9411049

1050+
#' UnionAll
1051+
#'
1052+
#' Return a new DataFrame containing the union of rows in this DataFrame
1053+
#' and another DataFrame. This is equivalent to `UNION ALL` in SQL.
1054+
#'
1055+
#' @param x A Spark DataFrame
1056+
#' @param y A Spark DataFrame
1057+
#' @return A DataFrame containing the result of the union.
1058+
#' @rdname unionAll
1059+
#' @export
1060+
#' @examples
1061+
#'\dontrun{
1062+
#' sc <- sparkR.init()
1063+
#' sqlCtx <- sparkRSQL.init(sc)
1064+
#' df1 <- jsonFile(sqlCtx, path)
1065+
#' df2 <- jsonFile(sqlCtx, path2)
1066+
#' unioned <- unionAll(df, df2)
1067+
#' }
1068+
setGeneric("unionAll", function(x, y) { standardGeneric("unionAll") })
1069+
1070+
#' @rdname unionAll
1071+
#' @export
1072+
setMethod("unionAll",
1073+
signature(x = "DataFrame", y = "DataFrame"),
1074+
function(x, y) {
1075+
unioned <- callJMethod(x@sdf, "unionAll", y@sdf)
1076+
dataFrame(unioned)
1077+
})
1078+
1079+
#' Intersect
1080+
#'
1081+
#' Return a new DataFrame containing rows only in both this DataFrame
1082+
#' and another DataFrame. This is equivalent to `INTERSECT` in SQL.
1083+
#'
1084+
#' @param x A Spark DataFrame
1085+
#' @param y A Spark DataFrame
1086+
#' @return A DataFrame containing the result of the intersect.
1087+
#' @rdname intersect
1088+
#' @export
1089+
#' @examples
1090+
#'\dontrun{
1091+
#' sc <- sparkR.init()
1092+
#' sqlCtx <- sparkRSQL.init(sc)
1093+
#' df1 <- jsonFile(sqlCtx, path)
1094+
#' df2 <- jsonFile(sqlCtx, path2)
1095+
#' intersectDF <- intersect(df, df2)
1096+
#' }
1097+
setGeneric("intersect", function(x, y) { standardGeneric("intersect") })
1098+
1099+
#' @rdname intersect
1100+
#' @export
1101+
setMethod("intersect",
1102+
signature(x = "DataFrame", y = "DataFrame"),
1103+
function(x, y) {
1104+
intersected <- callJMethod(x@sdf, "intersect", y@sdf)
1105+
dataFrame(intersected)
1106+
})
1107+
1108+
#' Subtract
1109+
#'
1110+
#' Return a new DataFrame containing rows in this DataFrame
1111+
#' but not in another DataFrame. This is equivalent to `EXCEPT` in SQL.
1112+
#'
1113+
#' @param x A Spark DataFrame
1114+
#' @param y A Spark DataFrame
1115+
#' @return A DataFrame containing the result of the subtract operation.
1116+
#' @rdname subtract
1117+
#' @export
1118+
#' @examples
1119+
#'\dontrun{
1120+
#' sc <- sparkR.init()
1121+
#' sqlCtx <- sparkRSQL.init(sc)
1122+
#' df1 <- jsonFile(sqlCtx, path)
1123+
#' df2 <- jsonFile(sqlCtx, path2)
1124+
#' subtractDF <- subtract(df, df2)
1125+
#' }
1126+
setGeneric("subtract", function(x, y) { standardGeneric("subtract") })
1127+
1128+
#' @rdname subtract
1129+
#' @export
1130+
setMethod("subtract",
1131+
signature(x = "DataFrame", y = "DataFrame"),
1132+
function(x, y) {
1133+
subtracted <- callJMethod(x@sdf, "except", y@sdf)
1134+
dataFrame(subtracted)
1135+
})
9421136

9431137
#' Save the contents of the DataFrame to a data source
9441138
#'

pkg/R/SQLContext.R

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,27 @@ clearCache <- function(sqlCtx) {
230230
callJMethod(sqlCtx, "clearCache")
231231
}
232232

233+
#' Drop Temporary Table
234+
#'
235+
#' Drops the temporary table with the given table name in the catalog.
236+
#' If the table has been cached/persisted before, it's also unpersisted.
237+
#'
238+
#' @param sqlCtx SQLContext to use
239+
#' @param tableName The name of the SparkSQL table to be dropped.
240+
#' sc <- sparkR.init()
241+
#' sqlCtx <- sparkRSQL.init(sc)
242+
#' df <- loadDF(sqlCtx, path, "parquet")
243+
#' registerTempTable(df, "table")
244+
#' dropTempTable(sqlCtx, "table")
245+
#' }
246+
247+
dropTempTable <- function(sqlCtx, tableName) {
248+
if (class(tableName) != "character") {
249+
stop("tableName must be a string.")
250+
}
251+
callJMethod(sqlCtx, "dropTempTable", tableName)
252+
}
253+
233254
#' Load an DataFrame
234255
#'
235256
#' Returns the dataset in a data source as a DataFrame

pkg/inst/tests/test_sparkSQL.R

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ test_that("test cache, uncache and clearCache", {
4040
cacheTable(sqlCtx, "table1")
4141
uncacheTable(sqlCtx, "table1")
4242
clearCache(sqlCtx)
43+
dropTempTable(sqlCtx, "table1")
4344
})
4445

4546
test_that("test tableNames and tables", {
@@ -48,6 +49,7 @@ test_that("test tableNames and tables", {
4849
expect_true(length(tableNames(sqlCtx)) == 1)
4950
df <- tables(sqlCtx)
5051
expect_true(count(df) == 1)
52+
dropTempTable(sqlCtx, "table1")
5153
})
5254

5355
test_that("registerTempTable() results in a queryable table and sql() results in a new DataFrame", {
@@ -56,12 +58,43 @@ test_that("registerTempTable() results in a queryable table and sql() results in
5658
newdf <- sql(sqlCtx, "SELECT * FROM table1 where name = 'Michael'")
5759
expect_true(inherits(newdf, "DataFrame"))
5860
expect_true(count(newdf) == 1)
61+
dropTempTable(sqlCtx, "table1")
62+
})
63+
64+
test_that("insertInto() on a registered table", {
65+
df <- loadDF(sqlCtx, jsonPath, "json")
66+
saveDF(df, parquetPath, "parquet", "overwrite")
67+
dfParquet <- loadDF(sqlCtx, parquetPath, "parquet")
68+
69+
lines <- c("{\"name\":\"Bob\", \"age\":24}",
70+
"{\"name\":\"James\", \"age\":35}")
71+
jsonPath2 <- tempfile(pattern="jsonPath2", fileext=".tmp")
72+
parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
73+
writeLines(lines, jsonPath2)
74+
df2 <- loadDF(sqlCtx, jsonPath2, "json")
75+
saveDF(df2, parquetPath2, "parquet", "overwrite")
76+
dfParquet2 <- loadDF(sqlCtx, parquetPath2, "parquet")
77+
78+
registerTempTable(dfParquet, "table1")
79+
insertInto(dfParquet2, "table1")
80+
expect_true(count(sql(sqlCtx, "select * from table1")) == 5)
81+
expect_true(first(sql(sqlCtx, "select * from table1 order by age"))$name == "Michael")
82+
dropTempTable(sqlCtx, "table1")
83+
84+
registerTempTable(dfParquet, "table1")
85+
insertInto(dfParquet2, "table1", overwrite = TRUE)
86+
expect_true(count(sql(sqlCtx, "select * from table1")) == 2)
87+
expect_true(first(sql(sqlCtx, "select * from table1 order by age"))$name == "Bob")
88+
dropTempTable(sqlCtx, "table1")
5989
})
6090

6191
test_that("table() returns a new DataFrame", {
92+
df <- jsonFile(sqlCtx, jsonPath)
93+
registerTempTable(df, "table1")
6294
tabledf <- table(sqlCtx, "table1")
6395
expect_true(inherits(tabledf, "DataFrame"))
6496
expect_true(count(tabledf) == 3)
97+
dropTempTable(sqlCtx, "table1")
6598
})
6699

67100
test_that("toRDD() returns an RRDD", {
@@ -430,6 +463,44 @@ test_that("isLocal()", {
430463
expect_false(isLocal(df))
431464
})
432465

466+
test_that("unionAll(), subtract(), and intersect() on a DataFrame", {
467+
df <- jsonFile(sqlCtx, jsonPath)
468+
469+
lines <- c("{\"name\":\"Bob\", \"age\":24}",
470+
"{\"name\":\"Andy\", \"age\":30}",
471+
"{\"name\":\"James\", \"age\":35}")
472+
jsonPath2 <- tempfile(pattern="sparkr-test", fileext=".tmp")
473+
writeLines(lines, jsonPath2)
474+
df2 <- loadDF(sqlCtx, jsonPath2, "json")
475+
476+
unioned <- sortDF(unionAll(df, df2), df$age)
477+
expect_true(inherits(unioned, "DataFrame"))
478+
expect_true(count(unioned) == 6)
479+
expect_true(first(unioned)$name == "Michael")
480+
481+
subtracted <- sortDF(subtract(df, df2), desc(df$age))
482+
expect_true(inherits(unioned, "DataFrame"))
483+
expect_true(count(subtracted) == 2)
484+
expect_true(first(subtracted)$name == "Justin")
485+
486+
intersected <- sortDF(intersect(df, df2), df$age)
487+
expect_true(inherits(unioned, "DataFrame"))
488+
expect_true(count(intersected) == 1)
489+
expect_true(first(intersected)$name == "Andy")
490+
})
491+
492+
test_that("withColumn() and withColumnRenamed()", {
493+
df <- jsonFile(sqlCtx, jsonPath)
494+
newDF <- withColumn(df, "newAge", df$age + 2)
495+
expect_true(length(columns(newDF)) == 3)
496+
expect_true(columns(newDF)[3] == "newAge")
497+
expect_true(first(filter(newDF, df$name != "Michael"))$newAge == 32)
498+
499+
newDF2 <- withColumnRenamed(df, "age", "newerAge")
500+
expect_true(length(columns(newDF2)) == 2)
501+
expect_true(columns(newDF2)[1] == "newerAge")
502+
})
503+
433504
# TODO: Enable and test once the parquetFile PR has been merged
434505
# test_that("saveAsParquetFile() on DataFrame and works with parquetFile", {
435506
# df <- jsonFile(sqlCtx, jsonPath)

0 commit comments

Comments
 (0)