Skip to content
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 58 additions & 20 deletions core/src/main/scala/org/apache/spark/storage/DiskStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable.ListBuffer

import com.google.common.io.{ByteStreams, Closeables, Files}
import io.netty.channel.FileRegion
import io.netty.channel.{DefaultFileRegion, FileRegion}
import io.netty.util.AbstractReferenceCounted

import org.apache.spark.{SecurityManager, SparkConf}
Expand Down Expand Up @@ -108,25 +108,7 @@ private[spark] class DiskStore(
new EncryptedBlockData(file, blockSize, conf, key)

case _ =>
val channel = new FileInputStream(file).getChannel()
if (blockSize < minMemoryMapBytes) {
// For small files, directly read rather than memory map.
Utils.tryWithSafeFinally {
val buf = ByteBuffer.allocate(blockSize.toInt)
JavaUtils.readFully(channel, buf)
buf.flip()
new ByteBufferBlockData(new ChunkedByteBuffer(buf), true)
} {
channel.close()
}
} else {
Utils.tryWithSafeFinally {
new ByteBufferBlockData(
new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length)), true)
} {
channel.close()
}
}
new DiskBlockData(conf, file, blockSize)
}
}

Expand Down Expand Up @@ -165,6 +147,62 @@ private[spark] class DiskStore(

}

private class DiskBlockData(
conf: SparkConf,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can pass in minMemoryMapBytes directly.

file: File,
blockSize: Long) extends BlockData {

private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")

override def toInputStream(): InputStream = new FileInputStream(file)

/**
* Returns a Netty-friendly wrapper for the block's data.
*
* Please see `ManagedBuffer.convertToNetty()` for more details.
*/
override def toNetty(): AnyRef = new DefaultFileRegion(file, 0, size)

override def toChunkedByteBuffer(allocator: (Int) => ByteBuffer): ChunkedByteBuffer = {
Utils.tryWithResource(open()) { channel =>
var remaining = blockSize
val chunks = new ListBuffer[ByteBuffer]()
while (remaining > 0) {
val chunkSize = math.min(remaining, Int.MaxValue)
val chunk = allocator(chunkSize.toInt)
remaining -= chunkSize
JavaUtils.readFully(channel, chunk)
chunk.flip()
chunks += chunk
}
new ChunkedByteBuffer(chunks.toArray)
}
}

override def toByteBuffer(): ByteBuffer = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will still hit the 2g limitation here, I'm wondering which end-to-end use cases are affected by it.

Copy link
Author

@eyalfa eyalfa Aug 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed.
I chose to postpone the failure from DiskStroe.getBytes to this place as I believe it introduces no regression while still allowing the more common 'streaming' like use-case.

further more, I think this plays well with the comment about future deprecation of org.apache.spark.network.buffer.ManagedBuffer#nioByteBuffer which seems to be the main reason for BlockData exposing the toByteBuffer method.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan
it took me roughly 4 hours, but I looked both at the shuffle cod path and at BlockManager.getRemoteBytes:
it seems the first is robust to large blocks by using Netty's stream capabilities,
the later seems to be broken as it's not using the Netty's streaming capabilities and actually tries to copy the result buffer into a heap based buffer. I think this deserves its own JIRA/PR.
I think these two places plus the external shuffle server cover most of the relevant use cases (aside from local caching which i believe this PR completes in terms of being 2GB proof).

require( blockSize < Int.MaxValue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no space after (; comma should be on this line.

, s"can't create a byte buffer of size $blockSize"
+ s" since it exceeds Int.MaxValue ${Int.MaxValue}.")
Utils.tryWithResource(open()) { channel =>
if (blockSize < minMemoryMapBytes) {
// For small files, directly read rather than memory map.
val buf = ByteBuffer.allocate(blockSize.toInt)
JavaUtils.readFully(channel, buf)
buf.flip()
buf
} else {
channel.map(MapMode.READ_ONLY, 0, file.length)
}
}
}

override def size: Long = blockSize

override def dispose(): Unit = {}

private def open() = new FileInputStream(file).getChannel
}

private class EncryptedBlockData(
file: File,
blockSize: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1415,6 +1415,79 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
super.fetchBlockSync(host, port, execId, blockId)
}
}

def testGetOrElseUpdateForLargeBlock(storageLevel: StorageLevel) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you measured how long these tests take? I've seen this tried before in other changes related to 2g limits, and this kind of test was always ridiculously slow.

You can avoid this kind of test by making the chunk size configurable, e.g. in this line you're adding above:

