Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion R/pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ sparkR.sparkContext <- function(
#' sparkR.session("yarn-client", "SparkR", "/home/spark",
#' list(spark.executor.memory="4g"),
#' c("one.jar", "two.jar", "three.jar"),
#' c("com.databricks:spark-avro_2.11:2.0.1"))
#' c("com.databricks:spark-avro_2.12:2.0.1"))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@felixcheung is it OK to refer to _2.12 artifacts here? I don't think this one actually exists, but is it just an example?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's even a separate discussion about even using this as an example, since that package is now in Spark since 2.4.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@felixcheung was the conclusion that we can make this a dummy package? I just want to avoid showing _2.11 usage here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, dummy name is completely fine with me.

#' sparkR.session(spark.master = "yarn-client", spark.executor.memory = "4g")
#'}
#' @note sparkR.session since 2.0.0
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/tests/fulltests/test_client.R
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ test_that("multiple packages don't produce a warning", {

test_that("sparkJars sparkPackages as character vectors", {
args <- generateSparkSubmitArgs("", "", c("one.jar", "two.jar", "three.jar"), "",
c("com.databricks:spark-avro_2.11:2.0.1"))
c("com.databricks:spark-avro_2.12:2.0.1"))
expect_match(args, "--jars one.jar,two.jar,three.jar")
expect_match(args, "--packages com.databricks:spark-avro_2.11:2.0.1")
expect_match(args, "--packages com.databricks:spark-avro_2.12:2.0.1")
})
2 changes: 1 addition & 1 deletion R/pkg/vignettes/sparkr-vignettes.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ SparkR supports operating on a variety of data sources through the `SparkDataFra
The general method for creating `SparkDataFrame` from data sources is `read.df`. This method takes in the path for the file to load and the type of data source, and the currently active Spark Session will be used automatically. SparkR supports reading CSV, JSON and Parquet files natively and through Spark Packages you can find data source connectors for popular file formats like Avro. These packages can be added with `sparkPackages` parameter when initializing SparkSession using `sparkR.session`.

```{r, eval=FALSE}
sparkR.session(sparkPackages = "com.databricks:spark-avro_2.11:3.0.0")
sparkR.session(sparkPackages = "com.databricks:spark-avro_2.12:3.0.0")
```

We can see how to use data sources using an example CSV input file. For more information please refer to SparkR [read.df](https://spark.apache.org/docs/latest/api/R/read.df.html) API documentation.
Expand Down
49 changes: 27 additions & 22 deletions bin/load-spark-env.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -21,37 +21,42 @@ rem This script loads spark-env.cmd if it exists, and ensures it is only loaded
rem spark-env.cmd is loaded from SPARK_CONF_DIR if set, or within the current directory's
rem conf\ subdirectory.

set SPARK_ENV_CMD=spark-env.cmd
if [%SPARK_ENV_LOADED%] == [] (
set SPARK_ENV_LOADED=1

if [%SPARK_CONF_DIR%] == [] (
set SPARK_CONF_DIR=%~dp0..\conf
)

call :LoadSparkEnv
set SPARK_ENV_CMD=%SPARK_CONF_DIR%\%SPARK_ENV_CMD%
if exist %SPARK_ENV_CMD% (
call %SPARK_ENV_CMD%
)
)

rem Setting SPARK_SCALA_VERSION if not already set.

set ASSEMBLY_DIR2="%SPARK_HOME%\assembly\target\scala-2.11"
set ASSEMBLY_DIR1="%SPARK_HOME%\assembly\target\scala-2.12"

if [%SPARK_SCALA_VERSION%] == [] (

if exist %ASSEMBLY_DIR2% if exist %ASSEMBLY_DIR1% (
echo "Presence of build for multiple Scala versions detected."
echo "Either clean one of them or, set SPARK_SCALA_VERSION in spark-env.cmd."
exit 1
)
if exist %ASSEMBLY_DIR2% (
set SPARK_SCALA_VERSION=2.11
) else (
set SPARK_SCALA_VERSION=2.12
)
)
rem TODO: revisit for Scala 2.13 support
set SPARK_SCALA_VERSION=2.12
Copy link
Member

@gengliangwang gengliangwang Nov 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not familiar with .cmd script. Should we keep the quote here, "2.12"?

Copy link
Member

@HyukjinKwon HyukjinKwon Nov 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, the string became as is including quotes if it's quoted on Windows ... haha odd (to me).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, so we shouldn't add quotes to values like SPARK_ENV_CMD above, but use them in if conditions, call, etc?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I think we shouldn't quote if I remember this correctly. Let me test and get back to you today or tomorrow. I'll have to fly to Korea for my one week vacation from tomorrow :D.

Copy link
Member

@HyukjinKwon HyukjinKwon Nov 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@HyukjinKwon HyukjinKwon Nov 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For call, it's a bit different (it can be quoted IIRC)(https://ss64.com/nt/call.html)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, I ran some of simple commands:

C:\>set A=aa

C:\>ECHO %A%
aa

C:\>set A="aa"

C:\>ECHO %A%
"aa"

C:\>call "python.exe"
Python 3.6.4 (v3.6.4:d48eceb, Dec 19 2017, 06:54:40) [MSC v.1900 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> exit(0)

C:\>call python.exe
Python 3.6.4 (v3.6.4:d48eceb, Dec 19 2017, 06:54:40) [MSC v.1900 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> exit(0)

rem if [%SPARK_SCALA_VERSION%] == [] (
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gengliangwang this was the update I was talking about to the .cmd script. You can follow up with this change, uncommented, if you like, separately from this PR.

rem set SCALA_VERSION_1=2.12
rem set SCALA_VERSION_2=2.11
rem
rem set ASSEMBLY_DIR1=%SPARK_HOME%\assembly\target\scala-%SCALA_VERSION_1%
rem set ASSEMBLY_DIR2=%SPARK_HOME%\assembly\target\scala-%SCALA_VERSION_2%
rem set ENV_VARIABLE_DOC=https://spark.apache.org/docs/latest/configuration.html#environment-variables
rem if exist %ASSEMBLY_DIR2% if exist %ASSEMBLY_DIR1% (
rem echo "Presence of build for multiple Scala versions detected (%ASSEMBLY_DIR1% and %ASSEMBLY_DIR2%)."
rem echo "Remove one of them or, set SPARK_SCALA_VERSION=%SCALA_VERSION_1% in %SPARK_ENV_CMD%."
rem echo "Visit %ENV_VARIABLE_DOC% for more details about setting environment variables in spark-env.cmd."
rem echo "Either clean one of them or, set SPARK_SCALA_VERSION in spark-env.cmd."
rem exit 1
rem )
rem if exist %ASSEMBLY_DIR1% (
rem set SPARK_SCALA_VERSION=%SCALA_VERSION_1%
rem ) else (
rem set SPARK_SCALA_VERSION=%SCALA_VERSION_2%
rem )
rem )
exit /b 0

:LoadSparkEnv
if exist "%SPARK_CONF_DIR%\spark-env.cmd" (
call "%SPARK_CONF_DIR%\spark-env.cmd"
)
42 changes: 22 additions & 20 deletions bin/load-spark-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,25 @@ fi

# Setting SPARK_SCALA_VERSION if not already set.

if [ -z "$SPARK_SCALA_VERSION" ]; then
SCALA_VERSION_1=2.12
SCALA_VERSION_2=2.11

ASSEMBLY_DIR_1="${SPARK_HOME}/assembly/target/scala-${SCALA_VERSION_1}"
ASSEMBLY_DIR_2="${SPARK_HOME}/assembly/target/scala-${SCALA_VERSION_2}"
ENV_VARIABLE_DOC="https://spark.apache.org/docs/latest/configuration.html#environment-variables"
if [[ -d "$ASSEMBLY_DIR_1" && -d "$ASSEMBLY_DIR_2" ]]; then
echo "Presence of build for multiple Scala versions detected ($ASSEMBLY_DIR_1 and $ASSEMBLY_DIR_2)." 1>&2
echo "Remove one of them or, export SPARK_SCALA_VERSION=$SCALA_VERSION_1 in ${SPARK_ENV_SH}." 1>&2
echo "Visit ${ENV_VARIABLE_DOC} for more details about setting environment variables in spark-env.sh." 1>&2
exit 1
fi

if [[ -d "$ASSEMBLY_DIR_1" ]]; then
export SPARK_SCALA_VERSION=${SCALA_VERSION_1}
else
export SPARK_SCALA_VERSION=${SCALA_VERSION_2}
fi
fi
# TODO: revisit for Scala 2.13 support
export SPARK_SCALA_VERSION=2.12
#if [ -z "$SPARK_SCALA_VERSION" ]; then
# SCALA_VERSION_1=2.12
# SCALA_VERSION_2=2.11
#
# ASSEMBLY_DIR_1="${SPARK_HOME}/assembly/target/scala-${SCALA_VERSION_1}"
# ASSEMBLY_DIR_2="${SPARK_HOME}/assembly/target/scala-${SCALA_VERSION_2}"
# ENV_VARIABLE_DOC="https://spark.apache.org/docs/latest/configuration.html#environment-variables"
# if [[ -d "$ASSEMBLY_DIR_1" && -d "$ASSEMBLY_DIR_2" ]]; then
# echo "Presence of build for multiple Scala versions detected ($ASSEMBLY_DIR_1 and $ASSEMBLY_DIR_2)." 1>&2
# echo "Remove one of them or, export SPARK_SCALA_VERSION=$SCALA_VERSION_1 in ${SPARK_ENV_SH}." 1>&2
# echo "Visit ${ENV_VARIABLE_DOC} for more details about setting environment variables in spark-env.sh." 1>&2
# exit 1
# fi
#
# if [[ -d "$ASSEMBLY_DIR_1" ]]; then
# export SPARK_SCALA_VERSION=${SCALA_VERSION_1}
# else
# export SPARK_SCALA_VERSION=${SCALA_VERSION_2}
# fi
#fi
63 changes: 4 additions & 59 deletions core/src/main/scala/org/apache/spark/FutureAction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,6 @@ trait FutureAction[T] extends Future[T] {
*/
override def value: Option[Try[T]]

// These two methods must be implemented in Scala 2.12. They're implemented as a no-op here
// and then filled in with a real implementation in the two subclasses below. The no-op exists
// here so that those implementations can declare "override", necessary in 2.12, while working
// in 2.11, where the method doesn't exist in the superclass.
// After 2.11 support goes away, remove these two:

def transform[S](f: (Try[T]) => Try[S])(implicit executor: ExecutionContext): Future[S] =
throw new UnsupportedOperationException()

def transformWith[S](f: (Try[T]) => Future[S])(implicit executor: ExecutionContext): Future[S] =
throw new UnsupportedOperationException()

/**
* Blocks and returns the result of this job.
*/
Expand All @@ -117,43 +105,6 @@ trait FutureAction[T] extends Future[T] {

}

/**
* Scala 2.12 defines the two new transform/transformWith methods mentioned above. Impementing
* these for 2.12 in the Spark class here requires delegating to these same methods in an
* underlying Future object. But that only exists in 2.12. But these methods are only called
* in 2.12. So define helper shims to access these methods on a Future by reflection.
*/
private[spark] object FutureAction {

private val transformTryMethod =
try {
classOf[Future[_]].getMethod("transform", classOf[(_) => _], classOf[ExecutionContext])
} catch {
case _: NoSuchMethodException => null // Would fail later in 2.11, but not called in 2.11
}

private val transformWithTryMethod =
try {
classOf[Future[_]].getMethod("transformWith", classOf[(_) => _], classOf[ExecutionContext])
} catch {
case _: NoSuchMethodException => null // Would fail later in 2.11, but not called in 2.11
}

private[spark] def transform[T, S](
future: Future[T],
f: (Try[T]) => Try[S],
executor: ExecutionContext): Future[S] =
transformTryMethod.invoke(future, f, executor).asInstanceOf[Future[S]]

private[spark] def transformWith[T, S](
future: Future[T],
f: (Try[T]) => Future[S],
executor: ExecutionContext): Future[S] =
transformWithTryMethod.invoke(future, f, executor).asInstanceOf[Future[S]]

}


/**
* A [[FutureAction]] holding the result of an action that triggers a single job. Examples include
* count, collect, reduce.
Expand Down Expand Up @@ -195,16 +146,10 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
def jobIds: Seq[Int] = Seq(jobWaiter.jobId)

override def transform[S](f: (Try[T]) => Try[S])(implicit e: ExecutionContext): Future[S] =
FutureAction.transform(
jobWaiter.completionFuture,
(u: Try[Unit]) => f(u.map(_ => resultFunc)),
e)
jobWaiter.completionFuture.transform((u: Try[Unit]) => f(u.map(_ => resultFunc)))

override def transformWith[S](f: (Try[T]) => Future[S])(implicit e: ExecutionContext): Future[S] =
FutureAction.transformWith(
jobWaiter.completionFuture,
(u: Try[Unit]) => f(u.map(_ => resultFunc)),
e)
jobWaiter.completionFuture.transformWith((u: Try[Unit]) => f(u.map(_ => resultFunc)))
}


Expand Down Expand Up @@ -299,10 +244,10 @@ class ComplexFutureAction[T](run : JobSubmitter => Future[T])
def jobIds: Seq[Int] = subActions.flatMap(_.jobIds)

override def transform[S](f: (Try[T]) => Try[S])(implicit e: ExecutionContext): Future[S] =
FutureAction.transform(p.future, f, e)
p.future.transform(f)

override def transformWith[S](f: (Try[T]) => Future[S])(implicit e: ExecutionContext): Future[S] =
FutureAction.transformWith(p.future, f, e)
p.future.transformWith(f)
}


Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@ import java.io.{IOException, ObjectOutputStream}

import scala.collection.mutable.ArrayBuffer
import scala.collection.parallel.ForkJoinTaskSupport
import scala.concurrent.forkjoin.ForkJoinPool
import scala.reflect.ClassTag

import org.apache.spark.{Dependency, Partition, RangeDependency, SparkContext, TaskContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD
import org.apache.spark.util.Utils
import org.apache.spark.util.{ThreadUtils, Utils}

/**
* Partition for UnionRDD.
Expand Down Expand Up @@ -61,7 +60,7 @@ private[spark] class UnionPartition[T: ClassTag](

object UnionRDD {
private[spark] lazy val partitionEvalTaskSupport =
new ForkJoinTaskSupport(new ForkJoinPool(8))
new ForkJoinTaskSupport(ThreadUtils.newForkJoinPool("partition-eval-task-support", 8))
}

@DeveloperApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ import org.apache.spark.internal.Logging
*/
private[spark] object ClosureCleaner extends Logging {

private val isScala2_11 = scala.util.Properties.versionString.contains("2.11")

// Get an ASM class reader for a given class from the JAR that loaded it
private[util] def getClassReader(cls: Class[_]): ClassReader = {
// Copy data over, before delegating to ClassReader - else we can run out of open file handles.
Expand Down Expand Up @@ -168,9 +166,6 @@ private[spark] object ClosureCleaner extends Logging {
* @param closure the closure to check.
*/
private def getSerializedLambda(closure: AnyRef): Option[SerializedLambda] = {
if (isScala2_11) {
return None
}
val isClosureCandidate =
closure.getClass.isSynthetic &&
closure
Expand Down
13 changes: 6 additions & 7 deletions core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import scala.language.higherKinds
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
import scala.concurrent.{Awaitable, ExecutionContext, ExecutionContextExecutor, Future}
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.concurrent.forkjoin.{ForkJoinPool => SForkJoinPool, ForkJoinWorkerThread => SForkJoinWorkerThread}
import scala.util.control.NonFatal

import org.apache.spark.SparkException
Expand Down Expand Up @@ -181,17 +180,17 @@ private[spark] object ThreadUtils {
}

/**
* Construct a new Scala ForkJoinPool with a specified max parallelism and name prefix.
* Construct a new ForkJoinPool with a specified max parallelism and name prefix.
*/
def newForkJoinPool(prefix: String, maxThreadNumber: Int): SForkJoinPool = {
def newForkJoinPool(prefix: String, maxThreadNumber: Int): ForkJoinPool = {
// Custom factory to set thread names
val factory = new SForkJoinPool.ForkJoinWorkerThreadFactory {
override def newThread(pool: SForkJoinPool) =
new SForkJoinWorkerThread(pool) {
val factory = new ForkJoinPool.ForkJoinWorkerThreadFactory {
override def newThread(pool: ForkJoinPool) =
new ForkJoinWorkerThread(pool) {
setName(prefix + "-" + super.getName)
}
}
new SForkJoinPool(maxThreadNumber, factory,
new ForkJoinPool(maxThreadNumber, factory,
null, // handler
false // asyncMode
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {

test("add dependencies works correctly") {
val md = SparkSubmitUtils.getModuleDescriptor
val artifacts = SparkSubmitUtils.extractMavenCoordinates("com.databricks:spark-csv_2.11:0.1," +
"com.databricks:spark-avro_2.11:0.1")
val artifacts = SparkSubmitUtils.extractMavenCoordinates("com.databricks:spark-csv_2.12:0.1," +
"com.databricks:spark-avro_2.12:0.1")

SparkSubmitUtils.addDependenciesToIvy(md, artifacts, "default")
assert(md.getDependencies.length === 2)
Expand Down Expand Up @@ -189,15 +189,15 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {

test("neglects Spark and Spark's dependencies") {
val coordinates = SparkSubmitUtils.IVY_DEFAULT_EXCLUDES
.map(comp => s"org.apache.spark:spark-${comp}2.11:2.1.1")
.map(comp => s"org.apache.spark:spark-${comp}2.12:2.4.0")
.mkString(",") + ",org.apache.spark:spark-core_fake:1.2.0"

val path = SparkSubmitUtils.resolveMavenCoordinates(
coordinates,
SparkSubmitUtils.buildIvySettings(None, None),
isTest = true)
assert(path === "", "should return empty path")
val main = MavenCoordinate("org.apache.spark", "spark-streaming-kafka-assembly_2.11", "1.2.0")
val main = MavenCoordinate("org.apache.spark", "spark-streaming-kafka-assembly_2.12", "1.2.0")
IvyTestUtils.withRepository(main, None, None) { repo =>
val files = SparkSubmitUtils.resolveMavenCoordinates(
coordinates + "," + main.toString,
Expand Down
Loading