-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-17204][CORE] Fix replicated off heap storage #16499
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-17204][CORE] Fix replicated off heap storage #16499
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
N.B. We no longer need the releaseLock call because we exhaust the iterator returned by getLocalValues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using an array of 1s instead of an array of 0s is my silly, paranoid, OCD way of adding a little extra entropy to the test. I think the chance that this change in test value will actually affect the outcome of this test is about 0%. I will revert to the original test value on request.
|
Test build #71022 has finished for PR 16499 at commit
|
|
@rxin, can you recommend someone I reach out to for help reviewing this PR? |
|
Ideally I think @JoshRosen is the person to take a look |
|
Josh, can you take a look at this when you have a chance? |
|
also cc @sameeragarwal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to call dispose on on-head byte buffer? I think only off-heap byte buffer needs to be disposed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the name of the ChunkedByteBuffer.dispose() method is confusing. It actually only attempts to dispose a so-called "memory mapped" buffer. On-heap buffers are not memory mapped, therefore this is a no-op for them.
On the other hand, when the storage level uses off-heap memory in this context, bytesToReplicate is a reference to the actual off-heap memory buffer. Disposing of this buffer will erase it from the local memory store. Obviously, this is not the desired behavior. So we add the guard for off-heap memory buffers here.
As far as I can tell, there is no storage level for which bytesToReplicate.dispose() would actually do anything. However, technically if bytesToReplicate where memory-mapped but not direct, this would dispose of that memory. Would we even want that behavior? Overall, this finally clause is attempting to destroy the data we get from doGetLocalBytes(). This does not seem to be safe or correct, because we do not want to destroy the data stored locally. Therefore, we should consider getting rid of this finally clause entirely. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, aren't we deciding whether we should dispose bytesToReplicate based on the 'target' StorageLevel? Shouldn't we make that decision based on whether bytesToReplicate were stored off-heap or not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So maybe use putBlockStatus.storageLevel instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As StorageUtils.dispose only cleans up a memory-mapped ByteBuffer (actually, java.nio.MappedByteBuffer), I don't think calling bytesToReplicate.dispose() here would be a problem, because it is a no-op for others.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi guys. Sorry for the delay on the update. I started down the path that I proposed and it resulted in too many awkward changes in method signatures downstream. I don't think this is a viable step forward.
As another option, we could dispose the buffer if and only if it has a non-null fd field. Since that field is private, we would have to call it by reflection. I'd also include a unit test to validate that the field exists as expected to guard against internal changes in future versions of Java.
On a broader level, I wonder if callers of ChunkedByteBuffer.dispose method understand that it will dispose of non-memory-mapped direct buffers? The documentation of that method suggests it's only supposed to dispose of memory-mapped files in the strict sense (those actually memory mapped against a file descriptor by the OS). If other methods are accidentally calling this method on non-memory-mapped direct buffers, that suggests to me we need to push the fix to that method (or actually the StorageUtils.dispose() method). What do you think of that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StorageUtils.dispose is a good place to put the fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To fix it in StorageUtils.dispose() sounds good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I'll see what else is calling that method to validate that a fix there won't break something else, and I'll add a unit test to validate that calling StorageUtils.dispose on a direct byte buffer that isn't memory mapped doesn't actually dispose it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to check if bytes is already a direct buffer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Actually, I think we need to check memoryMode.useOffHeap here.)
Assume memoryMode.useOffHeap is true. We have two cases to consider:
bytesis on-heap. In this case, we need to copy it into a new direct buffer, and that's what we're doing here.bytesis off-heap. In this case, we assume that the caller upstream is managing the memory underlyingbytes, andbytes.copy(Platform.allocateDirectBuffer)becomes a defensive copy. If the caller is not managing this memory, I would call that a bug in the caller's behavior.
In either case, I believe we should be calling bytes.copy(Platform.allocateDirectBuffer) when memoryMode.useOffHeap is true.
BTW, in my experience tracing this code in the debugger, bytes has always been an on-heap buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NM about checking memoryMode.useOffHeap. I got that confused with StorageLevel. There's actually only two values of MemoryMode: MemoryMode.OFF_HEAP and MemoryMode.ON_HEAP.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This condition can just check for level.useOffHeap right? But more generally, I had the same question that @viirya has. Is there a way to check if bytes is already off-heap and avoid the defensive copy? Or will that never be the case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it safe to store a ref to bytes if the memory is stored off-heap? If the caller changes the values in that memory or frees it, the buffer we put in the memory store will be affected. We don't want that kind of side-effect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean if all bytes are copy before passing in, it is no need to do another copy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So do a copy if and only if memoryMode == MemoryMode.OFF_HEAP and bytes is not direct and bytes is not a memory mapped file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. I meant do we actually need to defensive copy here? All usage of putBytes across Spark have duplicated the byte buffer before passing it in. Is there any missing case we should do this defensive copy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh. nvm. That duplicate doesn't actually copy buffer content...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I am still wondering if we need to do copy like this here. Right, it is defensive, but as BlockManager is private to spark internal, and if all callers to it do not modify/release the byte buffer passed in, doesn't this defensive copy only cause performance regression?
off-heap storage replication
storage level, do not copy the underlying data unless the buffer is not direct
320b548 to
25923f3
Compare
| val memoryMode = level.memoryMode | ||
| memoryStore.putBytes(blockId, size, memoryMode, () => { | ||
| if (memoryMode == MemoryMode.OFF_HEAP && | ||
| bytes.chunks.exists(buffer => !buffer.isDirect)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've refined this check for copying bytes to skip copying when the underlying buffers are already direct.
| /** | ||
| * Put the block locally, using the given storage level. | ||
| * | ||
| * '''Important!''' Callers must not mutate or release the data buffer underlying `bytes`. Doing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've explicitly documented the fact that callers must not mutate the data buffers underlying bytes.
|
Test build #73223 has finished for PR 16499 at commit
|
|
Test build #73226 has finished for PR 16499 at commit
|
ChunkedByteBuffer.toByteBuffer method to ensure callers understand this method does not always return a copy of the underlying data
|
I looked into simply cleaning up the I also found a new memory management bug in I found that there were no uses of |
| replicate(blockId, bytesToReplicate, level, remoteClassTag) | ||
| } finally { | ||
| bytesToReplicate.dispose() | ||
| bytesToReplicate.unmap() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one new thought: ideally we only wanna dispose bytes returned by DiskStore. ChunkedByteBuffer has a boolean flag, disposed. If this flag is true, ChunkedByteBuffer.dispose will become no-op.
When MemoryStore returns a ChunkedByteBuffer, how about we setting the disposed flag to true? Then we won't dispose it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would to explore this further to ensure this would really work well, but I like your idea with one caveat. I think we should avoid using the disposed var for this purpose. Using it this way would introduce ambiguity in its meaning when disposed is true. In addition to its current meaning, it could also mean "this buffer is not disposed but we don't want to dispose it".
Instead, I suggest keeping the usage of disposed as-is and adding an additional var, e.g. indisposable, which makes the dispose method itself a no-op.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan I explored the approach of making the MemoryStore return a ChunkedByteBuffer that cannot be disposed, however I don't think there's a clean way to safely support that behavior. In essence, if the memory manager marks a buffer as indisposable when it returns it to the block manager, then that buffer cannot be evicted later. Adding additional code to handle this other behavior correctly was looking rather messy, and I abandoned the effort.
At this point, I think that explicitly separating unmap and dispose methods is still the best way to resolve this issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea we should separate unmap and dispose. but instead of using a hack in https://github.com/apache/spark/pull/16499/files#diff-21027f5c826cd378daaae5f7c3eea2b5R240, shall we use a just a flag needUnmap in ChunkedByteBuffer ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The best I think we can expect from such a flag is a hint. The constructor of a ChunkedByteBuffer will not always know if the underlying byte buffers are memory mapped or not. For example, see
| putBytes(blockId, new ChunkedByteBuffer(data.nioByteBuffer()), level)(classTag) |
In this case, data.nioByteBuffer() might or might not be memory-mapped.
I still think the current patch set is the best overall amongst the other options we've considered. I can add a unit test for StorageUtils.unmap to ensure it works as expected (only disposing memory-mapped buffers). I can also add an if clause around the call to bytesToReplicate.unmap() to ensure this is only called when the replication storage level is off-heap. This will ensure the reflective call on the fd field only occurs for off-heap replication. Given that off-heap replication is currently broken, I doubt anyone will notice a performance degradation... Besides that, I suspect that network and disk IO performance will dominate the reflective method call performance.
|
Test build #74127 has finished for PR 16499 at commit
|
(Jira: https://issues.apache.org/jira/browse/SPARK-17204) ## What changes were proposed in this pull request? There are a couple of bugs in the `BlockManager` with respect to support for replicated off-heap storage. First, the locally-stored off-heap byte buffer is disposed of when it is replicated. It should not be. Second, the replica byte buffers are stored as heap byte buffers instead of direct byte buffers even when the storage level memory mode is off-heap. This PR addresses both of these problems. ## How was this patch tested? `BlockManagerReplicationSuite` was enhanced to fill in the coverage gaps. It now fails if either of the bugs in this PR exist. Author: Michael Allman <[email protected]> Closes #16499 from mallman/spark-17204-replicated_off_heap_storage. (cherry picked from commit 7fa116f) Signed-off-by: Wenchen Fan <[email protected]>
|
thanks, merging to master/2.1! @mallman can you send a new PR for 2.0? thanks! |
Will do. Do I need to open a new JIRA ticket for that? |
|
You do not need to open the new JIRA. You can still use the same JIRA number |
(Jira: https://issues.apache.org/jira/browse/SPARK-17204) There are a couple of bugs in the `BlockManager` with respect to support for replicated off-heap storage. First, the locally-stored off-heap byte buffer is disposed of when it is replicated. It should not be. Second, the replica byte buffers are stored as heap byte buffers instead of direct byte buffers even when the storage level memory mode is off-heap. This PR addresses both of these problems. `BlockManagerReplicationSuite` was enhanced to fill in the coverage gaps. It now fails if either of the bugs in this PR exist. Author: Michael Allman <[email protected]> Closes apache#16499 from mallman/spark-17204-replicated_off_heap_storage.
|
Backport PR is #17390 |
(Jira: https://issues.apache.org/jira/browse/SPARK-17204)
What changes were proposed in this pull request?
There are a couple of bugs in the
BlockManagerwith respect to support for replicated off-heap storage. First, the locally-stored off-heap byte buffer is disposed of when it is replicated. It should not be. Second, the replica byte buffers are stored as heap byte buffers instead of direct byte buffers even when the storage level memory mode is off-heap. This PR addresses both of these problems.How was this patch tested?
BlockManagerReplicationSuitewas enhanced to fill in the coverage gaps. It now fails if either of the bugs in this PR exist.