Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ mockLinesMapType <- c("{\"name\":\"Bob\",\"info\":{\"age\":16,\"height\":176.5}}
mapTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
writeLines(mockLinesMapType, mapTypeJsonPath)

if (.Platform$OS.type == "windows") {
Sys.setenv(TZ = "GMT")
}

test_that("calling sparkRSQL.init returns existing SQL context", {
skip_on_cran()

Expand Down
36 changes: 18 additions & 18 deletions R/pkg/vignettes/sparkr-vignettes.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ We can view the first few rows of the `SparkDataFrame` by `head` or `showDF` fun
head(carsDF)
```

Common data processing operations such as `filter`, `select` are supported on the `SparkDataFrame`.
Common data processing operations such as `filter` and `select` are supported on the `SparkDataFrame`.
```{r}
carsSubDF <- select(carsDF, "model", "mpg", "hp")
carsSubDF <- filter(carsSubDF, carsSubDF$hp >= 200)
Expand Down Expand Up @@ -379,7 +379,7 @@ out <- dapply(carsSubDF, function(x) { x <- cbind(x, x$mpg * 1.61) }, schema)
head(collect(out))
```

Like `dapply`, apply a function to each partition of a `SparkDataFrame` and collect the result back. The output of function should be a `data.frame`, but no schema is required in this case. Note that `dapplyCollect` can fail if the output of UDF run on all the partition cannot be pulled to the driver and fit in driver memory.
Like `dapply`, `dapplyCollect` can apply a function to each partition of a `SparkDataFrame` and collect the result back. The output of the function should be a `data.frame`, but no schema is required in this case. Note that `dapplyCollect` can fail if the output of the UDF on all partitions cannot be pulled into the driver's memory.

```{r}
out <- dapplyCollect(
Expand All @@ -405,7 +405,7 @@ result <- gapply(
head(arrange(result, "max_mpg", decreasing = TRUE))
```

Like gapply, `gapplyCollect` applies a function to each partition of a `SparkDataFrame` and collect the result back to R `data.frame`. The output of the function should be a `data.frame` but no schema is required in this case. Note that `gapplyCollect` can fail if the output of UDF run on all the partition cannot be pulled to the driver and fit in driver memory.
Like `gapply`, `gapplyCollect` can apply a function to each partition of a `SparkDataFrame` and collect the result back to R `data.frame`. The output of the function should be a `data.frame` but no schema is required in this case. Note that `gapplyCollect` can fail if the output of the UDF on all partitions cannot be pulled into the driver's memory.

```{r}
result <- gapplyCollect(
Expand Down Expand Up @@ -458,20 +458,20 @@ options(ops)


### SQL Queries
A `SparkDataFrame` can also be registered as a temporary view in Spark SQL and that allows you to run SQL queries over its data. The sql function enables applications to run SQL queries programmatically and returns the result as a `SparkDataFrame`.
A `SparkDataFrame` can also be registered as a temporary view in Spark SQL so that one can run SQL queries over its data. The sql function enables applications to run SQL queries programmatically and returns the result as a `SparkDataFrame`.

```{r}
people <- read.df(paste0(sparkR.conf("spark.home"),
"/examples/src/main/resources/people.json"), "json")
```

Register this SparkDataFrame as a temporary view.
Register this `SparkDataFrame` as a temporary view.

```{r}
createOrReplaceTempView(people, "people")
```

SQL statements can be run by using the sql method.
SQL statements can be run using the sql method.
```{r}
teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
head(teenagers)
Expand Down Expand Up @@ -780,7 +780,7 @@ head(predict(isoregModel, newDF))
`spark.gbt` fits a [gradient-boosted tree](https://en.wikipedia.org/wiki/Gradient_boosting) classification or regression model on a `SparkDataFrame`.
Users can call `summary` to get a summary of the fitted model, `predict` to make predictions, and `write.ml`/`read.ml` to save/load fitted models.

Similar to the random forest example above, we use the `longley` dataset to train a gradient-boosted tree and make predictions:
We use the `longley` dataset to train a gradient-boosted tree and make predictions:

```{r, warning=FALSE}
df <- createDataFrame(longley)
Expand Down Expand Up @@ -820,7 +820,7 @@ head(select(fitted, "Class", "prediction"))

`spark.gaussianMixture` fits multivariate [Gaussian Mixture Model](https://en.wikipedia.org/wiki/Mixture_model#Multivariate_Gaussian_mixture_model) (GMM) against a `SparkDataFrame`. [Expectation-Maximization](https://en.wikipedia.org/wiki/Expectation%E2%80%93maximization_algorithm) (EM) is used to approximate the maximum likelihood estimator (MLE) of the model.

We use a simulated example to demostrate the usage.
We use a simulated example to demonstrate the usage.
```{r}
X1 <- data.frame(V1 = rnorm(4), V2 = rnorm(4))
X2 <- data.frame(V1 = rnorm(6, 3), V2 = rnorm(6, 4))
Expand Down Expand Up @@ -851,9 +851,9 @@ head(select(kmeansPredictions, "model", "mpg", "hp", "wt", "prediction"), n = 20

* Topics and documents both exist in a feature space, where feature vectors are vectors of word counts (bag of words).

* Rather than estimating a clustering using a traditional distance, LDA uses a function based on a statistical model of how text documents are generated.
* Rather than clustering using a traditional distance, LDA uses a function based on a statistical model of how text documents are generated.

To use LDA, we need to specify a `features` column in `data` where each entry represents a document. There are two type options for the column:
To use LDA, we need to specify a `features` column in `data` where each entry represents a document. There are two options for the column:

* character string: This can be a string of the whole document. It will be parsed automatically. Additional stop words can be added in `customizedStopWords`.

Expand Down Expand Up @@ -901,7 +901,7 @@ perplexity

`spark.als` learns latent factors in [collaborative filtering](https://en.wikipedia.org/wiki/Recommender_system#Collaborative_filtering) via [alternating least squares](http://dl.acm.org/citation.cfm?id=1608614).

There are multiple options that can be configured in `spark.als`, including `rank`, `reg`, `nonnegative`. For a complete list, refer to the help file.
There are multiple options that can be configured in `spark.als`, including `rank`, `reg`, and `nonnegative`. For a complete list, refer to the help file.

```{r, eval=FALSE}
ratings <- list(list(0, 0, 4.0), list(0, 1, 2.0), list(1, 1, 3.0), list(1, 2, 4.0),
Expand Down Expand Up @@ -981,7 +981,7 @@ testSummary


### Model Persistence
The following example shows how to save/load an ML model by SparkR.
The following example shows how to save/load an ML model in SparkR.
```{r}
t <- as.data.frame(Titanic)
training <- createDataFrame(t)
Expand Down Expand Up @@ -1079,19 +1079,19 @@ There are three main object classes in SparkR you may be working with.
+ `sdf` stores a reference to the corresponding Spark Dataset in the Spark JVM backend.
+ `env` saves the meta-information of the object such as `isCached`.

It can be created by data import methods or by transforming an existing `SparkDataFrame`. We can manipulate `SparkDataFrame` by numerous data processing functions and feed that into machine learning algorithms.
It can be created by data import methods or by transforming an existing `SparkDataFrame`. We can manipulate `SparkDataFrame` by numerous data processing functions and feed that into machine learning algorithms.

* `Column`: an S4 class representing column of `SparkDataFrame`. The slot `jc` saves a reference to the corresponding Column object in the Spark JVM backend.
* `Column`: an S4 class representing a column of `SparkDataFrame`. The slot `jc` saves a reference to the corresponding `Column` object in the Spark JVM backend.

It can be obtained from a `SparkDataFrame` by `$` operator, `df$col`. More often, it is used together with other functions, for example, with `select` to select particular columns, with `filter` and constructed conditions to select rows, with aggregation functions to compute aggregate statistics for each group.
It can be obtained from a `SparkDataFrame` by `$` operator, e.g., `df$col`. More often, it is used together with other functions, for example, with `select` to select particular columns, with `filter` and constructed conditions to select rows, with aggregation functions to compute aggregate statistics for each group.

* `GroupedData`: an S4 class representing grouped data created by `groupBy` or by transforming other `GroupedData`. Its `sgd` slot saves a reference to a RelationalGroupedDataset object in the backend.
* `GroupedData`: an S4 class representing grouped data created by `groupBy` or by transforming other `GroupedData`. Its `sgd` slot saves a reference to a `RelationalGroupedDataset` object in the backend.

This is often an intermediate object with group information and followed up by aggregation operations.
This is often an intermediate object with group information and followed up by aggregation operations.

### Architecture

A complete description of architecture can be seen in reference, in particular the paper *SparkR: Scaling R Programs with Spark*.
A complete description of architecture can be seen in the references, in particular the paper *SparkR: Scaling R Programs with Spark*.

Under the hood of SparkR is Spark SQL engine. This avoids the overheads of running interpreted R code, and the optimized SQL execution engine in Spark uses structural information about data and computation flow to perform a bunch of optimizations to speed up the computation.

Expand Down
1 change: 0 additions & 1 deletion conf/spark-env.sh.template
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

# Options read in YARN client mode
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
# - SPARK_EXECUTOR_INSTANCES, Number of executors to start (Default: 2)
# - SPARK_EXECUTOR_CORES, Number of cores for the executors (Default: 1).
# - SPARK_EXECUTOR_MEMORY, Memory per Executor (e.g. 1000M, 2G) (Default: 1G)
# - SPARK_DRIVER_MEMORY, Memory for Driver (e.g. 1000M, 2G) (Default: 1G)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,7 @@ private[spark] class PythonAccumulatorV2(
private val serverPort: Int)
extends CollectionAccumulator[Array[Byte]] {

Utils.checkHost(serverHost, "Expected hostname")
Utils.checkHost(serverHost)

val bufferSize = SparkEnv.get.conf.getInt("spark.buffer.size", 65536)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ private[deploy] object DeployMessages {
memory: Int,
workerWebUiUrl: String)
extends DeployMessage {
Utils.checkHost(host, "Required hostname")
Utils.checkHost(host)
assert (port > 0)
}

Expand Down Expand Up @@ -131,7 +131,7 @@ private[deploy] object DeployMessages {

// TODO(matei): replace hostPort with host
case class ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) {
Utils.checkHostPort(hostPort, "Required hostport")
Utils.checkHostPort(hostPort)
}

case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String],
Expand Down Expand Up @@ -183,7 +183,7 @@ private[deploy] object DeployMessages {
completedDrivers: Array[DriverInfo],
status: MasterState) {

Utils.checkHost(host, "Required hostname")
Utils.checkHost(host)
assert (port > 0)

def uri: String = "spark://" + host + ":" + port
Expand All @@ -201,7 +201,7 @@ private[deploy] object DeployMessages {
drivers: List[DriverRunner], finishedDrivers: List[DriverRunner], masterUrl: String,
cores: Int, memory: Int, coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String) {

Utils.checkHost(host, "Required hostname")
Utils.checkHost(host)
assert (port > 0)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private[deploy] class Master(
private val waitingDrivers = new ArrayBuffer[DriverInfo]
private var nextDriverNumber = 0

Utils.checkHost(address.host, "Expected hostname")
Utils.checkHost(address.host)

private val masterMetricsSystem = MetricsSystem.createMetricsSystem("master", conf, securityMgr)
private val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications", conf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ private[master] class MasterArguments(args: Array[String], conf: SparkConf) exte
@tailrec
private def parse(args: List[String]): Unit = args match {
case ("--ip" | "-i") :: value :: tail =>
Utils.checkHost(value, "ip no longer supported, please use hostname " + value)
Utils.checkHost(value)
host = value
parse(tail)

case ("--host" | "-h") :: value :: tail =>
Utils.checkHost(value, "Please use hostname " + value)
Utils.checkHost(value)
host = value
parse(tail)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private[spark] class WorkerInfo(
val webUiAddress: String)
extends Serializable {

Utils.checkHost(host, "Expected hostname")
Utils.checkHost(host)
assert (port > 0)

@transient var executors: mutable.HashMap[String, ExecutorDesc] = _ // executorId => info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private[deploy] class Worker(
private val host = rpcEnv.address.host
private val port = rpcEnv.address.port

Utils.checkHost(host, "Expected hostname")
Utils.checkHost(host)
assert (port > 0)

// A scheduled executor used to send messages at the specified time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) {
@tailrec
private def parse(args: List[String]): Unit = args match {
case ("--ip" | "-i") :: value :: tail =>
Utils.checkHost(value, "ip no longer supported, please use hostname " + value)
Utils.checkHost(value)
host = value
parse(tail)

case ("--host" | "-h") :: value :: tail =>
Utils.checkHost(value, "Please use hostname " + value)
Utils.checkHost(value)
host = value
parse(tail)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ private[spark] class Executor(
private val conf = env.conf

// No ip or host:port - just hostname
Utils.checkHost(executorHostname, "Expected executed slave to be a hostname")
Utils.checkHost(executorHostname)
// must not have port specified.
assert (0 == Utils.parseHostPort(executorHostname)._2)

Expand Down
11 changes: 9 additions & 2 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -612,12 +612,19 @@ private[spark] class BlockManager(

/**
* Return a list of locations for the given block, prioritizing the local machine since
* multiple block managers can share the same host.
* multiple block managers can share the same host, followed by hosts on the same rack.
*/
private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
val locs = Random.shuffle(master.getLocations(blockId))
val (preferredLocs, otherLocs) = locs.partition { loc => blockManagerId.host == loc.host }
preferredLocs ++ otherLocs
blockManagerId.topologyInfo match {
case None => preferredLocs ++ otherLocs
case Some(_) =>
val (sameRackLocs, differentRackLocs) = otherLocs.partition {
loc => blockManagerId.topologyInfo == loc.topologyInfo
}
preferredLocs ++ sameRackLocs ++ differentRackLocs
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class BlockManagerId private (
def executorId: String = executorId_

if (null != host_) {
Utils.checkHost(host_, "Expected hostname")
Utils.checkHost(host_)
assert (port_ > 0)
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/util/RpcUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ private[spark] object RpcUtils {
def makeDriverRef(name: String, conf: SparkConf, rpcEnv: RpcEnv): RpcEndpointRef = {
val driverHost: String = conf.get("spark.driver.host", "localhost")
val driverPort: Int = conf.getInt("spark.driver.port", 7077)
Utils.checkHost(driverHost, "Expected hostname")
Utils.checkHost(driverHost)
rpcEnv.setupEndpointRef(RpcAddress(driverHost, driverPort), name)
}

Expand Down
9 changes: 5 additions & 4 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -937,12 +937,13 @@ private[spark] object Utils extends Logging {
customHostname.getOrElse(InetAddresses.toUriString(localIpAddress))
}

def checkHost(host: String, message: String = "") {
assert(host.indexOf(':') == -1, message)
def checkHost(host: String) {
assert(host != null && host.indexOf(':') == -1, s"Expected hostname (not IP) but got $host")
}

def checkHostPort(hostPort: String, message: String = "") {
assert(hostPort.indexOf(':') != -1, message)
def checkHostPort(hostPort: String) {
assert(hostPort != null && hostPort.indexOf(':') != -1,
s"Expected host and port but got $hostPort")
}

// Typically, this will be of order of number of nodes in cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,8 +496,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(list2DiskGet.get.readMethod === DataReadMethod.Disk)
}

test("optimize a location order of blocks") {
val localHost = Utils.localHostName()
test("optimize a location order of blocks without topology information") {
val localHost = "localhost"
val otherHost = "otherHost"
val bmMaster = mock(classOf[BlockManagerMaster])
val bmId1 = BlockManagerId("id1", localHost, 1)
Expand All @@ -508,7 +508,32 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val blockManager = makeBlockManager(128, "exec", bmMaster)
val getLocations = PrivateMethod[Seq[BlockManagerId]]('getLocations)
val locations = blockManager invokePrivate getLocations(BroadcastBlockId(0))
assert(locations.map(_.host).toSet === Set(localHost, localHost, otherHost))
assert(locations.map(_.host) === Seq(localHost, localHost, otherHost))
}

test("optimize a location order of blocks with topology information") {
val localHost = "localhost"
val otherHost = "otherHost"
val localRack = "localRack"
val otherRack = "otherRack"

val bmMaster = mock(classOf[BlockManagerMaster])
val bmId1 = BlockManagerId("id1", localHost, 1, Some(localRack))
val bmId2 = BlockManagerId("id2", localHost, 2, Some(localRack))
val bmId3 = BlockManagerId("id3", otherHost, 3, Some(otherRack))
val bmId4 = BlockManagerId("id4", otherHost, 4, Some(otherRack))
val bmId5 = BlockManagerId("id5", otherHost, 5, Some(localRack))
when(bmMaster.getLocations(mc.any[BlockId]))
.thenReturn(Seq(bmId1, bmId2, bmId5, bmId3, bmId4))

val blockManager = makeBlockManager(128, "exec", bmMaster)
blockManager.blockManagerId =
BlockManagerId(SparkContext.DRIVER_IDENTIFIER, localHost, 1, Some(localRack))
val getLocations = PrivateMethod[Seq[BlockManagerId]]('getLocations)
val locations = blockManager invokePrivate getLocations(BroadcastBlockId(0))
assert(locations.map(_.host) === Seq(localHost, localHost, otherHost, otherHost, otherHost))
assert(locations.flatMap(_.topologyInfo)
=== Seq(localRack, localRack, localRack, otherRack, otherRack))
}

test("SPARK-9591: getRemoteBytes from another location when Exception throw") {
Expand Down
Loading