Skip to content

Commit 8859371

Browse files
committed
various minor fixes and clean up
1 parent e3ddbba commit 8859371

File tree

9 files changed

+34
-60
lines changed

9 files changed

+34
-60
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 14 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,16 @@ private[spark] class BlockManager(
6060

6161
private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
6262
private[storage] val diskStore = new DiskStore(this, diskBlockManager)
63-
6463
var tachyonInitialized = false
65-
private[storage] var innerTachyonStore: TachyonStore = null
64+
private[storage] lazy val tachyonStore : TachyonStore = {
65+
val storeDir = conf.get("spark.tachyonstore.dir", System.getProperty("java.io.tmpdir"))
66+
val tachyonStorePath = s"${storeDir}/${appId}/${this.executorId}"
67+
val tachyonMaster = conf.get("spark.tachyonmaster.address", "localhost:19998")
68+
val tachyonBlockManager = new TachyonBlockManager(
69+
shuffleBlockManager, tachyonStorePath, tachyonMaster)
70+
tachyonInitialized = true
71+
new TachyonStore(this, tachyonBlockManager)
72+
}
6673

6774
// If we use Netty for shuffle, start a new Netty-based shuffle sender service.
6875
private val nettyPort: Int = {
@@ -101,23 +108,6 @@ private[spark] class BlockManager(
101108
var asyncReregisterTask: Future[Unit] = null
102109
val asyncReregisterLock = new Object
103110

104-
private[storage] lazy val tachyonStore : TachyonStore = {
105-
if (!tachyonInitialized) {
106-
initializeTachyonStore()
107-
}
108-
this.innerTachyonStore
109-
}
110-
111-
private def initializeTachyonStore() {
112-
val storeDir = conf.get("spark.tachyonstore.dir", System.getProperty("java.io.tmpdir"))
113-
val tachyonStorePath = s"${storeDir}/${appId}/${this.executorId}"
114-
val tachyonMaster = conf.get("spark.tachyonmaster.address", "localhost:19998")
115-
val tachyonBlockManager = new TachyonBlockManager(
116-
shuffleBlockManager, tachyonStorePath, tachyonMaster)
117-
this.innerTachyonStore = new TachyonStore(this, tachyonBlockManager)
118-
this.tachyonInitialized = true
119-
}
120-
121111
private def heartBeat() {
122112
if (!master.sendHeartBeat(blockManagerId)) {
123113
reregister()
@@ -636,7 +626,7 @@ private[spark] class BlockManager(
636626
case Right(newBytes) => bytesAfterPut = newBytes
637627
case _ =>
638628
}
639-
}else {
629+
} else {
640630
// Save directly to disk.
641631
// Don't get back the bytes unless we replicate them.
642632
val askForBytes = level.replication > 1
@@ -658,7 +648,7 @@ private[spark] class BlockManager(
658648
}
659649
}
660650

661-
// Now that the block is in either the memory, tachyon or disk store,
651+
// Now that the block is in either the memory, tachyon, or disk store,
662652
// let other threads read it, and tell the master about it.
663653
marked = true
664654
myInfo.markReady(size)
@@ -822,11 +812,10 @@ private[spark] class BlockManager(
822812
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
823813
val removedFromMemory = memoryStore.remove(blockId)
824814
val removedFromDisk = diskStore.remove(blockId)
825-
val removedFromTachyon =
826-
if (tachyonInitialized == true) tachyonStore.remove(blockId) else false
815+
val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false
827816
if (!removedFromMemory && !removedFromDisk && !removedFromTachyon) {
828817
logWarning("Block " + blockId + " could not be removed as it was not found in either " +
829-
"the disk, memory or tachyon store")
818+
"the disk, memory, or tachyon store")
830819
}
831820
blockInfo.remove(blockId)
832821
if (tellMaster && info.tellMaster) {
@@ -939,7 +928,7 @@ private[spark] class BlockManager(
939928
blockInfo.clear()
940929
memoryStore.clear()
941930
diskStore.clear()
942-
if(tachyonInitialized == true) {
931+
if(tachyonInitialized) {
943932
tachyonStore.clear()
944933
}
945934
metadataCleaner.cancel()

core/src/main/scala/org/apache/spark/storage/StorageUtils.scala

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,10 @@ case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel,
4848
extends Ordered[RDDInfo] {
4949
override def toString = {
5050
import Utils.bytesToString
51-
("RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s;" +
52-
"TachyonSize: %s; DiskSize: %s").format(
53-
name, id, storageLevel.toString, numCachedPartitions, numPartitions,
54-
bytesToString(memSize), bytesToString(tachyonSize), bytesToString(diskSize))
55-
51+
("RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s;" +
52+
"TachyonSize: %s; DiskSize: %s").format(
53+
name, id, storageLevel.toString, numCachedPartitions, numPartitions,
54+
bytesToString(memSize), bytesToString(tachyonSize), bytesToString(diskSize))
5655
}
5756

5857
override def compare(that: RDDInfo) = {
@@ -96,14 +95,8 @@ object StorageUtils {
9695
sc.persistentRdds.get(rddId).map { r =>
9796
val rddName = Option(r.name).getOrElse(rddId.toString)
9897
val rddStorageLevel = r.getStorageLevel
99-
RDDInfo(rddId,
100-
rddName,
101-
rddStorageLevel,
102-
rddBlocks.length,
103-
r.partitions.size,
104-
memSize,
105-
tachyonSize,
106-
diskSize)
98+
RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, r.partitions.size, memSize,
99+
tachyonSize, diskSize)
107100
}
108101
}.flatten.toArray
109102

core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.spark.storage
1919

20-
import java.io.File
2120
import java.text.SimpleDateFormat
2221
import java.util.{Date, Random}
2322

@@ -47,7 +46,7 @@ private[spark] class TachyonBlockManager(
4746
val client = if (master != null && master != "") TachyonFS.get(master) else null
4847
if (client == null) {
4948
logError("Failed to connect to the Tachyon as the master address is not configured")
50-
System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
49+
System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
5150
}
5251

5352
private val MAX_DIR_CREATION_ATTEMPTS = 10
@@ -136,8 +135,8 @@ private[spark] class TachyonBlockManager(
136135
}
137136
}
138137
if (!foundLocalDir) {
139-
logError("Failed " + MAX_DIR_CREATION_ATTEMPTS +
140-
" attempts to create tachyon dir in " + rootDir)
138+
logError("Failed " + MAX_DIR_CREATION_ATTEMPTS + " attempts to create tachyon dir in " +
139+
rootDir)
141140
System.exit(ExecutorExitCode.TACHYON_STORE_FAILED_TO_CREATE_DIR)
142141
}
143142
logInfo("Created tachyon directory at " + tachyonDir)

core/src/main/scala/org/apache/spark/storage/TachyonFileSegment.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,12 @@
1717

1818
package org.apache.spark.storage
1919

20-
import java.io.File
2120
import tachyon.client.TachyonFile
2221

2322
/**
24-
* References a particular segment of a file (potentially the entire file),
25-
* based off an offset and a length.
23+
* References a particular segment of a file (potentially the entire file), based off an offset and
24+
* a length.
2625
*/
27-
28-
private[spark] class TachyonFileSegment(val file: TachyonFile, val offset: Long, val length: Long)
29-
{
26+
private[spark] class TachyonFileSegment(val file: TachyonFile, val offset: Long, val length: Long) {
3027
override def toString = "(name=%s, offset=%d, length=%d)".format(file.getPath(), offset, length)
3128
}

core/src/main/scala/org/apache/spark/storage/TachyonStore.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,25 +17,21 @@
1717

1818
package org.apache.spark.storage
1919

20-
import java.io.RandomAccessFile
2120
import java.nio.ByteBuffer
22-
import java.util.LinkedHashMap
23-
import java.io.FileOutputStream
24-
import java.nio.channels.FileChannel.MapMode
2521

2622
import scala.collection.mutable.ArrayBuffer
2723

28-
import tachyon.client.OutStream
2924
import tachyon.client.WriteType
3025
import tachyon.client.ReadType
31-
import tachyon.client.InStream
3226

3327
import org.apache.spark.Logging
3428
import org.apache.spark.util.Utils
3529
import org.apache.spark.serializer.Serializer
3630

3731

3832
private class Entry(val size: Long)
33+
34+
3935
/**
4036
* Stores BlockManager blocks on Tachyon.
4137
*/

core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,12 +156,12 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
156156
assert(master.getLocations("a1").size === 0, "master did not remove a1")
157157
assert(master.getLocations("a2").size === 0, "master did not remove a2")
158158
}
159-
159+
160160
test("master + 2 managers interaction") {
161161
store = new BlockManager("exec1", actorSystem, master, serializer, 2000, conf, securityMgr)
162162
store2 = new BlockManager("exec2", actorSystem, master, new KryoSerializer(conf), 2000, conf,
163163
securityMgr)
164-
164+
165165
val peers = master.getPeers(store.blockManagerId, 1)
166166
assert(peers.size === 1, "master did not return the other manager as a peer")
167167
assert(peers.head === store2.blockManagerId, "peer returned by master is not the other manager")
@@ -407,7 +407,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
407407
assert(store.memoryStore.contains(rdd(0, 2)), "rdd_0_2 was not in store")
408408
assert(store.memoryStore.contains(rdd(0, 3)), "rdd_0_3 was not in store")
409409
}
410-
410+
411411
test("tachyon storage") {
412412
val tachyonUnitTestEnabled = conf.getBoolean("spark.test.tachyon.enable", false)
413413
if (tachyonUnitTestEnabled) {

examples/src/main/scala/org/apache/spark/examples/SparkPi.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
package org.apache.spark.examples
1919

2020
import scala.math.random
21+
2122
import org.apache.spark._
22-
import SparkContext._
2323

2424
/** Computes an approximation to pi */
2525
object SparkPi {

examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
package org.apache.spark.examples
1919

2020
import scala.math.random
21+
2122
import org.apache.spark._
22-
import SparkContext._
2323
import org.apache.spark.storage.StorageLevel
2424

2525
/** Computes an approximation to pi */

project/SparkBuild.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ object SparkBuild extends Build {
311311
"com.codahale.metrics" % "metrics-graphite" % "3.0.0",
312312
"com.twitter" %% "chill" % "0.3.1" excludeAll(excludeAsm),
313313
"com.twitter" % "chill-java" % "0.3.1" excludeAll(excludeAsm),
314-
"org.tachyonproject" % "tachyon" % "0.4.0" excludeAll(excludeKyro, excludeHadoop, excludeCurator, excludeJackson, excludeNetty, excludeAsm),
314+
"org.tachyonproject" % "tachyon" % "0.4.1" excludeAll(excludeKyro, excludeHadoop, excludeCurator, excludeJackson, excludeNetty, excludeAsm),
315315
"com.clearspring.analytics" % "stream" % "2.5.1"
316316
),
317317
libraryDependencies ++= maybeAvro

0 commit comments

Comments
 (0)