Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
791189b
"Adding an option to persist Spark RDD blocks into Tachyon." move the…
RongGu Mar 16, 2014
556978b
fix the scalastyle errors
RongGu Mar 17, 2014
70ca182
a bit change in comment
RongGu Mar 21, 2014
8011a96
fix a brought-in mistake in StorageLevel
RongGu Mar 21, 2014
e01a271
update tachyon 0.4.1
haoyuan Mar 22, 2014
dc8ef24
add old storelevel constructor
haoyuan Mar 22, 2014
47304b3
make tachyonStore in BlockMananger lazy val; add more comments Storag…
haoyuan Mar 22, 2014
e554b1e
add python code
haoyuan Mar 22, 2014
fcaeab2
address Aaron's comment
haoyuan Mar 22, 2014
e3ddbba
add doc to use Tachyon cache mode.
haoyuan Mar 22, 2014
8859371
various minor fixes and clean up
haoyuan Mar 22, 2014
776a56c
address patrick's and ali's comments from the previous PR
haoyuan Mar 22, 2014
e82909c
minor cleanup
haoyuan Mar 22, 2014
bf278fa
fix python tests
haoyuan Mar 22, 2014
1dcadf9
typo
haoyuan Mar 22, 2014
77be7e8
address mateiz's comment about the temp folder name problem. The impl…
RongGu Mar 23, 2014
8968b67
exclude more libraries from tachyon dependency to be the same as refe…
haoyuan Mar 23, 2014
6a22c1a
fix scalastyle
haoyuan Mar 23, 2014
2825a13
up-merging to the current master branch of the apache spark
RongGu Mar 24, 2014
ca14469
bump tachyon version to 0.4.1-thrift
haoyuan Mar 24, 2014
716e93b
revert the version
haoyuan Mar 24, 2014
d827250
fix JsonProtocolSuie test failure
RongGu Mar 24, 2014
6adb58f
Merge branch 'master' of https://github.com/RongGu/spark-1
RongGu Mar 24, 2014
939e467
0.4.1-thrift from maven central
haoyuan Mar 25, 2014
bbeb4de
fix the JsonProtocolSuite test failure problem
RongGu Mar 25, 2014
eacb2e8
Merge branch 'master' of https://github.com/RongGu/spark-1
RongGu Mar 25, 2014
16c5798
make the dependency on tachyon as tachyon-0.4.1-thrift
RongGu Mar 25, 2014
86a2eab
tachyon 0.4.1-thrift is in the staging repo. but jenkins failed to do…
haoyuan Mar 24, 2014
fd84156
use randomUUID to generate sparkapp directory name on tachyon;minor c…
RongGu Mar 27, 2014
e700d9c
add the SparkTachyonHdfsLR example and some comments
RongGu Mar 27, 2014
76805aa
unifies the config properties name prefix; add the configs into docs/…
RongGu Mar 27, 2014
c9aeabf
rename the StorgeLevel.TACHYON as StorageLevel.OFF_HEAP
RongGu Mar 27, 2014
04301d3
rename StorageLevel.TACHYON to Storage.OFF_HEAP
RongGu Mar 27, 2014
4572f9f
reserving the old apply function API of StorageLevel
RongGu Mar 27, 2014
49cc724
update docs with off_headp option
haoyuan Mar 28, 2014
64348b2
update conf docs.
haoyuan Mar 28, 2014
589eafe
use TRY_CACHE instead of MUST_CACHE
haoyuan Mar 28, 2014
91fa09d
address patrick's comments
haoyuan Mar 28, 2014
be79d77
find a way to clean up some unnecessay metods and classed to make the…
RongGu Mar 29, 2014
619a9a8
set number of directories in TachyonStore back to 64; added a TODO ta…
RongGu Mar 29, 2014
ed73e19
Merge branch 'master' of github.com:RongGu/spark-1
haoyuan Mar 28, 2014
3dcace4
address matei's comments
haoyuan Apr 2, 2014
77d2703
change python api.git status
haoyuan Apr 2, 2014
d9a6438
fix for pspark
haoyuan Apr 2, 2014
9b97935
address aaron's comments
haoyuan Apr 2, 2014
5cc041c
address aaron's comments
haoyuan Apr 2, 2014
120e48a
changed the root-level dir name in Tachyon
RongGu Apr 3, 2014
8adfcfa
address arron's comment on inTachyonSize
RongGu Apr 3, 2014
51149e7
address aaron's comment on returning value of the remove() function i…
RongGu Apr 3, 2014
7cd4600
remove some logic code for tachyonstore's replication
RongGu Apr 3, 2014
55b5918
address matei's comment on the replication of offHeap storagelevel
RongGu Apr 4, 2014
e0f4891
better check offheap.
haoyuan Apr 4, 2014
a8b3ec6
merge master branch
haoyuan Apr 4, 2014
ae7834b
minor cleanup
haoyuan Apr 4, 2014
9f7fa1b
fix code style
haoyuan Apr 4, 2014
72b7768
merge master
haoyuan Apr 5, 2014
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,27 @@
<artifactId>commons-io</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.tachyonproject</groupId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exclusions here don't seem to match the exclusions in the sbt build (https://github.com/RongGu/spark-1/blob/master/project/SparkBuild.scala#L325) -- is there a reason for this difference?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this one excludes more than the sbt one:
excludeAll(excludeHadoop, excludeCurator, excludeEclipseJetty, excludePowermock),