val chunkSize = math.min(remaining, Int.MaxValue)

Then your test can run fast and not use a lot of memory. You just need to add extra checks that the data is being chunked properly, instead of relying on the JVM not throwing errors at you.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanzin ,
I've measured, test cases times range from 7-25 seconds on my laptop.
point well taken 😎

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7-25 seconds is really a long time for a unit test...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I know,
it even gets worse when using the === operator.

I'm currently exploring the second direction pointed by @vanzin , introducing a test-only configuration key to configure the max page size

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan , @vanzin ,
taking the 'parameterized approach', I'd remove most of the tests from BlockManagerSuite as they'd require propagating this parameter to too many subsystems.
so, I'm going to modify DiskStore and DiskStoreSuite to use such a parameter, I'm not sure about leaving a test-case in BlockManagerSuite that tests DISK_ONLY persistence, what do you guys think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we do them together in a follow-up PR? I think the test case in DiskStoreSuite is enough.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, currently working on:

  1. parameterizing DiskStore and DiskStoreSuite
  2. revert the tests in BlockManagerSuite
  3. revert the 6gb change in sbt

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be probably easier to propagate the chunk size as a SparkConf entry that is not documented. But up to you guys.

store = makeBlockManager(6L * 1024 * 1024 * 1024, "exec1")
def mkBlobs() = {
val rng = new java.util.Random(42)
val buff = new Array[Byte](1024 * 1024)
rng.nextBytes(buff)
Iterator.fill(2 * 1024 + 1) {
buff
}
}
val res1 = store.getOrElseUpdate(
RDDBlockId(42, 0),
storageLevel,
implicitly[ClassTag[Array[Byte]]],
mkBlobs _
)
withClue(res1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does res1 have a reasonable string representation?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd print an Either where left side is a case class with members: iterator (prints as empty/non empty iterator), an enum and number of bytes.
right side is an iterator, again this'd print an empty/not-empty iterator.

assert(res1.isLeft)
assert(res1.left.get.data.zipAll(mkBlobs(), null, null).forall {
case (a, b) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a === b?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't compare Arrays, you get identity equality which is usually not what you want. hence the .seq that forces it to be wrapped with a Seq

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

=== is a helper method in scala test and should be able to compare arrays

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if === does not work, you have Arrays.equal, which is null-safe.

a != null &&
b != null &&
a.asInstanceOf[Array[Byte]].seq == b.asInstanceOf[Array[Byte]].seq
})
}
val getResult = store.get(RDDBlockId(42, 0))
withClue(getResult) {
assert(getResult.isDefined)
assert(getResult.get.data.zipAll(mkBlobs(), null, null).forall {
case (a, b) =>
a != null &&
b != null &&
a.asInstanceOf[Array[Byte]].seq == b.asInstanceOf[Array[Byte]].seq
})
}
val getBlockRes = store.getBlockData(RDDBlockId(42, 0))
withClue(getBlockRes) {
try {
assert(getBlockRes.size() >= 2 * 1024 * 1024 * 1024)
Utils.tryWithResource(getBlockRes.createInputStream()) { inpStrm =>
val iter = store
.serializerManager
.dataDeserializeStream(RDDBlockId(42, 0)
, inpStrm)(implicitly[ClassTag[Array[Byte]]])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comma goes in the previous line. inpStrm is kind of an ugly variable name; pick one: is, in, inputStream.

assert(iter.zipAll(mkBlobs(), null, null).forall {
case (a, b) =>
a != null &&
b != null &&
a.asInstanceOf[Array[Byte]].seq == b.asInstanceOf[Array[Byte]].seq
})
}
} finally {
getBlockRes.release()
}
}
}

test("getOrElseUpdate > 2gb, storage level = disk only") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we just write a test in DiskStoreSuite?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh we already have, then why we have these tests?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these tests cover more than just the DiskOnly storage level, they were crafted when I had bigger ambitions of solving the entire 2GB issue 😎 , that was before seeing some ~100 files pull requests being abandoned or rejected.
aside, these tests also test the entire orchestration done by BlockManager when an RDD requests a cached partition, notice that these tests intentionally makes two calls to the BlockManager in order to simulate both code paths (cache-hit, cache-miss).

testGetOrElseUpdateForLargeBlock(StorageLevel.DISK_ONLY)
}

test("getOrElseUpdate > 2gb, storage level = memory deserialized") {
testGetOrElseUpdateForLargeBlock(StorageLevel.MEMORY_ONLY)
}

