Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ exportMethods("arrange",
"selectExpr",
"show",
"showDF",
"storageLevel",
"subset",
"summarize",
"summary",
Expand Down
28 changes: 27 additions & 1 deletion R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ setMethod("persist",
#' @param ... further arguments to be passed to or from other methods.
#'
#' @family SparkDataFrame functions
#' @rdname unpersist-methods
#' @rdname unpersist
#' @aliases unpersist,SparkDataFrame-method
#' @name unpersist
#' @export
Expand All @@ -654,6 +654,32 @@ setMethod("unpersist",
x
})

#' StorageLevel
#'
#' Get storagelevel of this SparkDataFrame.
#'
#' @param x the SparkDataFrame to get the storageLevel.
#'
#' @family SparkDataFrame functions
#' @rdname storageLevel
#' @aliases storageLevel,SparkDataFrame-method
#' @name storageLevel
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' path <- "path/to/file.json"
#' df <- read.json(path)
#' persist(df, "MEMORY_AND_DISK")
#' storageLevel(df)
#'}
#' @note storageLevel since 2.1.0
setMethod("storageLevel",
signature(x = "SparkDataFrame"),
function(x) {
storageLevelToString(callJMethod(x@sdf, "storageLevel"))
})

#' Repartition
#'
#' The following options for repartition are possible:
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ setMethod("persistRDD",
#' cache(rdd) # rdd@@env$isCached == TRUE
#' unpersistRDD(rdd) # rdd@@env$isCached == FALSE
#'}
#' @rdname unpersist-methods
#' @rdname unpersist
#' @aliases unpersist,RDD-method
#' @noRd
setMethod("unpersistRDD",
Expand Down
6 changes: 5 additions & 1 deletion R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,10 @@ setGeneric("selectExpr", function(x, expr, ...) { standardGeneric("selectExpr")
#' @export
setGeneric("showDF", function(x, ...) { standardGeneric("showDF") })

# @rdname storageLevel
# @export
setGeneric("storageLevel", function(x) { standardGeneric("storageLevel") })

#' @rdname subset
#' @export
setGeneric("subset", function(x, ...) { standardGeneric("subset") })
Expand All @@ -711,7 +715,7 @@ setGeneric("union", function(x, y) { standardGeneric("union") })
#' @export
setGeneric("unionAll", function(x, y) { standardGeneric("unionAll") })

#' @rdname unpersist-methods
#' @rdname unpersist
#' @export
setGeneric("unpersist", function(x, ...) { standardGeneric("unpersist") })

Expand Down
41 changes: 41 additions & 0 deletions R/pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,47 @@ getStorageLevel <- function(newLevel = c("DISK_ONLY",
"OFF_HEAP" = callJStatic(storageLevelClass, "OFF_HEAP"))
}

storageLevelToString <- function(levelObj) {
useDisk <- callJMethod(levelObj, "useDisk")
useMemory <- callJMethod(levelObj, "useMemory")
useOffHeap <- callJMethod(levelObj, "useOffHeap")
deserialized <- callJMethod(levelObj, "deserialized")
replication <- callJMethod(levelObj, "replication")
shortName <- if (!useDisk && !useMemory && !useOffHeap && !deserialized && replication == 1) {
"NONE"
} else if (useDisk && !useMemory && !useOffHeap && !deserialized && replication == 1) {
"DISK_ONLY"
} else if (useDisk && !useMemory && !useOffHeap && !deserialized && replication == 2) {
"DISK_ONLY_2"
} else if (!useDisk && useMemory && !useOffHeap && deserialized && replication == 1) {
"MEMORY_ONLY"
} else if (!useDisk && useMemory && !useOffHeap && deserialized && replication == 2) {
"MEMORY_ONLY_2"
} else if (!useDisk && useMemory && !useOffHeap && !deserialized && replication == 1) {
"MEMORY_ONLY_SER"
} else if (!useDisk && useMemory && !useOffHeap && !deserialized && replication == 2) {
"MEMORY_ONLY_SER_2"
} else if (useDisk && useMemory && !useOffHeap && deserialized && replication == 1) {
"MEMORY_AND_DISK"
} else if (useDisk && useMemory && !useOffHeap && deserialized && replication == 2) {
"MEMORY_AND_DISK_2"
} else if (useDisk && useMemory && !useOffHeap && !deserialized && replication == 1) {
"MEMORY_AND_DISK_SER"
} else if (useDisk && useMemory && !useOffHeap && !deserialized && replication == 2) {
"MEMORY_AND_DISK_SER_2"
} else if (useDisk && useMemory && useOffHeap && !deserialized && replication == 1) {
"OFF_HEAP"
} else {
NULL
}
fullInfo <- callJMethod(levelObj, "toString")
if (is.null(shortName)) {
fullInfo
} else {
paste(shortName, "-", fullInfo)
}
}

# Utility function for functions where an argument needs to be integer but we want to allow
# the user to type (for example) `5` instead of `5L` to avoid a confusing error message.
numToInt <- function(num) {
Expand Down
5 changes: 4 additions & 1 deletion R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ test_that("multiple pipeline transformations result in an RDD with the correct v
expect_false(collectRDD(second)[[3]]$testCol)
})

test_that("cache(), persist(), and unpersist() on a DataFrame", {
test_that("cache(), storageLevel(), persist(), and unpersist() on a DataFrame", {
df <- read.json(jsonPath)
expect_false(df@env$isCached)
cache(df)
Expand All @@ -795,6 +795,9 @@ test_that("cache(), persist(), and unpersist() on a DataFrame", {
persist(df, "MEMORY_AND_DISK")
expect_true(df@env$isCached)

expect_equal(storageLevel(df),
"MEMORY_AND_DISK - StorageLevel(disk, memory, deserialized, 1 replicas)")

unpersist(df)
expect_false(df@env$isCached)

Expand Down