Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,9 @@ export("as.DataFrame",
"read.parquet",
"read.text",
"spark.lapply",
"spark.addFile",
"spark.getSparkFilesRootDirectory",
"spark.getSparkFiles",
"sql",
"str",
"tableToDF",
Expand Down
48 changes: 48 additions & 0 deletions R/pkg/R/context.R
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,54 @@ setCheckpointDir <- function(sc, dirName) {
invisible(callJMethod(sc, "setCheckpointDir", suppressWarnings(normalizePath(dirName))))
}

#' Add a file or directory to be downloaded with this Spark job on every node.
#'
#' The path passed can be either a local file, a file in HDFS (or other Hadoop-supported
#' filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
#' use spark.getSparkFiles(fileName) to find its download location.
#'
#' @rdname spark.addFile
#' @param path The path of the file to be added
#' @export
#' @examples
#'\dontrun{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add @export

#' spark.addFile("~/myfile")
#'}
#' @note spark.addFile since 2.1.0
spark.addFile <- function(path) {
sc <- getSparkContext()
invisible(callJMethod(sc, "addFile", suppressWarnings(normalizePath(path))))
}

#' Get the root directory that contains files added through spark.addFile.
#'
#' @rdname spark.getSparkFilesRootDirectory
#' @return the root directory that contains files added through spark.addFile
#' @export
#' @examples
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add @export

#'\dontrun{
#' spark.getSparkFilesRootDirectory()
#'}
#' @note spark.getSparkFilesRootDirectory since 2.1.0
spark.getSparkFilesRootDirectory <- function() {
callJStatic("org.apache.spark.SparkFiles", "getRootDirectory")
}

#' Get the absolute path of a file added through spark.addFile.
#'
#' @rdname spark.getSparkFiles
#' @param fileName The name of the file added through spark.addFile
#' @return the absolute path of a file added through spark.addFile.
#' @export
#' @examples
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add @export

#'\dontrun{
#' spark.getSparkFiles("myfile")
#'}
#' @note spark.getSparkFiles since 2.1.0
spark.getSparkFiles <- function(fileName) {
callJStatic("org.apache.spark.SparkFiles", "get", as.character(fileName))
}

#' Run a function over a list of elements, distributing the computations with Spark
#'
#' Run a function over a list of elements, distributing the computations with Spark. Applies a
Expand Down
13 changes: 13 additions & 0 deletions R/pkg/inst/tests/testthat/test_context.R
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,16 @@ test_that("spark.lapply should perform simple transforms", {
expect_equal(doubled, as.list(2 * 1:10))
sparkR.session.stop()
})

test_that("add and get file to be downloaded with Spark job on every node", {
sparkR.sparkContext()
path <- tempfile(pattern = "hello", fileext = ".txt")
filename <- basename(path)
words <- "Hello World!"
writeLines(words, path)
spark.addFile(path)
download_path <- spark.getSparkFiles(filename)
expect_equal(readLines(download_path), words)
unlink(path)
sparkR.session.stop()
})
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1426,7 +1426,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* supported for Hadoop-supported filesystems.
*/
def addFile(path: String, recursive: Boolean): Unit = {
val uri = new URI(path)
val uri = new Path(path).toUri
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should there be some tests we can add for this change?

Copy link
Member

@HyukjinKwon HyukjinKwon Sep 20, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do understand your concern @felixcheung. However, IMHO, it'd be okay to not test Hadoop library within Spark. I will try to find some tests/documentation related with Windows path in Hadoop and then will share to make sure.

FWIW, this case was verified by one of comitters before for Windows path. So, it'd be okay.

Copy link
Member

@HyukjinKwon HyukjinKwon Sep 20, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we could alternatively use Utils.resolveURI as well which is already being tested within Spark. However, this util seems not hadling C:/a/b/c case (not C:\a\b\c) which we should fix. So I suggested Path (...).toUri instead but if you feel strongly about this, we could use that. I will try to find and share doc and tests for Path as I get in my home though.

Copy link
Member

@HyukjinKwon HyukjinKwon Sep 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @HyukjinKwon . Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok thanks!

val schemeCorrectedPath = uri.getScheme match {
case null | "local" => new File(path).getCanonicalFile.toURI.toString
case _ => path
Copy link
Member

@HyukjinKwon HyukjinKwon Sep 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Utils.fetchFile(path, ...) below, it seems we can't pass path as it is because new URI(path) is called internally which fails to be parsed in case of Windows path.

Could I please ask to change this to uri.toString? It'd work fine as far as I know.

import java.net.URI
import org.apache.hadoop.fs.Path

scala> val a = new Path("C:\\a\\b\\c").toUri
a: java.net.URI = /C:/a/b/c

scala> val b = new URI(a.toString)
b: java.net.URI = /C:/a/b/c

scala> val a = new Path("C:/a/b/c").toUri
a: java.net.URI = /C:/a/b/c

scala> val b = new URI(a.toString)
b: java.net.URI = /C:/a/b/c

scala> val a = new Path("/a/b/c").toUri
a: java.net.URI = /a/b/c

scala> val b = new URI(a.toString)
b: java.net.URI = /a/b/c

scala> val a = new Path("file:///a/b/c").toUri
a: java.net.URI = file:///a/b/c

scala> val b = new URI(a.toString)
b: java.net.URI = file:///a/b/c

scala> val a = new Path("http://localhost/a/b/c").toUri
a: java.net.URI = http://localhost/a/b/c

scala> val b = new URI(a.toString)
b: java.net.URI = http://localhost/a/b/c

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just in case, I ran the tests after manually fixing this. Maybe we can wait for the result - https://ci.appveyor.com/project/HyukjinKwon/spark/build/108-pr-15131-path

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yanboliang Yeap, it passes the tests at least.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, thanks!

Expand Down Expand Up @@ -1457,8 +1457,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
logInfo(s"Added file $path at $key with timestamp $timestamp")
// Fetch the file locally so that closures which are run on the driver can still use the
// SparkFiles API to access files.
Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager,
hadoopConfiguration, timestamp, useCache = false)
Utils.fetchFile(uri.toString, new File(SparkFiles.getRootDirectory()), conf,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should there be some tests we can add for this change?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my peraonal opinion, I thought it's okay (I thought about this for a while) because Utils.fetchFile wants the first argument as url not path. So, this might be a valid correction without tests because we fixed the argument to what the function initially wants. But no strong opinion.

env.securityManager, hadoopConfiguration, timestamp, useCache = false)
postEnvironmentUpdate()
}
}
Expand Down