test("getOrElseUpdate > 2gb, storage level = off-heap") {
testGetOrElseUpdateForLargeBlock(StorageLevel.OFF_HEAP)
}

test("getOrElseUpdate > 2gb, storage level = memory serialized") {
testGetOrElseUpdateForLargeBlock(StorageLevel.MEMORY_ONLY_SER)
}
}

private object BlockManagerSuite {
Expand Down
37 changes: 31 additions & 6 deletions core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,18 @@ class DiskStoreSuite extends SparkFunSuite {
val diskStoreMapped = new DiskStore(conf.clone().set(confKey, "0"), diskBlockManager,
securityManager)
diskStoreMapped.putBytes(blockId, byteBuffer)
val mapped = diskStoreMapped.getBytes(blockId).asInstanceOf[ByteBufferBlockData].buffer
val mapped = diskStoreMapped.getBytes(blockId).toByteBuffer()
assert(diskStoreMapped.remove(blockId))

val diskStoreNotMapped = new DiskStore(conf.clone().set(confKey, "1m"), diskBlockManager,
securityManager)
diskStoreNotMapped.putBytes(blockId, byteBuffer)
val notMapped = diskStoreNotMapped.getBytes(blockId).asInstanceOf[ByteBufferBlockData].buffer
val notMapped = diskStoreNotMapped.getBytes(blockId).toByteBuffer()

// Not possible to do isInstanceOf due to visibility of HeapByteBuffer
assert(notMapped.getChunks().forall(_.getClass.getName.endsWith("HeapByteBuffer")),
assert(notMapped.getClass.getName.endsWith("HeapByteBuffer"),
"Expected HeapByteBuffer for un-mapped read")
assert(mapped.getChunks().forall(_.isInstanceOf[MappedByteBuffer]),
assert(mapped.isInstanceOf[MappedByteBuffer],
"Expected MappedByteBuffer for mapped read")

def arrayFromByteBuffer(in: ByteBuffer): Array[Byte] = {
Expand All @@ -70,8 +70,8 @@ class DiskStoreSuite extends SparkFunSuite {
array
}

assert(Arrays.equals(mapped.toArray, bytes))
assert(Arrays.equals(notMapped.toArray, bytes))
assert(Arrays.equals(new ChunkedByteBuffer(mapped).toArray, bytes))
assert(Arrays.equals(new ChunkedByteBuffer(notMapped).toArray, bytes))
}

test("block size tracking") {
Expand All @@ -92,6 +92,31 @@ class DiskStoreSuite extends SparkFunSuite {
assert(diskStore.getSize(blockId) === 0L)
}

test("blocks larger than 2gb") {
val conf = new SparkConf()
val diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = true)
val diskStore = new DiskStore(conf, diskBlockManager, new SecurityManager(conf))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove this empty line

val mb = 1024 * 1024
val gb = 1024L * mb

val blockId = BlockId("rdd_1_2")
diskStore.put(blockId) { chan =>
val arr = new Array[Byte](mb)
for {
_ <- 0 until 3072
} {
val buf = ByteBuffer.wrap(arr)
while (buf.hasRemaining()) {
chan.write(buf)
}
}
}

val blockData = diskStore.getBytes(blockId)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kiszk, this is the test case I was referring to.
I actually introduced it prior to actually (hopefully) fixing the bug in DiskStore.getBytes.

assert(blockData.size == 3 * gb)
}

test("block data encryption") {
val testDir = Utils.createTempDir()
val testData = new Array[Byte](128 * 1024)
Expand Down
2 changes: 1 addition & 1 deletion project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ object TestSettings {
javaOptions in Test ++= System.getProperties.asScala.filter(_._1.startsWith("spark"))
.map { case (k,v) => s"-D$k=$v" }.toSeq,
javaOptions in Test += "-ea",
javaOptions in Test ++= "-Xmx3g -Xss4096k"
javaOptions in Test ++= "-Xmx6g -Xss4096k"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little worried about this change. Since the change to BlockManagerSuite is not very related to this PR, can we revert and revisit it in follow-up PR? Then we can unblock this PR.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan , let's wait few hours and see what the other guys CCed for this (the last ones to edit the build) have to say about this. if they are also worried or do not comment I'll revert this.

I must say I'm reluctant to revert these tests as I personally believe that lack of such tests contributed to spark's 2GB issues, including this one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am +1 for separating it if this can be. Let's get some changes we are sure of into the code base first.

.split(" ").toSeq,
javaOptions += "-Xmx3g",
// Exclude tags defined in a system property
Expand Down