Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ importFrom("utils", "download.file", "object.size", "packageVersion", "tail", "u

# S3 methods exported
export("sparkR.session")
export("sparkR.init")
export("sparkR.session.stop")
export("sparkR.stop")
export("sparkR.conf")
Expand All @@ -41,6 +42,9 @@ export("sparkR.callJStatic")

export("install.spark")

export("sparkRSQL.init",
"sparkRHive.init")

# MLlib integration
exportMethods("glm",
"spark.glm",
Expand Down Expand Up @@ -148,13 +152,15 @@ exportMethods("arrange",
"printSchema",
"randomSplit",
"rbind",
"registerTempTable",
"rename",
"repartition",
"repartitionByRange",
"rollup",
"sample",
"sample_frac",
"sampleBy",
"saveAsParquetFile",
"saveAsTable",
"saveDF",
"schema",
Expand Down Expand Up @@ -431,14 +437,18 @@ export("as.DataFrame",
"cacheTable",
"clearCache",
"createDataFrame",
"createExternalTable",
"createTable",
"currentDatabase",
"dropTempTable",
"dropTempView",
"jsonFile",
"listColumns",
"listDatabases",
"listFunctions",
"listTables",
"loadDF",
"parquetFile",
"read.df",
"read.jdbc",
"read.json",
Expand Down
38 changes: 38 additions & 0 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,32 @@ setMethod("createOrReplaceTempView",
invisible(callJMethod(x@sdf, "createOrReplaceTempView", viewName))
})

#' (Deprecated) Register Temporary Table
#'
#' Registers a SparkDataFrame as a Temporary Table in the SparkSession
#' @param x A SparkDataFrame
#' @param tableName A character vector containing the name of the table
#'
#' @seealso \link{createOrReplaceTempView}
#' @rdname registerTempTable-deprecated
#' @name registerTempTable
#' @aliases registerTempTable,SparkDataFrame,character-method
#' @examples
#'\dontrun{
#' sparkR.session()
#' path <- "path/to/file.json"
#' df <- read.json(path)
#' registerTempTable(df, "json_df")
#' new_df <- sql("SELECT * FROM json_df")
#'}
#' @note registerTempTable since 1.4.0
setMethod("registerTempTable",
signature(x = "SparkDataFrame", tableName = "character"),
function(x, tableName) {
.Deprecated("createOrReplaceTempView")
invisible(callJMethod(x@sdf, "createOrReplaceTempView", tableName))
})

#' insertInto
#'
#' Insert the contents of a SparkDataFrame into a table registered in the current SparkSession.
Expand Down Expand Up @@ -936,6 +962,7 @@ setMethod("write.orc",
#' path <- "path/to/file.json"
#' df <- read.json(path)
#' write.parquet(df, "/tmp/sparkr-tmp1/")
#' saveAsParquetFile(df, "/tmp/sparkr-tmp2/")
#'}
#' @note write.parquet since 1.6.0
setMethod("write.parquet",
Expand All @@ -946,6 +973,17 @@ setMethod("write.parquet",
invisible(handledCallJMethod(write, "parquet", path))
})

#' @rdname write.parquet
#' @name saveAsParquetFile
#' @aliases saveAsParquetFile,SparkDataFrame,character-method
#' @note saveAsParquetFile since 1.4.0
setMethod("saveAsParquetFile",
signature(x = "SparkDataFrame", path = "character"),
function(x, path) {
.Deprecated("write.parquet")
write.parquet(x, path)
})

#' Save the content of SparkDataFrame in a text file at the specified path.
#'
#' Save the content of the SparkDataFrame in a text file at the specified path.
Expand Down
50 changes: 50 additions & 0 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ setMethod("toDF", signature(x = "RDD"),
#' path <- "path/to/file.json"
#' df <- read.json(path)
#' df <- read.json(path, multiLine = TRUE)
#' df <- jsonFile(path)
#' }
#' @name read.json
#' @note read.json since 1.6.0
Expand All @@ -403,6 +404,46 @@ read.json <- function(path, ...) {
dataFrame(sdf)
}

#' @rdname read.json
#' @name jsonFile
#' @note jsonFile since 1.4.0
jsonFile <- function(path) {
.Deprecated("read.json")
read.json(path)
}

#' JSON RDD
#'
#' Loads an RDD storing one JSON object per string as a SparkDataFrame.
#'
#' @param sqlContext SQLContext to use
#' @param rdd An RDD of JSON string
#' @param schema A StructType object to use as schema
#' @param samplingRatio The ratio of simpling used to infer the schema
#' @return A SparkDataFrame
#' @noRd
#' @examples
#'\dontrun{
#' sparkR.session()
#' rdd <- texFile(sc, "path/to/json")
#' df <- jsonRDD(sqlContext, rdd)
#'}

# TODO: remove - this method is no longer exported
# TODO: support schema
jsonRDD <- function(sqlContext, rdd, schema = NULL, samplingRatio = 1.0) {
.Deprecated("read.json")
rdd <- serializeToString(rdd)
if (is.null(schema)) {
read <- callJMethod(sqlContext, "read")
# samplingRatio is deprecated
sdf <- callJMethod(read, "json", callJMethod(getJRDD(rdd), "rdd"))
dataFrame(sdf)
} else {
stop("not implemented")
}
}

#' Create a SparkDataFrame from an ORC file.
#'
#' Loads an ORC file, returning the result as a SparkDataFrame.
Expand Down Expand Up @@ -445,6 +486,15 @@ read.parquet <- function(path, ...) {
dataFrame(sdf)
}

#' @param ... argument(s) passed to the method.
#' @rdname read.parquet
#' @name parquetFile
#' @note parquetFile since 1.4.0
parquetFile <- function(...) {
.Deprecated("read.parquet")
read.parquet(unlist(list(...)))
}

#' Create a SparkDataFrame from a text file.
#'
#' Loads text files and returns a SparkDataFrame whose schema starts with
Expand Down
52 changes: 52 additions & 0 deletions R/pkg/R/catalog.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,35 @@

# catalog.R: SparkSession catalog functions

#' (Deprecated) Create an external table
#'
#' Creates an external table based on the dataset in a data source,
#' Returns a SparkDataFrame associated with the external table.
#'
#' The data source is specified by the \code{source} and a set of options(...).
#' If \code{source} is not specified, the default data source configured by
#' "spark.sql.sources.default" will be used.
#'
#' @param tableName a name of the table.
#' @param path the path of files to load.
#' @param source the name of external data source.
#' @param schema the schema of the data required for some data sources.
#' @param ... additional argument(s) passed to the method.
#' @return A SparkDataFrame.
#' @rdname createExternalTable-deprecated
#' @seealso \link{createTable}
#' @examples
#'\dontrun{
#' sparkR.session()
#' df <- createExternalTable("myjson", path="path/to/json", source="json", schema)
#' }
#' @name createExternalTable
#' @note createExternalTable since 1.4.0
createExternalTable <- function(tableName, path = NULL, source = NULL, schema = NULL, ...) {
.Deprecated("createTable", old = "createExternalTable")
createTable(tableName, path, source, schema, ...)
}

#' Creates a table based on the dataset in a data source
#'
#' Creates a table based on the dataset in a data source. Returns a SparkDataFrame associated with
Expand Down Expand Up @@ -130,6 +159,29 @@ clearCache <- function() {
invisible(callJMethod(catalog, "clearCache"))
}

#' Drops the temporary table with the given table name in the catalog.
#' If the table has been cached/persisted before, it's also unpersisted.
#'
#' @param tableName The name of the SparkSQL table to be dropped.
#' @seealso \link{dropTempView}
#' @rdname dropTempTable-deprecated
#' @examples
#' \dontrun{
#' sparkR.session()
#' df <- read.df(path, "parquet")
#' createOrReplaceTempView(df, "table")
#' dropTempTable("table")
#' }
#' @name dropTempTable
#' @note dropTempTable since 1.4.0
dropTempTable <- function(tableName) {
.Deprecated("dropTempView", old = "dropTempTable")
if (class(tableName) != "character") {
stop("tableName must be a string.")
}
dropTempView(tableName)
}

#' Drops the temporary view with the given view name in the catalog.
#'
#' Drops the temporary view with the given view name in the catalog.
Expand Down
6 changes: 6 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,9 @@ setGeneric("persist", function(x, newLevel) { standardGeneric("persist") })
#' @rdname printSchema
setGeneric("printSchema", function(x) { standardGeneric("printSchema") })

#' @rdname registerTempTable-deprecated
setGeneric("registerTempTable", function(x, tableName) { standardGeneric("registerTempTable") })

#' @rdname rename
setGeneric("rename", function(x, ...) { standardGeneric("rename") })

Expand Down Expand Up @@ -592,6 +595,9 @@ setGeneric("write.parquet", function(x, path, ...) {
standardGeneric("write.parquet")
})

#' @rdname write.parquet
setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParquetFile") })

#' @rdname write.stream
setGeneric("write.stream", function(df, source = NULL, outputMode = NULL, ...) {
standardGeneric("write.stream")
Expand Down
98 changes: 98 additions & 0 deletions R/pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,49 @@ sparkR.stop <- function() {
sparkR.session.stop()
}

#' (Deprecated) Initialize a new Spark Context
#'
#' This function initializes a new SparkContext.
#'
#' @param master The Spark master URL
#' @param appName Application name to register with cluster manager
#' @param sparkHome Spark Home directory
#' @param sparkEnvir Named list of environment variables to set on worker nodes
#' @param sparkExecutorEnv Named list of environment variables to be used when launching executors
#' @param sparkJars Character vector of jar files to pass to the worker nodes
#' @param sparkPackages Character vector of package coordinates
#' @seealso \link{sparkR.session}
#' @rdname sparkR.init-deprecated
#' @examples
#'\dontrun{
#' sc <- sparkR.init("local[2]", "SparkR", "/home/spark")
#' sc <- sparkR.init("local[2]", "SparkR", "/home/spark",
#' list(spark.executor.memory="1g"))
#' sc <- sparkR.init("yarn-client", "SparkR", "/home/spark",
#' list(spark.executor.memory="4g"),
#' list(LD_LIBRARY_PATH="/directory of JVM libraries (libjvm.so) on workers/"),
#' c("one.jar", "two.jar", "three.jar"),
#' c("com.databricks:spark-avro_2.11:2.0.1"))
#'}
#' @note sparkR.init since 1.4.0
sparkR.init <- function(
master = "",
appName = "SparkR",
sparkHome = Sys.getenv("SPARK_HOME"),
sparkEnvir = list(),
sparkExecutorEnv = list(),
sparkJars = "",
sparkPackages = "") {
.Deprecated("sparkR.session")
sparkR.sparkContext(master,
appName,
sparkHome,
convertNamedListToEnv(sparkEnvir),
convertNamedListToEnv(sparkExecutorEnv),
sparkJars,
sparkPackages)
}

# Internal function to handle creating the SparkContext.
sparkR.sparkContext <- function(
master = "",
Expand Down Expand Up @@ -229,6 +272,61 @@ sparkR.sparkContext <- function(
sc
}

#' (Deprecated) Initialize a new SQLContext
#'
#' This function creates a SparkContext from an existing JavaSparkContext and
#' then uses it to initialize a new SQLContext
#'
#' Starting SparkR 2.0, a SparkSession is initialized and returned instead.
#' This API is deprecated and kept for backward compatibility only.
#'
#' @param jsc The existing JavaSparkContext created with SparkR.init()
#' @seealso \link{sparkR.session}
#' @rdname sparkRSQL.init-deprecated
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#'}
#' @note sparkRSQL.init since 1.4.0
sparkRSQL.init <- function(jsc = NULL) {
.Deprecated("sparkR.session")

if (exists(".sparkRsession", envir = .sparkREnv)) {
return(get(".sparkRsession", envir = .sparkREnv))
}

# Default to without Hive support for backward compatibility.
sparkR.session(enableHiveSupport = FALSE)
}

#' (Deprecated) Initialize a new HiveContext
#'
#' This function creates a HiveContext from an existing JavaSparkContext
#'
#' Starting SparkR 2.0, a SparkSession is initialized and returned instead.
#' This API is deprecated and kept for backward compatibility only.
#'
#' @param jsc The existing JavaSparkContext created with SparkR.init()
#' @seealso \link{sparkR.session}
#' @rdname sparkRHive.init-deprecated
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRHive.init(sc)
#'}
#' @note sparkRHive.init since 1.4.0
sparkRHive.init <- function(jsc = NULL) {
.Deprecated("sparkR.session")

if (exists(".sparkRsession", envir = .sparkREnv)) {
return(get(".sparkRsession", envir = .sparkREnv))
}

# Default to without Hive support for backward compatibility.
sparkR.session(enableHiveSupport = TRUE)
}

#' Get the existing SparkSession or initialize a new SparkSession.
#'
#' SparkSession is the entry point into SparkR. \code{sparkR.session} gets the existing
Expand Down
Loading