Skip to content

Commit d205d40

Browse files
Michael Allmancloud-fan
authored andcommitted
[SPARK-17204][CORE] Fix replicated off heap storage
(Jira: https://issues.apache.org/jira/browse/SPARK-17204) ## What changes were proposed in this pull request? There are a couple of bugs in the `BlockManager` with respect to support for replicated off-heap storage. First, the locally-stored off-heap byte buffer is disposed of when it is replicated. It should not be. Second, the replica byte buffers are stored as heap byte buffers instead of direct byte buffers even when the storage level memory mode is off-heap. This PR addresses both of these problems. ## How was this patch tested? `BlockManagerReplicationSuite` was enhanced to fill in the coverage gaps. It now fails if either of the bugs in this PR exist. Author: Michael Allman <[email protected]> Closes #16499 from mallman/spark-17204-replicated_off_heap_storage. (cherry picked from commit 7fa116f) Signed-off-by: Wenchen Fan <[email protected]>
1 parent af8bf21 commit d205d40

File tree

5 files changed

+105
-25
lines changed

5 files changed

+105
-25
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,9 @@ private[spark] class BlockManager(
318318

319319
/**
320320
* Put the block locally, using the given storage level.
321+
*
322+
* '''Important!''' Callers must not mutate or release the data buffer underlying `bytes`. Doing
323+
* so may corrupt or change the data stored by the `BlockManager`.
321324
*/
322325
override def putBlockData(
323326
blockId: BlockId,
@@ -756,6 +759,9 @@ private[spark] class BlockManager(
756759
/**
757760
* Put a new block of serialized bytes to the block manager.
758761
*
762+
* '''Important!''' Callers must not mutate or release the data buffer underlying `bytes`. Doing
763+
* so may corrupt or change the data stored by the `BlockManager`.
764+
*
759765
* @param encrypt If true, asks the block manager to encrypt the data block before storing,
760766
* when I/O encryption is enabled. This is required for blocks that have been
761767
* read from unencrypted sources, since all the BlockManager read APIs
@@ -774,7 +780,7 @@ private[spark] class BlockManager(
774780
if (encrypt && securityManager.ioEncryptionKey.isDefined) {
775781
try {
776782
val data = bytes.toByteBuffer
777-
val in = new ByteBufferInputStream(data, true)
783+
val in = new ByteBufferInputStream(data)
778784
val byteBufOut = new ByteBufferOutputStream(data.remaining())
779785
val out = CryptoStreamUtils.createCryptoOutputStream(byteBufOut, conf,
780786
securityManager.ioEncryptionKey.get)
@@ -801,6 +807,9 @@ private[spark] class BlockManager(
801807
*
802808
* If the block already exists, this method will not overwrite it.
803809
*
810+
* '''Important!''' Callers must not mutate or release the data buffer underlying `bytes`. Doing
811+
* so may corrupt or change the data stored by the `BlockManager`.
812+
*
804813
* @param keepReadLock if true, this method will hold the read lock when it returns (even if the
805814
* block already exists). If false, this method will hold no locks when it
806815
* returns.
@@ -844,7 +853,15 @@ private[spark] class BlockManager(
844853
false
845854
}
846855
} else {
847-
memoryStore.putBytes(blockId, size, level.memoryMode, () => bytes)
856+
val memoryMode = level.memoryMode
857+
memoryStore.putBytes(blockId, size, memoryMode, () => {
858+
if (memoryMode == MemoryMode.OFF_HEAP &&
859+
bytes.chunks.exists(buffer => !buffer.isDirect)) {
860+
bytes.copy(Platform.allocateDirectBuffer)
861+
} else {
862+
bytes
863+
}
864+
})
848865
}
849866
if (!putSucceeded && level.useDisk) {
850867
logWarning(s"Persisting block $blockId to disk instead.")
@@ -1049,7 +1066,7 @@ private[spark] class BlockManager(
10491066
try {
10501067
replicate(blockId, bytesToReplicate, level, remoteClassTag)
10511068
} finally {
1052-
bytesToReplicate.dispose()
1069+
bytesToReplicate.unmap()
10531070
}
10541071
logDebug("Put block %s remotely took %s"
10551072
.format(blockId, Utils.getUsedTimeMs(remoteStartTime)))

core/src/main/scala/org/apache/spark/storage/StorageUtils.scala

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -236,22 +236,60 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
236236

237237
/** Helper methods for storage-related objects. */
238238
private[spark] object StorageUtils extends Logging {
239+
// Ewwww... Reflection!!! See the unmap method for justification
240+
private val memoryMappedBufferFileDescriptorField = {
241+
val mappedBufferClass = classOf[java.nio.MappedByteBuffer]
242+
val fdField = mappedBufferClass.getDeclaredField("fd")
243+
fdField.setAccessible(true)
244+
fdField
245+
}
239246

240247
/**
241-
* Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that
242-
* might cause errors if one attempts to read from the unmapped buffer, but it's better than
243-
* waiting for the GC to find it because that could lead to huge numbers of open files. There's
244-
* unfortunately no standard API to do this.
248+
* Attempt to clean up a ByteBuffer if it is direct or memory-mapped. This uses an *unsafe* Sun
249+
* API that will cause errors if one attempts to read from the disposed buffer. However, neither
250+
* the bytes allocated to direct buffers nor file descriptors opened for memory-mapped buffers put
251+
* pressure on the garbage collector. Waiting for garbage collection may lead to the depletion of
252+
* off-heap memory or huge numbers of open files. There's unfortunately no standard API to
253+
* manually dispose of these kinds of buffers.
254+
*
255+
* See also [[unmap]]
245256
*/
246257
def dispose(buffer: ByteBuffer): Unit = {
247258
if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) {
248-
logTrace(s"Unmapping $buffer")
249-
if (buffer.asInstanceOf[DirectBuffer].cleaner() != null) {
250-
buffer.asInstanceOf[DirectBuffer].cleaner().clean()
259+
logTrace(s"Disposing of $buffer")
260+
cleanDirectBuffer(buffer.asInstanceOf[DirectBuffer])
261+
}
262+
}
263+
264+
/**
265+
* Attempt to unmap a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that will
266+
* cause errors if one attempts to read from the unmapped buffer. However, the file descriptors of
267+
* memory-mapped buffers do not put pressure on the garbage collector. Waiting for garbage
268+
* collection may lead to huge numbers of open files. There's unfortunately no standard API to
269+
* manually unmap memory-mapped buffers.
270+
*
271+
* See also [[dispose]]
272+
*/
273+
def unmap(buffer: ByteBuffer): Unit = {
274+
if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) {
275+
// Note that direct buffers are instances of MappedByteBuffer. As things stand in Java 8, the
276+
// JDK does not provide a public API to distinguish between direct buffers and memory-mapped
277+
// buffers. As an alternative, we peek beneath the curtains and look for a non-null file
278+
// descriptor in mappedByteBuffer
279+
if (memoryMappedBufferFileDescriptorField.get(buffer) != null) {
280+
logTrace(s"Unmapping $buffer")
281+
cleanDirectBuffer(buffer.asInstanceOf[DirectBuffer])
251282
}
252283
}
253284
}
254285

286+
private def cleanDirectBuffer(buffer: DirectBuffer) = {
287+
val cleaner = buffer.cleaner()
288+
if (cleaner != null) {
289+
cleaner.clean()
290+
}
291+
}
292+
255293
/**
256294
* Update the given list of RDDInfo with the given list of storage statuses.
257295
* This method overwrites the old values stored in the RDDInfo's.

core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,10 @@ import java.nio.ByteBuffer
2323
import org.apache.spark.storage.StorageUtils
2424

2525
/**
26-
* Reads data from a ByteBuffer, and optionally cleans it up using StorageUtils.dispose()
27-
* at the end of the stream (e.g. to close a memory-mapped file).
26+
* Reads data from a ByteBuffer.
2827
*/
2928
private[spark]
30-
class ByteBufferInputStream(private var buffer: ByteBuffer, dispose: Boolean = false)
29+
class ByteBufferInputStream(private var buffer: ByteBuffer)
3130
extends InputStream {
3231

3332
override def read(): Int = {
@@ -72,9 +71,6 @@ class ByteBufferInputStream(private var buffer: ByteBuffer, dispose: Boolean = f
7271
*/
7372
private def cleanUp() {
7473
if (buffer != null) {
75-
if (dispose) {
76-
StorageUtils.dispose(buffer)
77-
}
7874
buffer = null
7975
}
8076
}

core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,11 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
8686
}
8787

8888
/**
89-
* Copy this buffer into a new ByteBuffer.
89+
* Convert this buffer to a ByteBuffer. If this buffer is backed by a single chunk, its underlying
90+
* data will not be copied. Instead, it will be duplicated. If this buffer is backed by multiple
91+
* chunks, the data underlying this buffer will be copied into a new byte buffer. As a result, it
92+
* is suggested to use this method only if the caller does not need to manage the memory
93+
* underlying this buffer.
9094
*
9195
* @throws UnsupportedOperationException if this buffer's size exceeds the max ByteBuffer size.
9296
*/
@@ -132,17 +136,30 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
132136
}
133137

134138
/**
135-
* Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that
136-
* might cause errors if one attempts to read from the unmapped buffer, but it's better than
137-
* waiting for the GC to find it because that could lead to huge numbers of open files. There's
138-
* unfortunately no standard API to do this.
139+
* Attempt to clean up any ByteBuffer in this ChunkedByteBuffer which is direct or memory-mapped.
140+
* See [[StorageUtils.dispose]] for more information.
141+
*
142+
* See also [[unmap]]
139143
*/
140144
def dispose(): Unit = {
141145
if (!disposed) {
142146
chunks.foreach(StorageUtils.dispose)
143147
disposed = true
144148
}
145149
}
150+
151+
/**
152+
* Attempt to unmap any ByteBuffer in this ChunkedByteBuffer if it is memory-mapped. See
153+
* [[StorageUtils.unmap]] for more information.
154+
*
155+
* See also [[dispose]]
156+
*/
157+
def unmap(): Unit = {
158+
if (!disposed) {
159+
chunks.foreach(StorageUtils.unmap)
160+
disposed = true
161+
}
162+
}
146163
}
147164

148165
/**

core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -375,7 +375,8 @@ class BlockManagerReplicationSuite extends SparkFunSuite
375375
// Put the block into one of the stores
376376
val blockId = new TestBlockId(
377377
"block-with-" + storageLevel.description.replace(" ", "-").toLowerCase)
378-
stores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel)
378+
val testValue = Array.fill[Byte](blockSize)(1)
379+
stores(0).putSingle(blockId, testValue, storageLevel)
379380

380381
// Assert that master know two locations for the block
381382
val blockLocations = master.getLocations(blockId).map(_.executorId).toSet
@@ -387,12 +388,23 @@ class BlockManagerReplicationSuite extends SparkFunSuite
387388
testStore => blockLocations.contains(testStore.blockManagerId.executorId)
388389
}.foreach { testStore =>
389390
val testStoreName = testStore.blockManagerId.executorId
390-
assert(
391-
testStore.getLocalValues(blockId).isDefined, s"$blockId was not found in $testStoreName")
392-
testStore.releaseLock(blockId)
391+
val blockResultOpt = testStore.getLocalValues(blockId)
392+
assert(blockResultOpt.isDefined, s"$blockId was not found in $testStoreName")
393+
val localValues = blockResultOpt.get.data.toSeq
394+
assert(localValues.size == 1)
395+
assert(localValues.head === testValue)
393396
assert(master.getLocations(blockId).map(_.executorId).toSet.contains(testStoreName),
394397
s"master does not have status for ${blockId.name} in $testStoreName")
395398

399+
val memoryStore = testStore.memoryStore
400+
if (memoryStore.contains(blockId) && !storageLevel.deserialized) {
401+
memoryStore.getBytes(blockId).get.chunks.foreach { byteBuffer =>
402+
assert(storageLevel.useOffHeap == byteBuffer.isDirect,
403+
s"memory mode ${storageLevel.memoryMode} is not compatible with " +
404+
byteBuffer.getClass.getSimpleName)
405+
}
406+
}
407+
396408
val blockStatus = master.getBlockStatus(blockId)(testStore.blockManagerId)
397409

398410
// Assert that block status in the master for this store has expected storage level

0 commit comments

Comments
 (0)