This one also excluded junit. there is no particular reason to do so...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will Powermock and JUnit even be included in the tachyon-client artifact?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. They won't. So, from Tachyon 0.5.0, we use tachyon-client.

<artifactId>tachyon</artifactId>
<version>0.4.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
Expand Down
33 changes: 20 additions & 13 deletions core/src/main/java/org/apache/spark/api/java/StorageLevels.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,19 @@
* Expose some commonly useful storage level constants.
*/
public class StorageLevels {
public static final StorageLevel NONE = create(false, false, false, 1);
public static final StorageLevel DISK_ONLY = create(true, false, false, 1);
public static final StorageLevel DISK_ONLY_2 = create(true, false, false, 2);
public static final StorageLevel MEMORY_ONLY = create(false, true, true, 1);
public static final StorageLevel MEMORY_ONLY_2 = create(false, true, true, 2);
public static final StorageLevel MEMORY_ONLY_SER = create(false, true, false, 1);
public static final StorageLevel MEMORY_ONLY_SER_2 = create(false, true, false, 2);
public static final StorageLevel MEMORY_AND_DISK = create(true, true, true, 1);
public static final StorageLevel MEMORY_AND_DISK_2 = create(true, true, true, 2);
public static final StorageLevel MEMORY_AND_DISK_SER = create(true, true, false, 1);
public static final StorageLevel MEMORY_AND_DISK_SER_2 = create(true, true, false, 2);
public static final StorageLevel NONE = new StorageLevel(false, false, false, false, 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should use create() instead of the private constructor of StorageLevel.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Thanks!

public static final StorageLevel DISK_ONLY = new StorageLevel(true, false, false, false, 1);
public static final StorageLevel DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2);
public static final StorageLevel MEMORY_ONLY = new StorageLevel(false, true, false, true, 1);
public static final StorageLevel MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2);
public static final StorageLevel MEMORY_ONLY_SER = new StorageLevel(false, true, false, false, 1);
public static final StorageLevel MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2);
public static final StorageLevel MEMORY_AND_DISK = new StorageLevel(true, true, false, true, 1);
public static final StorageLevel MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2);
public static final StorageLevel MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false, 1);
public static final StorageLevel MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2);

public static final StorageLevel TACHYON = new StorageLevel(false, false, true, false, 1);

/**
* Create a new StorageLevel object.
Expand All @@ -42,7 +44,12 @@ public class StorageLevels {
* @param deserialized saved as deserialized objects, if true
* @param replication replication factor
*/
public static StorageLevel create(boolean useDisk, boolean useMemory, boolean deserialized, int replication) {
return StorageLevel.apply(useDisk, useMemory, deserialized, replication);
public static StorageLevel create(
boolean useDisk,
boolean useMemory,
boolean useTachyon,
boolean deserialized,
int replication) {
return StorageLevel.apply(useDisk, useMemory, useTachyon, deserialized, replication);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't just delete the old method -- add a second version that takes the new useTachyon argument. Some user code will be using the old method and there's no reason to break it here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.netty;

import org.apache.spark.storage.BlockId;
import org.apache.spark.storage.TachyonFileSegment;

public interface TachyonFilePathResolver {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be public?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. "main/scala/org/apache/spark/storage/TachyonBlockManager.scala" reference to it.
same as "main/scala/org/apache/spark/storage/DiskBlockManager.scala" reference to PathResolver.java

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case please add a comment that this is not a user-facing API. Actually if we were writing this in Scala we'd mark it private[spark], but I guess we couldn't do that for the Netty code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this needs to exist at all... We don't want to store shuffle files in Tachyon AFAIK.

/** Get the file segment in which the given block resides. */
TachyonFileSegment getBlockLocation(BlockId blockId);
}
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ class SparkContext(
conf.get("spark.driver.host"),
conf.get("spark.driver.port").toInt,
isDriver = true,
isLocal = isLocal)
isLocal = isLocal,
"<driver>" + appName)
SparkEnv.set(env)

// Used to store a URL for each static file/jar together with the file's local timestamp
Expand Down
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.spark.util.{AkkaUtils, Utils}
*/
class SparkEnv private[spark] (
val executorId: String,
val appId: String,
val actorSystem: ActorSystem,
val serializerManager: SerializerManager,
val serializer: Serializer,
Expand Down Expand Up @@ -121,7 +122,8 @@ object SparkEnv extends Logging {
hostname: String,
port: Int,
isDriver: Boolean,
isLocal: Boolean): SparkEnv = {
isLocal: Boolean,
appId: String = null): SparkEnv = {

val securityManager = new SecurityManager(conf)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port, conf = conf,
Expand Down Expand Up @@ -169,7 +171,7 @@ object SparkEnv extends Logging {
"BlockManagerMaster",
new BlockManagerMasterActor(isLocal, conf)), conf)
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
serializer, conf, securityManager)
serializer, conf, securityManager, appId)

