Skip to content

Commit 11fe67d

Browse files
committed
Fixed conflict
2 parents 79ac03d + c24aeb6 commit 11fe67d

File tree

251 files changed

+11529
-2302
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

251 files changed

+11529
-2302
lines changed

LICENSE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -814,6 +814,7 @@ BSD-style licenses
814814
The following components are provided under a BSD-style license. See project link for details.
815815

816816
(BSD 3 Clause) core (com.github.fommil.netlib:core:1.1.2 - https://github.com/fommil/netlib-java/core)
817+
(BSD 3 Clause) JPMML-Model (org.jpmml:pmml-model:1.1.15 - https://github.com/jpmml/jpmml-model)
817818
(BSD 3-clause style license) jblas (org.jblas:jblas:1.2.3 - http://jblas.org/)
818819
(BSD License) AntLR Parser Generator (antlr:antlr:2.7.7 - http://www.antlr.org/)
819820
(BSD License) Javolution (javolution:javolution:5.5.1 - http://javolution.org)

core/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,11 @@
9595
<artifactId>spark-network-shuffle_${scala.binary.version}</artifactId>
9696
<version>${project.version}</version>
9797
</dependency>
98+
<dependency>
99+
<groupId>org.apache.spark</groupId>
100+
<artifactId>spark-unsafe_${scala.binary.version}</artifactId>
101+
<version>${project.version}</version>
102+
</dependency>
98103
<dependency>
99104
<groupId>net.java.dev.jets3t</groupId>
100105
<artifactId>jets3t</artifactId>

core/src/main/scala/org/apache/spark/storage/TachyonFileSegment.scala renamed to core/src/main/java/org/apache/spark/api/java/function/Function0.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,13 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.storage
18+
package org.apache.spark.api.java.function;
1919

20-
import tachyon.client.TachyonFile
20+
import java.io.Serializable;
2121

2222
/**
23-
* References a particular segment of a file (potentially the entire file), based off an offset and
24-
* a length.
23+
* A zero-argument function that returns an R.
2524
*/
26-
private[spark] class TachyonFileSegment(val file: TachyonFile, val offset: Long, val length: Long) {
27-
override def toString: String = {
28-
"(name=%s, offset=%d, length=%d)".format(file.getPath(), offset, length)
29-
}
25+
public interface Function0<R> extends Serializable {
26+
public R call() throws Exception;
3027
}

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
106106
*/
107107
protected def askTracker[T: ClassTag](message: Any): T = {
108108
try {
109-
trackerEndpoint.askWithReply[T](message)
109+
trackerEndpoint.askWithRetry[T](message)
110110
} catch {
111111
case e: Exception =>
112112
logError("Error communicating with MapOutputTracker", e)

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -247,9 +247,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
247247
private[spark] def eventLogDir: Option[URI] = _eventLogDir
248248
private[spark] def eventLogCodec: Option[String] = _eventLogCodec
249249

250-
// Generate the random name for a temp folder in Tachyon
250+
// Generate the random name for a temp folder in external block store.
251251
// Add a timestamp as the suffix here to make it more safe
252-
val tachyonFolderName = "spark-" + randomUUID.toString()
252+
val externalBlockStoreFolderName = "spark-" + randomUUID.toString()
253+
@deprecated("Use externalBlockStoreFolderName instead.", "1.4.0")
254+
val tachyonFolderName = externalBlockStoreFolderName
253255

254256
def isLocal: Boolean = (master == "local" || master.startsWith("local["))
255257

@@ -386,7 +388,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
386388
}
387389
}
388390

389-
_conf.set("spark.tachyonStore.folderName", tachyonFolderName)
391+
_conf.set("spark.externalBlockStore.folderName", externalBlockStoreFolderName)
390392

391393
if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
392394

@@ -555,7 +557,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
555557
SparkEnv.executorActorSystemName,
556558
RpcAddress(host, port),
557559
ExecutorEndpoint.EXECUTOR_ENDPOINT_NAME)
558-
Some(endpointRef.askWithReply[Array[ThreadStackTrace]](TriggerThreadDump))
560+
Some(endpointRef.askWithRetry[Array[ThreadStackTrace]](TriggerThreadDump))
559561
}
560562
} catch {
561563
case e: Exception =>
@@ -713,7 +715,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
713715
RDD[(String, String)] = {
714716
assertNotStopped()
715717
val job = new NewHadoopJob(hadoopConfiguration)
716-
NewFileInputFormat.addInputPath(job, new Path(path))
718+
// Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking
719+
// comma separated files as input. (see SPARK-7155)
720+
NewFileInputFormat.setInputPaths(job, path)
717721
val updateConf = job.getConfiguration
718722
new WholeTextFileRDD(
719723
this,
@@ -759,7 +763,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
759763
RDD[(String, PortableDataStream)] = {
760764
assertNotStopped()
761765
val job = new NewHadoopJob(hadoopConfiguration)
762-
NewFileInputFormat.addInputPath(job, new Path(path))
766+
// Use setInputPaths so that binaryFiles aligns with hadoopFile/textFile in taking
767+
// comma separated files as input. (see SPARK-7155)
768+
NewFileInputFormat.setInputPaths(job, path)
763769
val updateConf = job.getConfiguration
764770
new BinaryFileRDD(
765771
this,
@@ -935,7 +941,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
935941
// The call to new NewHadoopJob automatically adds security credentials to conf,
936942
// so we don't need to explicitly add them ourselves
937943
val job = new NewHadoopJob(conf)
938-
NewFileInputFormat.addInputPath(job, new Path(path))
944+
// Use setInputPaths so that newAPIHadoopFile aligns with hadoopFile/textFile in taking
945+
// comma separated files as input. (see SPARK-7155)
946+
NewFileInputFormat.setInputPaths(job, path)
939947
val updatedConf = job.getConfiguration
940948
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf).setName(path)
941949
}

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinato
4040
import org.apache.spark.serializer.Serializer
4141
import org.apache.spark.shuffle.{ShuffleMemoryManager, ShuffleManager}
4242
import org.apache.spark.storage._
43+
import org.apache.spark.unsafe.memory.{ExecutorMemoryManager, MemoryAllocator}
4344
import org.apache.spark.util.{RpcUtils, Utils}
4445

4546
/**
@@ -69,6 +70,7 @@ class SparkEnv (
6970
val sparkFilesDir: String,
7071
val metricsSystem: MetricsSystem,
7172
val shuffleMemoryManager: ShuffleMemoryManager,
73+
val executorMemoryManager: ExecutorMemoryManager,
7274
val outputCommitCoordinator: OutputCommitCoordinator,
7375
val conf: SparkConf) extends Logging {
7476

@@ -382,6 +384,15 @@ object SparkEnv extends Logging {
382384
new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
383385
outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)
384386

387+
val executorMemoryManager: ExecutorMemoryManager = {
388+
val allocator = if (conf.getBoolean("spark.unsafe.offHeap", false)) {
389+
MemoryAllocator.UNSAFE
390+
} else {
391+
MemoryAllocator.HEAP
392+
}
393+
new ExecutorMemoryManager(allocator)
394+
}
395+
385396
val envInstance = new SparkEnv(
386397
executorId,
387398
rpcEnv,
@@ -398,6 +409,7 @@ object SparkEnv extends Logging {
398409
sparkFilesDir,
399410
metricsSystem,
400411
shuffleMemoryManager,
412+
executorMemoryManager,
401413
outputCommitCoordinator,
402414
conf)
403415

core/src/main/scala/org/apache/spark/TaskContext.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.io.Serializable
2121

2222
import org.apache.spark.annotation.DeveloperApi
2323
import org.apache.spark.executor.TaskMetrics
24+
import org.apache.spark.unsafe.memory.TaskMemoryManager
2425
import org.apache.spark.util.TaskCompletionListener
2526

2627

@@ -133,4 +134,9 @@ abstract class TaskContext extends Serializable {
133134
/** ::DeveloperApi:: */
134135
@DeveloperApi
135136
def taskMetrics(): TaskMetrics
137+
138+
/**
139+
* Returns the manager for this task's managed memory.
140+
*/
141+
private[spark] def taskMemoryManager(): TaskMemoryManager
136142
}

core/src/main/scala/org/apache/spark/TaskContextImpl.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark
1919

2020
import org.apache.spark.executor.TaskMetrics
21+
import org.apache.spark.unsafe.memory.TaskMemoryManager
2122
import org.apache.spark.util.{TaskCompletionListener, TaskCompletionListenerException}
2223

2324
import scala.collection.mutable.ArrayBuffer
@@ -27,6 +28,7 @@ private[spark] class TaskContextImpl(
2728
val partitionId: Int,
2829
override val taskAttemptId: Long,
2930
override val attemptNumber: Int,
31+
override val taskMemoryManager: TaskMemoryManager,
3032
val runningLocally: Boolean = false,
3133
val taskMetrics: TaskMetrics = TaskMetrics.empty)
3234
extends TaskContext

core/src/main/scala/org/apache/spark/TestUtils.scala

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -105,23 +105,18 @@ private[spark] object TestUtils {
105105
URI.create(s"string:///${name.replace(".", "/")}${SOURCE.extension}")
106106
}
107107

108-
private class JavaSourceFromString(val name: String, val code: String)
108+
private[spark] class JavaSourceFromString(val name: String, val code: String)
109109
extends SimpleJavaFileObject(createURI(name), SOURCE) {
110110
override def getCharContent(ignoreEncodingErrors: Boolean): String = code
111111
}
112112

113-
/** Creates a compiled class with the given name. Class file will be placed in destDir. */
113+
/** Creates a compiled class with the source file. Class file will be placed in destDir. */
114114
def createCompiledClass(
115115
className: String,
116116
destDir: File,
117-
toStringValue: String = "",
118-
baseClass: String = null,
119-
classpathUrls: Seq[URL] = Seq()): File = {
117+
sourceFile: JavaSourceFromString,
118+
classpathUrls: Seq[URL]): File = {
120119
val compiler = ToolProvider.getSystemJavaCompiler
121-
val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("")
122-
val sourceFile = new JavaSourceFromString(className,
123-
"public class " + className + extendsText + " implements java.io.Serializable {" +
124-
" @Override public String toString() { return \"" + toStringValue + "\"; }}")
125120

126121
// Calling this outputs a class file in pwd. It's easier to just rename the file than
127122
// build a custom FileManager that controls the output location.
@@ -144,4 +139,18 @@ private[spark] object TestUtils {
144139
assert(out.exists(), "Destination file not moved: " + out.getAbsolutePath())
145140
out
146141
}
142+
143+
/** Creates a compiled class with the given name. Class file will be placed in destDir. */
144+
def createCompiledClass(
145+
className: String,
146+
destDir: File,
147+
toStringValue: String = "",
148+
baseClass: String = null,
149+
classpathUrls: Seq[URL] = Seq()): File = {
150+
val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("")
151+
val sourceFile = new JavaSourceFromString(className,
152+
"public class " + className + extendsText + " implements java.io.Serializable {" +
153+
" @Override public String toString() { return \"" + toStringValue + "\"; }}")
154+
createCompiledClass(className, destDir, sourceFile, classpathUrls)
155+
}
147156
}

0 commit comments

Comments
 (0)