-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-19556][core] Do not encrypt block manager data in memory. #17295
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This change modifies the way block data is encrypted to make the more
common cases faster, while penalizing an edge case. As a side effect
of the change, all data that goes through the block manager is now
encrypted only when needed, including the previous path (broadcast
variables) where that did not happen.
The way the change works is by not encrypting data that is stored in
memory; so if a serialized block is in memory, it will only be encrypted
once it is evicted to disk.
The penalty comes when transferring that encrypted data from disk. If the
data ends up in memory again, it is as efficient as before; but if the
evicted block needs to be transferred directly to a remote executor, then
there's now a performance penalty, since the code now uses a custom
FileRegion implementation to decrypt the data before transferring.
This also means that block data transferred between executors now is
not encrypted (and thus relies on the network library encryption support
for secrecy). Shuffle blocks are still transferred in encrypted form,
since they're handled in a slightly different way by the code. This also
keeps compatibility with existing external shuffle services, which transfer
encrypted shuffle blocks, and avoids having to make the external service
aware of encryption at all.
Another change in the disk store is that it now stores a tiny metadata
file next to the file holding the block data; this is needed to accurately
account for the decrypted block size, which may be significantly different
from the size of the encrypted file on disk.
The serialization and deserialization APIs in the SerializerManager now
do not do encryption automatically; callers need to explicitly wrap their
streams with an appropriate crypto stream before using those.
As a result of these changes, some of the workarounds added in SPARK-19520
are removed here.
Testing: a new trait ("EncryptionFunSuite") was added that provides an easy
way to run a test twice, with encryption on and off; broadcast, block manager
and caching tests were modified to use this new trait so that the existing
tests exercise both encrypted and non-encrypted paths. I also ran some
applications with encryption turned on to verify that they still work,
including streaming tests that failed without the fix for SPARK-19520.
|
I have not looked at the implementation in detail, but can you comment on why the change w.r.t plain text block data to remote executor ? Isn't it not simpler to transmit block contents in encrypted format without decryption ? Remote fetch of RDD blocks is not uncommon (for any task other than PROCESS_LOCAL); and I wanted to better understand why this is required. |
First, keep in mind that there's no metadata that tells the receiver whether a block is encrypted or not. This means that methods like This leaves two choices:
That's fine. This change makes the data read from the BlockManager instance not encrypted. But when transmitting the data over to another executor, there's RPC-level encryption ( |
This can be solved by tagging the block data with a prefix byte - we do something similar for MapStatus (direct or broadcast). |
|
Test build #74555 has finished for PR 17295 at commit
|
|
Just to be clear, I would prefer if we consistently did things - either encrypt all blocks while transferring (irrespective of sasl being enabled or not); or depend only on sasl for channel encryption. (The workaround is, what I mentioned above, tagging) |
Sure, it could be solved in different ways. I just happened to prefer the one in this patch, since I think it's less intrusive; if you look closely, the majority of changes are in a single class ( |
Not really sure what you mean here. But transferring encrypted data without RPC encryption is not really secure, since the encryption key is transferred to executors using an RPC. There's even a warning message if RPC encryption is not on and you enable disk encryption. Shuffle is a different beast - I explain why the shuffle blocks are transferred in encrypted form in the PR description. |
Good point, I overlooked that. I agree about shuffle being special case'd; I was looking at only non-shuffle blocks. |
No, with these changes, only shuffle data is transferred in encrypted form. |
What's the actual difference? previously we transfer encrypted data? |
Yes. The previous version of the code would transfer the encrypted file over to the receiver, and the encrypted data for serialized blocks would also be stored in But this caused all the other issues with making the |
|
makes sense. one more question, ideally, shall we also transfer shuffle blocks after decryption? |
| new CryptoInputStream(transformationStr, properties, is, | ||
| new SecretKeySpec(key, "AES"), new IvParameterSpec(iv)) | ||
| var read = 0 | ||
| while (read < iv.length) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does this while loop do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It avoids issues with short reads. It's unlikely to happen but I always write read code like this to be safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, you can just use ByteStreams.readFully(is, iv).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, missed that one. +1 for shorter code.
| /** | ||
| * This class is a workaround for CRYPTO-125, that forces all bytes to be written to the | ||
| * underlying channel. Since the callers of this API are using blocking I/O, there are no | ||
| * concerns with regards to CPU usage here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it a separated bug fix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. As the comment states, it's a workaround for a bug in the commons-crypto library, which would affect the code being added.
| assert(blockSize <= Int.MaxValue, "Block is too large to be wrapped in a byte buffer.") | ||
| val is = toInputStream() | ||
| try { | ||
| ByteBuffer.wrap(ByteStreams.toByteArray(is)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will we read all data out here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a comment explaining it a few lines above...
No. That's explained in the PR description. |
mridulm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did an initial pass and added some comments/queries. Overall, as I mentioned earlier, I like the fact that we have a more consistent approach to transfering data.
Thanks for the work @vanzin !
| val params = new CryptoParams(key, sparkConf) | ||
| val iv = createInitializationVector(params.conf) | ||
| val buf = ByteBuffer.wrap(iv) | ||
| while (buf.remaining() > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: buf.hasRemaining for this pattern of use
| throw new EOFException("Failed to read IV from stream.") | ||
| } | ||
| read += _read | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ByteStreams.readFully instead of the loop
| key: Array[Byte]): ReadableByteChannel = { | ||
| val iv = new Array[Byte](IV_LENGTH_IN_BYTES) | ||
| val buf = ByteBuffer.wrap(iv) | ||
| buf.clear() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: The clear is not required.
| } | ||
|
|
||
| /** | ||
| * This class is a workaround for CRYPTO-125, that forces all bytes to be written to the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a lousy bug ! Good thing that we dont seem to be hit by it (yet).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a pretty nasty workaround for it in the network library... (the non-blocking workaround is a lot worse than this.)
| override def toManagedBuffer(): ManagedBuffer = new NettyManagedBuffer(buffer.toNetty) | ||
|
|
||
| override def toByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer = { | ||
| buffer.copy(allocator) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
autoDispose is not honored for toManagedBuffer and toByteBuffer ?
On first pass, it looks like it is not ...
Also, is the expectation that invoker must manually invoke dispose when not using toInputStream ?
Would be good to add a comment about this to BlockData trait detailing the expectation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I had traced through that stuff 2 or 3 times, and now I did it again and I think I finally understood all that's going on. Basically, the old code was really bad at explicitly disposing of the buffers, meaning a bunch of paths (like the ones that used managed buffers) didn't bother to do it and just left the work to the GC.
I changed the code a bit to make the dispose more explicit and added comments in a few key places.
| */ | ||
| private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolean) extends Logging { | ||
|
|
||
| private val METADATA_FILE_SUFFIX = ".meta" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assuming I am not missing something, shuffle does not use (require) block length from meta file.
If yes, for all others, why not simply keep the block size in memory ? On executor failure, the on disk block is lost anyway, and we already maintain block info for each block in executor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, good point... there's currently no metadata kept in the DiskStore class, but then this shouldn't be a lot of data.
| } finally { | ||
| try { | ||
| Closeables.close(fileOutputStream, threwException) | ||
| Closeables.close(out, threwException) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IOException can be thrown in close(), we will need to remove block (and meta) in that case as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was the previous behavior, but well, doesn't hurt to fix it.
| Utils.tryWithSafeFinally { | ||
| val buf = ByteBuffer.allocate(blockSize.toInt) | ||
| while (buf.remaining() > 0) { | ||
| channel.read(buf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to handle case where read() returns EOF (-1) in case of data corruption, file removal from underneath, etc : we will end up in infinite loop otherwise.
I might have missed more places where this pattern exists in this change.
| remaining -= chunkSize | ||
|
|
||
| while (chunk.remaining() > 0) { | ||
| source.read(chunk) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as mentioned above, needs EOF error handling.
| written | ||
| } | ||
|
|
||
| override def deallocate(): Unit = source.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
release buffer as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StorageUtils.dispose specifically checks for mapped buffers, which is not the case here. It could be changed, but in this case I wonder if it's necessary or if waiting for GC is good enough.
This wasn't really caused by the new code, but by old code that was not consistent in its disposal of BlockManager buffers. This commit fixes the places where the underlying buffer was just left for the GC instead of being explicitly disposed.
|
Test build #74906 has finished for PR 17295 at commit
|
|
retest this please |
|
Test build #74905 has finished for PR 17295 at commit
|
|
Test build #74911 has finished for PR 17295 at commit
|
|
Test build #74989 has finished for PR 17295 at commit
|
|
Test build #74997 has finished for PR 17295 at commit
|
| } | ||
| obj | ||
| } finally { | ||
| blocks.foreach(_.dispose()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah good catch! we should dispose the blocks here
| val params = new CryptoParams(key, sparkConf) | ||
| val iv = createInitializationVector(params.conf) | ||
| val buf = ByteBuffer.wrap(iv) | ||
| while (buf.hasRemaining()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there any possibility this may be an infinite loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually this logic is same with CryptoHelperChannel.write. Shall we create CryptoHelperChannel first and simply call helper.write(buf) here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, there's no infinite loop here, because a failure would cause an exception. Yeah, using the helper should work too.
| key: Array[Byte]): ReadableByteChannel = { | ||
| val iv = new Array[Byte](IV_LENGTH_IN_BYTES) | ||
| val buf = ByteBuffer.wrap(iv) | ||
| JavaUtils.readFully(channel, buf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not use ByteStreams.readFully? the buf is not used else where
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no ByteStreams.readFully for ReadableByteChannel that I'm aware of.
| val byteStream = new BufferedOutputStream(outputStream) | ||
| val autoPick = !blockId.isInstanceOf[StreamBlockId] | ||
| val ser = getSerializer(implicitly[ClassTag[T]], autoPick).newInstance() | ||
| ser.serializeStream(wrapStream(blockId, byteStream)).writeAll(values).close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the wrapStream and wrapForEncryption methods can be removed from this class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They're still used in a bunch of places.
| def toNetty(): Object | ||
|
|
||
| def toChunkedByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it will be great to add some document for these 4 methods about when they will be called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added scaladoc for toNetty(), but the others seem self-explanatory to me.
|
|
||
| /** | ||
| * This [[ManagedBuffer]] wraps a [[ChunkedByteBuffer]] retrieved from the [[BlockManager]] | ||
| * This [[ManagedBuffer]] wraps a ManagedBuffer retrieved from the [[BlockManager]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wraps a [[BlockData]]
| data: BlockData, | ||
| dispose: Boolean) extends ManagedBuffer { | ||
|
|
||
| private val refCount = new AtomicInteger(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we should mention it in the class doc that the BlockData will be disposed automatically via reference count.
| blockId: BlockId, | ||
| chunkedBuffer: ChunkedByteBuffer) extends NettyManagedBuffer(chunkedBuffer.toNetty) { | ||
| data: BlockData, | ||
| dispose: Boolean) extends ManagedBuffer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needDispose may be a better name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I prefer dispose, because it's not about needing to dispose the buffer, but wanting to dispose the buffer.
| replicate(blockId, bytesToReplicate, level, remoteClassTag) | ||
| } finally { | ||
| bytesToReplicate.unmap() | ||
| bytesToReplicate.dispose() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why change unmap to dispose?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because there's no BlockData.unmap().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BlockData.dispose calls ChunkedByteBuffer.unmap.
| Closeables.close(fileOutputStream, threwException) | ||
| out.close() | ||
| } catch { | ||
| case ioe: IOException => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this? threwException starts with true
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code needs to catch any exception thrown by out.close() and also remove the block in that case. That wasn't done before.
| private[spark] trait BlockData { | ||
|
|
||
| def toInputStream(): InputStream | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the return type is Object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See ManagedBuffer.convertToNetty().
|
Test build #75120 has finished for PR 17295 at commit
|
| override def toByteBuffer(): ByteBuffer = buffer.toByteBuffer | ||
|
|
||
| override def size: Long = buffer.size | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we define the semantic of the BlockData.dispose clearly? It's quite confusing here that the dispose method call buffer.unmap while ChunkedByteBuffer also has a dispose method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think BlockData.dispose() is pretty well defined. "Release any resources held by the object." What's confusing is that there's both dispose() and unmap() in ChunkedByteBuffer, when there used to be only dispose(). It's confusing to have two different methods for releasing resources, and that confusion is not being caused by this patch.
BlockData is not just a wrapper around ChunkedByteBuffer; if it were there wouldn't be a need for it. Which is why calling the method unmmap() wouldn't make any sense here, since that's very specific to memory-mapped byte buffers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW I'm really starting to think the fix in #16499, while technically correct, is more confusing that it should be. The problem is not that the code was disposing of off-heap buffers; the problem is that buffers read from the memory store should not be disposed of, while buffers read from the disk store should.
So it's not really a matter of dispose vs. unmap, but a matter of where the buffer come from. (Which is kinda what I had in this patch with the autoDispose parameter to ByteBufferBlockData. Perhaps I should revive that and get rid of StorageUtils.unmap, which is just confusing.)
It is not the type of buffer that defines whether it should be disposed or not, but rather where it comes from: if it comes from the memory store, it should not be disposed. Any other case (disk store, temporary serialized buffers, etc), the buffer should be disposed. It just happened that this sort of aligned with the types (buffers from the memory store are normal buffers, buffers from the disk store are mapped buffers in certain cases). But the origin defines who owns the buffer and, thus, who should dispose of it.
|
I removed The replication tests fail from time to time but they seem to be flaky without this patch. See; |
|
Test build #75226 has finished for PR 17295 at commit
|
| * so that the corresponding block's read lock can be released once this buffer's references | ||
| * are released. | ||
| * | ||
| * If `dispose` is set to try, the [[BlockData]]will be disposed when the buffer's reference |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is set to try -> is set to true
|
LGTM, cc @mallman to check the |
|
Test build #75267 has finished for PR 17295 at commit
|
|
Test build #75323 has finished for PR 17295 at commit
|
|
thanks, merging to master! |
LGTM, too. Sorry for the late reply... I've been away the past two weeks. |
This change modifies the way block data is encrypted to make the more
common cases faster, while penalizing an edge case. As a side effect
of the change, all data that goes through the block manager is now
encrypted only when needed, including the previous path (broadcast
variables) where that did not happen.
The way the change works is by not encrypting data that is stored in
memory; so if a serialized block is in memory, it will only be encrypted
once it is evicted to disk.
The penalty comes when transferring that encrypted data from disk. If the
data ends up in memory again, it is as efficient as before; but if the
evicted block needs to be transferred directly to a remote executor, then
there's now a performance penalty, since the code now uses a custom
FileRegion implementation to decrypt the data before transferring.
This also means that block data transferred between executors now is
not encrypted (and thus relies on the network library encryption support
for secrecy). Shuffle blocks are still transferred in encrypted form,
since they're handled in a slightly different way by the code. This also
keeps compatibility with existing external shuffle services, which transfer
encrypted shuffle blocks, and avoids having to make the external service
aware of encryption at all.
The serialization and deserialization APIs in the SerializerManager now
do not do encryption automatically; callers need to explicitly wrap their
streams with an appropriate crypto stream before using those.
As a result of these changes, some of the workarounds added in SPARK-19520
are removed here.
Testing: a new trait ("EncryptionFunSuite") was added that provides an easy
way to run a test twice, with encryption on and off; broadcast, block manager
and caching tests were modified to use this new trait so that the existing
tests exercise both encrypted and non-encrypted paths. I also ran some
applications with encryption turned on to verify that they still work,
including streaming tests that failed without the fix for SPARK-19520.