diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 0fe82ac0cedc..c01a45315191 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -133,8 +133,6 @@ private[spark] class BlockManager( private[spark] val externalShuffleServiceEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED) - private val chunkSize = - conf.getSizeAsBytes("spark.storage.memoryMapLimitForTests", Int.MaxValue.toString).toInt private val remoteReadNioBufferConversion = conf.getBoolean("spark.network.remoteReadNioBufferConversion", false) @@ -451,7 +449,7 @@ private[spark] class BlockManager( new EncryptedBlockData(tmpFile, blockSize, conf, key).toChunkedByteBuffer(allocator) case None => - ChunkedByteBuffer.fromFile(tmpFile, conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS).toInt) + ChunkedByteBuffer.fromFile(tmpFile) } putBytes(blockId, buffer, level)(classTag) tmpFile.delete() @@ -797,7 +795,7 @@ private[spark] class BlockManager( if (remoteReadNioBufferConversion) { return Some(new ChunkedByteBuffer(data.nioByteBuffer())) } else { - return Some(ChunkedByteBuffer.fromManagedBuffer(data, chunkSize)) + return Some(ChunkedByteBuffer.fromManagedBuffer(data)) } } logDebug(s"The value of block $blockId is null") diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index 4aa8d45ec740..9547cb49bbee 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -30,6 +30,7 @@ import org.apache.spark.internal.config import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.util.{ByteArrayWritableChannel, LimitedInputStream} import org.apache.spark.storage.StorageUtils +import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark.util.Utils /** @@ -169,24 +170,25 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { } -object ChunkedByteBuffer { +private[spark] object ChunkedByteBuffer { + + // TODO eliminate this method if we switch BlockManager to getting InputStreams - def fromManagedBuffer(data: ManagedBuffer, maxChunkSize: Int): ChunkedByteBuffer = { + def fromManagedBuffer(data: ManagedBuffer): ChunkedByteBuffer = { data match { case f: FileSegmentManagedBuffer => - fromFile(f.getFile, maxChunkSize, f.getOffset, f.getLength) + fromFile(f.getFile, f.getOffset, f.getLength) case other => new ChunkedByteBuffer(other.nioByteBuffer()) } } - def fromFile(file: File, maxChunkSize: Int): ChunkedByteBuffer = { - fromFile(file, maxChunkSize, 0, file.length()) + def fromFile(file: File): ChunkedByteBuffer = { + fromFile(file, 0, file.length()) } private def fromFile( file: File, - maxChunkSize: Int, offset: Long, length: Long): ChunkedByteBuffer = { // We do *not* memory map the file, because we may end up putting this into the memory store, @@ -195,7 +197,7 @@ object ChunkedByteBuffer { val is = new FileInputStream(file) ByteStreams.skipFully(is, offset) val in = new LimitedInputStream(is, length) - val chunkSize = math.min(maxChunkSize, length).toInt + val chunkSize = math.min(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH, length).toInt val out = new ChunkedByteBufferOutputStream(chunkSize, ByteBuffer.allocate _) Utils.tryWithSafeFinally { IOUtils.copy(in, out)