From 735eca68d8efcd150d47631644cf848b4d98603e Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 14 Mar 2016 21:57:16 -0700 Subject: [PATCH 01/17] Split MemoryEntry into two separate classes (serialized and deserialized) --- .../spark/storage/memory/MemoryStore.scala | 40 +++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 02d44dc732951..61469f24edaae 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -29,7 +29,11 @@ import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel} import org.apache.spark.util.{CompletionIterator, SizeEstimator, Utils} import org.apache.spark.util.collection.SizeTrackingVector -private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean) +private sealed trait MemoryEntry { + val size: Long +} +private case class DeserializedMemoryEntry(value: Array[Any], size: Long) extends MemoryEntry +private case class SerializedMemoryEntry(buffer: ByteBuffer, size: Long) extends MemoryEntry /** * Stores blocks in memory, either as Arrays of deserialized Java objects or as @@ -97,7 +101,7 @@ private[spark] class MemoryStore( // Work on a duplicate - since the original input might be used elsewhere. val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer] assert(bytes.limit == size) - val entry = new MemoryEntry(bytes, size, deserialized = false) + val entry = new SerializedMemoryEntry(bytes, size) entries.synchronized { entries.put(blockId, entry) } @@ -183,10 +187,10 @@ private[spark] class MemoryStore( val arrayValues = vector.toArray vector = null val entry = if (level.deserialized) { - new MemoryEntry(arrayValues, SizeEstimator.estimate(arrayValues), deserialized = true) + new DeserializedMemoryEntry(arrayValues, SizeEstimator.estimate(arrayValues)) } else { val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator) - new MemoryEntry(bytes, bytes.limit, deserialized = false) + new SerializedMemoryEntry(bytes, bytes.limit) } val size = entry.size def transferUnrollToStorage(amount: Long): Unit = { @@ -241,26 +245,22 @@ private[spark] class MemoryStore( } def getBytes(blockId: BlockId): Option[ByteBuffer] = { - val entry = entries.synchronized { - entries.get(blockId) - } - if (entry == null) { - None - } else { - require(!entry.deserialized, "should only call getBytes on blocks stored in serialized form") - Some(entry.value.asInstanceOf[ByteBuffer].duplicate()) // Doesn't actually copy the data + val entry = entries.synchronized { entries.get(blockId) } + entry match { + case null => None + case e: DeserializedMemoryEntry => + throw new IllegalArgumentException("should only call getBytes on serialized blocks") + case SerializedMemoryEntry(bytes, _) => Some(bytes.duplicate()) // Doesn't actually copy data } } def getValues(blockId: BlockId): Option[Iterator[Any]] = { - val entry = entries.synchronized { - entries.get(blockId) - } - if (entry == null) { - None - } else { - require(entry.deserialized, "should only call getValues on deserialized blocks") - Some(entry.value.asInstanceOf[Array[Any]].iterator) + val entry = entries.synchronized { entries.get(blockId) } + entry match { + case null => None + case e: SerializedMemoryEntry => + throw new IllegalArgumentException("should only call getValues on deserialized blocks") + case DeserializedMemoryEntry(values, _) => Some(values.iterator) } } From 8f0828986b72ce722cfe0360ae863971547fc58b Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 15 Mar 2016 11:53:54 -0700 Subject: [PATCH 02/17] Add ChunkedByteBuffer and use it in storage layer. --- .../network/buffer/NettyManagedBuffer.java | 2 +- .../spark/broadcast/TorrentBroadcast.scala | 11 +- .../org/apache/spark/executor/Executor.scala | 5 +- .../spark/scheduler/TaskResultGetter.scala | 3 +- .../apache/spark/storage/BlockManager.scala | 75 ++++++----- .../storage/BlockManagerManagedBuffer.scala | 9 +- .../org/apache/spark/storage/DiskStore.scala | 16 +-- .../spark/storage/memory/MemoryStore.scala | 22 ++-- .../spark/util/io/ChunkedByteBuffer.scala | 124 ++++++++++++++++++ .../spark/storage/BlockManagerSuite.scala | 19 +-- .../apache/spark/storage/DiskStoreSuite.scala | 14 +- .../rdd/WriteAheadLogBackedBlockRDD.scala | 5 +- .../receiver/ReceivedBlockHandler.scala | 8 +- 13 files changed, 225 insertions(+), 88 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala diff --git a/common/network-common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java b/common/network-common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java index 4c8802af7ae67..acc49d968c186 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java +++ b/common/network-common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java @@ -28,7 +28,7 @@ /** * A {@link ManagedBuffer} backed by a Netty {@link ByteBuf}. */ -public final class NettyManagedBuffer extends ManagedBuffer { +public class NettyManagedBuffer extends ManagedBuffer { private final ByteBuf buf; public NettyManagedBuffer(ByteBuf buf) { diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 8091aa8062a21..02b99b24a04a4 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -29,7 +29,7 @@ import org.apache.spark.io.CompressionCodec import org.apache.spark.serializer.Serializer import org.apache.spark.storage.{BlockId, BroadcastBlockId, StorageLevel} import org.apache.spark.util.{ByteBufferInputStream, Utils} -import org.apache.spark.util.io.ByteArrayChunkOutputStream +import org.apache.spark.util.io.{ByteArrayChunkOutputStream, ChunkedByteBuffer} /** * A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]]. @@ -106,7 +106,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec) blocks.zipWithIndex.foreach { case (block, i) => val pieceId = BroadcastBlockId(id, "piece" + i) - if (!blockManager.putBytes(pieceId, block, MEMORY_AND_DISK_SER, tellMaster = true)) { + val bytes = new ChunkedByteBuffer(block) + if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) { throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager") } } @@ -114,10 +115,10 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) } /** Fetch torrent blocks from the driver and/or other executors. */ - private def readBlocks(): Array[ByteBuffer] = { + private def readBlocks(): Array[ChunkedByteBuffer] = { // Fetch chunks of data. Note that all these chunks are stored in the BlockManager and reported // to the driver, so other executors can pull these chunks from this executor as well. - val blocks = new Array[ByteBuffer](numBlocks) + val blocks = new Array[ChunkedByteBuffer](numBlocks) val bm = SparkEnv.get.blockManager for (pid <- Random.shuffle(Seq.range(0, numBlocks))) { @@ -181,7 +182,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) case None => logInfo("Started reading broadcast variable " + id) val startTimeMs = System.currentTimeMillis() - val blocks = readBlocks() + val blocks = readBlocks().flatMap(_.getChunks()) logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs)) val obj = TorrentBroadcast.unBlockifyObject[T]( diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 48372d70d52a9..fff80a0abf12e 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -35,6 +35,7 @@ import org.apache.spark.scheduler.{AccumulableInfo, DirectTaskResult, IndirectTa import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage.{StorageLevel, TaskResultBlockId} import org.apache.spark.util._ +import org.apache.spark.util.io.ChunkedByteBuffer /** * Spark executor, backed by a threadpool to run tasks. @@ -296,7 +297,9 @@ private[spark] class Executor( } else if (resultSize > maxDirectResultSize) { val blockId = TaskResultBlockId(taskId) env.blockManager.putBytes( - blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER) + blockId, + new ChunkedByteBuffer(serializedDirectResult), + StorageLevel.MEMORY_AND_DISK_SER) logInfo( s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)") ser.serialize(new IndirectTaskResult[Any](blockId, resultSize)) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index c94c4f55e9ced..ae8467f57929d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -81,8 +81,9 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul taskSetManager, tid, TaskState.FINISHED, TaskResultLost) return } + // TODO(josh): assumption that there is only one chunk here is a hack val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]]( - serializedTaskResult.get) + serializedTaskResult.get.getChunks().head) sparkEnv.blockManager.master.removeBlock(blockId) (deserializedResult, size) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index d21df4b95b3cd..82d4b007589a6 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -33,7 +33,7 @@ import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics} import org.apache.spark.io.CompressionCodec import org.apache.spark.memory.MemoryManager import org.apache.spark.network._ -import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} +import org.apache.spark.network.buffer.{ManagedBuffer, NettyManagedBuffer} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.ExternalShuffleClient import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo @@ -42,6 +42,7 @@ import org.apache.spark.serializer.{Serializer, SerializerInstance} import org.apache.spark.shuffle.ShuffleManager import org.apache.spark.storage.memory._ import org.apache.spark.util._ +import org.apache.spark.util.io.ChunkedByteBuffer /* Class for returning a fetched block and associated metrics. */ private[spark] class BlockResult( @@ -295,7 +296,7 @@ private[spark] class BlockManager( * Put the block locally, using the given storage level. */ override def putBlockData(blockId: BlockId, data: ManagedBuffer, level: StorageLevel): Boolean = { - putBytes(blockId, data.nioByteBuffer(), level) + putBytes(blockId, new ChunkedByteBuffer(data.nioByteBuffer()), level) } /** @@ -443,7 +444,7 @@ private[spark] class BlockManager( /** * Get block from the local block manager as serialized bytes. */ - def getLocalBytes(blockId: BlockId): Option[ByteBuffer] = { + def getLocalBytes(blockId: BlockId): Option[ChunkedByteBuffer] = { logDebug(s"Getting local block $blockId as bytes") // As an optimization for map output fetches, if the block is for a shuffle, return it // without acquiring a lock; the disk store never deletes (recent) items so this should work @@ -452,7 +453,8 @@ private[spark] class BlockManager( // TODO: This should gracefully handle case where local block is not available. Currently // downstream code will throw an exception. Option( - shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer()) + new ChunkedByteBuffer( + shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())) } else { blockInfoManager.lockForReading(blockId).map { info => doGetLocalBytes(blockId, info) } } @@ -464,7 +466,7 @@ private[spark] class BlockManager( * Must be called while holding a read lock on the block. * Releases the read lock upon exception; keeps the read lock upon successful return. */ - private def doGetLocalBytes(blockId: BlockId, info: BlockInfo): ByteBuffer = { + private def doGetLocalBytes(blockId: BlockId, info: BlockInfo): ChunkedByteBuffer = { val level = info.level logDebug(s"Level for block $blockId is $level") // In order, try to read the serialized bytes from memory, then from disk, then fall back to @@ -479,7 +481,7 @@ private[spark] class BlockManager( diskStore.getBytes(blockId) } else if (level.useMemory && memoryStore.contains(blockId)) { // The block was not found on disk, so serialize an in-memory copy: - dataSerialize(blockId, memoryStore.getValues(blockId).get) + new ChunkedByteBuffer(dataSerialize(blockId, memoryStore.getValues(blockId).get)) } else { releaseLock(blockId) throw new SparkException(s"Block $blockId was not found even though it's read-locked") @@ -503,7 +505,7 @@ private[spark] class BlockManager( */ def getRemoteValues(blockId: BlockId): Option[BlockResult] = { getRemoteBytes(blockId).map { data => - new BlockResult(dataDeserialize(blockId, data), DataReadMethod.Network, data.limit()) + new BlockResult(dataDeserialize(blockId, data), DataReadMethod.Network, data.limit) } } @@ -520,7 +522,7 @@ private[spark] class BlockManager( /** * Get block from remote block managers as serialized bytes. */ - def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = { + def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = { logDebug(s"Getting remote block $blockId") require(blockId != null, "BlockId is null") var runningFailureCount = 0 @@ -566,7 +568,7 @@ private[spark] class BlockManager( } if (data != null) { - return Some(data) + return Some(new ChunkedByteBuffer(data)) } logDebug(s"The value of block $blockId is null") } @@ -704,7 +706,7 @@ private[spark] class BlockManager( */ def putBytes( blockId: BlockId, - bytes: ByteBuffer, + bytes: ChunkedByteBuffer, level: StorageLevel, tellMaster: Boolean = true): Boolean = { require(bytes != null, "Bytes is null") @@ -724,7 +726,7 @@ private[spark] class BlockManager( */ private def doPutBytes( blockId: BlockId, - bytes: ByteBuffer, + bytes: ChunkedByteBuffer, level: StorageLevel, tellMaster: Boolean = true, keepReadLock: Boolean = false): Boolean = { @@ -733,25 +735,22 @@ private[spark] class BlockManager( // Since we're storing bytes, initiate the replication before storing them locally. // This is faster as data is already serialized and ready to send. val replicationFuture = if (level.replication > 1) { - // Duplicate doesn't copy the bytes, but just creates a wrapper - val bufferView = bytes.duplicate() Future { // This is a blocking action and should run in futureExecutionContext which is a cached // thread pool - replicate(blockId, bufferView, level) + replicate(blockId, bytes, level) }(futureExecutionContext) } else { null } - bytes.rewind() - val size = bytes.limit() + val size = bytes.limit if (level.useMemory) { // Put it in memory first, even if it also has useDisk set to true; // We will drop it to disk later if the memory store can't hold it. val putSucceeded = if (level.deserialized) { - val values = dataDeserialize(blockId, bytes.duplicate()) + val values = dataDeserialize(blockId, bytes) memoryStore.putIterator(blockId, values, level) match { case Right(_) => true case Left(iter) => @@ -921,7 +920,7 @@ private[spark] class BlockManager( try { replicate(blockId, bytesToReplicate, level) } finally { - BlockManager.dispose(bytesToReplicate) + bytesToReplicate.dispose() } logDebug("Put block %s remotely took %s" .format(blockId, Utils.getUsedTimeMs(remoteStartTime))) @@ -943,29 +942,27 @@ private[spark] class BlockManager( blockInfo: BlockInfo, blockId: BlockId, level: StorageLevel, - diskBytes: ByteBuffer): ByteBuffer = { + diskBytes: ChunkedByteBuffer): ChunkedByteBuffer = { require(!level.deserialized) if (level.useMemory) { // Synchronize on blockInfo to guard against a race condition where two readers both try to // put values read from disk into the MemoryStore. blockInfo.synchronized { if (memoryStore.contains(blockId)) { - BlockManager.dispose(diskBytes) + diskBytes.dispose() memoryStore.getBytes(blockId).get } else { - val putSucceeded = memoryStore.putBytes(blockId, diskBytes.limit(), () => { + val putSucceeded = memoryStore.putBytes(blockId, diskBytes.limit, () => { // https://issues.apache.org/jira/browse/SPARK-6076 // If the file size is bigger than the free memory, OOM will happen. So if we // cannot put it into MemoryStore, copyForMemory should not be created. That's why - // this action is put into a `() => ByteBuffer` and created lazily. - val copyForMemory = ByteBuffer.allocate(diskBytes.limit) - copyForMemory.put(diskBytes) + // this action is put into a `() => ChunkedByteBuffer` and created lazily. + diskBytes.copy() }) if (putSucceeded) { - BlockManager.dispose(diskBytes) + diskBytes.dispose() memoryStore.getBytes(blockId).get } else { - diskBytes.rewind() diskBytes } } @@ -1031,7 +1028,7 @@ private[spark] class BlockManager( * Replicate block to another node. Not that this is a blocking call that returns after * the block has been replicated. */ - private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { + private def replicate(blockId: BlockId, data: ChunkedByteBuffer, level: StorageLevel): Unit = { val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) val numPeersToReplicateTo = level.replication - 1 val peersForReplication = new ArrayBuffer[BlockManagerId] @@ -1084,11 +1081,15 @@ private[spark] class BlockManager( case Some(peer) => try { val onePeerStartTime = System.currentTimeMillis - data.rewind() - logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer") + logTrace(s"Trying to replicate $blockId of ${data.limit} bytes to $peer") blockTransferService.uploadBlockSync( - peer.host, peer.port, peer.executorId, blockId, new NioManagedBuffer(data), tLevel) - logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %s ms" + peer.host, + peer.port, + peer.executorId, + blockId, + new NettyManagedBuffer(data.toNetty), + tLevel) + logTrace(s"Replicated $blockId of ${data.limit} bytes to $peer in %s ms" .format(System.currentTimeMillis - onePeerStartTime)) peersReplicatedTo += peer peersForReplication -= peer @@ -1111,7 +1112,7 @@ private[spark] class BlockManager( } } val timeTakeMs = (System.currentTimeMillis - startTime) - logDebug(s"Replicating $blockId of ${data.limit()} bytes to " + + logDebug(s"Replicating $blockId of ${data.limit} bytes to " + s"${peersReplicatedTo.size} peer(s) took $timeTakeMs ms") if (peersReplicatedTo.size < numPeersToReplicateTo) { logWarning(s"Block $blockId replicated to only " + @@ -1153,7 +1154,7 @@ private[spark] class BlockManager( */ def dropFromMemory( blockId: BlockId, - data: () => Either[Array[Any], ByteBuffer]): StorageLevel = { + data: () => Either[Array[Any], ChunkedByteBuffer]): StorageLevel = { logInfo(s"Dropping block $blockId from memory") val info = blockInfoManager.assertBlockIsLockedForWriting(blockId) var blockIsUpdated = false @@ -1296,6 +1297,14 @@ private[spark] class BlockManager( dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true)) } + /** + * Deserializes a ByteBuffer into an iterator of values and disposes of it when the end of + * the iterator is reached. + */ + def dataDeserialize(blockId: BlockId, bytes: ChunkedByteBuffer): Iterator[Any] = { + dataDeserializeStream(blockId, bytes.toInputStream(dispose = true)) + } + /** * Deserializes a InputStream into an iterator of values and disposes of it when the end of * the iterator is reached. diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala index 5886b9c00b557..12594e6a2bc0c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala @@ -17,12 +17,11 @@ package org.apache.spark.storage -import java.nio.ByteBuffer - -import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} +import org.apache.spark.network.buffer.{ManagedBuffer, NettyManagedBuffer} +import org.apache.spark.util.io.ChunkedByteBuffer /** - * This [[ManagedBuffer]] wraps a [[ByteBuffer]] which was retrieved from the [[BlockManager]] + * This [[ManagedBuffer]] wraps a [[ChunkedByteBuffer]] retrieved from the [[BlockManager]] * so that the corresponding block's read lock can be released once this buffer's references * are released. * @@ -32,7 +31,7 @@ import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} private[storage] class BlockManagerManagedBuffer( blockManager: BlockManager, blockId: BlockId, - buf: ByteBuffer) extends NioManagedBuffer(buf) { + chunkedBuffer: ChunkedByteBuffer) extends NettyManagedBuffer(chunkedBuffer.toNetty) { override def retain(): ManagedBuffer = { super.retain() diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index caecd97a0b722..00cab8cb379ae 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -25,6 +25,7 @@ import com.google.common.io.Closeables import org.apache.spark.{Logging, SparkConf} import org.apache.spark.util.Utils +import org.apache.spark.util.io.ChunkedByteBuffer /** * Stores BlockManager blocks on disk. @@ -70,23 +71,18 @@ private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) e finishTime - startTime)) } - def putBytes(blockId: BlockId, _bytes: ByteBuffer): Unit = { - // So that we do not modify the input offsets ! - // duplicate does not copy buffer, so inexpensive - val bytes = _bytes.duplicate() + def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = { put(blockId) { fileOutputStream => val channel = fileOutputStream.getChannel Utils.tryWithSafeFinally { - while (bytes.remaining > 0) { - channel.write(bytes) - } + bytes.writeFully(channel) } { channel.close() } } } - def getBytes(blockId: BlockId): ByteBuffer = { + def getBytes(blockId: BlockId): ChunkedByteBuffer = { val file = diskManager.getFile(blockId.name) val channel = new RandomAccessFile(file, "r").getChannel Utils.tryWithSafeFinally { @@ -101,9 +97,9 @@ private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) e } } buf.flip() - buf + new ChunkedByteBuffer(buf) } else { - channel.map(MapMode.READ_ONLY, 0, file.length) + new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length)) } } { channel.close() diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 61469f24edaae..eca93073e13de 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -17,7 +17,6 @@ package org.apache.spark.storage.memory -import java.nio.ByteBuffer import java.util.LinkedHashMap import scala.collection.mutable @@ -28,12 +27,13 @@ import org.apache.spark.memory.MemoryManager import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel} import org.apache.spark.util.{CompletionIterator, SizeEstimator, Utils} import org.apache.spark.util.collection.SizeTrackingVector +import org.apache.spark.util.io.ChunkedByteBuffer private sealed trait MemoryEntry { val size: Long } private case class DeserializedMemoryEntry(value: Array[Any], size: Long) extends MemoryEntry -private case class SerializedMemoryEntry(buffer: ByteBuffer, size: Long) extends MemoryEntry +private case class SerializedMemoryEntry(buffer: ChunkedByteBuffer, size: Long) extends MemoryEntry /** * Stores blocks in memory, either as Arrays of deserialized Java objects or as @@ -94,12 +94,11 @@ private[spark] class MemoryStore( * * @return true if the put() succeeded, false otherwise. */ - def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): Boolean = { + def putBytes(blockId: BlockId, size: Long, _bytes: () => ChunkedByteBuffer): Boolean = { require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") if (memoryManager.acquireStorageMemory(blockId, size)) { // We acquired enough memory for the block, so go ahead and put it - // Work on a duplicate - since the original input might be used elsewhere. - val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer] + val bytes = _bytes() assert(bytes.limit == size) val entry = new SerializedMemoryEntry(bytes, size) entries.synchronized { @@ -189,7 +188,7 @@ private[spark] class MemoryStore( val entry = if (level.deserialized) { new DeserializedMemoryEntry(arrayValues, SizeEstimator.estimate(arrayValues)) } else { - val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator) + val bytes = new ChunkedByteBuffer(blockManager.dataSerialize(blockId, arrayValues.iterator)) new SerializedMemoryEntry(bytes, bytes.limit) } val size = entry.size @@ -244,13 +243,13 @@ private[spark] class MemoryStore( } } - def getBytes(blockId: BlockId): Option[ByteBuffer] = { + def getBytes(blockId: BlockId): Option[ChunkedByteBuffer] = { val entry = entries.synchronized { entries.get(blockId) } entry match { case null => None case e: DeserializedMemoryEntry => throw new IllegalArgumentException("should only call getBytes on serialized blocks") - case SerializedMemoryEntry(bytes, _) => Some(bytes.duplicate()) // Doesn't actually copy data + case SerializedMemoryEntry(bytes, _) => Some(bytes) } } @@ -341,10 +340,9 @@ private[spark] class MemoryStore( // blocks and removing entries. However the check is still here for // future safety. if (entry != null) { - val data = if (entry.deserialized) { - Left(entry.value.asInstanceOf[Array[Any]]) - } else { - Right(entry.value.asInstanceOf[ByteBuffer].duplicate()) + val data = entry match { + case DeserializedMemoryEntry(values, _) => Left(values) + case SerializedMemoryEntry(buffer, _) => Right(buffer) } val newEffectiveStorageLevel = blockManager.dropFromMemory(blockId, () => data) if (newEffectiveStorageLevel.isValid) { diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala new file mode 100644 index 0000000000000..b61f37671fcc1 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.io + +import java.io.InputStream +import java.nio.ByteBuffer +import java.nio.channels.WritableByteChannel + +import io.netty.buffer.{ByteBuf, Unpooled} + +import org.apache.spark.network.util.ByteArrayWritableChannel +import org.apache.spark.storage.BlockManager + +private[spark] class ChunkedByteBuffer(_chunks: Array[ByteBuffer]) { + + def this(byteBuffer: ByteBuffer) = { + this(Array(byteBuffer)) + } + + private[this] val chunks: Array[ByteBuffer] = { + _chunks.map(_.duplicate().rewind().asInstanceOf[ByteBuffer]) // doesn't actually copy bytes + } + + val limit: Long = chunks.map(_.limit().asInstanceOf[Long]).sum + + def writeFully(channel: WritableByteChannel): Unit = { + for (chunk <- chunks) { + // So that we do not modify the input offsets ! + // duplicate does not copy buffer, so inexpensive + val bytes = chunk.duplicate() + while (bytes.remaining > 0) { + channel.write(bytes) + } + } + } + + def toNetty: ByteBuf = Unpooled.wrappedBuffer(chunks: _*) + + def toArray: Array[Byte] = { + // TODO(josh): assert on the limit range / size + val byteChannel = new ByteArrayWritableChannel(limit.toInt) + writeFully(byteChannel) + byteChannel.close() + byteChannel.getData + } + + def toInputStream(dispose: Boolean): InputStream = new ChunkedByteBufferInputStream(this, dispose) + + def getChunks(): Array[ByteBuffer] = chunks.map(_.duplicate()) + + def copy(): ChunkedByteBuffer = { + val copiedChunks = chunks.map { chunk => + // TODO: accept an allocator in this copy method, etc. + val newChunk = ByteBuffer.allocate(chunk.limit()) + newChunk.put(chunk) + } + new ChunkedByteBuffer(copiedChunks) + } + + def dispose(): Unit = { + chunks.foreach(BlockManager.dispose) + } +} + + +// TODO(josh): implement dispose + +private class ChunkedByteBufferInputStream( + chunkedBuffer: ChunkedByteBuffer, + dispose: Boolean = false) extends InputStream { + + // TODO(josh): assumption of non-empty iterator needs to be enforced elsewhere + private[this] val chunksIterator: Iterator[ByteBuffer] = chunkedBuffer.getChunks().iterator + private[this] var currentChunk: ByteBuffer = chunksIterator.next() + + override def available(): Int = { + currentChunk.remaining() + } + +// override def skip(n: Long): Long = { +// // TODO(josh): check contract +// var i = n +// while (i > 0) { +// read() +// i -= 1 +// } +// n +// } + + override def read(): Int = { + if (!currentChunk.hasRemaining && chunksIterator.hasNext) { + currentChunk = chunksIterator.next() + } + if (currentChunk.hasRemaining) { + currentChunk.get() + } else { + -1 + } + } + + // TODO(josh): implement +// override def read(b: Array[Byte]): Int = super.read(b) +// +// override def read(b: Array[Byte], off: Int, len: Int): Int = super.read(b, off, len) + + override def close(): Unit = { + // TODO(josh): implement + } +} diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 2e0c0596a75bb..edf5cd35e40ee 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -44,6 +44,7 @@ import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} import org.apache.spark.shuffle.hash.HashShuffleManager import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat import org.apache.spark.util._ +import org.apache.spark.util.io.ChunkedByteBuffer class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach with PrivateMethodTester with ResetSystemProperties { @@ -192,8 +193,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(master.getLocations("a3").size === 0, "master was told about a3") // Drop a1 and a2 from memory; this should be reported back to the master - store.dropFromMemoryIfExists("a1", () => null: Either[Array[Any], ByteBuffer]) - store.dropFromMemoryIfExists("a2", () => null: Either[Array[Any], ByteBuffer]) + store.dropFromMemoryIfExists("a1", () => null: Either[Array[Any], ChunkedByteBuffer]) + store.dropFromMemoryIfExists("a2", () => null: Either[Array[Any], ChunkedByteBuffer]) assert(store.getSingleAndReleaseLock("a1") === None, "a1 not removed from store") assert(store.getSingleAndReleaseLock("a2") === None, "a2 not removed from store") assert(master.getLocations("a1").size === 0, "master did not remove a1") @@ -434,8 +435,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE t2.join() t3.join() - store.dropFromMemoryIfExists("a1", () => null: Either[Array[Any], ByteBuffer]) - store.dropFromMemoryIfExists("a2", () => null: Either[Array[Any], ByteBuffer]) + store.dropFromMemoryIfExists("a1", () => null: Either[Array[Any], ChunkedByteBuffer]) + store.dropFromMemoryIfExists("a2", () => null: Either[Array[Any], ChunkedByteBuffer]) store.waitForAsyncReregister() } } @@ -1253,9 +1254,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE store = makeBlockManager(12000) val memoryStore = store.memoryStore val blockId = BlockId("rdd_3_10") - var bytes: ByteBuffer = null + var bytes: ChunkedByteBuffer = null memoryStore.putBytes(blockId, 10000, () => { - bytes = ByteBuffer.allocate(10000) + bytes = new ChunkedByteBuffer(ByteBuffer.allocate(10000)) bytes }) assert(memoryStore.getSize(blockId) === 10000) @@ -1364,7 +1365,7 @@ private object BlockManagerSuite { def dropFromMemoryIfExists( blockId: BlockId, - data: () => Either[Array[Any], ByteBuffer]): Unit = { + data: () => Either[Array[Any], ChunkedByteBuffer]): Unit = { store.blockInfoManager.lockForWriting(blockId).foreach { info => val newEffectiveStorageLevel = store.dropFromMemory(blockId, data) if (newEffectiveStorageLevel.isValid) { @@ -1394,7 +1395,9 @@ private object BlockManagerSuite { val getLocalAndReleaseLock: (BlockId) => Option[BlockResult] = wrapGet(store.getLocalValues) val getAndReleaseLock: (BlockId) => Option[BlockResult] = wrapGet(store.get) val getSingleAndReleaseLock: (BlockId) => Option[Any] = wrapGet(store.getSingle) - val getLocalBytesAndReleaseLock: (BlockId) => Option[ByteBuffer] = wrapGet(store.getLocalBytes) + val getLocalBytesAndReleaseLock: (BlockId) => Option[ChunkedByteBuffer] = { + wrapGet(store.getLocalBytes) + } } } diff --git a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala index 97e74fe706002..9ed5016510d56 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala @@ -21,6 +21,7 @@ import java.nio.{ByteBuffer, MappedByteBuffer} import java.util.Arrays import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.util.io.ChunkedByteBuffer class DiskStoreSuite extends SparkFunSuite { @@ -29,7 +30,7 @@ class DiskStoreSuite extends SparkFunSuite { // Create a non-trivial (not all zeros) byte array val bytes = Array.tabulate[Byte](1000)(_.toByte) - val byteBuffer = ByteBuffer.wrap(bytes) + val byteBuffer = new ChunkedByteBuffer(ByteBuffer.wrap(bytes)) val blockId = BlockId("rdd_1_2") val diskBlockManager = new DiskBlockManager(new SparkConf(), deleteFilesOnStop = true) @@ -44,9 +45,10 @@ class DiskStoreSuite extends SparkFunSuite { val notMapped = diskStoreNotMapped.getBytes(blockId) // Not possible to do isInstanceOf due to visibility of HeapByteBuffer - assert(notMapped.getClass.getName.endsWith("HeapByteBuffer"), + assert(notMapped.getChunks().forall(_.getClass.getName.endsWith("HeapByteBuffer")), "Expected HeapByteBuffer for un-mapped read") - assert(mapped.isInstanceOf[MappedByteBuffer], "Expected MappedByteBuffer for mapped read") + assert(mapped.getChunks().forall(_.isInstanceOf[MappedByteBuffer]), + "Expected MappedByteBuffer for mapped read") def arrayFromByteBuffer(in: ByteBuffer): Array[Byte] = { val array = new Array[Byte](in.remaining()) @@ -54,9 +56,7 @@ class DiskStoreSuite extends SparkFunSuite { array } - val mappedAsArray = arrayFromByteBuffer(mapped) - val notMappedAsArray = arrayFromByteBuffer(notMapped) - assert(Arrays.equals(mappedAsArray, bytes)) - assert(Arrays.equals(notMappedAsArray, bytes)) + assert(Arrays.equals(mapped.toArray, bytes)) + assert(Arrays.equals(notMapped.toArray, bytes)) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala index f811784b25c82..1926b4c2eecd9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala @@ -28,11 +28,13 @@ import org.apache.spark.rdd.BlockRDD import org.apache.spark.storage.{BlockId, StorageLevel} import org.apache.spark.streaming.util._ import org.apache.spark.util.SerializableConfiguration +import org.apache.spark.util.io.ChunkedByteBuffer /** * Partition class for [[org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD]]. * It contains information about the id of the blocks having this partition's data and * the corresponding record handle in the write ahead log that backs the partition. + * * @param index index of the partition * @param blockId id of the block having the partition data * @param isBlockIdValid Whether the block Ids are valid (i.e., the blocks are present in the Spark @@ -59,7 +61,6 @@ class WriteAheadLogBackedBlockRDDPartition( * correctness, and it can be used in situations where it is known that the block * does not exist in the Spark executors (e.g. after a failed driver is restarted). * - * * @param sc SparkContext * @param _blockIds Ids of the blocks that contains this RDD's data * @param walRecordHandles Record handles in write ahead logs that contain this RDD's data @@ -156,7 +157,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( logInfo(s"Read partition data of $this from write ahead log, record handle " + partition.walRecordHandle) if (storeInBlockManager) { - blockManager.putBytes(blockId, dataRead, storageLevel) + blockManager.putBytes(blockId, new ChunkedByteBuffer(dataRead), storageLevel) logDebug(s"Stored partition data of $this into block manager with level $storageLevel") dataRead.rewind() } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index e22e320b17126..d3c0162dca77a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -29,6 +29,7 @@ import org.apache.spark.storage._ import org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler._ import org.apache.spark.streaming.util.{WriteAheadLogRecordHandle, WriteAheadLogUtils} import org.apache.spark.util.{Clock, SystemClock, ThreadUtils} +import org.apache.spark.util.io.ChunkedByteBuffer /** Trait that represents the metadata related to storage of blocks */ private[streaming] trait ReceivedBlockStoreResult { @@ -83,7 +84,8 @@ private[streaming] class BlockManagerBasedBlockHandler( numRecords = countIterator.count putResult case ByteBufferBlock(byteBuffer) => - blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true) + blockManager.putBytes( + blockId, new ChunkedByteBuffer(byteBuffer), storageLevel, tellMaster = true) case o => throw new SparkException( s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}") @@ -184,8 +186,8 @@ private[streaming] class WriteAheadLogBasedBlockHandler( // Store the block in block manager val storeInBlockManagerFuture = Future { - val putSucceeded = - blockManager.putBytes(blockId, serializedBlock, effectiveStorageLevel, tellMaster = true) + val putSucceeded = blockManager.putBytes( + blockId, new ChunkedByteBuffer(serializedBlock), effectiveStorageLevel, tellMaster = true) if (!putSucceeded) { throw new SparkException( s"Could not store $blockId to block manager with storage level $storageLevel") From 79b1a6a31236b81c444dda1e8ee1cfdf2f3c36ae Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 15 Mar 2016 13:53:27 -0700 Subject: [PATCH 03/17] Add test cases and fix bug in ChunkedByteBuffer.toInputStream() --- .../spark/util/io/ChunkedByteBuffer.scala | 31 +++-- .../spark/io/ChunkedByteBufferSuite.scala | 113 ++++++++++++++++++ 2 files changed, 133 insertions(+), 11 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index b61f37671fcc1..b566cac2de472 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -21,6 +21,7 @@ import java.io.InputStream import java.nio.ByteBuffer import java.nio.channels.WritableByteChannel +import com.google.common.primitives.UnsignedBytes import io.netty.buffer.{ByteBuf, Unpooled} import org.apache.spark.network.util.ByteArrayWritableChannel @@ -28,6 +29,9 @@ import org.apache.spark.storage.BlockManager private[spark] class ChunkedByteBuffer(_chunks: Array[ByteBuffer]) { + require(_chunks.nonEmpty, "Cannot create a ChunkedByteBuffer with no chunks") + require(_chunks.forall(_.limit() > 0), "chunks must be non-empty") + def this(byteBuffer: ByteBuffer) = { this(Array(byteBuffer)) } @@ -39,20 +43,20 @@ private[spark] class ChunkedByteBuffer(_chunks: Array[ByteBuffer]) { val limit: Long = chunks.map(_.limit().asInstanceOf[Long]).sum def writeFully(channel: WritableByteChannel): Unit = { - for (chunk <- chunks) { - // So that we do not modify the input offsets ! - // duplicate does not copy buffer, so inexpensive - val bytes = chunk.duplicate() + for (bytes <- getChunks()) { while (bytes.remaining > 0) { channel.write(bytes) } } } - def toNetty: ByteBuf = Unpooled.wrappedBuffer(chunks: _*) + def toNetty: ByteBuf = Unpooled.wrappedBuffer(getChunks(): _*) def toArray: Array[Byte] = { - // TODO(josh): assert on the limit range / size + if (limit >= Integer.MAX_VALUE) { + throw new UnsupportedOperationException( + s"cannot call toArray because buffer size ($limit bytes) exceeds maximum array size") + } val byteChannel = new ByteArrayWritableChannel(limit.toInt) writeFully(byteChannel) byteChannel.close() @@ -64,7 +68,7 @@ private[spark] class ChunkedByteBuffer(_chunks: Array[ByteBuffer]) { def getChunks(): Array[ByteBuffer] = chunks.map(_.duplicate()) def copy(): ChunkedByteBuffer = { - val copiedChunks = chunks.map { chunk => + val copiedChunks = getChunks().map { chunk => // TODO: accept an allocator in this copy method, etc. val newChunk = ByteBuffer.allocate(chunk.limit()) newChunk.put(chunk) @@ -84,12 +88,16 @@ private class ChunkedByteBufferInputStream( chunkedBuffer: ChunkedByteBuffer, dispose: Boolean = false) extends InputStream { - // TODO(josh): assumption of non-empty iterator needs to be enforced elsewhere private[this] val chunksIterator: Iterator[ByteBuffer] = chunkedBuffer.getChunks().iterator private[this] var currentChunk: ByteBuffer = chunksIterator.next() + assert(currentChunk.position() == 0) override def available(): Int = { - currentChunk.remaining() + while (!currentChunk.hasRemaining && chunksIterator.hasNext) { + currentChunk = chunksIterator.next() + assert(currentChunk.position() == 0) + } + currentChunk.remaining() } // override def skip(n: Long): Long = { @@ -105,9 +113,10 @@ private class ChunkedByteBufferInputStream( override def read(): Int = { if (!currentChunk.hasRemaining && chunksIterator.hasNext) { currentChunk = chunksIterator.next() + assert(currentChunk.position() == 0) } if (currentChunk.hasRemaining) { - currentChunk.get() + UnsignedBytes.toInt(currentChunk.get()) } else { -1 } @@ -119,6 +128,6 @@ private class ChunkedByteBufferInputStream( // override def read(b: Array[Byte], off: Int, len: Int): Int = super.read(b, off, len) override def close(): Unit = { - // TODO(josh): implement + currentChunk = null } } diff --git a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala new file mode 100644 index 0000000000000..b614c426fc5a2 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.io + +import java.nio.ByteBuffer + +import com.google.common.io.ByteStreams + +import org.apache.spark.SparkFunSuite +import org.apache.spark.network.util.ByteArrayWritableChannel +import org.apache.spark.util.io.ChunkedByteBuffer + +class ChunkedByteBufferSuite extends SparkFunSuite { + + test("must have at least one chunk") { + intercept[IllegalArgumentException] { + new ChunkedByteBuffer(Array.empty[ByteBuffer]) + } + } + + test("chunks must be non-empty") { + intercept[IllegalArgumentException] { + new ChunkedByteBuffer(Array(ByteBuffer.allocate(0))) + } + } + + test("constructor duplicates chunks") { + val byteBuffer = ByteBuffer.allocate(8) + byteBuffer.limit(4) + val chunkedByteBuffer = new ChunkedByteBuffer(Array(byteBuffer)) + assert(chunkedByteBuffer.limit === 4) + assert(chunkedByteBuffer.getChunks().head.limit() === 4) + // Changing the original ByteBuffer's position and limit does not affect the ChunkedByteBuffer: + byteBuffer.limit(8) + byteBuffer.position(4) + assert(chunkedByteBuffer.limit === 4) + assert(chunkedByteBuffer.getChunks().head.limit() === 4) + assert(chunkedByteBuffer.getChunks().head.position() === 0) + } + + test("constructor rewinds chunks") { + val byteBuffer = ByteBuffer.allocate(8) + byteBuffer.get() + byteBuffer.get() + assert(byteBuffer.position() === 2) + val chunkedByteBuffer = new ChunkedByteBuffer(Array(byteBuffer)) + assert(chunkedByteBuffer.limit === 8) + assert(chunkedByteBuffer.getChunks().head.position() === 0) + } + + test("getChunks() duplicates chunks") { + val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(8))) + chunkedByteBuffer.getChunks().head.position(4) + assert(chunkedByteBuffer.getChunks().head.position() === 0) + } + + test("copy() does not affect original buffer's position") { + val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(8))) + chunkedByteBuffer.copy() + assert(chunkedByteBuffer.getChunks().head.position() === 0) + } + + test("writeFully() does not affect original buffer's position") { + val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(8))) + chunkedByteBuffer.writeFully(new ByteArrayWritableChannel(chunkedByteBuffer.limit.toInt)) + assert(chunkedByteBuffer.getChunks().head.position() === 0) + } + + test("toArray()") { + val bytes = ByteBuffer.wrap(Array.tabulate(8)(_.toByte)) + val chunkedByteBuffer = new ChunkedByteBuffer(Array(bytes, bytes)) + assert(chunkedByteBuffer.toArray === bytes.array() ++ bytes.array()) + } + + test("toArray() throws UnsupportedOperationException if size exceeds 2GB") { + val fourMegabyteBuffer = ByteBuffer.allocate(1024 * 1024 * 4) + fourMegabyteBuffer.limit(fourMegabyteBuffer.capacity()) + val chunkedByteBuffer = new ChunkedByteBuffer(Array.fill(1024)(fourMegabyteBuffer)) + assert(chunkedByteBuffer.limit === (1024L * 1024L * 1024L * 4L)) + intercept[UnsupportedOperationException] { + chunkedByteBuffer.toArray + } + } + + // TODO(josh) test dispose behavior + test("toInputStream()") { + val bytes1 = ByteBuffer.wrap(Array.tabulate(256)(_.toByte)) + val bytes2 = ByteBuffer.wrap(Array.tabulate(128)(_.toByte)) + val chunkedByteBuffer = new ChunkedByteBuffer(Array(bytes1, bytes2)) + assert(chunkedByteBuffer.limit === bytes1.limit() + bytes2.limit()) + + val inputStream = chunkedByteBuffer.toInputStream(false) + val bytesFromStream = new Array[Byte](chunkedByteBuffer.limit.toInt) + ByteStreams.readFully(inputStream, bytesFromStream) + assert(bytesFromStream === bytes1.array() ++ bytes2.array()) + assert(chunkedByteBuffer.getChunks().head.position() === 0) + } +} From 7dbcd5a9ef0c669f5db97990af944d8b63300e97 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 15 Mar 2016 15:05:23 -0700 Subject: [PATCH 04/17] WIP towards understanding destruction. --- .../spark/broadcast/TorrentBroadcast.scala | 2 +- .../org/apache/spark/executor/Executor.scala | 2 +- .../apache/spark/storage/BlockManager.scala | 9 +- .../spark/storage/memory/MemoryStore.scala | 1 + .../spark/util/io/ChunkedByteBuffer.scala | 85 ++++++++++++------- .../spark/io/ChunkedByteBufferSuite.scala | 24 ------ .../rdd/WriteAheadLogBackedBlockRDD.scala | 2 +- .../receiver/ReceivedBlockHandler.scala | 7 +- 8 files changed, 70 insertions(+), 62 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 02b99b24a04a4..0225f72bb07fd 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -106,7 +106,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec) blocks.zipWithIndex.foreach { case (block, i) => val pieceId = BroadcastBlockId(id, "piece" + i) - val bytes = new ChunkedByteBuffer(block) + val bytes = new ChunkedByteBuffer(block.duplicate()) if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) { throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager") } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index fff80a0abf12e..3f3be37077b29 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -298,7 +298,7 @@ private[spark] class Executor( val blockId = TaskResultBlockId(taskId) env.blockManager.putBytes( blockId, - new ChunkedByteBuffer(serializedDirectResult), + new ChunkedByteBuffer(serializedDirectResult.duplicate()), StorageLevel.MEMORY_AND_DISK_SER) logInfo( s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)") diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 82d4b007589a6..e6adc40f83fa3 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -418,7 +418,7 @@ private[spark] class BlockManager( val iter: Iterator[Any] = if (level.deserialized) { memoryStore.getValues(blockId).get } else { - dataDeserialize(blockId, memoryStore.getBytes(blockId).get) + dataDeserializeStream(blockId, memoryStore.getBytes(blockId).get.toInputStream) } val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId)) Some(new BlockResult(ci, DataReadMethod.Memory, info.size)) @@ -426,7 +426,7 @@ private[spark] class BlockManager( val iterToReturn: Iterator[Any] = { val diskBytes = diskStore.getBytes(blockId) if (level.deserialized) { - val diskValues = dataDeserialize(blockId, diskBytes) + val diskValues = dataDeserializeStream(blockId, diskBytes.toDestructiveInputStream) maybeCacheDiskValuesInMemory(info, blockId, level, diskValues) } else { dataDeserialize(blockId, maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes)) @@ -505,7 +505,8 @@ private[spark] class BlockManager( */ def getRemoteValues(blockId: BlockId): Option[BlockResult] = { getRemoteBytes(blockId).map { data => - new BlockResult(dataDeserialize(blockId, data), DataReadMethod.Network, data.limit) + val values = dataDeserializeStream(blockId, data.toInputStream) + new BlockResult(values, DataReadMethod.Network, data.limit) } } @@ -750,7 +751,7 @@ private[spark] class BlockManager( // Put it in memory first, even if it also has useDisk set to true; // We will drop it to disk later if the memory store can't hold it. val putSucceeded = if (level.deserialized) { - val values = dataDeserialize(blockId, bytes) + val values = dataDeserializeStream(blockId, bytes.toInputStream) memoryStore.putIterator(blockId, values, level) match { case Right(_) => true case Left(iter) => diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index eca93073e13de..daef5477ac4a6 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -188,6 +188,7 @@ private[spark] class MemoryStore( val entry = if (level.deserialized) { new DeserializedMemoryEntry(arrayValues, SizeEstimator.estimate(arrayValues)) } else { + // TODO(josh): incrementally serialize val bytes = new ChunkedByteBuffer(blockManager.dataSerialize(blockId, arrayValues.iterator)) new SerializedMemoryEntry(bytes, bytes.limit) } diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index b566cac2de472..c9f8e6578c936 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -27,22 +27,20 @@ import io.netty.buffer.{ByteBuf, Unpooled} import org.apache.spark.network.util.ByteArrayWritableChannel import org.apache.spark.storage.BlockManager -private[spark] class ChunkedByteBuffer(_chunks: Array[ByteBuffer]) { +private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { + require(chunks != null, "chunks must not be null") + require(chunks.nonEmpty, "Cannot create a ChunkedByteBuffer with no chunks") + require(chunks.forall(_.limit() > 0), "chunks must be non-empty") + require(chunks.forall(_.position() == 0), "chunks' positions must be 0") - require(_chunks.nonEmpty, "Cannot create a ChunkedByteBuffer with no chunks") - require(_chunks.forall(_.limit() > 0), "chunks must be non-empty") + val limit: Long = chunks.map(_.limit().asInstanceOf[Long]).sum def this(byteBuffer: ByteBuffer) = { this(Array(byteBuffer)) } - private[this] val chunks: Array[ByteBuffer] = { - _chunks.map(_.duplicate().rewind().asInstanceOf[ByteBuffer]) // doesn't actually copy bytes - } - - val limit: Long = chunks.map(_.limit().asInstanceOf[Long]).sum - def writeFully(channel: WritableByteChannel): Unit = { + assertNotDisposed() for (bytes <- getChunks()) { while (bytes.remaining > 0) { channel.write(bytes) @@ -50,9 +48,13 @@ private[spark] class ChunkedByteBuffer(_chunks: Array[ByteBuffer]) { } } - def toNetty: ByteBuf = Unpooled.wrappedBuffer(getChunks(): _*) + def toNetty: ByteBuf = { + assertNotDisposed() + Unpooled.wrappedBuffer(getChunks(): _*) + } def toArray: Array[Byte] = { + assertNotDisposed() if (limit >= Integer.MAX_VALUE) { throw new UnsupportedOperationException( s"cannot call toArray because buffer size ($limit bytes) exceeds maximum array size") @@ -63,39 +65,55 @@ private[spark] class ChunkedByteBuffer(_chunks: Array[ByteBuffer]) { byteChannel.getData } - def toInputStream(dispose: Boolean): InputStream = new ChunkedByteBufferInputStream(this, dispose) + def toInputStream: InputStream = { + assertNotDisposed() + new ChunkedByteBufferInputStream(getChunks().iterator) + } - def getChunks(): Array[ByteBuffer] = chunks.map(_.duplicate()) + def toDestructiveInputStream: InputStream = { + val is = new ChunkedByteBufferInputStream(chunks.iterator) + chunks = null + is + } + + def getChunks(): Array[ByteBuffer] = { + assertNotDisposed() + chunks.map(_.duplicate()) + } def copy(): ChunkedByteBuffer = { + assertNotDisposed() val copiedChunks = getChunks().map { chunk => - // TODO: accept an allocator in this copy method, etc. + // TODO: accept an allocator in this copy method to integrate with mem. accounting systems val newChunk = ByteBuffer.allocate(chunk.limit()) newChunk.put(chunk) + newChunk.flip() + newChunk } new ChunkedByteBuffer(copiedChunks) } def dispose(): Unit = { + assertNotDisposed() chunks.foreach(BlockManager.dispose) + chunks = null } -} + private def assertNotDisposed(): Unit = { + if (chunks == null) { + throw new IllegalStateException("Cannot call methods on a disposed ChunkedByteBuffer") + } + } +} -// TODO(josh): implement dispose - -private class ChunkedByteBufferInputStream( - chunkedBuffer: ChunkedByteBuffer, - dispose: Boolean = false) extends InputStream { +private class ChunkedByteBufferInputStream(chunks: Iterator[ByteBuffer]) extends InputStream { - private[this] val chunksIterator: Iterator[ByteBuffer] = chunkedBuffer.getChunks().iterator - private[this] var currentChunk: ByteBuffer = chunksIterator.next() - assert(currentChunk.position() == 0) + private[this] var currentChunk: ByteBuffer = chunks.next() override def available(): Int = { - while (!currentChunk.hasRemaining && chunksIterator.hasNext) { - currentChunk = chunksIterator.next() - assert(currentChunk.position() == 0) + while (!currentChunk.hasRemaining && chunks.hasNext) { + BlockManager.dispose(currentChunk) + currentChunk = chunks.next() } currentChunk.remaining() } @@ -111,13 +129,15 @@ private class ChunkedByteBufferInputStream( // } override def read(): Int = { - if (!currentChunk.hasRemaining && chunksIterator.hasNext) { - currentChunk = chunksIterator.next() - assert(currentChunk.position() == 0) + if (currentChunk != null && !currentChunk.hasRemaining && chunks.hasNext) { + BlockManager.dispose(currentChunk) + currentChunk = chunks.next() } - if (currentChunk.hasRemaining) { + if (currentChunk != null && currentChunk.hasRemaining) { UnsignedBytes.toInt(currentChunk.get()) } else { + BlockManager.dispose(currentChunk) + currentChunk = null -1 } } @@ -128,6 +148,13 @@ private class ChunkedByteBufferInputStream( // override def read(b: Array[Byte], off: Int, len: Int): Int = super.read(b, off, len) override def close(): Unit = { + if (currentChunk != null) { + BlockManager.dispose(currentChunk) + while (chunks.hasNext) { + currentChunk = chunks.next() + BlockManager.dispose(currentChunk) + } + } currentChunk = null } } diff --git a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala index b614c426fc5a2..44b0f16693cc8 100644 --- a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala @@ -39,30 +39,6 @@ class ChunkedByteBufferSuite extends SparkFunSuite { } } - test("constructor duplicates chunks") { - val byteBuffer = ByteBuffer.allocate(8) - byteBuffer.limit(4) - val chunkedByteBuffer = new ChunkedByteBuffer(Array(byteBuffer)) - assert(chunkedByteBuffer.limit === 4) - assert(chunkedByteBuffer.getChunks().head.limit() === 4) - // Changing the original ByteBuffer's position and limit does not affect the ChunkedByteBuffer: - byteBuffer.limit(8) - byteBuffer.position(4) - assert(chunkedByteBuffer.limit === 4) - assert(chunkedByteBuffer.getChunks().head.limit() === 4) - assert(chunkedByteBuffer.getChunks().head.position() === 0) - } - - test("constructor rewinds chunks") { - val byteBuffer = ByteBuffer.allocate(8) - byteBuffer.get() - byteBuffer.get() - assert(byteBuffer.position() === 2) - val chunkedByteBuffer = new ChunkedByteBuffer(Array(byteBuffer)) - assert(chunkedByteBuffer.limit === 8) - assert(chunkedByteBuffer.getChunks().head.position() === 0) - } - test("getChunks() duplicates chunks") { val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(8))) chunkedByteBuffer.getChunks().head.position(4) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala index 1926b4c2eecd9..8625882b04421 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala @@ -157,7 +157,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( logInfo(s"Read partition data of $this from write ahead log, record handle " + partition.walRecordHandle) if (storeInBlockManager) { - blockManager.putBytes(blockId, new ChunkedByteBuffer(dataRead), storageLevel) + blockManager.putBytes(blockId, new ChunkedByteBuffer(dataRead.duplicate()), storageLevel) logDebug(s"Stored partition data of $this into block manager with level $storageLevel") dataRead.rewind() } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index d3c0162dca77a..58c3cd01fed88 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -85,7 +85,7 @@ private[streaming] class BlockManagerBasedBlockHandler( putResult case ByteBufferBlock(byteBuffer) => blockManager.putBytes( - blockId, new ChunkedByteBuffer(byteBuffer), storageLevel, tellMaster = true) + blockId, new ChunkedByteBuffer(byteBuffer.duplicate()), storageLevel, tellMaster = true) case o => throw new SparkException( s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}") @@ -187,7 +187,10 @@ private[streaming] class WriteAheadLogBasedBlockHandler( // Store the block in block manager val storeInBlockManagerFuture = Future { val putSucceeded = blockManager.putBytes( - blockId, new ChunkedByteBuffer(serializedBlock), effectiveStorageLevel, tellMaster = true) + blockId, + new ChunkedByteBuffer(serializedBlock.duplicate()), + effectiveStorageLevel, + tellMaster = true) if (!putSucceeded) { throw new SparkException( s"Could not store $blockId to block manager with storage level $storageLevel") From 3fbec212d9f714386121b4aed791d6c9fb1359a2 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 15 Mar 2016 15:39:27 -0700 Subject: [PATCH 05/17] Small fixes to dispose behavior. --- .../apache/spark/storage/BlockManager.scala | 9 ++- .../spark/util/io/ChunkedByteBuffer.scala | 71 ++++++------------- .../spark/io/ChunkedByteBufferSuite.scala | 5 +- 3 files changed, 30 insertions(+), 55 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index e6adc40f83fa3..82d4b007589a6 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -418,7 +418,7 @@ private[spark] class BlockManager( val iter: Iterator[Any] = if (level.deserialized) { memoryStore.getValues(blockId).get } else { - dataDeserializeStream(blockId, memoryStore.getBytes(blockId).get.toInputStream) + dataDeserialize(blockId, memoryStore.getBytes(blockId).get) } val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId)) Some(new BlockResult(ci, DataReadMethod.Memory, info.size)) @@ -426,7 +426,7 @@ private[spark] class BlockManager( val iterToReturn: Iterator[Any] = { val diskBytes = diskStore.getBytes(blockId) if (level.deserialized) { - val diskValues = dataDeserializeStream(blockId, diskBytes.toDestructiveInputStream) + val diskValues = dataDeserialize(blockId, diskBytes) maybeCacheDiskValuesInMemory(info, blockId, level, diskValues) } else { dataDeserialize(blockId, maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes)) @@ -505,8 +505,7 @@ private[spark] class BlockManager( */ def getRemoteValues(blockId: BlockId): Option[BlockResult] = { getRemoteBytes(blockId).map { data => - val values = dataDeserializeStream(blockId, data.toInputStream) - new BlockResult(values, DataReadMethod.Network, data.limit) + new BlockResult(dataDeserialize(blockId, data), DataReadMethod.Network, data.limit) } } @@ -751,7 +750,7 @@ private[spark] class BlockManager( // Put it in memory first, even if it also has useDisk set to true; // We will drop it to disk later if the memory store can't hold it. val putSucceeded = if (level.deserialized) { - val values = dataDeserializeStream(blockId, bytes.toInputStream) + val values = dataDeserialize(blockId, bytes) memoryStore.putIterator(blockId, values, level) match { case Right(_) => true case Left(iter) => diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index c9f8e6578c936..5809548c4a10d 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -40,7 +40,6 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { } def writeFully(channel: WritableByteChannel): Unit = { - assertNotDisposed() for (bytes <- getChunks()) { while (bytes.remaining > 0) { channel.write(bytes) @@ -49,12 +48,10 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { } def toNetty: ByteBuf = { - assertNotDisposed() Unpooled.wrappedBuffer(getChunks(): _*) } def toArray: Array[Byte] = { - assertNotDisposed() if (limit >= Integer.MAX_VALUE) { throw new UnsupportedOperationException( s"cannot call toArray because buffer size ($limit bytes) exceeds maximum array size") @@ -65,24 +62,15 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { byteChannel.getData } - def toInputStream: InputStream = { - assertNotDisposed() - new ChunkedByteBufferInputStream(getChunks().iterator) - } - - def toDestructiveInputStream: InputStream = { - val is = new ChunkedByteBufferInputStream(chunks.iterator) - chunks = null - is + def toInputStream(dispose: Boolean = false): InputStream = { + new ChunkedByteBufferInputStream(this, dispose) } def getChunks(): Array[ByteBuffer] = { - assertNotDisposed() chunks.map(_.duplicate()) } def copy(): ChunkedByteBuffer = { - assertNotDisposed() val copiedChunks = getChunks().map { chunk => // TODO: accept an allocator in this copy method to integrate with mem. accounting systems val newChunk = ByteBuffer.allocate(chunk.limit()) @@ -93,41 +81,29 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { new ChunkedByteBuffer(copiedChunks) } + /** + * Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that + * might cause errors if one attempts to read from the unmapped buffer, but it's better than + * waiting for the GC to find it because that could lead to huge numbers of open files. There's + * unfortunately no standard API to do this. + */ def dispose(): Unit = { - assertNotDisposed() chunks.foreach(BlockManager.dispose) - chunks = null - } - - private def assertNotDisposed(): Unit = { - if (chunks == null) { - throw new IllegalStateException("Cannot call methods on a disposed ChunkedByteBuffer") - } } } -private class ChunkedByteBufferInputStream(chunks: Iterator[ByteBuffer]) extends InputStream { +/** + * Reads data from a ChunkedByteBuffer, and optionally cleans it up using BlockManager.dispose() + * at the end of the stream (e.g. to close a memory-mapped file). + */ +private class ChunkedByteBufferInputStream( + var chunkedByteBuffer: ChunkedByteBuffer, + dispose: Boolean) + extends InputStream { + private[this] var chunks = chunkedByteBuffer.getChunks().iterator private[this] var currentChunk: ByteBuffer = chunks.next() - override def available(): Int = { - while (!currentChunk.hasRemaining && chunks.hasNext) { - BlockManager.dispose(currentChunk) - currentChunk = chunks.next() - } - currentChunk.remaining() - } - -// override def skip(n: Long): Long = { -// // TODO(josh): check contract -// var i = n -// while (i > 0) { -// read() -// i -= 1 -// } -// n -// } - override def read(): Int = { if (currentChunk != null && !currentChunk.hasRemaining && chunks.hasNext) { BlockManager.dispose(currentChunk) @@ -136,25 +112,24 @@ private class ChunkedByteBufferInputStream(chunks: Iterator[ByteBuffer]) extends if (currentChunk != null && currentChunk.hasRemaining) { UnsignedBytes.toInt(currentChunk.get()) } else { - BlockManager.dispose(currentChunk) - currentChunk = null + close() -1 } } // TODO(josh): implement // override def read(b: Array[Byte]): Int = super.read(b) -// // override def read(b: Array[Byte], off: Int, len: Int): Int = super.read(b, off, len) +// override def skip(n: Long): Long = super.skip(n) override def close(): Unit = { if (currentChunk != null) { - BlockManager.dispose(currentChunk) - while (chunks.hasNext) { - currentChunk = chunks.next() - BlockManager.dispose(currentChunk) + if (dispose) { + chunkedByteBuffer.dispose() } } + chunkedByteBuffer = null + chunks = null currentChunk = null } } diff --git a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala index 44b0f16693cc8..952b3f0924eed 100644 --- a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala @@ -73,17 +73,18 @@ class ChunkedByteBufferSuite extends SparkFunSuite { } } - // TODO(josh) test dispose behavior test("toInputStream()") { val bytes1 = ByteBuffer.wrap(Array.tabulate(256)(_.toByte)) val bytes2 = ByteBuffer.wrap(Array.tabulate(128)(_.toByte)) val chunkedByteBuffer = new ChunkedByteBuffer(Array(bytes1, bytes2)) assert(chunkedByteBuffer.limit === bytes1.limit() + bytes2.limit()) - val inputStream = chunkedByteBuffer.toInputStream(false) + val inputStream = chunkedByteBuffer.toInputStream(dispose = false) val bytesFromStream = new Array[Byte](chunkedByteBuffer.limit.toInt) ByteStreams.readFully(inputStream, bytesFromStream) assert(bytesFromStream === bytes1.array() ++ bytes2.array()) assert(chunkedByteBuffer.getChunks().head.position() === 0) } + + // TODO(josh): figure out how to test the dispose=true case. } From e5e663f22094333dac6e184c78176ee658e3441e Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 15 Mar 2016 15:49:24 -0700 Subject: [PATCH 06/17] Modify BlockManager.dataSerialize to write ChunkedByteBuffers. --- .../org/apache/spark/storage/BlockManager.scala | 14 +++++++------- .../apache/spark/storage/memory/MemoryStore.scala | 3 +-- .../apache/spark/util/io/ChunkedByteBuffer.scala | 8 ++++++++ .../streaming/receiver/ReceivedBlockHandler.scala | 6 +++--- 4 files changed, 19 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 82d4b007589a6..94452cf0b99dd 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -42,7 +42,7 @@ import org.apache.spark.serializer.{Serializer, SerializerInstance} import org.apache.spark.shuffle.ShuffleManager import org.apache.spark.storage.memory._ import org.apache.spark.util._ -import org.apache.spark.util.io.ChunkedByteBuffer +import org.apache.spark.util.io.{ByteArrayChunkOutputStream, ChunkedByteBuffer} /* Class for returning a fetched block and associated metrics. */ private[spark] class BlockResult( @@ -481,7 +481,7 @@ private[spark] class BlockManager( diskStore.getBytes(blockId) } else if (level.useMemory && memoryStore.contains(blockId)) { // The block was not found on disk, so serialize an in-memory copy: - new ChunkedByteBuffer(dataSerialize(blockId, memoryStore.getValues(blockId).get)) + dataSerialize(blockId, memoryStore.getValues(blockId).get) } else { releaseLock(blockId) throw new SparkException(s"Block $blockId was not found even though it's read-locked") @@ -1281,11 +1281,11 @@ private[spark] class BlockManager( ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close() } - /** Serializes into a byte buffer. */ - def dataSerialize(blockId: BlockId, values: Iterator[Any]): ByteBuffer = { - val byteStream = new ByteBufferOutputStream(4096) - dataSerializeStream(blockId, byteStream, values) - byteStream.toByteBuffer + /** Serializes into a chunked byte buffer. */ + def dataSerialize(blockId: BlockId, values: Iterator[Any]): ChunkedByteBuffer = { + val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(1024 * 1024 * 4) + dataSerializeStream(blockId, byteArrayChunkOutputStream, values) + new ChunkedByteBuffer(byteArrayChunkOutputStream.toArrays.map(ByteBuffer.wrap)) } /** diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index daef5477ac4a6..c8cc1e38f976c 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -188,8 +188,7 @@ private[spark] class MemoryStore( val entry = if (level.deserialized) { new DeserializedMemoryEntry(arrayValues, SizeEstimator.estimate(arrayValues)) } else { - // TODO(josh): incrementally serialize - val bytes = new ChunkedByteBuffer(blockManager.dataSerialize(blockId, arrayValues.iterator)) + val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator) new SerializedMemoryEntry(bytes, bytes.limit) } val size = entry.size diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index 5809548c4a10d..5381127aca210 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -62,6 +62,14 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { byteChannel.getData } + def toByteBuffer: ByteBuffer = { + if (chunks.length == 1) { + chunks.head + } else { + ByteBuffer.wrap(toArray) + } + } + def toInputStream(dispose: Boolean = false): InputStream = { new ChunkedByteBufferInputStream(this, dispose) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index 58c3cd01fed88..35627962b2b01 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -179,7 +179,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler( numRecords = countIterator.count serializedBlock case ByteBufferBlock(byteBuffer) => - byteBuffer + new ChunkedByteBuffer(byteBuffer.duplicate()) case _ => throw new Exception(s"Could not push $blockId to block manager, unexpected block type") } @@ -188,7 +188,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler( val storeInBlockManagerFuture = Future { val putSucceeded = blockManager.putBytes( blockId, - new ChunkedByteBuffer(serializedBlock.duplicate()), + serializedBlock, effectiveStorageLevel, tellMaster = true) if (!putSucceeded) { @@ -199,7 +199,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler( // Store the block in write ahead log val storeInWriteAheadLogFuture = Future { - writeAheadLog.write(serializedBlock, clock.getTimeMillis()) + writeAheadLog.write(serializedBlock.toByteBuffer, clock.getTimeMillis()) } // Combine the futures, wait for both to complete, and return the write ahead log record handle From 0a347fdd9ec0e94eab17eb0f33c93acd1afbdcfb Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 15 Mar 2016 23:56:02 -0700 Subject: [PATCH 07/17] Fix test compilation in streaming. --- .../org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala | 2 +- .../spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 95c1609d8e9a0..f23fb2b74c93b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -338,7 +338,7 @@ class ReceivedBlockHandlerSuite storeAndVerify(blocks.map { b => IteratorBlock(b.toIterator) }) storeAndVerify(blocks.map { b => ArrayBufferBlock(new ArrayBuffer ++= b) }) - storeAndVerify(blocks.map { b => ByteBufferBlock(dataToByteBuffer(b)) }) + storeAndVerify(blocks.map { b => ByteBufferBlock(dataToByteBuffer(b).toByteBuffer) }) } /** Test error handling when blocks that cannot be stored */ diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala index 79ac833c1846b..c4bf42d0f272d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala @@ -223,7 +223,7 @@ class WriteAheadLogBackedBlockRDDSuite require(blockData.size === blockIds.size) val writer = new FileBasedWriteAheadLogWriter(new File(dir, "logFile").toString, hadoopConf) val segments = blockData.zip(blockIds).map { case (data, id) => - writer.write(blockManager.dataSerialize(id, data.iterator)) + writer.write(blockManager.dataSerialize(id, data.iterator).toByteBuffer) } writer.close() segments From 43f8fa6ae5ba093655cdbd55ca56959a7652de56 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 16 Mar 2016 13:54:55 -0700 Subject: [PATCH 08/17] Allow ChunkedByteBuffer to contain no chunks. --- .../apache/spark/util/io/ChunkedByteBuffer.scala | 9 +++++++-- .../apache/spark/io/ChunkedByteBufferSuite.scala | 13 +++++++++---- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index 5381127aca210..15bce5ccf2dd9 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -29,7 +29,6 @@ import org.apache.spark.storage.BlockManager private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { require(chunks != null, "chunks must not be null") - require(chunks.nonEmpty, "Cannot create a ChunkedByteBuffer with no chunks") require(chunks.forall(_.limit() > 0), "chunks must be non-empty") require(chunks.forall(_.position() == 0), "chunks' positions must be 0") @@ -110,7 +109,13 @@ private class ChunkedByteBufferInputStream( extends InputStream { private[this] var chunks = chunkedByteBuffer.getChunks().iterator - private[this] var currentChunk: ByteBuffer = chunks.next() + private[this] var currentChunk: ByteBuffer = { + if (chunks.hasNext) { + chunks.next() + } else { + null + } + } override def read(): Int = { if (currentChunk != null && !currentChunk.hasRemaining && chunks.hasNext) { diff --git a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala index 952b3f0924eed..16d137ab4887d 100644 --- a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala @@ -27,10 +27,15 @@ import org.apache.spark.util.io.ChunkedByteBuffer class ChunkedByteBufferSuite extends SparkFunSuite { - test("must have at least one chunk") { - intercept[IllegalArgumentException] { - new ChunkedByteBuffer(Array.empty[ByteBuffer]) - } + test("no chunks") { + val emptyChunkedByteBuffer = new ChunkedByteBuffer(Array.empty[ByteBuffer]) + assert(emptyChunkedByteBuffer.limit === 0) + assert(emptyChunkedByteBuffer.getChunks().isEmpty) + assert(emptyChunkedByteBuffer.toArray === Array.empty) + assert(emptyChunkedByteBuffer.toByteBuffer.capacity() === 0) + assert(emptyChunkedByteBuffer.toNetty.capacity() === 0) + emptyChunkedByteBuffer.toInputStream(dispose = false).close() + emptyChunkedByteBuffer.toInputStream(dispose = true).close() } test("chunks must be non-empty") { From 25e68841541b45d7eedc0447cc8154d746ee8db2 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 16 Mar 2016 14:00:21 -0700 Subject: [PATCH 09/17] Document toByteBuffer() and toArray() size limitations. --- .../org/apache/spark/util/io/ChunkedByteBuffer.scala | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index 15bce5ccf2dd9..53d363df47c51 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -50,6 +50,11 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { Unpooled.wrappedBuffer(getChunks(): _*) } + /** + * Copy this buffer into a new byte array. + * + * @throws UnsupportedOperationException if this buffer's size exceeds the maximum array size. + */ def toArray: Array[Byte] = { if (limit >= Integer.MAX_VALUE) { throw new UnsupportedOperationException( @@ -61,9 +66,14 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { byteChannel.getData } + /** + * Copy this buffer into a new ByteBuffer. + * + * @throws UnsupportedOperationException if this buffer's size exceeds the max ByteBuffer size. + */ def toByteBuffer: ByteBuffer = { if (chunks.length == 1) { - chunks.head + chunks.head.duplicate() } else { ByteBuffer.wrap(toArray) } From 325c83d8909472428ae65620033fff4887c36e06 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 16 Mar 2016 14:07:42 -0700 Subject: [PATCH 10/17] Move dispose() from BlockManager to StorageUtils. It was a static method before, but its location was confusing. --- .../apache/spark/storage/BlockManager.scala | 21 ++---------------- .../apache/spark/storage/StorageUtils.scala | 22 ++++++++++++++++++- .../spark/util/ByteBufferInputStream.scala | 8 +++---- .../spark/util/io/ChunkedByteBuffer.scala | 8 +++---- 4 files changed, 31 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 94452cf0b99dd..f7248c36567b0 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -18,7 +18,7 @@ package org.apache.spark.storage import java.io._ -import java.nio.{ByteBuffer, MappedByteBuffer} +import java.nio.ByteBuffer import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.concurrent.{Await, ExecutionContext, Future} @@ -26,8 +26,6 @@ import scala.concurrent.duration._ import scala.util.Random import scala.util.control.NonFatal -import sun.nio.ch.DirectBuffer - import org.apache.spark._ import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics} import org.apache.spark.io.CompressionCodec @@ -1333,24 +1331,9 @@ private[spark] class BlockManager( } -private[spark] object BlockManager extends Logging { +private[spark] object BlockManager { private val ID_GENERATOR = new IdGenerator - /** - * Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that - * might cause errors if one attempts to read from the unmapped buffer, but it's better than - * waiting for the GC to find it because that could lead to huge numbers of open files. There's - * unfortunately no standard API to do this. - */ - def dispose(buffer: ByteBuffer): Unit = { - if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) { - logTrace(s"Unmapping $buffer") - if (buffer.asInstanceOf[DirectBuffer].cleaner() != null) { - buffer.asInstanceOf[DirectBuffer].cleaner().clean() - } - } - } - def blockIdsToHosts( blockIds: Array[BlockId], env: SparkEnv, diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index 43cd15921cc97..0537f44bc0a6b 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -17,10 +17,15 @@ package org.apache.spark.storage +import java.nio.{ByteBuffer, MappedByteBuffer} + import scala.collection.Map import scala.collection.mutable +import sun.nio.ch.DirectBuffer + import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.Logging /** * :: DeveloperApi :: @@ -222,7 +227,22 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) { } /** Helper methods for storage-related objects. */ -private[spark] object StorageUtils { +private[spark] object StorageUtils extends Logging { + + /** + * Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that + * might cause errors if one attempts to read from the unmapped buffer, but it's better than + * waiting for the GC to find it because that could lead to huge numbers of open files. There's + * unfortunately no standard API to do this. + */ + def dispose(buffer: ByteBuffer): Unit = { + if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) { + logTrace(s"Unmapping $buffer") + if (buffer.asInstanceOf[DirectBuffer].cleaner() != null) { + buffer.asInstanceOf[DirectBuffer].cleaner().clean() + } + } + } /** * Update the given list of RDDInfo with the given list of storage statuses. diff --git a/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala b/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala index 54de4d4ee8ca7..dce2ac63a664c 100644 --- a/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala +++ b/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala @@ -20,10 +20,10 @@ package org.apache.spark.util import java.io.InputStream import java.nio.ByteBuffer -import org.apache.spark.storage.BlockManager +import org.apache.spark.storage.StorageUtils /** - * Reads data from a ByteBuffer, and optionally cleans it up using BlockManager.dispose() + * Reads data from a ByteBuffer, and optionally cleans it up using StorageUtils.dispose() * at the end of the stream (e.g. to close a memory-mapped file). */ private[spark] @@ -68,12 +68,12 @@ class ByteBufferInputStream(private var buffer: ByteBuffer, dispose: Boolean = f } /** - * Clean up the buffer, and potentially dispose of it using BlockManager.dispose(). + * Clean up the buffer, and potentially dispose of it using StorageUtils.dispose(). */ private def cleanUp() { if (buffer != null) { if (dispose) { - BlockManager.dispose(buffer) + StorageUtils.dispose(buffer) } buffer = null } diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index 53d363df47c51..019893e795ce7 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -25,7 +25,7 @@ import com.google.common.primitives.UnsignedBytes import io.netty.buffer.{ByteBuf, Unpooled} import org.apache.spark.network.util.ByteArrayWritableChannel -import org.apache.spark.storage.BlockManager +import org.apache.spark.storage.StorageUtils private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { require(chunks != null, "chunks must not be null") @@ -105,12 +105,12 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { * unfortunately no standard API to do this. */ def dispose(): Unit = { - chunks.foreach(BlockManager.dispose) + chunks.foreach(StorageUtils.dispose) } } /** - * Reads data from a ChunkedByteBuffer, and optionally cleans it up using BlockManager.dispose() + * Reads data from a ChunkedByteBuffer, and optionally cleans it up using StorageUtils.dispose() * at the end of the stream (e.g. to close a memory-mapped file). */ private class ChunkedByteBufferInputStream( @@ -129,7 +129,7 @@ private class ChunkedByteBufferInputStream( override def read(): Int = { if (currentChunk != null && !currentChunk.hasRemaining && chunks.hasNext) { - BlockManager.dispose(currentChunk) + StorageUtils.dispose(currentChunk) currentChunk = chunks.next() } if (currentChunk != null && currentChunk.hasRemaining) { From 4f5074ece49030a6e7134f7ece706ed441c02ee4 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 16 Mar 2016 14:11:14 -0700 Subject: [PATCH 11/17] Better documentation for dispose() methods. --- .../org/apache/spark/util/io/ChunkedByteBuffer.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index 019893e795ce7..cff622ecaaf81 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -79,6 +79,11 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { } } + /** + * Creates an input stream to read data from this ChunkedByteBuffer. + * @param dispose if true, [[dispose()]] will be called at the end of the stream + * in order to close any memory-mapped files which back this buffer. + */ def toInputStream(dispose: Boolean = false): InputStream = { new ChunkedByteBufferInputStream(this, dispose) } @@ -110,8 +115,10 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { } /** - * Reads data from a ChunkedByteBuffer, and optionally cleans it up using StorageUtils.dispose() - * at the end of the stream (e.g. to close a memory-mapped file). + * Reads data from a ChunkedByteBuffer. + * + * @param dispose if true, [[ChunkedByteBuffer.dispose()]] will be called at the end of the stream + * in order to close any memory-mapped files which back the buffer. */ private class ChunkedByteBufferInputStream( var chunkedByteBuffer: ChunkedByteBuffer, From b6ddf3ed40cc90ec94b7e4917808f8a726b597ee Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 16 Mar 2016 14:12:39 -0700 Subject: [PATCH 12/17] Rename limit to size. --- .../org/apache/spark/storage/BlockManager.scala | 12 ++++++------ .../apache/spark/storage/memory/MemoryStore.scala | 4 ++-- .../org/apache/spark/util/io/ChunkedByteBuffer.scala | 10 ++++++---- .../org/apache/spark/io/ChunkedByteBufferSuite.scala | 10 +++++----- 4 files changed, 19 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index f7248c36567b0..b3cf57de03207 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -503,7 +503,7 @@ private[spark] class BlockManager( */ def getRemoteValues(blockId: BlockId): Option[BlockResult] = { getRemoteBytes(blockId).map { data => - new BlockResult(dataDeserialize(blockId, data), DataReadMethod.Network, data.limit) + new BlockResult(dataDeserialize(blockId, data), DataReadMethod.Network, data.size) } } @@ -742,7 +742,7 @@ private[spark] class BlockManager( null } - val size = bytes.limit + val size = bytes.size if (level.useMemory) { // Put it in memory first, even if it also has useDisk set to true; @@ -950,7 +950,7 @@ private[spark] class BlockManager( diskBytes.dispose() memoryStore.getBytes(blockId).get } else { - val putSucceeded = memoryStore.putBytes(blockId, diskBytes.limit, () => { + val putSucceeded = memoryStore.putBytes(blockId, diskBytes.size, () => { // https://issues.apache.org/jira/browse/SPARK-6076 // If the file size is bigger than the free memory, OOM will happen. So if we // cannot put it into MemoryStore, copyForMemory should not be created. That's why @@ -1079,7 +1079,7 @@ private[spark] class BlockManager( case Some(peer) => try { val onePeerStartTime = System.currentTimeMillis - logTrace(s"Trying to replicate $blockId of ${data.limit} bytes to $peer") + logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer") blockTransferService.uploadBlockSync( peer.host, peer.port, @@ -1087,7 +1087,7 @@ private[spark] class BlockManager( blockId, new NettyManagedBuffer(data.toNetty), tLevel) - logTrace(s"Replicated $blockId of ${data.limit} bytes to $peer in %s ms" + logTrace(s"Replicated $blockId of ${data.size} bytes to $peer in %s ms" .format(System.currentTimeMillis - onePeerStartTime)) peersReplicatedTo += peer peersForReplication -= peer @@ -1110,7 +1110,7 @@ private[spark] class BlockManager( } } val timeTakeMs = (System.currentTimeMillis - startTime) - logDebug(s"Replicating $blockId of ${data.limit} bytes to " + + logDebug(s"Replicating $blockId of ${data.size} bytes to " + s"${peersReplicatedTo.size} peer(s) took $timeTakeMs ms") if (peersReplicatedTo.size < numPeersToReplicateTo) { logWarning(s"Block $blockId replicated to only " + diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index c8cc1e38f976c..64697e920ba97 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -99,7 +99,7 @@ private[spark] class MemoryStore( if (memoryManager.acquireStorageMemory(blockId, size)) { // We acquired enough memory for the block, so go ahead and put it val bytes = _bytes() - assert(bytes.limit == size) + assert(bytes.size == size) val entry = new SerializedMemoryEntry(bytes, size) entries.synchronized { entries.put(blockId, entry) @@ -189,7 +189,7 @@ private[spark] class MemoryStore( new DeserializedMemoryEntry(arrayValues, SizeEstimator.estimate(arrayValues)) } else { val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator) - new SerializedMemoryEntry(bytes, bytes.limit) + new SerializedMemoryEntry(bytes, bytes.size) } val size = entry.size def transferUnrollToStorage(amount: Long): Unit = { diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index cff622ecaaf81..04dd24f72af94 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -27,12 +27,13 @@ import io.netty.buffer.{ByteBuf, Unpooled} import org.apache.spark.network.util.ByteArrayWritableChannel import org.apache.spark.storage.StorageUtils + private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { require(chunks != null, "chunks must not be null") require(chunks.forall(_.limit() > 0), "chunks must be non-empty") require(chunks.forall(_.position() == 0), "chunks' positions must be 0") - val limit: Long = chunks.map(_.limit().asInstanceOf[Long]).sum + val size: Long = chunks.map(_.limit().asInstanceOf[Long]).sum def this(byteBuffer: ByteBuffer) = { this(Array(byteBuffer)) @@ -56,11 +57,11 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { * @throws UnsupportedOperationException if this buffer's size exceeds the maximum array size. */ def toArray: Array[Byte] = { - if (limit >= Integer.MAX_VALUE) { + if (size >= Integer.MAX_VALUE) { throw new UnsupportedOperationException( - s"cannot call toArray because buffer size ($limit bytes) exceeds maximum array size") + s"cannot call toArray because buffer size ($size bytes) exceeds maximum array size") } - val byteChannel = new ByteArrayWritableChannel(limit.toInt) + val byteChannel = new ByteArrayWritableChannel(size.toInt) writeFully(byteChannel) byteChannel.close() byteChannel.getData @@ -81,6 +82,7 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { /** * Creates an input stream to read data from this ChunkedByteBuffer. + * * @param dispose if true, [[dispose()]] will be called at the end of the stream * in order to close any memory-mapped files which back this buffer. */ diff --git a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala index 16d137ab4887d..417734e99a02a 100644 --- a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala @@ -29,7 +29,7 @@ class ChunkedByteBufferSuite extends SparkFunSuite { test("no chunks") { val emptyChunkedByteBuffer = new ChunkedByteBuffer(Array.empty[ByteBuffer]) - assert(emptyChunkedByteBuffer.limit === 0) + assert(emptyChunkedByteBuffer.size === 0) assert(emptyChunkedByteBuffer.getChunks().isEmpty) assert(emptyChunkedByteBuffer.toArray === Array.empty) assert(emptyChunkedByteBuffer.toByteBuffer.capacity() === 0) @@ -58,7 +58,7 @@ class ChunkedByteBufferSuite extends SparkFunSuite { test("writeFully() does not affect original buffer's position") { val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(8))) - chunkedByteBuffer.writeFully(new ByteArrayWritableChannel(chunkedByteBuffer.limit.toInt)) + chunkedByteBuffer.writeFully(new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt)) assert(chunkedByteBuffer.getChunks().head.position() === 0) } @@ -72,7 +72,7 @@ class ChunkedByteBufferSuite extends SparkFunSuite { val fourMegabyteBuffer = ByteBuffer.allocate(1024 * 1024 * 4) fourMegabyteBuffer.limit(fourMegabyteBuffer.capacity()) val chunkedByteBuffer = new ChunkedByteBuffer(Array.fill(1024)(fourMegabyteBuffer)) - assert(chunkedByteBuffer.limit === (1024L * 1024L * 1024L * 4L)) + assert(chunkedByteBuffer.size === (1024L * 1024L * 1024L * 4L)) intercept[UnsupportedOperationException] { chunkedByteBuffer.toArray } @@ -82,10 +82,10 @@ class ChunkedByteBufferSuite extends SparkFunSuite { val bytes1 = ByteBuffer.wrap(Array.tabulate(256)(_.toByte)) val bytes2 = ByteBuffer.wrap(Array.tabulate(128)(_.toByte)) val chunkedByteBuffer = new ChunkedByteBuffer(Array(bytes1, bytes2)) - assert(chunkedByteBuffer.limit === bytes1.limit() + bytes2.limit()) + assert(chunkedByteBuffer.size === bytes1.limit() + bytes2.limit()) val inputStream = chunkedByteBuffer.toInputStream(dispose = false) - val bytesFromStream = new Array[Byte](chunkedByteBuffer.limit.toInt) + val bytesFromStream = new Array[Byte](chunkedByteBuffer.size.toInt) ByteStreams.readFully(inputStream, bytesFromStream) assert(bytesFromStream === bytes1.array() ++ bytes2.array()) assert(chunkedByteBuffer.getChunks().head.position() === 0) From 719ad3c4e9e942ce62cbcf288788aca785690a7e Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 16 Mar 2016 14:20:08 -0700 Subject: [PATCH 13/17] Implement missing InputStream methods. --- .../spark/util/io/ChunkedByteBuffer.scala | 37 ++++++++++++++++--- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index 04dd24f72af94..01fe52ff3b4fa 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -82,7 +82,7 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { /** * Creates an input stream to read data from this ChunkedByteBuffer. - * + * * @param dispose if true, [[dispose()]] will be called at the end of the stream * in order to close any memory-mapped files which back this buffer. */ @@ -149,10 +149,37 @@ private class ChunkedByteBufferInputStream( } } - // TODO(josh): implement -// override def read(b: Array[Byte]): Int = super.read(b) -// override def read(b: Array[Byte], off: Int, len: Int): Int = super.read(b, off, len) -// override def skip(n: Long): Long = super.skip(n) + override def read(dest: Array[Byte], offset: Int, length: Int): Int = { + if (currentChunk != null && !currentChunk.hasRemaining && chunks.hasNext) { + StorageUtils.dispose(currentChunk) + currentChunk = chunks.next() + } + if (currentChunk != null && currentChunk.hasRemaining) { + val amountToGet = math.min(currentChunk.remaining(), length) + currentChunk.get(dest, offset, amountToGet) + amountToGet + } else { + close() + -1 + } + } + + override def skip(bytes: Long): Long = { + if (currentChunk != null) { + val amountToSkip = math.min(bytes, currentChunk.remaining).toInt + currentChunk.position(currentChunk.position + amountToSkip) + if (currentChunk.remaining() == 0) { + if (chunks.hasNext) { + currentChunk = chunks.next() + } else { + close() + } + } + amountToSkip + } else { + 0L + } + } override def close(): Unit = { if (currentChunk != null) { From 23006076dcb73095a9eaa7e2524a10c048bae646 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 16 Mar 2016 15:00:10 -0700 Subject: [PATCH 14/17] More comments. --- .../spark/util/io/ChunkedByteBuffer.scala | 26 ++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index 01fe52ff3b4fa..01f8bc8f579ba 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -27,18 +27,32 @@ import io.netty.buffer.{ByteBuf, Unpooled} import org.apache.spark.network.util.ByteArrayWritableChannel import org.apache.spark.storage.StorageUtils - +/** + * Read-only byte buffer which is physically stored as multiple chunks rather than a single + * contiguous array. + * + * @param chunks an array of [[ByteBuffer]]s. Each buffer in this array must be non-empty and have + * position == 0. Ownership of these buffers is transferred to the ChunkedByteBuffer, + * so if these buffers may also be used elsewhere then the caller is responsible for + * copying them as needed. + */ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { require(chunks != null, "chunks must not be null") require(chunks.forall(_.limit() > 0), "chunks must be non-empty") require(chunks.forall(_.position() == 0), "chunks' positions must be 0") + /** + * This size of this buffer, in bytes. + */ val size: Long = chunks.map(_.limit().asInstanceOf[Long]).sum def this(byteBuffer: ByteBuffer) = { this(Array(byteBuffer)) } + /** + * Write this buffer to a channel. + */ def writeFully(channel: WritableByteChannel): Unit = { for (bytes <- getChunks()) { while (bytes.remaining > 0) { @@ -47,6 +61,9 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { } } + /** + * Wrap this buffer to view it as a Netty ByteBuf. + */ def toNetty: ByteBuf = { Unpooled.wrappedBuffer(getChunks(): _*) } @@ -90,10 +107,17 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { new ChunkedByteBufferInputStream(this, dispose) } + /** + * Get duplicates of the ByteBuffers backing this ChunkedByteBuffer. Visible for testing + */ def getChunks(): Array[ByteBuffer] = { chunks.map(_.duplicate()) } + /** + * Make a copy of this ChunkedByteBuffer, copying all of the backing data into new buffers. + * The new buffer will share no resources with the original buffer. + */ def copy(): ChunkedByteBuffer = { val copiedChunks = getChunks().map { chunk => // TODO: accept an allocator in this copy method to integrate with mem. accounting systems From 3fc0b66981aa2d45be129986f0dc5bd595e08b22 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 16 Mar 2016 15:02:42 -0700 Subject: [PATCH 15/17] Fix confusing getChunks().head --- .../scala/org/apache/spark/scheduler/TaskResultGetter.scala | 3 +-- .../scala/org/apache/spark/util/io/ChunkedByteBuffer.scala | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index ae8467f57929d..7a0d938fae58d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -81,9 +81,8 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul taskSetManager, tid, TaskState.FINISHED, TaskResultLost) return } - // TODO(josh): assumption that there is only one chunk here is a hack val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]]( - serializedTaskResult.get.getChunks().head) + serializedTaskResult.get.toByteBuffer) sparkEnv.blockManager.master.removeBlock(blockId) (deserializedResult, size) } diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index 01f8bc8f579ba..de705a7aa3ae7 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -108,7 +108,7 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { } /** - * Get duplicates of the ByteBuffers backing this ChunkedByteBuffer. Visible for testing + * Get duplicates of the ByteBuffers backing this ChunkedByteBuffer. */ def getChunks(): Array[ByteBuffer] = { chunks.map(_.duplicate()) From cb9311b30636ced3854fd035340092e497750b47 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 17 Mar 2016 11:09:29 -0700 Subject: [PATCH 16/17] Fix logging import. --- core/src/main/scala/org/apache/spark/storage/StorageUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index 0537f44bc0a6b..199a5fc270a41 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -25,7 +25,7 @@ import scala.collection.mutable import sun.nio.ch.DirectBuffer import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.Logging +import org.apache.spark.internal.Logging /** * :: DeveloperApi :: From 2970932bf11ab9cfc8cac33c05a13912ddb345d4 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 17 Mar 2016 17:49:08 -0700 Subject: [PATCH 17/17] Clean up dispose logic to address review comments. --- .../org/apache/spark/util/io/ChunkedByteBuffer.scala | 8 ++------ .../org/apache/spark/io/ChunkedByteBufferSuite.scala | 2 -- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index de705a7aa3ae7..c643c4b63c601 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -162,7 +162,6 @@ private class ChunkedByteBufferInputStream( override def read(): Int = { if (currentChunk != null && !currentChunk.hasRemaining && chunks.hasNext) { - StorageUtils.dispose(currentChunk) currentChunk = chunks.next() } if (currentChunk != null && currentChunk.hasRemaining) { @@ -175,7 +174,6 @@ private class ChunkedByteBufferInputStream( override def read(dest: Array[Byte], offset: Int, length: Int): Int = { if (currentChunk != null && !currentChunk.hasRemaining && chunks.hasNext) { - StorageUtils.dispose(currentChunk) currentChunk = chunks.next() } if (currentChunk != null && currentChunk.hasRemaining) { @@ -206,10 +204,8 @@ private class ChunkedByteBufferInputStream( } override def close(): Unit = { - if (currentChunk != null) { - if (dispose) { - chunkedByteBuffer.dispose() - } + if (chunkedByteBuffer != null && dispose) { + chunkedByteBuffer.dispose() } chunkedByteBuffer = null chunks = null diff --git a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala index 417734e99a02a..aab70e7431e07 100644 --- a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala @@ -90,6 +90,4 @@ class ChunkedByteBufferSuite extends SparkFunSuite { assert(bytesFromStream === bytes1.array() ++ bytes2.array()) assert(chunkedByteBuffer.getChunks().head.position() === 0) } - - // TODO(josh): figure out how to test the dispose=true case. }