Skip to content

Commit 5e2940d

Browse files
rebase
2 parents 083760a + 47bf406 commit 5e2940d

File tree

189 files changed

+8663
-1639
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

189 files changed

+8663
-1639
lines changed

LICENSE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -814,6 +814,7 @@ BSD-style licenses
814814
The following components are provided under a BSD-style license. See project link for details.
815815

816816
(BSD 3 Clause) core (com.github.fommil.netlib:core:1.1.2 - https://github.com/fommil/netlib-java/core)
817+
(BSD 3 Clause) JPMML-Model (org.jpmml:pmml-model:1.1.15 - https://github.com/jpmml/jpmml-model)
817818
(BSD 3-clause style license) jblas (org.jblas:jblas:1.2.3 - http://jblas.org/)
818819
(BSD License) AntLR Parser Generator (antlr:antlr:2.7.7 - http://www.antlr.org/)
819820
(BSD License) Javolution (javolution:javolution:5.5.1 - http://javolution.org)

core/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,11 @@
9595
<artifactId>spark-network-shuffle_${scala.binary.version}</artifactId>
9696
<version>${project.version}</version>
9797
</dependency>
98+
<dependency>
99+
<groupId>org.apache.spark</groupId>
100+
<artifactId>spark-unsafe_${scala.binary.version}</artifactId>
101+
<version>${project.version}</version>
102+
</dependency>
98103
<dependency>
99104
<groupId>net.java.dev.jets3t</groupId>
100105
<artifactId>jets3t</artifactId>
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.api.java.function;
19+
20+
import java.io.Serializable;
21+
22+
/**
23+
* A zero-argument function that returns an R.
24+
*/
25+
public interface Function0<R> extends Serializable {
26+
public R call() throws Exception;
27+
}

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
106106
*/
107107
protected def askTracker[T: ClassTag](message: Any): T = {
108108
try {
109-
trackerEndpoint.askWithReply[T](message)
109+
trackerEndpoint.askWithRetry[T](message)
110110
} catch {
111111
case e: Exception =>
112112
logError("Error communicating with MapOutputTracker", e)

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -555,7 +555,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
555555
SparkEnv.executorActorSystemName,
556556
RpcAddress(host, port),
557557
ExecutorEndpoint.EXECUTOR_ENDPOINT_NAME)
558-
Some(endpointRef.askWithReply[Array[ThreadStackTrace]](TriggerThreadDump))
558+
Some(endpointRef.askWithRetry[Array[ThreadStackTrace]](TriggerThreadDump))
559559
}
560560
} catch {
561561
case e: Exception =>
@@ -713,7 +713,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
713713
RDD[(String, String)] = {
714714
assertNotStopped()
715715
val job = new NewHadoopJob(hadoopConfiguration)
716-
NewFileInputFormat.addInputPath(job, new Path(path))
716+
// Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking
717+
// comma separated files as input. (see SPARK-7155)
718+
NewFileInputFormat.setInputPaths(job, path)
717719
val updateConf = job.getConfiguration
718720
new WholeTextFileRDD(
719721
this,
@@ -759,7 +761,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
759761
RDD[(String, PortableDataStream)] = {
760762
assertNotStopped()
761763
val job = new NewHadoopJob(hadoopConfiguration)
762-
NewFileInputFormat.addInputPath(job, new Path(path))
764+
// Use setInputPaths so that binaryFiles aligns with hadoopFile/textFile in taking
765+
// comma separated files as input. (see SPARK-7155)
766+
NewFileInputFormat.setInputPaths(job, path)
763767
val updateConf = job.getConfiguration
764768
new BinaryFileRDD(
765769
this,
@@ -935,7 +939,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
935939
// The call to new NewHadoopJob automatically adds security credentials to conf,
936940
// so we don't need to explicitly add them ourselves
937941
val job = new NewHadoopJob(conf)
938-
NewFileInputFormat.addInputPath(job, new Path(path))
942+
// Use setInputPaths so that newAPIHadoopFile aligns with hadoopFile/textFile in taking
943+
// comma separated files as input. (see SPARK-7155)
944+
NewFileInputFormat.setInputPaths(job, path)
939945
val updatedConf = job.getConfiguration
940946
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf).setName(path)
941947
}

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinato
4040
import org.apache.spark.serializer.Serializer
4141
import org.apache.spark.shuffle.{ShuffleMemoryManager, ShuffleManager}
4242
import org.apache.spark.storage._
43+
import org.apache.spark.unsafe.memory.{ExecutorMemoryManager, MemoryAllocator}
4344
import org.apache.spark.util.{RpcUtils, Utils}
4445

4546
/**
@@ -69,6 +70,7 @@ class SparkEnv (
6970
val sparkFilesDir: String,
7071
val metricsSystem: MetricsSystem,
7172
val shuffleMemoryManager: ShuffleMemoryManager,
73+
val executorMemoryManager: ExecutorMemoryManager,
7274
val outputCommitCoordinator: OutputCommitCoordinator,
7375
val conf: SparkConf) extends Logging {
7476

@@ -382,6 +384,15 @@ object SparkEnv extends Logging {
382384
new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
383385
outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)
384386

387+
val executorMemoryManager: ExecutorMemoryManager = {
388+
val allocator = if (conf.getBoolean("spark.unsafe.offHeap", false)) {
389+
MemoryAllocator.UNSAFE
390+
} else {
391+
MemoryAllocator.HEAP
392+
}
393+
new ExecutorMemoryManager(allocator)
394+
}
395+
385396
val envInstance = new SparkEnv(
386397
executorId,
387398
rpcEnv,
@@ -398,6 +409,7 @@ object SparkEnv extends Logging {
398409
sparkFilesDir,
399410
metricsSystem,
400411
shuffleMemoryManager,
412+
executorMemoryManager,
401413
outputCommitCoordinator,
402414
conf)
403415

core/src/main/scala/org/apache/spark/TaskContext.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.io.Serializable
2121

2222
import org.apache.spark.annotation.DeveloperApi
2323
import org.apache.spark.executor.TaskMetrics
24+
import org.apache.spark.unsafe.memory.TaskMemoryManager
2425
import org.apache.spark.util.TaskCompletionListener
2526

2627

@@ -133,4 +134,9 @@ abstract class TaskContext extends Serializable {
133134
/** ::DeveloperApi:: */
134135
@DeveloperApi
135136
def taskMetrics(): TaskMetrics
137+
138+
/**
139+
* Returns the manager for this task's managed memory.
140+
*/
141+
private[spark] def taskMemoryManager(): TaskMemoryManager
136142
}

core/src/main/scala/org/apache/spark/TaskContextImpl.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark
1919

2020
import org.apache.spark.executor.TaskMetrics
21+
import org.apache.spark.unsafe.memory.TaskMemoryManager
2122
import org.apache.spark.util.{TaskCompletionListener, TaskCompletionListenerException}
2223

2324
import scala.collection.mutable.ArrayBuffer
@@ -27,6 +28,7 @@ private[spark] class TaskContextImpl(
2728
val partitionId: Int,
2829
override val taskAttemptId: Long,
2930
override val attemptNumber: Int,
31+
override val taskMemoryManager: TaskMemoryManager,
3032
val runningLocally: Boolean = false,
3133
val taskMetrics: TaskMetrics = TaskMetrics.empty)
3234
extends TaskContext

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -734,13 +734,31 @@ private[deploy] object SparkSubmitUtils {
734734
/**
735735
* Extracts maven coordinates from a comma-delimited string
736736
* @param remoteRepos Comma-delimited string of remote repositories
737+
* @param ivySettings The Ivy settings for this session
737738
* @return A ChainResolver used by Ivy to search for and resolve dependencies.
738739
*/
739-
def createRepoResolvers(remoteRepos: Option[String]): ChainResolver = {
740+
def createRepoResolvers(remoteRepos: Option[String], ivySettings: IvySettings): ChainResolver = {
740741
// We need a chain resolver if we want to check multiple repositories
741742
val cr = new ChainResolver
742743
cr.setName("list")
743744

745+
val localM2 = new IBiblioResolver
746+
localM2.setM2compatible(true)
747+
val m2Path = ".m2" + File.separator + "repository" + File.separator
748+
localM2.setRoot(new File(System.getProperty("user.home"), m2Path).toURI.toString)
749+
localM2.setUsepoms(true)
750+
localM2.setName("local-m2-cache")
751+
cr.add(localM2)
752+
753+
val localIvy = new IBiblioResolver
754+
localIvy.setRoot(new File(ivySettings.getDefaultIvyUserDir,
755+
"local" + File.separator).toURI.toString)
756+
val ivyPattern = Seq("[organisation]", "[module]", "[revision]", "[type]s",
757+
"[artifact](-[classifier]).[ext]").mkString(File.separator)
758+
localIvy.setPattern(ivyPattern)
759+
localIvy.setName("local-ivy-cache")
760+
cr.add(localIvy)
761+
744762
// the biblio resolver resolves POM declared dependencies
745763
val br: IBiblioResolver = new IBiblioResolver
746764
br.setM2compatible(true)
@@ -773,8 +791,7 @@ private[deploy] object SparkSubmitUtils {
773791

774792
/**
775793
* Output a comma-delimited list of paths for the downloaded jars to be added to the classpath
776-
* (will append to jars in SparkSubmit). The name of the jar is given
777-
* after a '!' by Ivy. It also sometimes contains '(bundle)' after '.jar'. Remove that as well.
794+
* (will append to jars in SparkSubmit).
778795
* @param artifacts Sequence of dependencies that were resolved and retrieved
779796
* @param cacheDirectory directory where jars are cached
780797
* @return a comma-delimited list of paths for the dependencies
@@ -783,10 +800,9 @@ private[deploy] object SparkSubmitUtils {
783800
artifacts: Array[AnyRef],
784801
cacheDirectory: File): String = {
785802
artifacts.map { artifactInfo =>
786-
val artifactString = artifactInfo.toString
787-
val jarName = artifactString.drop(artifactString.lastIndexOf("!") + 1)
803+
val artifact = artifactInfo.asInstanceOf[Artifact].getModuleRevisionId
788804
cacheDirectory.getAbsolutePath + File.separator +
789-
jarName.substring(0, jarName.lastIndexOf(".jar") + 4)
805+
s"${artifact.getOrganisation}_${artifact.getName}-${artifact.getRevision}.jar"
790806
}.mkString(",")
791807
}
792808

@@ -868,6 +884,7 @@ private[deploy] object SparkSubmitUtils {
868884
if (alternateIvyCache.trim.isEmpty) {
869885
new File(ivySettings.getDefaultIvyUserDir, "jars")
870886
} else {
887+
ivySettings.setDefaultIvyUserDir(new File(alternateIvyCache))
871888
ivySettings.setDefaultCache(new File(alternateIvyCache, "cache"))
872889
new File(alternateIvyCache, "jars")
873890
}
@@ -877,7 +894,7 @@ private[deploy] object SparkSubmitUtils {
877894
// create a pattern matcher
878895
ivySettings.addMatcher(new GlobPatternMatcher)
879896
// create the dependency resolvers
880-
val repoResolver = createRepoResolvers(remoteRepos)
897+
val repoResolver = createRepoResolvers(remoteRepos, ivySettings)
881898
ivySettings.addResolver(repoResolver)
882899
ivySettings.setDefaultResolver(repoResolver.getName)
883900

@@ -911,7 +928,8 @@ private[deploy] object SparkSubmitUtils {
911928
}
912929
// retrieve all resolved dependencies
913930
ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId,
914-
packagesDirectory.getAbsolutePath + File.separator + "[artifact](-[classifier]).[ext]",
931+
packagesDirectory.getAbsolutePath + File.separator +
932+
"[organization]_[artifact]-[revision].[ext]",
915933
retrieveOptions.setConfs(Array(ivyConfName)))
916934
System.setOut(sysOut)
917935
resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ private[spark] class CoarseGrainedExecutorBackend(
5757
logInfo("Connecting to driver: " + driverUrl)
5858
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
5959
driver = Some(ref)
60-
ref.sendWithReply[RegisteredExecutor.type](
60+
ref.ask[RegisteredExecutor.type](
6161
RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls))
6262
} onComplete {
6363
case Success(msg) => Utils.tryLogNonFatalError {
@@ -154,7 +154,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
154154
executorConf,
155155
new SecurityManager(executorConf))
156156
val driver = fetcher.setupEndpointRefByURI(driverUrl)
157-
val props = driver.askWithReply[Seq[(String, String)]](RetrieveSparkProps) ++
157+
val props = driver.askWithRetry[Seq[(String, String)]](RetrieveSparkProps) ++
158158
Seq[(String, String)](("spark.app.id", appId))
159159
fetcher.shutdown()
160160

0 commit comments

Comments
 (0)