Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ export("as.DataFrame",
"clearCache",
"createDataFrame",
"createExternalTable",
"createTable",
"currentDatabase",
"dropTempTable",
"dropTempView",
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ setMethod("insertInto",
jmode <- convertToJSaveMode(ifelse(overwrite, "overwrite", "append"))
write <- callJMethod(x@sdf, "write")
write <- callJMethod(write, "mode", jmode)
callJMethod(write, "insertInto", tableName)
invisible(callJMethod(write, "insertInto", tableName))
})

#' Cache
Expand Down Expand Up @@ -2894,7 +2894,7 @@ setMethod("saveAsTable",
write <- callJMethod(write, "format", source)
write <- callJMethod(write, "mode", jmode)
write <- callJMethod(write, "options", options)
callJMethod(write, "saveAsTable", tableName)
invisible(callJMethod(write, "saveAsTable", tableName))
})

#' summary
Expand Down
11 changes: 7 additions & 4 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -544,12 +544,15 @@ sql <- function(x, ...) {
dispatchFunc("sql(sqlQuery)", x, ...)
}

#' Create a SparkDataFrame from a SparkSQL Table
#' Create a SparkDataFrame from a SparkSQL table or view
#'
#' Returns the specified Table as a SparkDataFrame. The Table must have already been registered
#' in the SparkSession.
#' Returns the specified table or view as a SparkDataFrame. The table or view must already exist or
#' have already been registered in the SparkSession.
#'
#' @param tableName The SparkSQL Table to convert to a SparkDataFrame.
#' @param tableName the qualified or unqualified name that designates a table or view. If a database
#' is specified, it identifies the table/view from the database.
#' Otherwise, it first attempts to find a temporary view with the given name
#' and then match the table/view from the current database.
#' @return SparkDataFrame
#' @rdname tableToDF
#' @name tableToDF
Expand Down
111 changes: 79 additions & 32 deletions R/pkg/R/catalog.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

# catalog.R: SparkSession catalog functions

#' Create an external table
#' (Deprecated) Create an external table
#'
#' Creates an external table based on the dataset in a data source,
#' Returns a SparkDataFrame associated with the external table.
Expand All @@ -29,10 +29,11 @@
#' @param tableName a name of the table.
#' @param path the path of files to load.
#' @param source the name of external data source.
#' @param schema the schema of the data for certain data source.
#' @param schema the schema of the data required for some data sources.
#' @param ... additional argument(s) passed to the method.
#' @return A SparkDataFrame.
#' @rdname createExternalTable
#' @rdname createExternalTable-deprecated
#' @seealso \link{createTable}
#' @export
#' @examples
#'\dontrun{
Expand All @@ -43,29 +44,70 @@
#' @method createExternalTable default
#' @note createExternalTable since 1.4.0
createExternalTable.default <- function(tableName, path = NULL, source = NULL, schema = NULL, ...) {
.Deprecated("createTable", old = "createExternalTable")
createTable(tableName, path, source, schema, ...)
}

createExternalTable <- function(x, ...) {
dispatchFunc("createExternalTable(tableName, path = NULL, source = NULL, ...)", x, ...)
}

#' Creates a table based on the dataset in a data source
#'
#' Creates a table based on the dataset in a data source. Returns a SparkDataFrame associated with
#' the table.
#'
#' The data source is specified by the \code{source} and a set of options(...).
#' If \code{source} is not specified, the default data source configured by
#' "spark.sql.sources.default" will be used. When a \code{path} is specified, an external table is
#' created from the data at the given path. Otherwise a managed table is created.
#'
#' @param tableName the qualified or unqualified name that designates a table. If no database
#' identifier is provided, it refers to a table in the current database.
#' @param path (optional) the path of files to load.
#' @param source (optional) the name of the data source.
#' @param schema (optional) the schema of the data required for some data sources.
#' @param ... additional named parameters as options for the data source.
#' @return A SparkDataFrame.
#' @rdname createTable
#' @seealso \link{createExternalTable}
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' df <- createTable("myjson", path="path/to/json", source="json", schema)
#'
#' createTable("people", source = "json", schema = schema)
#' insertInto(df, "people")
#' }
#' @name createTable
#' @note createTable since 2.2.0
createTable <- function(tableName, path = NULL, source = NULL, schema = NULL, ...) {
sparkSession <- getSparkSession()
options <- varargsToStrEnv(...)
if (!is.null(path)) {
options[["path"]] <- path
}
if (is.null(source)) {
source <- getDefaultSqlSource()
}
catalog <- callJMethod(sparkSession, "catalog")
if (is.null(schema)) {
sdf <- callJMethod(catalog, "createExternalTable", tableName, source, options)
sdf <- callJMethod(catalog, "createTable", tableName, source, options)
} else if (class(schema) == "structType") {
sdf <- callJMethod(catalog, "createTable", tableName, source, schema$jobj, options)
} else {
sdf <- callJMethod(catalog, "createExternalTable", tableName, source, schema$jobj, options)
stop("schema must be a structType.")
}
dataFrame(sdf)
}

