Skip to content

Commit 3dcace4

Browse files
committed
address matei's comments
1 parent 91fa09d commit 3dcace4

File tree

8 files changed

+32
-37
lines changed

8 files changed

+32
-37
lines changed

core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ object ExecutorExitCode {
4141
/** DiskStore failed to create a local temporary directory after many attempts. */
4242
val DISK_STORE_FAILED_TO_CREATE_DIR = 53
4343

44-
/** TachyonStore failed to create a local temporary directory after many attempts. */
44+
/** TachyonStore failed to initialize after many attempts. */
4545
val TACHYON_STORE_FAILED_TO_INITIALIZE = 54
4646

4747
/** TachyonStore failed to create a local temporary directory after many attempts. */
@@ -54,6 +54,9 @@ object ExecutorExitCode {
5454
case OOM => "OutOfMemoryError"
5555
case DISK_STORE_FAILED_TO_CREATE_DIR =>
5656
"Failed to create local directory (bad spark.local.dir?)"
57+
case TACHYON_STORE_FAILED_TO_INITIALIZE => "TachyonStore failed to initialize."
58+
case TACHYON_STORE_FAILED_TO_CREATE_DIR =>
59+
"TachyonStore failed to create a local temporary directory."
5760
case _ =>
5861
"Unknown executor exit code (" + exitCode + ")" + (
5962
if (exitCode > 128) {

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ private[spark] class BlockManager(
6262
val storeDir = conf.get("spark.tachyonStore.baseDir", System.getProperty("java.io.tmpdir"))
6363
val appFolderName = conf.get("spark.tachyonStore.folderName")
6464
val tachyonStorePath = s"${storeDir}/${appFolderName}/${this.executorId}"
65-
val tachyonMaster = conf.get("spark.tachyonStore.URL", "tachyon://localhost:19998")
65+
val tachyonMaster = conf.get("spark.tachyonStore.url", "tachyon://localhost:19998")
6666
val tachyonBlockManager = new TachyonBlockManager(
6767
shuffleBlockManager, tachyonStorePath, tachyonMaster)
6868
tachyonInitialized = true
@@ -276,7 +276,7 @@ private[spark] class BlockManager(
276276
(StorageLevel.NONE, 0L, 0L, 0L)
277277
case level =>
278278
val inMem = level.useMemory && memoryStore.contains(blockId)
279-
val inTachyon = level.useTachyon && tachyonStore.contains(blockId)
279+
val inTachyon = level.useOffHeap && tachyonStore.contains(blockId)
280280
val onDisk = level.useDisk && diskStore.contains(blockId)
281281
val deserialized = if (inMem) level.deserialized else false
282282
val replication = if (inMem || inTachyon || onDisk) level.replication else 1
@@ -369,7 +369,7 @@ private[spark] class BlockManager(
369369
}
370370

371371
// Look for the block in Tachyon
372-
if (level.useTachyon) {
372+
if (level.useOffHeap) {
373373
logDebug("Getting block " + blockId + " from tachyon")
374374
if (tachyonStore.contains(blockId)) {
375375
tachyonStore.getBytes(blockId) match {
@@ -651,7 +651,7 @@ private[spark] class BlockManager(
651651
}
652652
// Keep track of which blocks are dropped from memory
653653
res.droppedBlocks.foreach { block => updatedBlocks += block }
654-
} else if (level.useTachyon) {
654+
} else if (level.useOffHeap) {
655655
// Save to Tachyon.
656656
val askForBytes = level.replication > 1
657657
val res = data match {
@@ -757,7 +757,7 @@ private[spark] class BlockManager(
757757
var cachedPeers: Seq[BlockManagerId] = null
758758
private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel) {
759759
val tLevel = StorageLevel(
760-
level.useDisk, level.useMemory, level.useTachyon, level.deserialized, 1)
760+
level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
761761
if (cachedPeers == null) {
762762
cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
763763
}
@@ -922,7 +922,7 @@ private[spark] class BlockManager(
922922
if (level.useDisk) {
923923
diskStore.remove(id)
924924
}
925-
if (level.useTachyon) {
925+
if (level.useOffHeap) {
926926
tachyonStore.remove(id)
927927
}
928928
iterator.remove()

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -375,7 +375,7 @@ private[spark] class BlockManagerInfo(
375375
logInfo("Added %s on disk on %s (size: %s)".format(
376376
blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
377377
}
378-
if (storageLevel.useTachyon) {
378+
if (storageLevel.useOffHeap) {
379379
_blocks.put(blockId, BlockStatus(storageLevel, 0, 0, tachyonSize))
380380
logInfo("Added %s on tachyon on %s (size: %s)".format(
381381
blockId, blockManagerId.hostPort, Utils.bytesToString(tachyonSize)))
@@ -394,7 +394,7 @@ private[spark] class BlockManagerInfo(
394394
logInfo("Removed %s on %s on disk (size: %s)".format(
395395
blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.diskSize)))
396396
}
397-
if (blockStatus.storageLevel.useTachyon) {
397+
if (blockStatus.storageLevel.useOffHeap) {
398398
logInfo("Removed %s on %s on tachyon (size: %s)".format(
399399
blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.tachyonSize)))
400400
}

core/src/main/scala/org/apache/spark/storage/StorageLevel.scala

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
3131
class StorageLevel private(
3232
private var useDisk_ : Boolean,
3333
private var useMemory_ : Boolean,
34-
private var useTachyon_ : Boolean,
34+
private var useOffHeap_ : Boolean,
3535
private var deserialized_ : Boolean,
3636
private var replication_ : Int = 1)
3737
extends Externalizable {
@@ -45,27 +45,27 @@ class StorageLevel private(
4545

4646
def useDisk = useDisk_
4747
def useMemory = useMemory_
48-
def useTachyon = useTachyon_
48+
def useOffHeap = useOffHeap_
4949
def deserialized = deserialized_
5050
def replication = replication_
5151

5252
assert(replication < 40, "Replication restricted to be less than 40 for calculating hashcodes")
5353

5454
override def clone(): StorageLevel = new StorageLevel(
55-
this.useDisk, this.useMemory, this.useTachyon, this.deserialized, this.replication)
55+
this.useDisk, this.useMemory, this.useOffHeap, this.deserialized, this.replication)
5656

5757
override def equals(other: Any): Boolean = other match {
5858
case s: StorageLevel =>
5959
s.useDisk == useDisk &&
6060
s.useMemory == useMemory &&
61-
s.useTachyon == useTachyon &&
61+
s.useOffHeap == useOffHeap &&
6262
s.deserialized == deserialized &&
6363
s.replication == replication
6464
case _ =>
6565
false
6666
}
6767

68-
def isValid = ((useMemory || useDisk || useTachyon) && (replication > 0))
68+
def isValid = ((useMemory || useDisk || useOffHeap) && (replication > 0))
6969

7070
def toInt: Int = {
7171
var ret = 0
@@ -75,7 +75,7 @@ class StorageLevel private(
7575
if (useMemory_) {
7676
ret |= 4
7777
}
78-
if (useTachyon_) {
78+
if (useOffHeap_) {
7979
ret |= 2
8080
}
8181
if (deserialized_) {
@@ -93,7 +93,7 @@ class StorageLevel private(
9393
val flags = in.readByte()
9494
useDisk_ = (flags & 8) != 0
9595
useMemory_ = (flags & 4) != 0
96-
useTachyon_ = (flags & 2) != 0
96+
useOffHeap_ = (flags & 2) != 0
9797
deserialized_ = (flags & 1) != 0
9898
replication_ = in.readByte()
9999
}
@@ -102,14 +102,14 @@ class StorageLevel private(
102102
private def readResolve(): Object = StorageLevel.getCachedStorageLevel(this)
103103

104104
override def toString: String = "StorageLevel(%b, %b, %b, %b, %d)".format(
105-
useDisk, useMemory, useTachyon, deserialized, replication)
105+
useDisk, useMemory, useOffHeap, deserialized, replication)
106106

107107
override def hashCode(): Int = toInt * 41 + replication
108108
def description : String = {
109109
var result = ""
110110
result += (if (useDisk) "Disk " else "")
111111
result += (if (useMemory) "Memory " else "")
112-
result += (if (useTachyon) "Tachyon " else "")
112+
result += (if (useOffHeap) "Tachyon " else "")
113113
result += (if (deserialized) "Deserialized " else "Serialized ")
114114
result += "%sx Replicated".format(replication)
115115
result
@@ -135,10 +135,10 @@ object StorageLevel {
135135
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
136136
val OFF_HEAP = new StorageLevel(false, false, true, false)
137137

138-
/** Create a new StorageLevel object without setting useTachyon*/
139-
def apply(useDisk: Boolean, useMemory: Boolean, useTachyon: Boolean,
138+
/** Create a new StorageLevel object without setting useOffHeap*/
139+
def apply(useDisk: Boolean, useMemory: Boolean, useOffHeap: Boolean,
140140
deserialized: Boolean, replication: Int) = getCachedStorageLevel(
141-
new StorageLevel(useDisk, useMemory, useTachyon, deserialized, replication))
141+
new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication))
142142

143143
/** Create a new StorageLevel object */
144144
def apply(useDisk: Boolean, useMemory: Boolean,

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ private[spark] object JsonProtocol {
281281
def storageLevelToJson(storageLevel: StorageLevel): JValue = {
282282
("Use Disk" -> storageLevel.useDisk) ~
283283
("Use Memory" -> storageLevel.useMemory) ~
284-
("Use Tachyon" -> storageLevel.useTachyon) ~
284+
("Use Tachyon" -> storageLevel.useOffHeap) ~
285285
("Deserialized" -> storageLevel.deserialized) ~
286286
("Replication" -> storageLevel.replication)
287287
}

docs/configuration.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,12 +126,12 @@ Apart from these, the following properties are also available, and may be useful
126126
<td>spark.tachyonStore.baseDir</td>
127127
<td>System.getProperty("java.io.tmpdir")</td>
128128
<td>
129-
Directories of the Tachyon File System that store RDDs. The Tachyon file system's URL is set by <code>spark.tachyonStore.URL</code>.
129+
Directories of the Tachyon File System that store RDDs. The Tachyon file system's URL is set by <code>spark.tachyonStore.url</code>.
130130
It can also be a comma-separated list of multiple directories on Tachyon file system.
131131
</td>
132132
</tr>
133133
<tr>
134-
<td>spark.tachyonStore.URL</td>
134+
<td>spark.tachyonStore.url</td>
135135
<td>tachyon://localhost:19998</td>
136136
<td>
137137
The URL of the underlying Tachyon file system in the TachyonStore.

examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ import scala.math.random
2222
import org.apache.spark._
2323
import org.apache.spark.storage.StorageLevel
2424

25-
/** Computes an approximation to pi
25+
/**
26+
* Computes an approximation to pi
2627
* This example uses Tachyon to persist rdds during computation.
2728
*/
2829
object SparkTachyonPi {
@@ -44,16 +45,7 @@ object SparkTachyonPi {
4445
val y = random * 2 - 1
4546
if (x * x + y * y < 1) 1 else 0
4647
}.reduce(_ + _)
47-
println("1- Pi is roughly " + 4.0 * count / n)
48-
49-
val rdd2 = spark.parallelize(1 to n, slices)
50-
rdd2.persist(StorageLevel.OFF_HEAP)
51-
val count2 = rdd2.map { i =>
52-
val x = random * 2 - 1
53-
val y = random * 2 - 1
54-
if (x * x + y * y < 1) 1 else 0
55-
}.reduce(_ + _)
56-
println("2- Pi is roughly " + 4.0 * count2 / n)
48+
println("Pi is roughly " + 4.0 * count / n)
5749

5850
spark.stop()
5951
}

python/pyspark/storagelevel.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def __init__(self, useDisk, useMemory, useTachyon, deserialized, replication = 1
3434

3535
def __repr__(self):
3636
return "StorageLevel(%s, %s, %s, %s, %s)" % (
37-
self.useDisk, self.useMemory, self.useTachyon, self.deserialized, self.replication)
37+
self.useDisk, self.useMemory, self.useOffHeap, self.deserialized, self.replication)
3838

3939
StorageLevel.DISK_ONLY = StorageLevel(True, False, False, False)
4040
StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)
@@ -46,4 +46,4 @@ def __repr__(self):
4646
StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, False, True, 2)
4747
StorageLevel.MEMORY_AND_DISK_SER = StorageLevel(True, True, False, False)
4848
StorageLevel.MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, False, 2)
49-
StorageLevel.TACHYON = StorageLevel(False, False, True, False, 1)
49+
StorageLevel.OFF_HEAP = StorageLevel(False, False, True, False, 1)

0 commit comments

Comments
 (0)