val connectionManager = blockManager.connectionManager

Expand Down Expand Up @@ -219,6 +221,7 @@ object SparkEnv extends Logging {

new SparkEnv(
executorId,
appId,
actorSystem,
serializerManager,
serializer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ private[spark] class ExecutorRunner(
def substituteVariables(argument: String): String = argument match {
case "{{WORKER_URL}}" => workerUrl
case "{{EXECUTOR_ID}}" => execId.toString
case "{{APP_ID}}" => appId.toString
case "{{HOSTNAME}}" => host
case "{{CORES}}" => cores.toString
case other => other
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.util.{AkkaUtils, Utils}
private[spark] class CoarseGrainedExecutorBackend(
driverUrl: String,
executorId: String,
appId: String,
hostPort: String,
cores: Int)
extends Actor
Expand All @@ -53,7 +54,8 @@ private[spark] class CoarseGrainedExecutorBackend(
case RegisteredExecutor(sparkProperties) =>
logInfo("Successfully registered with driver")
// Make this host instead of hostPort ?
executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties)
executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties,
false, appId)

case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message)
Expand Down Expand Up @@ -92,7 +94,7 @@ private[spark] class CoarseGrainedExecutorBackend(
}

private[spark] object CoarseGrainedExecutorBackend {
def run(driverUrl: String, executorId: String, hostname: String, cores: Int,
def run(driverUrl: String, appId: String, executorId: String, hostname: String, cores: Int,
workerUrl: Option[String]) {
// Debug code
Utils.checkHost(hostname)
Expand All @@ -105,7 +107,8 @@ private[spark] object CoarseGrainedExecutorBackend {
// set it
val sparkHostPort = hostname + ":" + boundPort
actorSystem.actorOf(
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores),
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, appId, executorId,
sparkHostPort, cores),
name = "Executor")
workerUrl.foreach{ url =>
actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
Expand All @@ -118,13 +121,13 @@ private[spark] object CoarseGrainedExecutorBackend {
case x if x < 4 =>
System.err.println(
// Worker url is used in spark standalone mode to enforce fate-sharing with worker
"Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> " +
"Usage: CoarseGrainedExecutorBackend <driverUrl> <appId> <executorId> <hostname> " +
"<cores> [<workerUrl>]")
System.exit(1)
case 4 =>
run(args(0), args(1), args(2), args(3).toInt, None)
run(args(0), args(1), args(2), args(3), args(4).toInt, None)
case x if x > 4 =>
run(args(0), args(1), args(2), args(3).toInt, Some(args(4)))
run(args(0), args(1), args(2), args(3), args(4).toInt, Some(args(5)))
}
}
}
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ private[spark] class Executor(
executorId: String,
slaveHostname: String,
properties: Seq[(String, String)],
isLocal: Boolean = false)
isLocal: Boolean = false,
appId: String = null)
extends Logging
{
// Application dependencies (added through SparkContext) that we've fetched so far on this node.
Expand Down Expand Up @@ -103,7 +104,7 @@ private[spark] class Executor(
private val env = {
if (!isLocal) {
val _env = SparkEnv.create(conf, executorId, slaveHostname, 0,
isDriver = false, isLocal = false)
isDriver = false, isLocal = false, appId)
SparkEnv.set(_env)
_env.metricsSystem.registerSource(executorSource)
_env
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ object ExecutorExitCode {
/** DiskStore failed to create a local temporary directory after many attempts. */
val DISK_STORE_FAILED_TO_CREATE_DIR = 53

/** TachyonStore failed to create a local temporary directory after many attempts. */
val TACHYON_STORE_FAILED_TO_INITIALIZE = 54

/** TachyonStore failed to create a local temporary directory after many attempts. */
val TACHYON_STORE_FAILED_TO_CREATE_DIR = 55

def explainExitCode(exitCode: Int): String = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because you added two new special exit codes above, you should also modify this method to explain them. That's why we have the named exit codes here, to give users a meaningful message if the executor crashes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

exitCode match {
case UNCAUGHT_EXCEPTION => "Uncaught exception"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ private[spark] class SparkDeploySchedulerBackend(
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
conf.get("spark.driver.host"), conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{APP_ID}}", "{{HOSTNAME}}",
"{{CORES}}", "{{WORKER_URL}}")
val command = Command(
"org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs)
val sparkHome = sc.getSparkHome()
Expand Down
Loading