Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
b14bfc3
[SPARK-19993][SQL] Caching logical plans containing subquery expressi…
dilipbiswal Apr 12, 2017
b938438
[MINOR][DOCS] Fix spacings in Structured Streaming Programming Guide
dongjinleekr Apr 12, 2017
bca4259
[MINOR][DOCS] JSON APIs related documentation fixes
HyukjinKwon Apr 12, 2017
044f7ec
[SPARK-20298][SPARKR][MINOR] fixed spelling mistake "charactor"
bdwyer2 Apr 12, 2017
ffc57b0
[SPARK-20302][SQL] Short circuit cast when from and to types are stru…
rxin Apr 12, 2017
2e1fd46
[SPARK-20296][TRIVIAL][DOCS] Count distinct error message for streaming
jtoka Apr 12, 2017
ceaf77a
[SPARK-18692][BUILD][DOCS] Test Java 8 unidoc build on Jenkins
HyukjinKwon Apr 12, 2017
504e62e
[SPARK-20303][SQL] Rename createTempFunction to registerFunction
gatorsmile Apr 12, 2017
5408553
[SPARK-20304][SQL] AssertNotNull should not include path in string re…
rxin Apr 12, 2017
99a9473
[SPARK-19570][PYSPARK] Allow to disable hive in pyspark shell
zjffdu Apr 12, 2017
924c424
[SPARK-20301][FLAKY-TEST] Fix Hadoop Shell.runCommand flakiness in St…
brkyvz Apr 12, 2017
a7b430b
[SPARK-15354][FLAKY-TEST] TopologyAwareBlockReplicationPolicyBehavior…
cloud-fan Apr 13, 2017
c5f1cc3
[SPARK-20131][CORE] Don't use `this` lock in StandaloneSchedulerBacke…
zsxwing Apr 13, 2017
ec68d8f
[SPARK-20189][DSTREAM] Fix spark kinesis testcases to remove deprecat…
yashs360 Apr 13, 2017
095d1cb
[SPARK-20265][MLLIB] Improve Prefix'span pre-processing efficiency
Syrux Apr 13, 2017
a4293c2
[SPARK-20284][CORE] Make {Des,S}erializationStream extend Closeable
Apr 13, 2017
fbe4216
[SPARK-20233][SQL] Apply star-join filter heuristics to dynamic progr…
ioana-delaney Apr 13, 2017
8ddf0d2
[SPARK-20232][PYTHON] Improve combineByKey docs
Apr 13, 2017
7536e28
[SPARK-20038][SQL] FileFormatWriter.ExecuteWriteTask.releaseResources…
steveloughran Apr 13, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -2818,14 +2818,14 @@ setMethod("write.df",
signature(df = "SparkDataFrame"),
function(df, path = NULL, source = NULL, mode = "error", ...) {
if (!is.null(path) && !is.character(path)) {
stop("path should be charactor, NULL or omitted.")
stop("path should be character, NULL or omitted.")
}
if (!is.null(source) && !is.character(source)) {
stop("source should be character, NULL or omitted. It is the datasource specified ",
"in 'spark.sql.sources.default' configuration by default.")
}
if (!is.character(mode)) {
stop("mode should be charactor or omitted. It is 'error' by default.")
stop("mode should be character or omitted. It is 'error' by default.")
}
if (is.null(source)) {
source <- getDefaultSqlSource()
Expand Down Expand Up @@ -3040,7 +3040,7 @@ setMethod("fillna",
signature(x = "SparkDataFrame"),
function(x, value, cols = NULL) {
if (!(class(value) %in% c("integer", "numeric", "character", "list"))) {
stop("value should be an integer, numeric, charactor or named list.")
stop("value should be an integer, numeric, character or named list.")
}

if (class(value) == "list") {
Expand All @@ -3052,7 +3052,7 @@ setMethod("fillna",
# Check each item in the named list is of valid type
lapply(value, function(v) {
if (!(class(v) %in% c("integer", "numeric", "character"))) {
stop("Each item in value should be an integer, numeric or charactor.")
stop("Each item in value should be an integer, numeric or character.")
}
})

Expand Down Expand Up @@ -3598,7 +3598,7 @@ setMethod("write.stream",
"in 'spark.sql.sources.default' configuration by default.")
}
if (!is.null(outputMode) && !is.character(outputMode)) {
stop("outputMode should be charactor or omitted.")
stop("outputMode should be character or omitted.")
}
if (is.null(source)) {
source <- getDefaultSqlSource()
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ tableToDF <- function(tableName) {
#' @note read.df since 1.4.0
read.df.default <- function(path = NULL, source = NULL, schema = NULL, na.strings = "NA", ...) {
if (!is.null(path) && !is.character(path)) {
stop("path should be charactor, NULL or omitted.")
stop("path should be character, NULL or omitted.")
}
if (!is.null(source) && !is.character(source)) {
stop("source should be character, NULL or omitted. It is the datasource specified ",
Expand Down
6 changes: 3 additions & 3 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -2926,9 +2926,9 @@ test_that("Call DataFrameWriter.save() API in Java without path and check argume
paste("source should be character, NULL or omitted. It is the datasource specified",
"in 'spark.sql.sources.default' configuration by default."))
expect_error(write.df(df, path = c(3)),
"path should be charactor, NULL or omitted.")
"path should be character, NULL or omitted.")
expect_error(write.df(df, mode = TRUE),
"mode should be charactor or omitted. It is 'error' by default.")
"mode should be character or omitted. It is 'error' by default.")
})

test_that("Call DataFrameWriter.load() API in Java without path and check argument types", {
Expand All @@ -2947,7 +2947,7 @@ test_that("Call DataFrameWriter.load() API in Java without path and check argume

# Arguments checking in R side.
expect_error(read.df(path = c(3)),
"path should be charactor, NULL or omitted.")
"path should be character, NULL or omitted.")
expect_error(read.df(jsonPath, source = c(1, 2)),
paste("source should be character, NULL or omitted. It is the datasource specified",
"in 'spark.sql.sources.default' configuration by default."))
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/rpc/RpcEndpoint.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ private[spark] trait RpcEnvFactory {
*
* The life-cycle of an endpoint is:
*
* constructor -> onStart -> receive* -> onStop
* {@code constructor -> onStart -> receive* -> onStop}
*
* Note: `receive` can be called concurrently. If you want `receive` to be thread-safe, please use
* [[ThreadSafeRpcEndpoint]]
Expand Down Expand Up @@ -63,16 +63,16 @@ private[spark] trait RpcEndpoint {
}

/**
* Process messages from [[RpcEndpointRef.send]] or [[RpcCallContext.reply)]]. If receiving a
* unmatched message, [[SparkException]] will be thrown and sent to `onError`.
* Process messages from `RpcEndpointRef.send` or `RpcCallContext.reply`. If receiving a
* unmatched message, `SparkException` will be thrown and sent to `onError`.
*/
def receive: PartialFunction[Any, Unit] = {
case _ => throw new SparkException(self + " does not implement 'receive'")
}

/**
* Process messages from [[RpcEndpointRef.ask]]. If receiving a unmatched message,
* [[SparkException]] will be thrown and sent to `onError`.
* Process messages from `RpcEndpointRef.ask`. If receiving a unmatched message,
* `SparkException` will be thrown and sent to `onError`.
*/
def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case _ => context.sendFailure(new SparkException(self + " won't reply anything"))
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.util.{ThreadUtils, Utils}

/**
* An exception thrown if RpcTimeout modifies a [[TimeoutException]].
* An exception thrown if RpcTimeout modifies a `TimeoutException`.
*/
private[rpc] class RpcTimeoutException(message: String, cause: TimeoutException)
extends TimeoutException(message) { initCause(cause) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ class DAGScheduler(
* @param resultHandler callback to pass each result to
* @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
*
* @throws Exception when the job fails
* @note Throws `Exception` when the job fails
*/
def runJob[T, U](
rdd: RDD[T],
Expand Down Expand Up @@ -644,7 +644,7 @@ class DAGScheduler(
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @param evaluator [[ApproximateEvaluator]] to receive the partial results
* @param evaluator `ApproximateEvaluator` to receive the partial results
* @param callSite where in the user program this job was called
* @param timeout maximum time to wait for the job, in milliseconds
* @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ private[spark] trait ExternalClusterManager {

/**
* Create a scheduler backend for the given SparkContext and scheduler. This is
* called after task scheduler is created using [[ExternalClusterManager.createTaskScheduler()]].
* called after task scheduler is created using `ExternalClusterManager.createTaskScheduler()`.
* @param sc SparkContext
* @param masterURL the master URL
* @param scheduler TaskScheduler that will be used with the scheduler backend.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils}

/**
* Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
* It can also work with a local setup by using a [[LocalSchedulerBackend]] and setting
* It can also work with a local setup by using a `LocalSchedulerBackend` and setting
* isLocal to true. It handles common logic, like determining a scheduling order across jobs, waking
* up to launch speculative tasks, etc.
*
Expand Down Expand Up @@ -704,12 +704,12 @@ private[spark] object TaskSchedulerImpl {
* Used to balance containers across hosts.
*
* Accepts a map of hosts to resource offers for that host, and returns a prioritized list of
* resource offers representing the order in which the offers should be used. The resource
* resource offers representing the order in which the offers should be used. The resource
* offers are ordered such that we'll allocate one container on each host before allocating a
* second container on any host, and so on, in order to reduce the damage if a host fails.
*
* For example, given <h1, [o1, o2, o3]>, <h2, [o4]>, <h1, [o5, o6]>, returns
* [o1, o5, o4, 02, o6, o3]
* For example, given {@literal <h1, [o1, o2, o3]>}, {@literal <h2, [o4]>} and
* {@literal <h3, [o5, o6]>}, returns {@literal [o1, o5, o4, o2, o6, o3]}.
*/
def prioritizeContainers[K, T] (map: HashMap[K, ArrayBuffer[T]]): List[T] = {
val _keyList = new ArrayBuffer[K](map.size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.scheduler.cluster

import java.util.concurrent.Semaphore
import java.util.concurrent.atomic.AtomicBoolean

import scala.concurrent.Future

Expand All @@ -42,7 +43,7 @@ private[spark] class StandaloneSchedulerBackend(
with Logging {

private var client: StandaloneAppClient = null
private var stopping = false
private val stopping = new AtomicBoolean(false)
private val launcherBackend = new LauncherBackend() {
override protected def onStopRequest(): Unit = stop(SparkAppHandle.State.KILLED)
}
Expand Down Expand Up @@ -112,7 +113,7 @@ private[spark] class StandaloneSchedulerBackend(
launcherBackend.setState(SparkAppHandle.State.RUNNING)
}

override def stop(): Unit = synchronized {
override def stop(): Unit = {
stop(SparkAppHandle.State.FINISHED)
}

Expand All @@ -125,14 +126,14 @@ private[spark] class StandaloneSchedulerBackend(

override def disconnected() {
notifyContext()
if (!stopping) {
if (!stopping.get) {
logWarning("Disconnected from Spark cluster! Waiting for reconnection...")
}
}

override def dead(reason: String) {
notifyContext()
if (!stopping) {
if (!stopping.get) {
launcherBackend.setState(SparkAppHandle.State.KILLED)
logError("Application has been killed. Reason: " + reason)
try {
Expand Down Expand Up @@ -206,20 +207,20 @@ private[spark] class StandaloneSchedulerBackend(
registrationBarrier.release()
}

private def stop(finalState: SparkAppHandle.State): Unit = synchronized {
try {
stopping = true

super.stop()
client.stop()
private def stop(finalState: SparkAppHandle.State): Unit = {
if (stopping.compareAndSet(false, true)) {
try {
super.stop()
client.stop()

val callback = shutdownCallback
if (callback != null) {
callback(this)
val callback = shutdownCallback
if (callback != null) {
callback(this)
}
} finally {
launcherBackend.setState(finalState)
launcherBackend.close()
}
} finally {
launcherBackend.setState(finalState)
launcherBackend.close()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,15 @@ abstract class SerializerInstance {
* A stream for writing serialized objects.
*/
@DeveloperApi
abstract class SerializationStream {
abstract class SerializationStream extends Closeable {
/** The most general-purpose method to write an object. */
def writeObject[T: ClassTag](t: T): SerializationStream
/** Writes the object representing the key of a key-value pair. */
def writeKey[T: ClassTag](key: T): SerializationStream = writeObject(key)
/** Writes the object representing the value of a key-value pair. */
def writeValue[T: ClassTag](value: T): SerializationStream = writeObject(value)
def flush(): Unit
def close(): Unit
override def close(): Unit

def writeAll[T: ClassTag](iter: Iterator[T]): SerializationStream = {
while (iter.hasNext) {
Expand All @@ -149,14 +149,14 @@ abstract class SerializationStream {
* A stream for reading serialized objects.
*/
@DeveloperApi
abstract class DeserializationStream {
abstract class DeserializationStream extends Closeable {
/** The most general-purpose method to read an object. */
def readObject[T: ClassTag](): T
/** Reads the object representing the key of a key-value pair. */
def readKey[T: ClassTag](): T = readObject[T]()
/** Reads the object representing the value of a key-value pair. */
def readValue[T: ClassTag](): T = readObject[T]()
def close(): Unit
override def close(): Unit

/**
* Read the elements of this stream through an iterator. This can only be called once, as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ private[spark] trait BlockData {
/**
* Returns a Netty-friendly wrapper for the block's data.
*
* @see [[ManagedBuffer#convertToNetty()]]
* Please see `ManagedBuffer.convertToNetty()` for more details.
*/
def toNetty(): Object

Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ private[spark] object AccumulatorSuite {
import InternalAccumulator._

/**
* Create a long accumulator and register it to [[AccumulatorContext]].
* Create a long accumulator and register it to `AccumulatorContext`.
*/
def createLongAccum(
name: String,
Expand All @@ -258,7 +258,7 @@ private[spark] object AccumulatorSuite {
}

/**
* Make an [[AccumulableInfo]] out of an [[Accumulable]] with the intent to use the
* Make an `AccumulableInfo` out of an [[Accumulable]] with the intent to use the
* info as an accumulator update.
*/
def makeInfo(a: AccumulatorV2[_, _]): AccumulableInfo = a.toInfo(Some(a.value), None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.network.shuffle.{ExternalShuffleBlockHandler, ExternalSh
/**
* This suite creates an external shuffle server and routes all shuffle fetches through it.
* Note that failures in this suite may arise due to changes in Spark that invalidate expectations
* set up in [[ExternalShuffleBlockHandler]], such as changing the format of shuffle files or how
* set up in `ExternalShuffleBlockHandler`, such as changing the format of shuffle files or how
* we hash files into folders.
*/
class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.scalatest.BeforeAndAfterAll
import org.scalatest.BeforeAndAfterEach
import org.scalatest.Suite

/** Manages a local `sc` {@link SparkContext} variable, correctly stopping it after each test. */
/** Manages a local `sc` `SparkContext` variable, correctly stopping it after each test. */
trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self: Suite =>

@transient var sc: SparkContext = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,12 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
}

/**
* A map from partition -> results for all tasks of a job when you call this test framework's
* A map from partition to results for all tasks of a job when you call this test framework's
* [[submit]] method. Two important considerations:
*
* 1. If there is a job failure, results may or may not be empty. If any tasks succeed before
* the job has failed, they will get included in `results`. Instead, check for job failure by
* checking [[failure]]. (Also see [[assertDataStructuresEmpty()]])
* checking [[failure]]. (Also see `assertDataStructuresEmpty()`)
*
* 2. This only gets cleared between tests. So you'll need to do special handling if you submit
* more than one job in one test.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.serializer.KryoTest.RegistratorWithoutAutoReset
/**
* Tests to ensure that [[Serializer]] implementations obey the API contracts for methods that
* describe properties of the serialized stream, such as
* [[Serializer.supportsRelocationOfSerializedObjects]].
* `Serializer.supportsRelocationOfSerializedObjects`.
*/
class SerializerPropertiesSuite extends SparkFunSuite {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,18 @@ class RandomBlockReplicationPolicyBehavior extends SparkFunSuite
}
}

/**
* Returns a sequence of [[BlockManagerId]], whose rack is randomly picked from the given `racks`.
* Note that, each rack will be picked at least once from `racks`, if `count` is greater or equal
* to the number of `racks`.
*/
protected def generateBlockManagerIds(count: Int, racks: Seq[String]): Seq[BlockManagerId] = {
(1 to count).map{i =>
BlockManagerId(s"Exec-$i", s"Host-$i", 10000 + i, Some(racks(Random.nextInt(racks.size))))
val randomizedRacks: Seq[String] = Random.shuffle(
racks ++ racks.length.until(count).map(_ => racks(Random.nextInt(racks.length)))
)

(0 until count).map { i =>
BlockManagerId(s"Exec-$i", s"Host-$i", 10000 + i, Some(randomizedRacks(i)))
}
}
}
Expand Down
15 changes: 15 additions & 0 deletions dev/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,19 @@ def build_spark_sbt(hadoop_version):
exec_sbt(profiles_and_goals)


def build_spark_unidoc_sbt(hadoop_version):
set_title_and_block("Building Unidoc API Documentation", "BLOCK_DOCUMENTATION")
# Enable all of the profiles for the build:
build_profiles = get_hadoop_profiles(hadoop_version) + modules.root.build_profile_flags
sbt_goals = ["unidoc"]
profiles_and_goals = build_profiles + sbt_goals

print("[info] Building Spark unidoc (w/Hive 1.2.1) using SBT with these arguments: ",
" ".join(profiles_and_goals))

exec_sbt(profiles_and_goals)


def build_spark_assembly_sbt(hadoop_version):
# Enable all of the profiles for the build:
build_profiles = get_hadoop_profiles(hadoop_version) + modules.root.build_profile_flags
Expand All @@ -352,6 +365,8 @@ def build_spark_assembly_sbt(hadoop_version):
print("[info] Building Spark assembly (w/Hive 1.2.1) using SBT with these arguments: ",
" ".join(profiles_and_goals))
exec_sbt(profiles_and_goals)
# Make sure that Java and Scala API documentation can be generated
build_spark_unidoc_sbt(hadoop_version)


def build_apache_spark(build_tool, hadoop_version):
Expand Down
Loading