Skip to content

Commit 91fa09d

Browse files
committed
address patrick's comments
1 parent 589eafe commit 91fa09d

File tree

1 file changed

+21
-5
lines changed

1 file changed

+21
-5
lines changed

core/src/main/scala/org/apache/spark/storage/TachyonStore.scala

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.storage
1919

20+
import java.io.IOException
2021
import java.nio.ByteBuffer
2122

2223
import scala.collection.mutable.ArrayBuffer
@@ -109,13 +110,28 @@ private class TachyonStore(
109110

110111
override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
111112
val file = tachyonManager.getFile(blockId)
113+
if (file == null || file.getLocationHosts().size == 0) {
114+
return None
115+
}
112116
val is = file.getInStream(ReadType.CACHE)
113117
var buffer: ByteBuffer = null
114-
if (is != null){
115-
val size = file.length
116-
val bs = new Array[Byte](size.asInstanceOf[Int])
117-
is.read(bs, 0, size.asInstanceOf[Int])
118-
buffer = ByteBuffer.wrap(bs)
118+
try {
119+
if (is != null) {
120+
val size = file.length
121+
val bs = new Array[Byte](size.asInstanceOf[Int])
122+
val fetchSize = is.read(bs, 0, size.asInstanceOf[Int])
123+
buffer = ByteBuffer.wrap(bs)
124+
if (fetchSize != size) {
125+
logWarning("Failed to fetch the block " + blockId + " from Tachyon : Size " + size +
126+
" is not equal to fetched size " + fetchSize)
127+
return None
128+
}
129+
}
130+
} catch {
131+
case ioe: IOException => {
132+
logWarning("Failed to fetch the block " + blockId + " from Tachyon", ioe)
133+
return None
134+
}
119135
}
120136
Some(buffer)
121137
}

0 commit comments

Comments
 (0)