createExternalTable <- function(x, ...) {
dispatchFunc("createExternalTable(tableName, path = NULL, source = NULL, ...)", x, ...)
}

#' Cache Table
#'
#' Caches the specified table in-memory.
#'
#' @param tableName The name of the table being cached
#' @param tableName the qualified or unqualified name that designates a table. If no database
#' identifier is provided, it refers to a table in the current database.
#' @return SparkDataFrame
#' @rdname cacheTable
#' @export
Expand Down Expand Up @@ -94,7 +136,8 @@ cacheTable <- function(x, ...) {
#'
#' Removes the specified table from the in-memory cache.
#'
#' @param tableName The name of the table being uncached
#' @param tableName the qualified or unqualified name that designates a table. If no database
#' identifier is provided, it refers to a table in the current database.
#' @return SparkDataFrame
#' @rdname uncacheTable
#' @export
Expand Down Expand Up @@ -162,14 +205,14 @@ clearCache <- function() {
#' @method dropTempTable default
#' @note dropTempTable since 1.4.0
dropTempTable.default <- function(tableName) {
.Deprecated("dropTempView", old = "dropTempTable")
if (class(tableName) != "character") {
stop("tableName must be a string.")
}
dropTempView(tableName)
}

dropTempTable <- function(x, ...) {
.Deprecated("dropTempView")
dispatchFunc("dropTempView(viewName)", x, ...)
}

Expand All @@ -178,7 +221,7 @@ dropTempTable <- function(x, ...) {
#' Drops the temporary view with the given view name in the catalog.
#' If the view has been cached before, then it will also be uncached.
#'
#' @param viewName the name of the view to be dropped.
#' @param viewName the name of the temporary view to be dropped.
#' @return TRUE if the view is dropped successfully, FALSE otherwise.
#' @rdname dropTempView
#' @name dropTempView
Expand Down Expand Up @@ -317,10 +360,10 @@ listDatabases <- function() {
dataFrame(callJMethod(callJMethod(catalog, "listDatabases"), "toDF"))
}

#' Returns a list of tables in the specified database
#' Returns a list of tables or views in the specified database
#'
#' Returns a list of tables in the specified database.
#' This includes all temporary tables.
#' Returns a list of tables or views in the specified database.
#' This includes all temporary views.
#'
#' @param databaseName (optional) name of the database
#' @return a SparkDataFrame of the list of tables.
Expand Down Expand Up @@ -349,11 +392,13 @@ listTables <- function(databaseName = NULL) {
dataFrame(callJMethod(jdst, "toDF"))
}

#' Returns a list of columns for the given table in the specified database
#' Returns a list of columns for the given table/view in the specified database
#'
#' Returns a list of columns for the given table in the specified database.
#' Returns a list of columns for the given table/view in the specified database.
#'
#' @param tableName a name of the table.
#' @param tableName the qualified or unqualified name that designates a table/view. If no database
#' identifier is provided, it refers to a table/view in the current database.
#' If \code{databaseName} parameter is specified, this must be an unqualified name.
#' @param databaseName (optional) name of the database
#' @return a SparkDataFrame of the list of column descriptions.
#' @rdname listColumns
Expand Down Expand Up @@ -409,12 +454,13 @@ listFunctions <- function(databaseName = NULL) {
dataFrame(callJMethod(jdst, "toDF"))
}

#' Recover all the partitions in the directory of a table and update the catalog
#' Recovers all the partitions in the directory of a table and update the catalog
#'
#' Recover all the partitions in the directory of a table and update the catalog. The name should
#' reference a partitioned table, and not a temporary view.
#' Recovers all the partitions in the directory of a table and update the catalog. The name should
#' reference a partitioned table, and not a view.
#'
#' @param tableName a name of the table.
#' @param tableName the qualified or unqualified name that designates a table. If no database
#' identifier is provided, it refers to a table in the current database.
#' @rdname recoverPartitions
#' @name recoverPartitions
#' @export
Expand All @@ -430,17 +476,18 @@ recoverPartitions <- function(tableName) {
invisible(handledCallJMethod(catalog, "recoverPartitions", tableName))
}

#' Invalidate and refresh all the cached metadata of the given table
#' Invalidates and refreshes all the cached data and metadata of the given table
#'
#' Invalidate and refresh all the cached metadata of the given table. For performance reasons,
#' Spark SQL or the external data source library it uses might cache certain metadata about a
#' table, such as the location of blocks. When those change outside of Spark SQL, users should
#' Invalidates and refreshes all the cached data and metadata of the given table. For performance
#' reasons, Spark SQL or the external data source library it uses might cache certain metadata about
#' a table, such as the location of blocks. When those change outside of Spark SQL, users should
#' call this function to invalidate the cache.
#'
#' If this table is cached as an InMemoryRelation, drop the original cached version and make the
#' new version cached lazily.
#'
#' @param tableName a name of the table.
#' @param tableName the qualified or unqualified name that designates a table. If no database
#' identifier is provided, it refers to a table in the current database.
#' @rdname refreshTable
#' @name refreshTable
#' @export
Expand All @@ -456,11 +503,11 @@ refreshTable <- function(tableName) {
invisible(handledCallJMethod(catalog, "refreshTable", tableName))
}

#' Invalidate and refresh all the cached data and metadata for SparkDataFrame containing path
#' Invalidates and refreshes all the cached data and metadata for SparkDataFrame containing path
#'
#' Invalidate and refresh all the cached data (and the associated metadata) for any SparkDataFrame
#' that contains the given data source path. Path matching is by prefix, i.e. "/" would invalidate
#' everything that is cached.
#' Invalidates and refreshes all the cached data (and the associated metadata) for any
#' SparkDataFrame that contains the given data source path. Path matching is by prefix, i.e. "/"
#' would invalidate everything that is cached.
#'
#' @param path the path of the data source.
#' @rdname refreshByPath
Expand Down
20 changes: 15 additions & 5 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ test_that("create DataFrame from RDD", {
setHiveContext(sc)
sql("CREATE TABLE people (name string, age double, height float)")
df <- read.df(jsonPathNa, "json", schema)
invisible(insertInto(df, "people"))
insertInto(df, "people")
expect_equal(collect(sql("SELECT age from people WHERE name = 'Bob'"))$age,
c(16))
expect_equal(collect(sql("SELECT height from people WHERE name ='Bob'"))$height,
Expand Down Expand Up @@ -1268,33 +1268,43 @@ test_that("column calculation", {

test_that("test HiveContext", {
setHiveContext(sc)
df <- createExternalTable("json", jsonPath, "json")

schema <- structType(structField("name", "string"), structField("age", "integer"),
structField("height", "float"))
createTable("people", source = "json", schema = schema)
df <- read.df(jsonPathNa, "json", schema)
insertInto(df, "people")
expect_equal(collect(sql("SELECT age from people WHERE name = 'Bob'"))$age, c(16))
sql("DROP TABLE people")

df <- createTable("json", jsonPath, "json")
expect_is(df, "SparkDataFrame")
expect_equal(count(df), 3)
df2 <- sql("select * from json")
expect_is(df2, "SparkDataFrame")
expect_equal(count(df2), 3)

jsonPath2 <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
invisible(saveAsTable(df, "json2", "json", "append", path = jsonPath2))
saveAsTable(df, "json2", "json", "append", path = jsonPath2)
df3 <- sql("select * from json2")
expect_is(df3, "SparkDataFrame")
expect_equal(count(df3), 3)
unlink(jsonPath2)

hivetestDataPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
invisible(saveAsTable(df, "hivetestbl", path = hivetestDataPath))
saveAsTable(df, "hivetestbl", path = hivetestDataPath)
df4 <- sql("select * from hivetestbl")
expect_is(df4, "SparkDataFrame")
expect_equal(count(df4), 3)
unlink(hivetestDataPath)

parquetDataPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
invisible(saveAsTable(df, "parquetest", "parquet", mode = "overwrite", path = parquetDataPath))
saveAsTable(df, "parquetest", "parquet", mode = "overwrite", path = parquetDataPath)
df5 <- sql("select * from parquetest")
expect_is(df5, "SparkDataFrame")
expect_equal(count(df5), 3)
unlink(parquetDataPath)

unsetHiveContext()
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,15 @@ <h4 style="clear: left; display: inline-block;">Summary</h4>
<th></th>
<th>RDD Blocks</th>
<th><span data-toggle="tooltip"
title="Memory used / total available memory for storage of data like RDD partitions cached in memory. ">Storage Memory</span>
title="Memory used / total available memory for storage of data like RDD partitions cached in memory.">Storage Memory</span>
</th>
<th class="on_heap_memory">
<span data-toggle="tooltip"
title="Memory used / total available memory for on heap storage of data like RDD partitions cached in memory.">On Heap Storage Memory</span>
</th>
<th class="off_heap_memory">
<span data-toggle="tooltip"
title="Memory used / total available memory for off heap storage of data like RDD partitions cached in memory.">Off Heap Storage Memory</span>
</th>
<th>Disk Used</th>
<th>Cores</th>
Expand Down Expand Up @@ -73,6 +81,14 @@ <h4 style="clear: left; display: inline-block;">Executors</h4>
<span data-toggle="tooltip" data-placement="top"
title="Memory used / total available memory for storage of data like RDD partitions cached in memory.">
Storage Memory</span></th>
<th class="on_heap_memory">
<span data-toggle="tooltip" data-placement="top"
title="Memory used / total available memory for on heap storage of data like RDD partitions cached in memory.">
On Heap Storage Memory</span></th>
<th class="off_heap_memory">
<span data-toggle="tooltip"
title="Memory used / total available memory for off heap storage of data like RDD partitions cached in memory.">
Off Heap Storage Memory</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Disk Used">Disk Used</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Cores">Cores</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Active Tasks">Active Tasks</span></th>
Expand Down
Loading